use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use assert_matches2::assert_let;
use eyeball_im::VectorDiff;
use futures_util::StreamExt;
use matrix_sdk::{assert_let_timeout, executor::spawn, test_utils::mocks::MatrixMockServer};
use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
use matrix_sdk_ui::timeline::{EventSendState, RoomExt};
use ruma::{
event_id,
events::room::message::{MessageType, RoomMessageEventContent},
room_id, user_id,
};
use stream_assert::{assert_next_matches, assert_pending};
use tokio::task::yield_now;
#[async_test]
async fn test_echo() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room_id = room_id!("!a98sd12bjh:example.org");
let room = server.sync_joined_room(&client, room_id).await;
server.mock_room_state_encryption().plain().mount().await;
let timeline = Arc::new(
room.timeline_builder()
.with_internal_id_prefix("le_prefix".to_owned())
.build()
.await
.unwrap(),
);
let (_, mut timeline_stream) = timeline.subscribe().await;
let event_id = event_id!("$ev");
server.mock_room_send().ok(event_id).mock_once().mount().await;
let timeline = timeline.clone();
#[allow(unknown_lints, clippy::redundant_async_block)] let send_hdl = spawn(async move {
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await
});
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 2);
assert_let!(VectorDiff::PushBack { value: local_echo } = &timeline_updates[0]);
let item = local_echo.as_event().unwrap();
assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None }));
assert_let!(Some(msg) = item.content().as_message());
assert_let!(MessageType::Text(text) = msg.msgtype());
assert_eq!(text.body, "Hello, World!");
assert!(item.event_id().is_none());
let txn_id = item.transaction_id().unwrap();
assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[1]);
assert!(date_divider.is_date_divider());
send_hdl.await.unwrap().unwrap();
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 1);
assert_let!(VectorDiff::Set { index: 1, value: sent_confirmation } = &timeline_updates[0]);
let item = sent_confirmation.as_event().unwrap();
assert_matches!(item.send_state(), Some(EventSendState::Sent { .. }));
assert_eq!(item.event_id(), Some(event_id));
assert_pending!(timeline_stream);
let another_event_id = event_id!("$ev1");
let f = EventFactory::new();
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(
f.text_msg("Hello, World!")
.sender(user_id!("@example:localhost"))
.event_id(event_id)
.server_ts(152038280)
.unsigned_transaction_id(txn_id),
)
.add_timeline_event(
f.text_msg("Raclette")
.sender(user_id!("@example:localhost"))
.event_id(another_event_id)
.server_ts(152038281),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 5);
assert_matches!(&timeline_updates[0], VectorDiff::Remove { index: 1 });
assert_let!(VectorDiff::PushFront { value: first_event } = &timeline_updates[1]);
assert_eq!(first_event.as_event().unwrap().event_id(), Some(event_id));
assert_let!(VectorDiff::Insert { index: 1, value: second_event } = &timeline_updates[2]);
assert_eq!(second_event.as_event().unwrap().event_id(), Some(another_event_id));
assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[3]);
assert!(date_divider.is_date_divider());
assert_matches!(&timeline_updates[4], VectorDiff::Remove { index: 3 });
assert_pending!(timeline_stream);
}
#[async_test]
async fn test_retry_failed() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room_id = room_id!("!a98sd12bjh:example.org");
let room = server.sync_joined_room(&client, room_id).await;
client.send_queue().set_enabled(true).await;
server.mock_room_state_encryption().plain().mount().await;
let timeline = Arc::new(room.timeline().await.unwrap());
let (_, mut timeline_stream) =
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
let scoped_faulty_send = server.mock_room_send().error500().expect(3).mount_as_scoped().await;
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap();
yield_now().await;
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_matches!(value.send_state(), Some(EventSendState::NotSentYet { progress: None }));
});
assert_let_timeout!(
Duration::from_secs(5),
Some(VectorDiff::Set { index: 0, value: item }) = timeline_stream.next()
);
assert_matches!(
item.send_state(),
Some(EventSendState::SendingFailed { is_recoverable: true, .. })
);
assert!(client.send_queue().is_enabled());
assert!(!room.send_queue().is_enabled());
drop(scoped_faulty_send);
server.mock_room_send().ok(event_id!("$wWgymRfo7ri1uQx0NXO40vLJ")).mount().await;
room.send_queue().set_enabled(true);
tokio::time::sleep(Duration::from_millis(300)).await;
assert_let_timeout!(Some(VectorDiff::Set { index: 0, value }) = timeline_stream.next());
assert_matches!(value.send_state(), Some(EventSendState::Sent { .. }));
}
#[async_test]
async fn test_dedup_by_event_id_late() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room_id = room_id!("!a98sd12bjh:example.org");
let room = server.sync_joined_room(&client, room_id).await;
server.mock_room_state_encryption().plain().mount().await;
let timeline = Arc::new(room.timeline().await.unwrap());
let (_, mut timeline_stream) = timeline.subscribe().await;
let event_id = event_id!("$wWgymRfo7ri1uQx0NXO40vLJ");
server
.mock_room_send()
.ok_with_delay(event_id, Duration::from_millis(500))
.mount()
.await;
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap();
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 2);
assert_let!(VectorDiff::PushBack { value: local_echo } = &timeline_updates[0]);
let item = local_echo.as_event().unwrap();
assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { progress: None }));
assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[1]);
assert!(date_divider.is_date_divider());
let f = EventFactory::new();
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(
f.text_msg("Hello, World!")
.sender(client.user_id().unwrap())
.event_id(event_id)
.server_ts(123456),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 2);
assert_let!(VectorDiff::PushFront { value: remote_echo } = &timeline_updates[0]);
let item = remote_echo.as_event().unwrap();
assert_eq!(item.event_id(), Some(event_id));
assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[1]);
assert!(date_divider.is_date_divider());
assert_let_timeout!(Duration::from_secs(2), Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 6);
assert_let!(VectorDiff::Remove { index: 3 } = &timeline_updates[0]);
assert_let!(VectorDiff::Remove { index: 2 } = &timeline_updates[1]);
assert_let!(VectorDiff::Remove { index: 1 } = &timeline_updates[2]);
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[3]);
assert_eq!(value.as_event().unwrap().event_id(), Some(event_id));
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[4]);
assert!(value.is_date_divider());
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[4]);
assert!(value.is_date_divider());
assert_let!(VectorDiff::Remove { index: 2 } = &timeline_updates[5]);
assert_pending!(timeline_stream);
}
#[async_test]
async fn test_cancel_failed() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room_id = room_id!("!a98sd12bjh:example.org");
let room = server.sync_joined_room(&client, room_id).await;
server.mock_room_state_encryption().plain().mount().await;
let timeline = Arc::new(room.timeline().await.unwrap());
let (_, mut timeline_stream) =
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
let handle =
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap();
yield_now().await;
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_matches!(value.send_state(), Some(EventSendState::NotSentYet { progress: None }));
});
assert_let_timeout!(Some(VectorDiff::Set { index: 0, value }) = timeline_stream.next());
assert_matches!(value.send_state(), Some(EventSendState::SendingFailed { .. }));
assert!(handle.abort().await.unwrap());
assert_matches!(timeline_stream.next().await, Some(VectorDiff::Remove { index: 0 }));
}