use std::{ops::Not, sync::Arc, time::Duration};
use assert_matches::assert_matches;
use assert_matches2::assert_let;
use eyeball_im::VectorDiff;
use futures_util::FutureExt;
use imbl::Vector;
use matrix_sdk::{
assert_let_timeout, assert_next_matches_with_timeout,
deserialized_responses::TimelineEvent,
event_cache::{
BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate, RoomPaginationStatus,
},
linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
store::StoreConfig,
test_utils::{
assert_event_matches_msg,
mocks::{MatrixMockServer, RoomMessagesResponseTemplate},
},
};
use matrix_sdk_base::event_cache::{
Gap,
store::{EventCacheStore, MemoryStore},
};
use matrix_sdk_test::{ALICE, BOB, JoinedRoomBuilder, async_test, event_factory::EventFactory};
use ruma::{
EventId, event_id,
events::{
AnySyncMessageLikeEvent, AnySyncTimelineEvent, TimelineEventType,
room::message::RoomMessageEventContentWithoutRelation,
},
room_id,
room_version_rules::RedactionRules,
user_id,
};
use tokio::{spawn, sync::broadcast, time::sleep};
mod threads;
macro_rules! assert_event_id {
($timeline_event:expr, $event_id:literal) => {
assert_eq!($timeline_event.event_id().unwrap().as_str(), $event_id);
};
}
#[async_test]
async fn test_must_explicitly_subscribe() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room_id = room_id!("!omelette:fromage.fr");
let room = server.sync_joined_room(&client, room_id).await;
let result = room.event_cache().await;
assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
}
#[async_test]
async fn test_event_cache_receives_events() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut subscriber) = room_event_cache.subscribe().await.unwrap();
assert!(events.is_empty());
assert!(subscriber.is_empty());
let f = EventFactory::new().sender(user_id!("@dexter:lab.org"));
server
.mock_sync()
.ok_and_run(&client, |sync_builder| {
sync_builder.add_joined_room(
JoinedRoomBuilder::new(room_id).add_timeline_event(f.text_msg("bonjour monde")),
);
sync_builder.add_joined_room(
JoinedRoomBuilder::new(room_id!("!parallel:universe.uk"))
.add_timeline_event(f.text_msg("hi i'm learning French")),
);
})
.await;
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Append { values: events } = &diffs[0]);
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "bonjour monde");
assert!(subscriber.is_empty());
}
#[async_test]
async fn test_ignored_unignored() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let other_room_id = room_id!("!galette:saucisse.bzh");
let dexter = user_id!("@dexter:lab.org");
let ivan = user_id!("@ivan:lab.ch");
let f = EventFactory::new();
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_bulk(vec![
f.text_msg("hey there").sender(dexter).into_raw_sync(),
f.text_msg("hoy!").sender(ivan).into_raw_sync(),
]),
)
.await;
server
.sync_room(
&client,
JoinedRoomBuilder::new(other_room_id)
.add_timeline_bulk(vec![f.text_msg("demat!").sender(ivan).into_raw_sync()]),
)
.await;
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
assert_eq!(events.len(), 2);
assert_event_matches_msg(&events[0], "hey there");
assert_event_matches_msg(&events[1], "hoy!");
server
.mock_sync()
.ok_and_run(&client, |sync_builder| {
sync_builder.add_global_account_data(f.ignored_user_list([dexter.to_owned()]));
})
.await;
server
.mock_sync()
.ok_and_run(&client, |sync_builder| {
sync_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("i don't like this dexter").sender(ivan)),
);
})
.await;
{
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Clear = &diffs[0]);
}
{
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Append { values: events } = &diffs[0]);
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "i don't like this dexter");
}
{
let room = client.get_room(other_room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let events = room_event_cache.events().await.unwrap();
assert!(events.is_empty());
}
assert!(room_stream.is_empty());
}
async fn wait_for_initial_events(
events: Vec<TimelineEvent>,
room_stream: &mut broadcast::Receiver<RoomEventCacheUpdate>,
) {
if events.is_empty() {
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = room_stream.recv()
);
while let Some(Ok(_)) = room_stream.recv().now_or_never() {}
} else {
assert_eq!(events.len(), 1);
}
}
#[async_test]
async fn test_backpaginate_once() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("heyo").event_id(event_id!("$1")))
.set_timeline_prev_batch("prev_batch".to_owned())
.set_timeline_limited(),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
wait_for_initial_events(events, &mut room_stream).await;
let outcome = {
server
.mock_room_messages()
.match_from("prev_batch")
.ok(RoomMessagesResponseTemplate::default().events(vec![
f.text_msg("world").event_id(event_id!("$2")),
f.text_msg("hello").event_id(event_id!("$3")),
]))
.mock_once()
.mount()
.await;
room_event_cache.pagination().run_backwards_once(20).await.unwrap()
};
let BackPaginationOutcome { events, reached_start } = outcome;
assert!(reached_start);
assert_eq!(events.len(), 2);
assert_event_matches_msg(&events[0], "world");
assert_event_matches_msg(&events[1], "hello");
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 2);
assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => {
assert_eq!(*index, 0);
assert_event_matches_msg(event, "hello");
});
assert_matches!(&diffs[1], VectorDiff::Insert { index, value: event } => {
assert_eq!(*index, 1);
assert_event_matches_msg(event, "world");
});
assert!(room_stream.is_empty());
let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
assert!(outcome.events.is_empty());
assert!(outcome.reached_start);
}
#[async_test]
async fn test_backpaginate_many_times_with_many_iterations() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("heyo"))
.set_timeline_prev_batch("prev_batch".to_owned())
.set_timeline_limited(),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
wait_for_initial_events(events, &mut room_stream).await;
let mut num_iterations = 0;
let mut global_events = Vec::new();
server
.mock_room_messages()
.match_from("prev_batch")
.ok(RoomMessagesResponseTemplate::default().end_token("prev_batch2").events(vec![
f.text_msg("world").event_id(event_id!("$2")),
f.text_msg("hello").event_id(event_id!("$3")),
]))
.mock_once()
.mount()
.await;
server
.mock_room_messages()
.match_from("prev_batch2")
.ok(RoomMessagesResponseTemplate::default()
.events(vec![f.text_msg("oh well").event_id(event_id!("$4"))]))
.mock_once()
.mount()
.await;
let pagination = room_event_cache.pagination();
loop {
let outcome = pagination.run_backwards_once(20).await.unwrap();
global_events.extend(outcome.events);
num_iterations += 1;
if outcome.reached_start {
break;
}
}
assert_eq!(num_iterations, 2);
assert_event_matches_msg(&global_events[0], "world");
assert_event_matches_msg(&global_events[1], "hello");
assert_event_matches_msg(&global_events[2], "oh well");
assert_eq!(global_events.len(), 3);
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 2);
assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => {
assert_eq!(*index, 0);
assert_event_matches_msg(event, "hello");
});
assert_matches!(&diffs[1], VectorDiff::Insert { index, value: event } => {
assert_eq!(*index, 1);
assert_event_matches_msg(event, "world");
});
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => {
assert_eq!(*index, 0);
assert_event_matches_msg(event, "oh well");
});
assert!(room_stream.is_empty());
let (events, room_stream) = room_event_cache.subscribe().await.unwrap();
assert_event_matches_msg(&events[0], "oh well");
assert_event_matches_msg(&events[1], "hello");
assert_event_matches_msg(&events[2], "world");
assert_event_matches_msg(&events[3], "heyo");
assert_eq!(events.len(), 4);
assert!(room_stream.is_empty());
}
#[async_test]
async fn test_backpaginate_many_times_with_one_iteration() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("heyo"))
.set_timeline_prev_batch("prev_batch".to_owned())
.set_timeline_limited(),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
wait_for_initial_events(events, &mut room_stream).await;
let mut num_iterations = 0;
let mut global_events = Vec::new();
server
.mock_room_messages()
.match_from("prev_batch")
.ok(RoomMessagesResponseTemplate::default().end_token("prev_batch2").events(vec![
f.text_msg("world").event_id(event_id!("$2")),
f.text_msg("hello").event_id(event_id!("$3")),
]))
.mock_once()
.mount()
.await;
server
.mock_room_messages()
.match_from("prev_batch2")
.ok(RoomMessagesResponseTemplate::default()
.events(vec![f.text_msg("oh well").event_id(event_id!("$4"))]))
.mock_once()
.mount()
.await;
let pagination = room_event_cache.pagination();
loop {
let outcome = pagination.run_backwards_until(20).await.unwrap();
global_events.extend(outcome.events);
num_iterations += 1;
if outcome.reached_start {
break;
}
}
assert_eq!(num_iterations, 1);
assert_event_matches_msg(&global_events[0], "world");
assert_event_matches_msg(&global_events[1], "hello");
assert_event_matches_msg(&global_events[2], "oh well");
assert_eq!(global_events.len(), 3);
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 2);
assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => {
assert_eq!(*index, 0);
assert_event_matches_msg(event, "hello");
});
assert_matches!(&diffs[1], VectorDiff::Insert { index, value: event } => {
assert_eq!(*index, 1);
assert_event_matches_msg(event, "world");
});
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => {
assert_eq!(*index, 0);
assert_event_matches_msg(event, "oh well");
});
let (events, room_stream) = room_event_cache.subscribe().await.unwrap();
assert_event_matches_msg(&events[0], "oh well");
assert_event_matches_msg(&events[1], "hello");
assert_event_matches_msg(&events[2], "world");
assert_event_matches_msg(&events[3], "heyo");
assert_eq!(events.len(), 4);
assert!(room_stream.is_empty());
}
#[async_test]
async fn test_reset_while_backpaginating() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("heyo").into_raw_sync())
.set_timeline_prev_batch("first_backpagination".to_owned())
.set_timeline_limited(),
)
.await;
let (room_event_cache, _drop_handles) =
client.get_room(room_id).unwrap().event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
wait_for_initial_events(events, &mut room_stream).await;
server
.mock_room_messages()
.match_from("first_backpagination")
.ok(RoomMessagesResponseTemplate::default()
.events(vec![f.text_msg("lalala").into_raw_timeline()])
.with_delay(Duration::from_millis(500)))
.mock_once()
.mount()
.await;
server
.mock_room_messages()
.match_from("second_backpagination")
.ok(RoomMessagesResponseTemplate::default()
.end_token("third_backpagination")
.events(vec![f.text_msg("finally!").into_raw_timeline()]))
.mock_once()
.mount()
.await;
let backpagination = spawn({
let pagination = room_event_cache.pagination();
async move { pagination.run_backwards_once(20).await }
});
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("heyo").into_raw_sync())
.set_timeline_prev_batch("second_backpagination".to_owned())
.set_timeline_limited(),
)
.await;
let outcome = backpagination.await.expect("join failed").unwrap();
let BackPaginationOutcome { events, .. } = outcome;
assert!(!events.is_empty());
{
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 2);
assert_matches!(&diffs[0], VectorDiff::Clear);
assert_matches!(&diffs[1], VectorDiff::Append { values } => {
assert_eq!(values.len(), 1);
assert_event_matches_msg(&values[0], "heyo");
});
}
{
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => {
assert_eq!(*index, 0);
assert_event_matches_msg(event, "finally!");
});
}
assert!(room_stream.is_empty());
}
#[async_test]
async fn test_backpaginating_without_token() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
assert!(events.is_empty());
assert!(room_stream.is_empty());
server
.mock_room_messages()
.ok(RoomMessagesResponseTemplate::default()
.events(vec![f.text_msg("hi").event_id(event_id!("$2")).into_raw_timeline()]))
.mock_once()
.mount()
.await;
let pagination = room_event_cache.pagination();
let BackPaginationOutcome { events, reached_start } =
pagination.run_backwards_once(20).await.unwrap();
assert!(reached_start);
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "hi");
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "hi");
});
assert!(room_stream.is_empty());
}
#[async_test]
async fn test_limited_timeline_resets_pagination() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap();
assert!(events.is_empty());
assert!(room_stream.is_empty());
server
.mock_room_messages()
.ok(RoomMessagesResponseTemplate::default()
.events(vec![f.text_msg("hi").event_id(event_id!("$2")).into_raw_timeline()]))
.mock_once()
.mount()
.await;
let pagination = room_event_cache.pagination();
let mut pagination_status = pagination.status();
assert_eq!(pagination_status.get(), RoomPaginationStatus::Idle { hit_timeline_start: false });
let BackPaginationOutcome { events, reached_start } =
pagination.run_backwards_once(20).await.unwrap();
assert_eq!(events.len(), 1);
assert!(reached_start);
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "hi");
});
assert_next_matches_with_timeout!(
pagination_status,
RoomPaginationStatus::Idle { hit_timeline_start: true }
);
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.set_timeline_limited()
.set_timeline_prev_batch("prev-batch".to_owned()),
)
.await;
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Clear);
assert_next_matches_with_timeout!(
pagination_status,
RoomPaginationStatus::Idle { hit_timeline_start: false }
);
assert!(room_stream.is_empty());
}
#[async_test]
async fn test_limited_timeline_with_storage() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!galette:saucisse.bzh");
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(f.text_msg("hey yo")),
)
.await;
let (initial_events, mut subscriber) = room_event_cache.subscribe().await.unwrap();
if initial_events.is_empty() {
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Append { values: events } = &diffs[0]);
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "hey yo");
} else {
assert_eq!(initial_events.len(), 1);
assert_event_matches_msg(&initial_events[0], "hey yo");
}
assert!(subscriber.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("gappy!"))
.set_timeline_limited(),
)
.await;
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Append { values: events } = &diffs[0]);
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "gappy!");
assert!(subscriber.is_empty());
}
#[async_test]
async fn test_backpaginate_with_no_initial_events() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let wait_time = Duration::from_millis(500);
server
.mock_room_messages()
.ok(RoomMessagesResponseTemplate::default()
.end_token("prev_batch")
.events(vec![
f.text_msg("world").event_id(event_id!("$3")).into_raw_timeline(),
f.text_msg("hello").event_id(event_id!("$2")).into_raw_timeline(),
])
.with_delay(2 * wait_time))
.mock_once()
.mount()
.await;
server
.mock_room_messages()
.match_from("prev_batch")
.ok(RoomMessagesResponseTemplate::default()
.events(vec![f.text_msg("oh well").event_id(event_id!("$1"))]))
.mock_once()
.mount()
.await;
let pagination = room_event_cache.pagination();
let pagination_clone = pagination.clone();
let first_pagination = spawn(async move { pagination_clone.run_backwards_once(20).await });
sleep(Duration::from_millis(3000) + wait_time).await;
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("world").event_id(event_id!("$3"))),
)
.await;
first_pagination.await.expect("joining must work").expect("first backpagination must work");
pagination.run_backwards_once(20).await.unwrap();
let events = room_event_cache.events().await.unwrap();
assert_eq!(events.len(), 3, "{events:?}");
assert_event_matches_msg(&events[0], "oh well");
assert_event_matches_msg(&events[1], "hello");
assert_event_matches_msg(&events[2], "world");
}
#[async_test]
async fn test_backpaginate_replace_empty_gap() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("world").event_id(event_id!("$2")))
.set_timeline_limited()
.set_timeline_prev_batch("prev-batch".to_owned()),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
wait_for_initial_events(events, &mut stream).await;
server
.mock_room_messages()
.ok(RoomMessagesResponseTemplate::default().end_token("prev_batch"))
.mock_once()
.mount()
.await;
server
.mock_room_messages()
.match_from("prev_batch")
.ok(RoomMessagesResponseTemplate::default()
.events(vec![f.text_msg("hello").event_id(event_id!("$1"))]))
.mock_once()
.mount()
.await;
let pagination = room_event_cache.pagination();
pagination.run_backwards_once(20).await.unwrap();
pagination.run_backwards_once(20).await.unwrap();
let events = room_event_cache.events().await.unwrap();
assert_event_matches_msg(&events[0], "hello");
assert_event_matches_msg(&events[1], "world");
assert_eq!(events.len(), 2);
}
#[async_test]
async fn test_no_gap_stored_after_deduplicated_sync() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let initial_events = vec![
f.text_msg("hello").event_id(event_id!("$1")).into_raw_sync(),
f.text_msg("world").event_id(event_id!("$2")).into_raw_sync(),
f.text_msg("sup").event_id(event_id!("$3")).into_raw_sync(),
];
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_bulk(initial_events.clone())
.set_timeline_limited()
.set_timeline_prev_batch("prev-batch".to_owned()),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
if events.is_empty() {
assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv());
}
drop(events);
let pagination = room_event_cache.pagination();
server
.mock_room_messages()
.ok(RoomMessagesResponseTemplate::default())
.mock_once()
.mount()
.await;
let outcome = pagination.run_backwards_once(20).await.unwrap();
assert!(outcome.reached_start);
assert!(outcome.events.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_bulk(initial_events)
.set_timeline_limited()
.set_timeline_prev_batch("prev-batch".to_owned()),
)
.await;
assert!(stream.is_empty());
{
let events = room_event_cache.events().await.unwrap();
assert_event_matches_msg(&events[0], "hello");
assert_event_matches_msg(&events[1], "world");
assert_event_matches_msg(&events[2], "sup");
assert_eq!(events.len(), 3);
}
let outcome = pagination.run_backwards_once(20).await.unwrap();
assert!(outcome.events.is_empty());
assert!(outcome.reached_start);
{
let events = room_event_cache.events().await.unwrap();
assert_event_matches_msg(&events[0], "hello");
assert_event_matches_msg(&events[1], "world");
assert_event_matches_msg(&events[2], "sup");
assert_eq!(events.len(), 3);
}
}
#[async_test]
async fn test_no_gap_stored_after_deduplicated_backpagination() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("sup").event_id(event_id!("$3")).into_raw_sync())
.set_timeline_limited()
.set_timeline_prev_batch("prev-batch".to_owned()),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
if events.is_empty() {
assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv());
}
drop(events);
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_bulk(vec![
f.text_msg("hello").event_id(event_id!("$1")).into_raw_sync(),
f.text_msg("world").event_id(event_id!("$2")).into_raw_sync(),
f.text_msg("sup").event_id(event_id!("$3")).into_raw_sync(),
])
.set_timeline_limited()
.set_timeline_prev_batch("prev-batch2".to_owned()),
)
.await;
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
);
assert_eq!(diffs.len(), 2);
assert_matches!(&diffs[0], VectorDiff::Clear);
assert_matches!(&diffs[1], VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 3);
assert_eq!(events[0].event_id().unwrap().as_str(), "$1");
assert_eq!(events[1].event_id().unwrap().as_str(), "$2");
assert_eq!(events[2].event_id().unwrap().as_str(), "$3");
});
assert!(stream.is_empty());
server
.mock_room_messages()
.match_from("prev-batch2")
.ok(RoomMessagesResponseTemplate::default())
.mock_once()
.mount()
.await;
let pagination = room_event_cache.pagination();
let outcome = pagination.run_backwards_once(20).await.unwrap();
assert!(outcome.reached_start.not());
assert!(outcome.events.is_empty());
assert!(stream.is_empty());
server
.mock_room_messages()
.match_from("prev-batch")
.ok(RoomMessagesResponseTemplate::default().end_token("prev-batch3").events(vec![
f.text_msg("world").event_id(event_id!("$2")).into_raw_timeline(),
f.text_msg("hello").event_id(event_id!("$1")).into_raw_timeline(),
]))
.mock_once()
.mount()
.await;
let outcome = pagination.run_backwards_once(20).await.unwrap();
assert!(outcome.reached_start);
assert!(outcome.events.is_empty());
assert!(stream.is_empty());
let (events, stream) = room_event_cache.subscribe().await.unwrap();
assert_event_matches_msg(&events[0], "hello");
assert_event_matches_msg(&events[1], "world");
assert_event_matches_msg(&events[2], "sup");
assert_eq!(events.len(), 3);
assert!(stream.is_empty());
}
#[async_test]
async fn test_dont_delete_gap_that_wasnt_inserted() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("sup").event_id(event_id!("$3")).into_raw_sync())
.set_timeline_limited()
.set_timeline_prev_batch("prev-batch".to_owned()),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
if events.is_empty() {
assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv());
}
drop(events);
server
.mock_room_messages()
.match_from("prev-batch")
.ok(RoomMessagesResponseTemplate::default())
.mock_once()
.mount()
.await;
room_event_cache.pagination().run_backwards_once(20).await.unwrap();
assert!(stream.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_bulk(vec![
f.text_msg("sup").event_id(event_id!("$3")).into_raw_sync(),
]),
)
.await;
assert!(stream.is_empty());
}
#[async_test]
async fn test_apply_redaction_when_redaction_comes_later() {
let server = MatrixMockServer::new().await;
let state_memory_store = matrix_sdk_base::store::MemoryStore::new();
let event_cache_store = Arc::new(MemoryStore::new());
let store_config = StoreConfig::new("hodlor".to_owned())
.state_store(state_memory_store)
.event_cache_store(event_cache_store);
let client = server
.client_builder()
.on_builder(|builder| builder.store_config(store_config.clone()))
.build()
.await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(
f.text_msg("inapprops").event_id(event_id!("$1")).into_raw_sync(),
),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut subscriber) = room_event_cache.subscribe().await.unwrap();
if events.is_empty() {
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = subscriber.recv()
);
}
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.redaction(event_id!("$1")).into_raw_sync()),
)
.await;
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 2);
{
assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]);
assert_eq!(new_events.len(), 1);
let ev = new_events[0].raw().deserialize().unwrap();
assert_let!(
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) = ev
);
assert_eq!(ev.redacts(&RedactionRules::V1).unwrap(), event_id!("$1"));
}
{
assert_let!(VectorDiff::Set { index: 0, value: redacted_event } = &diffs[1]);
let ev = redacted_event.raw().deserialize().unwrap();
assert_let!(
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(ev)) = ev
);
assert_matches!(ev.as_original(), None);
}
assert!(subscriber.is_empty());
drop(client);
let client = server
.client_builder()
.on_builder(|builder| builder.store_config(store_config))
.build()
.await;
client.event_cache().subscribe().unwrap();
let room = client.get_room(room_id).unwrap();
let (cache, _drop_handles) = room.event_cache().await.unwrap();
let events = cache.events().await.unwrap();
assert_eq!(events.len(), 2);
let ev = events[0].raw().cast_ref_unchecked::<AnySyncMessageLikeEvent>().deserialize().unwrap();
assert!(ev.is_redacted());
assert_eq!(
events[1].raw().deserialize().unwrap().event_type(),
TimelineEventType::RoomRedaction
);
}
#[async_test]
async fn test_apply_redaction_on_an_in_store_event() {
let room_id = room_id!("!foo:bar.baz");
let event_factory = EventFactory::new().room(room_id).sender(&ALICE);
let mock_server = MatrixMockServer::new().await;
let client = mock_server.client_builder().build().await;
{
let event_cache_store = client.event_cache_store().lock().await.unwrap();
event_cache_store
.as_clean()
.unwrap()
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![
event_factory.text_msg("foo").event_id(event_id!("$ev0")).into_event(),
],
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(0)),
new: ChunkIdentifier::new(1),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(1), 0),
items: vec![
event_factory.text_msg("foo").event_id(event_id!("$ev1")).into_event(),
],
},
],
)
.await
.unwrap();
}
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room = mock_server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _room_event_cache_drop_handle) = room.event_cache().await.unwrap();
let (initial_updates, mut updates_stream) = room_event_cache.subscribe().await.unwrap();
{
assert_eq!(initial_updates.len(), 1);
assert_event_id!(initial_updates[0], "$ev1");
assert!(updates_stream.is_empty());
}
mock_server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(
event_factory
.redaction(event_id!("$ev0"))
.event_id(event_id!("$ev2"))
.into_raw_sync(),
),
)
.await;
let update = updates_stream.recv().await.unwrap();
assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_event_id!(&events[0], "$ev2");
});
});
assert!(updates_stream.is_empty());
let outcome = room_event_cache.pagination().run_backwards_until(1).await.unwrap();
assert_eq!(outcome.events.len(), 1);
assert_event_id!(outcome.events[0], "$ev0");
assert_matches!(
outcome.events[0].raw().deserialize().unwrap(),
AnySyncTimelineEvent::MessageLike(event) => {
assert!(event.is_redacted());
}
);
let update = updates_stream.recv().await.unwrap();
assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
assert_event_id!(event, "$ev0");
assert_matches!(
event.raw().deserialize().unwrap(),
AnySyncTimelineEvent::MessageLike(event) => {
assert!(event.is_redacted());
}
);
});
});
assert!(updates_stream.is_empty());
}
#[async_test]
async fn test_apply_redaction_when_redacted_and_redaction_are_in_same_sync() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_events, mut subscriber) = room_event_cache.subscribe().await.unwrap();
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("bleh").event_id(event_id!("$2")).into_raw_sync())
.add_timeline_event(f.redaction(event_id!("$2")).into_raw_sync()),
)
.await;
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 2);
{
assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]);
assert_eq!(new_events.len(), 2);
let ev = new_events[0].raw().deserialize().unwrap();
assert_let!(
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(ev)) = ev
);
assert_eq!(ev.as_original().unwrap().content.body(), "bleh");
let ev = new_events[1].raw().deserialize().unwrap();
assert_let!(
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) = ev
);
assert_eq!(ev.redacts(&RedactionRules::V1).unwrap(), event_id!("$2"));
}
{
assert_let!(VectorDiff::Set { index: 0, value: redacted_event } = &diffs[1]);
let ev = redacted_event.raw().deserialize().unwrap();
assert_let!(
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(ev)) = ev
);
assert_matches!(ev.as_original(), None);
}
assert!(subscriber.is_empty());
}
#[async_test]
async fn test_lazy_loading() {
let room_id = room_id!("!foo:bar.baz");
let event_factory = EventFactory::new().room(room_id).sender(&ALICE);
let mock_server = MatrixMockServer::new().await;
let client = mock_server.client_builder().build().await;
{
let event_cache_store = client.event_cache_store().lock().await.unwrap();
event_cache_store
.as_clean()
.unwrap()
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: (0..6)
.map(|nth| {
event_factory
.text_msg("foo")
.event_id(&EventId::parse(format!("$ev0_{nth}")).unwrap())
.into_event()
})
.collect::<Vec<_>>(),
},
Update::NewGapChunk {
previous: Some(ChunkIdentifier::new(0)),
new: ChunkIdentifier::new(1),
next: None,
gap: Gap { prev_token: "raclette".to_owned() },
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(1)),
new: ChunkIdentifier::new(2),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(2), 0),
items: (0..5)
.map(|nth| {
event_factory
.text_msg("foo")
.event_id(&EventId::parse(format!("$ev2_{nth}")).unwrap())
.into_event()
})
.collect::<Vec<_>>(),
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(2)),
new: ChunkIdentifier::new(3),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(3), 0),
items: (0..7)
.map(|nth| {
event_factory
.text_msg("foo")
.event_id(&EventId::parse(format!("$ev3_{nth}")).unwrap())
.into_event()
})
.collect::<Vec<_>>(),
},
],
)
.await
.unwrap();
}
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room = mock_server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _room_event_cache_drop_handle) = room.event_cache().await.unwrap();
let (initial_updates, mut updates_stream) = room_event_cache.subscribe().await.unwrap();
{
assert_eq!(initial_updates.len(), 7);
assert_event_id!(initial_updates[0], "$ev3_0");
assert_event_id!(initial_updates[1], "$ev3_1");
assert_event_id!(initial_updates[2], "$ev3_2");
assert_event_id!(initial_updates[3], "$ev3_3");
assert_event_id!(initial_updates[4], "$ev3_4");
assert_event_id!(initial_updates[5], "$ev3_5");
assert_event_id!(initial_updates[6], "$ev3_6");
assert!(updates_stream.is_empty());
}
{
let outcome = room_event_cache.pagination().run_backwards_until(1).await.unwrap();
assert_eq!(outcome.events.len(), 5);
assert_event_id!(outcome.events[0], "$ev2_4");
assert_event_id!(outcome.events[1], "$ev2_3");
assert_event_id!(outcome.events[2], "$ev2_2");
assert_event_id!(outcome.events[3], "$ev2_1");
assert_event_id!(outcome.events[4], "$ev2_0");
assert!(outcome.reached_start.not());
let update = updates_stream.recv().await.unwrap();
assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
assert_eq!(diffs.len(), 5);
assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
assert_event_id!(event, "$ev2_0");
});
assert_matches!(&diffs[1], VectorDiff::Insert { index: 1, value: event } => {
assert_event_id!(event, "$ev2_1");
});
assert_matches!(&diffs[2], VectorDiff::Insert { index: 2, value: event } => {
assert_event_id!(event, "$ev2_2");
});
assert_matches!(&diffs[3], VectorDiff::Insert { index: 3, value: event } => {
assert_event_id!(event, "$ev2_3");
});
assert_matches!(&diffs[4], VectorDiff::Insert { index: 4, value: event } => {
assert_event_id!(event, "$ev2_4");
});
});
assert!(updates_stream.is_empty());
}
{
let _network_pagination = mock_server
.mock_room_messages()
.match_from("raclette")
.ok(RoomMessagesResponseTemplate::default().end_token("numerobis").events(
(1..5) .rev()
.map(|nth| {
event_factory
.text_msg("foo")
.event_id(&EventId::parse(format!("$ev1_{nth}")).unwrap())
})
.collect::<Vec<_>>(),
))
.mock_once()
.mount_as_scoped()
.await;
let outcome = room_event_cache.pagination().run_backwards_until(1).await.unwrap();
assert_eq!(outcome.events.len(), 4);
assert_event_id!(outcome.events[0], "$ev1_4");
assert_event_id!(outcome.events[1], "$ev1_3");
assert_event_id!(outcome.events[2], "$ev1_2");
assert_event_id!(outcome.events[3], "$ev1_1");
assert!(outcome.reached_start.not());
let update = updates_stream.recv().await.unwrap();
assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
assert_eq!(diffs.len(), 4);
assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
assert_event_id!(event, "$ev1_1");
});
assert_matches!(&diffs[1], VectorDiff::Insert { index: 1, value: event } => {
assert_event_id!(event, "$ev1_2");
});
assert_matches!(&diffs[2], VectorDiff::Insert { index: 2, value: event } => {
assert_event_id!(event, "$ev1_3");
});
assert_matches!(&diffs[3], VectorDiff::Insert { index: 3, value: event } => {
assert_event_id!(event, "$ev1_4");
});
});
assert!(updates_stream.is_empty());
}
{
let _network_pagination = mock_server
.mock_room_messages()
.match_from("numerobis")
.ok(RoomMessagesResponseTemplate::default().end_token("trois").events(vec![
event_factory.text_msg("foo").event_id(event_id!("$ev1_0")),
event_factory.text_msg("foo").event_id(event_id!("$ev0_5")),
]))
.mock_once()
.mount_as_scoped()
.await;
let outcome = room_event_cache.pagination().run_backwards_until(1).await.unwrap();
assert_eq!(outcome.events.len(), 2);
assert_event_id!(outcome.events[0], "$ev1_0");
assert_event_id!(outcome.events[1], "$ev0_5");
assert!(outcome.reached_start.not());
let update = updates_stream.recv().await.unwrap();
assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
assert_eq!(diffs.len(), 2);
assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
assert_event_id!(event, "$ev0_5");
});
assert_matches!(&diffs[1], VectorDiff::Insert { index: 1, value: event } => {
assert_event_id!(event, "$ev1_0");
});
});
assert!(updates_stream.is_empty());
}
{
let _network_pagination = mock_server
.mock_room_messages()
.match_from("trois")
.ok(RoomMessagesResponseTemplate::default().end_token("quattuor").events(
(4..6)
.rev()
.map(|nth| {
event_factory
.text_msg("foo")
.event_id(&EventId::parse(format!("$ev0_{nth}")).unwrap())
})
.collect::<Vec<_>>(),
))
.mock_once() .mount_as_scoped()
.await;
let outcome = room_event_cache.pagination().run_backwards_until(1).await.unwrap();
assert_eq!(outcome.events.len(), 5);
assert_event_id!(outcome.events[0], "$ev0_4");
assert_event_id!(outcome.events[1], "$ev0_3");
assert_event_id!(outcome.events[2], "$ev0_2");
assert_event_id!(outcome.events[3], "$ev0_1");
assert_event_id!(outcome.events[4], "$ev0_0");
assert!(outcome.reached_start);
let update = updates_stream.recv().await.unwrap();
assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
assert_eq!(diffs.len(), 5);
assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
assert_event_id!(event, "$ev0_0");
});
assert_matches!(&diffs[1], VectorDiff::Insert { index: 1, value: event } => {
assert_event_id!(event, "$ev0_1");
});
assert_matches!(&diffs[2], VectorDiff::Insert { index: 2, value: event } => {
assert_event_id!(event, "$ev0_2");
});
assert_matches!(&diffs[3], VectorDiff::Insert { index: 3, value: event } => {
assert_event_id!(event, "$ev0_3");
});
assert_matches!(&diffs[4], VectorDiff::Insert { index: 4, value: event } => {
assert_event_id!(event, "$ev0_4");
});
});
}
assert!(updates_stream.is_empty());
}
#[async_test]
async fn test_deduplication() {
let room_id = room_id!("!foo:bar.baz");
let event_factory = EventFactory::new().room(room_id).sender(&ALICE);
let mock_server = MatrixMockServer::new().await;
let client = mock_server.client_builder().build().await;
{
let event_cache_store = client.event_cache_store().lock().await.unwrap();
event_cache_store
.as_clean()
.unwrap()
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: (0..4)
.map(|nth| {
event_factory
.text_msg("foo")
.event_id(&EventId::parse(format!("$ev0_{nth}")).unwrap())
.into_event()
})
.collect::<Vec<_>>(),
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(0)),
new: ChunkIdentifier::new(1),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(1), 0),
items: (0..3)
.map(|nth| {
event_factory
.text_msg("foo")
.event_id(&EventId::parse(format!("$ev1_{nth}")).unwrap())
.into_event()
})
.collect::<Vec<_>>(),
},
],
)
.await
.unwrap();
}
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room = mock_server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _room_event_cache_drop_handle) = room.event_cache().await.unwrap();
let (initial_updates, mut updates_stream) = room_event_cache.subscribe().await.unwrap();
{
assert_eq!(initial_updates.len(), 3);
assert_event_id!(initial_updates[0], "$ev1_0");
assert_event_id!(initial_updates[1], "$ev1_1");
assert_event_id!(initial_updates[2], "$ev1_2");
assert!(updates_stream.is_empty());
}
mock_server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(event_factory.text_msg("foo").event_id(event_id!("$ev1_0")))
.add_timeline_event(event_factory.text_msg("foo").event_id(event_id!("$ev1_2")))
.add_timeline_event(event_factory.text_msg("foo").event_id(event_id!("$ev0_1")))
.add_timeline_event(event_factory.text_msg("foo").event_id(event_id!("$ev0_2")))
.add_timeline_event(event_factory.text_msg("foo").event_id(event_id!("$ev3_0")))
.add_timeline_event(event_factory.text_msg("foo").event_id(event_id!("$ev3_1"))),
)
.await;
{
let update = updates_stream.recv().await.unwrap();
assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
assert_eq!(diffs.len(), 3);
assert_matches!(&diffs[0], VectorDiff::Remove { index } => {
assert_eq!(*index, 2);
});
assert_matches!(&diffs[1], VectorDiff::Remove { index } => {
assert_eq!(*index, 0);
});
assert_matches!(&diffs[2], VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 6);
assert_event_id!(&events[0], "$ev1_0");
assert_event_id!(&events[1], "$ev1_2");
assert_event_id!(&events[2], "$ev0_1");
assert_event_id!(&events[3], "$ev0_2");
assert_event_id!(&events[4], "$ev3_0");
assert_event_id!(&events[5], "$ev3_1");
});
});
}
{
let outcome = room_event_cache.pagination().run_backwards_until(1).await.unwrap();
assert_eq!(outcome.events.len(), 2);
assert_event_id!(outcome.events[0], "$ev0_3");
assert_event_id!(outcome.events[1], "$ev0_0");
let update = updates_stream.recv().await.unwrap();
assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
assert_eq!(diffs.len(), 2);
assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
assert_event_id!(event, "$ev0_0");
});
assert_matches!(&diffs[1], VectorDiff::Insert { index: 1, value: event } => {
assert_event_id!(event, "$ev0_3");
});
});
}
}
#[async_test]
async fn test_timeline_then_empty_timeline_then_deduplication_with_storage() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!galette:saucisse.bzh");
let room = server.sync_joined_room(&client, room_id).await;
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
let previous_events = [
f.text_msg("previous1").event_id(event_id!("$prev1")).into_raw_timeline(),
f.text_msg("previous2").event_id(event_id!("$prev2")).into_raw_timeline(),
f.text_msg("previous3").event_id(event_id!("$prev3")).into_raw_timeline(),
];
let latest_events = [
f.text_msg("latest3").event_id(event_id!("$latest3")).into_raw_timeline(),
f.text_msg("latest2").event_id(event_id!("$latest2")).into_raw_timeline(),
f.text_msg("latest1").event_id(event_id!("$latest1")).into_raw_timeline(),
];
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (initial_events, mut subscriber) = room_event_cache.subscribe().await.unwrap();
assert!(initial_events.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.set_timeline_limited()
.set_timeline_prev_batch("token-before-latest")
.add_timeline_bulk(latest_events.clone().into_iter().map(ruma::serde::Raw::cast)),
)
.await;
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 2);
assert_let!(VectorDiff::Clear = &diffs[0]);
assert_let!(VectorDiff::Append { values } = &diffs[1]);
assert_eq!(values.len(), 3);
assert_event_matches_msg(&values[0], "latest3");
assert_event_matches_msg(&values[1], "latest2");
assert_event_matches_msg(&values[2], "latest1");
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.set_timeline_limited()
.set_timeline_prev_batch("token-after-latest"),
)
.await;
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Clear = &diffs[0]);
assert!(subscriber.is_empty());
let all_events = previous_events.into_iter().chain(latest_events).rev().collect::<Vec<_>>();
server
.mock_room_messages()
.match_from("token-after-latest")
.ok(RoomMessagesResponseTemplate::default().end_token("messages-end-2").events(all_events))
.named("messages-since-after-latest")
.mount()
.await;
let outcome = room_event_cache.pagination().run_backwards_once(10).await.unwrap();
assert!(outcome.reached_start.not());
assert_eq!(outcome.events.len(), 6);
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Append { values } = &diffs[0]);
assert_eq!(values.len(), 6);
assert_event_matches_msg(&values[0], "previous1");
assert_event_matches_msg(&values[1], "previous2");
assert_event_matches_msg(&values[2], "previous3");
assert_event_matches_msg(&values[3], "latest3");
assert_event_matches_msg(&values[4], "latest2");
assert_event_matches_msg(&values[5], "latest1");
assert!(subscriber.is_empty());
}
#[async_test]
async fn test_dont_remove_only_gap() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!galette:saucisse.bzh");
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.set_timeline_limited()
.set_timeline_prev_batch("brillat-savarin"),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
server
.mock_room_messages()
.match_from("brillat-savarin")
.ok(RoomMessagesResponseTemplate::default())
.named("room/messages")
.mount()
.await;
let outcome = room_event_cache.pagination().run_backwards_once(16).await.unwrap();
assert!(outcome.reached_start);
}
#[async_test]
async fn test_clear_all_rooms() {
let sleeping_room_id = room_id!("!dodo:saucisse.bzh");
let event_cache_store = Arc::new(MemoryStore::new());
let f = EventFactory::new().room(sleeping_room_id);
let ev0 = f.text_msg("hi").sender(*ALICE).event_id(event_id!("$ev0")).into_event();
{
let cid = ChunkIdentifier::new(0);
event_cache_store
.handle_linked_chunk_updates(
LinkedChunkId::Room(sleeping_room_id),
vec![
Update::NewItemsChunk { previous: None, new: cid, next: None },
Update::PushItems { at: Position::new(cid, 0), items: vec![ev0] },
],
)
.await
.unwrap();
}
let server = MatrixMockServer::new().await;
let client = server
.client_builder()
.on_builder(|builder| {
builder.store_config(
StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
)
})
.build()
.await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!galette:saucisse.bzh");
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(
f.text_msg("bonchourhan").sender(*BOB).event_id(event_id!("$ev1")),
),
)
.await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (initial, mut room_updates) = room_event_cache.subscribe().await.unwrap();
let mut initial = Vector::from(initial);
if initial.is_empty() {
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_updates.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(diffs[0], VectorDiff::Append { .. });
diffs[0].clone().apply(&mut initial);
}
assert_eq!(initial.len(), 1);
assert_event_id!(initial[0], "$ev1");
client.event_cache().clear_all_rooms().await.unwrap();
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_updates.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Clear = &diffs[0]);
let (maybe_last_chunk, _chunk_id_gen) =
event_cache_store.load_last_chunk(LinkedChunkId::Room(sleeping_room_id)).await.unwrap();
assert!(maybe_last_chunk.is_none());
}
#[async_test]
async fn test_sync_while_back_paginate() {
let server = MatrixMockServer::new().await;
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
let prev_events = vec![
f.text_msg("messages3").event_id(event_id!("$messages3")).into_raw_timeline(),
f.text_msg("messages2").event_id(event_id!("$messages2")).into_raw_timeline(),
f.text_msg("messages1").event_id(event_id!("$messages1")).into_raw_timeline(),
];
let sync_events = [
f.text_msg("sync1").event_id(event_id!("$sync1")).into_raw_timeline(),
f.text_msg("sync2").event_id(event_id!("$sync2")).into_raw_timeline(),
f.text_msg("sync3").event_id(event_id!("$sync3")).into_raw_timeline(),
];
let state_memory_store = matrix_sdk_base::store::MemoryStore::new();
let store_config = StoreConfig::new("le_store".to_owned())
.event_cache_store(Arc::new(MemoryStore::new()))
.state_store(state_memory_store);
{
let client = server
.client_builder()
.on_builder(|builder| builder.store_config(store_config.clone()))
.build()
.await;
server.sync_joined_room(&client, room_id).await;
}
let client = server
.client_builder()
.on_builder(|builder| builder.store_config(store_config))
.build()
.await;
let room = client.get_room(room_id).unwrap();
client.event_cache().subscribe().unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (initial_events, mut subscriber) = room_event_cache.subscribe().await.unwrap();
assert!(initial_events.is_empty());
server
.mock_room_messages()
.match_from("token-before-sync-from-sync")
.ok(RoomMessagesResponseTemplate::default()
.end_token("token-before-messages")
.events(prev_events))
.named("messages")
.mount()
.await;
server
.mock_room_messages()
.ok(RoomMessagesResponseTemplate::default()
.end_token("token-before-sync-from-messages")
.events(sync_events.clone().into_iter().rev().collect()))
.named("messages")
.mount()
.await;
let pagination = room_event_cache.pagination();
let back_pagination_handle =
spawn(async move { pagination.run_backwards_once(3).await.unwrap() });
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.set_timeline_prev_batch("token-before-sync-from-sync")
.add_timeline_bulk(sync_events.into_iter().map(ruma::serde::Raw::cast)),
)
.await;
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Append { values } = &diffs[0]);
assert_eq!(values.len(), 3);
assert_event_matches_msg(&values[0], "sync1");
assert_event_matches_msg(&values[1], "sync2");
assert_event_matches_msg(&values[2], "sync3");
let outcome = back_pagination_handle.await.unwrap();
assert!(outcome.reached_start.not());
assert_eq!(outcome.events.len(), 3);
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv()
);
assert_eq!(diffs.len(), 3);
assert_let!(VectorDiff::Insert { index: 0, value: _ } = &diffs[0]);
assert_let!(VectorDiff::Insert { index: 1, value: _ } = &diffs[1]);
assert_let!(VectorDiff::Insert { index: 2, value: _ } = &diffs[2]);
assert!(subscriber.is_empty());
}
#[async_test]
async fn test_relations_ordering() {
let server = MatrixMockServer::new().await;
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(*ALICE);
let target_event_id = event_id!("$1");
let ev1 = f.text_msg("bonjour monde").event_id(target_event_id).into_event();
let event_cache_store = Arc::new(MemoryStore::new());
event_cache_store
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk { previous: None, new: ChunkIdentifier::new(0), next: None },
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![ev1.clone()],
},
],
)
.await
.unwrap();
let client = server
.client_builder()
.on_builder(|builder| {
builder.store_config(
StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
)
})
.build()
.await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (initial_events, mut listener) = room_event_cache.subscribe().await.unwrap();
assert_eq!(initial_events.len(), 1);
assert!(listener.recv().now_or_never().is_none());
let (_, relations) =
room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap().unwrap();
assert!(relations.is_empty());
let edit2 = event_id!("$edit2");
let ev2 = f
.text_msg("* hola mundo")
.edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("hola mundo"))
.event_id(edit2)
.into_raw();
let edit3 = event_id!("$edit3");
let ev3 = f
.text_msg("* ciao mondo")
.edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("ciao mondo"))
.event_id(edit3);
let edit4 = event_id!("$edit4");
let ev4 = f
.text_msg("* hello world")
.edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("hello world"))
.event_id(edit4);
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(ev3)
.add_timeline_event(ev4)
.set_timeline_limited()
.set_timeline_prev_batch("prev_batch"),
)
.await;
loop {
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = listener.recv()
);
if diffs.iter().any(|diff| matches!(diff, VectorDiff::Clear)) {
break;
}
}
let (_, relations) =
room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap().unwrap();
assert_eq!(relations.len(), 2);
assert_eq!(relations[0].event_id().unwrap(), edit3);
assert_eq!(relations[1].event_id().unwrap(), edit4);
server
.mock_room_messages()
.match_from("prev_batch")
.ok(RoomMessagesResponseTemplate::default().events(vec![ev2.clone()]))
.named("room/messages")
.mock_once()
.mount()
.await;
let outcome = room_event_cache.pagination().run_backwards_once(1).await.unwrap();
assert!(outcome.reached_start.not());
assert_eq!(outcome.events.len(), 1);
{
let outcome = room_event_cache.pagination().run_backwards_once(1).await.unwrap();
assert!(outcome.reached_start);
}
let (_, relations) =
room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap().unwrap();
assert_eq!(relations.len(), 3);
assert_eq!(relations[0].event_id().unwrap(), edit2);
assert_eq!(relations[1].event_id().unwrap(), edit3);
assert_eq!(relations[2].event_id().unwrap(), edit4);
let edit5 = event_id!("$edit5");
let ev5 = f
.text_msg("* hallo Welt")
.edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("hallo Welt"))
.event_id(edit5)
.into_event();
server.mock_room_event().ok(ev5).mock_once().mount().await;
room.event(edit5, None).await.unwrap();
let (_, relations) =
room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap().unwrap();
assert_eq!(relations.len(), 4);
assert_eq!(relations[0].event_id().unwrap(), edit5);
assert_eq!(relations[1].event_id().unwrap(), edit2);
assert_eq!(relations[2].event_id().unwrap(), edit3);
assert_eq!(relations[3].event_id().unwrap(), edit4);
}