use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use assert_matches2::assert_let;
use eyeball_im::VectorDiff;
use futures_util::StreamExt;
use matrix_sdk::{Error, assert_let_timeout, test_utils::mocks::MatrixMockServer};
use matrix_sdk_base::store::QueueWedgeError;
use matrix_sdk_test::{ALICE, JoinedRoomBuilder, async_test, event_factory::EventFactory};
use matrix_sdk_ui::timeline::{EventItemOrigin, EventSendState, RoomExt};
use ruma::{
MilliSecondsSinceUnixEpoch, event_id, events::room::message::RoomMessageEventContent, room_id,
};
use serde_json::json;
use stream_assert::{assert_next_matches, assert_pending};
use tokio::{task::yield_now, time::sleep};
#[async_test]
async fn test_message_order() {
let room_id = room_id!("!a98sd12bjh:example.org");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
server.mock_room_state_encryption().plain().mount().await;
let room = server.sync_joined_room(&client, room_id).await;
let timeline = Arc::new(room.timeline().await.unwrap());
let (_, mut timeline_stream) =
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
server
.mock_room_send()
.body_matches_partial_json(json!({ "body": "First!" }))
.ok_with_delay(event_id!("$ev0"), Duration::from_millis(200))
.mount()
.await;
server
.mock_room_send()
.body_matches_partial_json(json!({ "body": "Second." }))
.ok_with_delay(event_id!("$ev1"), Duration::from_millis(100))
.mount()
.await;
timeline.send(RoomMessageEventContent::text_plain("First!").into()).await.unwrap();
timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await.unwrap();
yield_now().await;
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert!(value.is_editable(), "local echo of first can be edited");
assert_eq!(value.content().as_message().unwrap().body(), "First!");
});
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert!(value.is_editable(), "local echo of second can be edited");
assert_eq!(value.content().as_message().unwrap().body(), "Second.");
});
sleep(Duration::from_millis(500)).await;
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => {
assert!(value.is_editable(), "remote echo of first can be edited");
assert_eq!(value.content().as_message().unwrap().body(), "First!");
assert_eq!(value.event_id().unwrap(), "$ev0");
});
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => {
assert!(value.is_editable(), "remote echo of second can be edited");
assert_eq!(value.content().as_message().unwrap().body(), "Second.");
assert_eq!(value.event_id().unwrap(), "$ev1");
});
assert_pending!(timeline_stream);
}
#[async_test]
async fn test_retry_order() {
let room_id = room_id!("!a98sd12bjh:example.org");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
server.mock_room_state_encryption().plain().mount().await;
let room = server.sync_joined_room(&client, room_id).await;
let timeline = Arc::new(room.timeline().await.unwrap());
let (_, mut timeline_stream) =
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
let scoped_faulty_send = server.mock_room_send().error500().expect(3).mount_as_scoped().await;
timeline.send(RoomMessageEventContent::text_plain("First!").into()).await.unwrap();
timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await.unwrap();
yield_now().await;
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_eq!(value.content().as_message().unwrap().body(), "First!");
});
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_eq!(value.content().as_message().unwrap().body(), "Second.");
});
assert_let_timeout!(
Duration::from_secs(5),
Some(VectorDiff::Set { index: 0, value: first }) = timeline_stream.next()
);
assert_matches!(first.send_state().unwrap(), EventSendState::SendingFailed { .. });
drop(scoped_faulty_send);
server
.mock_room_send()
.body_matches_partial_json(json!({ "body": "First!" }))
.ok_with_delay(event_id!("$ev0"), Duration::from_millis(100))
.mount()
.await;
server
.mock_room_send()
.body_matches_partial_json(json!({ "body": "Second." }))
.ok_with_delay(event_id!("$ev1"), Duration::from_millis(200))
.mount()
.await;
client.send_queue().set_enabled(true).await;
sleep(Duration::from_millis(600)).await;
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => {
assert_eq!(value.content().as_message().unwrap().body(), "First!");
assert_matches!(value.send_state().unwrap(), EventSendState::Sent { .. });
assert_eq!(value.event_id().unwrap(), "$ev0");
});
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => {
assert_eq!(value.content().as_message().unwrap().body(), "Second.");
assert_matches!(value.send_state().unwrap(), EventSendState::Sent { .. });
assert_eq!(value.event_id().unwrap(), "$ev1");
});
assert_pending!(timeline_stream);
}
#[async_test]
async fn test_reloaded_failed_local_echoes_are_marked_as_failed() {
let room_id = room_id!("!a98sd12bjh:example.org");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
server.mock_room_state_encryption().plain().mount().await;
let room = server.sync_joined_room(&client, room_id).await;
let timeline = Arc::new(room.timeline().await.unwrap());
let (_, mut timeline_stream) =
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
server.mock_room_send().error_too_large().expect(1).mount().await;
timeline.send(RoomMessageEventContent::text_plain("wall of text").into()).await.unwrap();
yield_now().await;
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_eq!(value.content().as_message().unwrap().body(), "wall of text");
});
assert_let_timeout!(Some(VectorDiff::Set { index: 0, value: first }) = timeline_stream.next());
let (error, is_recoverable) = assert_matches!(first.send_state().unwrap(), EventSendState::SendingFailed { error, is_recoverable } => (error, is_recoverable));
assert!(!is_recoverable);
assert_matches!(
error.as_client_api_error().unwrap().error_kind(),
Some(ruma::api::error::ErrorKind::TooLarge)
);
assert_pending!(timeline_stream);
let timeline = Arc::new(room.timeline().await.unwrap());
let (initial, _) = timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
assert_eq!(initial.len(), 1);
assert_eq!(initial[0].content().as_message().unwrap().body(), "wall of text");
assert_let!(
Some(EventSendState::SendingFailed { error, is_recoverable }) = initial[0].send_state()
);
assert!(!is_recoverable);
let msg = assert_matches!(
&**error,
Error::SendQueueWedgeError(error) => {
assert_matches!(&**error, QueueWedgeError::GenericApiError { msg } => { msg })
}
);
assert_eq!(msg, "the server returned an error: [413 / M_TOO_LARGE] Request body too large");
}
#[async_test]
async fn test_clear_with_echoes() {
let room_id = room_id!("!a98sd12bjh:example.org");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let f = EventFactory::new();
server.mock_room_state_encryption().plain().mount().await;
let room = server.sync_joined_room(&client, room_id).await;
let timeline = room.timeline().await.unwrap();
{
let (_, mut timeline_stream) = timeline.subscribe().await;
timeline.send(RoomMessageEventContent::text_plain("Send failure").into()).await.unwrap();
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 2);
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 1);
}
server
.mock_room_send()
.ok_with_delay(event_id!("$PyHxV5mYzjetBUT3qZq7V95GOzxb02EP"), Duration::from_secs(3600))
.mount()
.await;
timeline.send(RoomMessageEventContent::text_plain("Pending").into()).await.unwrap();
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("another message").sender(&ALICE)),
)
.await;
let timeline_items = timeline.items().await;
let event_items: Vec<_> = timeline_items.iter().filter_map(|item| item.as_event()).collect();
assert_eq!(event_items.len(), 3);
assert_matches!(event_items[0].origin(), Some(EventItemOrigin::Sync));
assert_matches!(event_items[1].send_state(), Some(EventSendState::SendingFailed { .. }));
assert_matches!(
event_items[2].send_state(),
Some(EventSendState::NotSentYet { progress: None })
);
timeline.clear().await;
let timeline_items = timeline.items().await;
let event_items: Vec<_> = timeline_items.iter().filter_map(|item| item.as_event()).collect();
assert_eq!(event_items.len(), 2);
assert_matches!(event_items[0].send_state(), Some(EventSendState::SendingFailed { .. }));
assert_matches!(
event_items[1].send_state(),
Some(EventSendState::NotSentYet { progress: None })
);
}
#[async_test]
async fn test_no_duplicate_date_divider() {
let room_id = room_id!("!a98sd12bjh:example.org");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
server.mock_room_state_encryption().plain().mount().await;
let room = server.sync_joined_room(&client, room_id).await;
let timeline = Arc::new(room.timeline().await.unwrap());
let (_, mut timeline_stream) = timeline.subscribe().await;
server
.mock_room_send()
.body_matches_partial_json(json!({ "body": "First!" }))
.ok_with_delay(event_id!("$ev0"), Duration::from_millis(200))
.mount()
.await;
server
.mock_room_send()
.body_matches_partial_json(json!({ "body": "Second." }))
.ok_with_delay(event_id!("$ev1"), Duration::from_millis(100))
.mount()
.await;
timeline.send(RoomMessageEventContent::text_plain("First!").into()).await.unwrap();
timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await.unwrap();
yield_now().await;
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 3);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
assert_eq!(value.as_event().unwrap().content().as_message().unwrap().body(), "First!");
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[1]);
assert!(value.is_date_divider());
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[2]);
assert_eq!(value.as_event().unwrap().content().as_message().unwrap().body(), "Second.");
sleep(Duration::from_millis(500)).await;
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 2);
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
let value = value.as_event().unwrap();
assert_matches!(value.send_state(), Some(EventSendState::Sent { event_id }) => {
assert_eq!(event_id, "$ev0");
});
assert_eq!(value.content().as_message().unwrap().body(), "First!");
assert_eq!(value.event_id().unwrap(), "$ev0");
assert_let!(VectorDiff::Set { index: 2, value: remote_event } = &timeline_updates[1]);
assert_eq!(remote_event.as_event().unwrap().event_id().unwrap(), "$ev1");
assert_pending!(timeline_stream);
let f = EventFactory::new();
let now = MilliSecondsSinceUnixEpoch::now();
f.set_next_ts(now.0.into());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(
f.text_msg("First!")
.sender(client.user_id().unwrap())
.event_id(event_id!("$ev2")),
)
.add_timeline_event(
f.text_msg("Second.")
.sender(client.user_id().unwrap())
.event_id(event_id!("$ev3")),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 4);
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[0]);
let value = value.as_event().unwrap();
assert_eq!(value.event_id().unwrap(), "$ev2");
assert_let!(VectorDiff::Insert { index: 1, value } = &timeline_updates[1]);
let value = value.as_event().unwrap();
assert_eq!(value.event_id().unwrap(), "$ev3");
assert_pending!(timeline_stream);
}