pub mod pagination;
mod state;
mod subscriber;
mod updates;
use std::{
collections::BTreeMap,
fmt,
sync::{Arc, atomic::Ordering},
};
use eyeball::SharedObservable;
use matrix_sdk_base::{
deserialized_responses::AmbiguityChange,
event_cache::Event,
sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
};
use ruma::{
EventId, OwnedEventId, OwnedRoomId, RoomId,
events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
serde::Raw,
};
pub(super) use state::{LockedRoomEventCacheState, RoomEventCacheStateLockWriteGuard};
pub use subscriber::RoomEventCacheSubscriber;
use tokio::sync::{Notify, broadcast::Receiver, mpsc};
use tracing::{instrument, trace, warn};
pub use updates::{
RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate,
RoomEventCacheUpdateSender,
};
use super::{
super::{AutoShrinkChannelPayload, EventCacheError, EventsOrigin, Result, RoomPagination},
TimelineVectorDiffs,
event_linked_chunk::sort_positions_descending,
thread::pagination::ThreadPagination,
};
use crate::{
client::WeakClient,
event_cache::{
EventFocusThreadMode,
caches::{event_focused::EventFocusedCache, pagination::SharedPaginationStatus},
},
room::WeakRoom,
};
#[derive(Clone)]
pub struct RoomEventCache {
inner: Arc<RoomEventCacheInner>,
}
impl fmt::Debug for RoomEventCache {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RoomEventCache").finish_non_exhaustive()
}
}
impl RoomEventCache {
pub(super) fn new(
room_id: OwnedRoomId,
weak_room: WeakRoom,
state: LockedRoomEventCacheState,
shared_pagination_status: SharedObservable<SharedPaginationStatus>,
auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
update_sender: RoomEventCacheUpdateSender,
) -> Self {
Self {
inner: Arc::new(RoomEventCacheInner::new(
room_id,
weak_room,
state,
shared_pagination_status,
auto_shrink_sender,
update_sender,
)),
}
}
pub fn room_id(&self) -> &RoomId {
&self.inner.room_id
}
pub async fn events(&self) -> Result<Vec<Event>> {
let state = self.inner.state.read().await?;
Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
}
pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
let state = self.inner.state.read().await?;
let events =
state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
let subscriber_count = state.subscriber_count();
let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
let subscriber = RoomEventCacheSubscriber::new(
self.inner.update_sender.new_room_receiver(),
self.inner.room_id.clone(),
self.inner.auto_shrink_sender.clone(),
subscriber_count.clone(),
);
Ok((events, subscriber))
}
pub async fn subscribe_to_thread(
&self,
thread_root: OwnedEventId,
) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
let mut state = self.inner.state.write().await?;
state.subscribe_to_thread(thread_root).await
}
pub async fn subscribe_to_pinned_events(
&self,
) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
let state = self.inner.state.read().await?;
state.subscribe_to_pinned_events(room).await
}
#[instrument(skip(self), fields(room_id = %self.inner.room_id, event_id = %event_id, thread_mode = ?thread_mode))]
pub async fn get_or_create_event_focused_cache(
&self,
event_id: OwnedEventId,
num_context_events: u16,
thread_mode: EventFocusThreadMode,
) -> Result<EventFocusedCache> {
let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
let guard = self.inner.state.read().await?;
if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
trace!("the cache was already created, returning it");
return Ok(cache);
}
let linked_chunk_update_sender = guard.state.linked_chunk_update_sender.clone();
drop(guard);
let room_id = room.room_id().to_owned();
let weak_room = WeakRoom::new(WeakClient::from_client(&room.client()), room_id.clone());
trace!("creating a fresh event-focused cache");
let cache = EventFocusedCache::new(weak_room, event_id.clone(), linked_chunk_update_sender);
cache.start_from(room, num_context_events, thread_mode).await?;
let mut guard = self.inner.state.write().await?;
if let Some(cache) = guard.get_event_focused_cache(event_id.clone(), thread_mode) {
trace!("another cache has been racily created, returning it");
return Ok(cache);
}
guard.insert_event_focused_cache(event_id, thread_mode, cache.clone());
Ok(cache)
}
#[instrument(skip(self), fields(room_id = %self.inner.room_id))]
pub async fn get_event_focused_cache(
&self,
event_id: OwnedEventId,
thread_mode: EventFocusThreadMode,
) -> Result<Option<EventFocusedCache>> {
Ok(self.inner.state.read().await?.get_event_focused_cache(event_id, thread_mode))
}
pub fn pagination(&self) -> RoomPagination {
RoomPagination::new(self.inner.clone())
}
pub async fn thread_pagination(&self, thread_id: OwnedEventId) -> Result<ThreadPagination> {
Ok(self.inner.state.write().await?.get_or_reload_thread(thread_id).pagination())
}
pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
where
P: FnMut(&Event) -> Option<O>,
{
Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
}
pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
Ok(self
.inner
.state
.read()
.await?
.find_event(event_id)
.await
.ok()
.flatten()
.map(|(_loc, event)| event))
}
pub async fn find_event_with_relations(
&self,
event_id: &EventId,
filter: Option<Vec<RelationType>>,
) -> Result<Option<(Event, Vec<Event>)>> {
Ok(self
.inner
.state
.read()
.await?
.find_event_with_relations(event_id, filter.clone())
.await
.ok()
.flatten())
}
pub async fn find_event_relations(
&self,
event_id: &EventId,
filter: Option<Vec<RelationType>>,
) -> Result<Vec<Event>> {
self.inner.state.read().await?.find_event_relations(event_id, filter.clone()).await
}
pub async fn clear(&self) -> Result<()> {
let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
self.inner.update_sender.send(
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
diffs: updates_as_vector_diffs,
origin: EventsOrigin::Cache,
}),
Some(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() }),
);
Ok(())
}
pub(in super::super) fn state(&self) -> &LockedRoomEventCacheState {
&self.inner.state
}
#[instrument(skip_all, fields(room_id = %self.room_id()))]
pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
self.inner
.handle_timeline(updates.timeline, updates.ephemeral.clone(), updates.ambiguity_changes)
.await?;
self.inner.handle_account_data(updates.account_data);
Ok(())
}
#[instrument(skip_all, fields(room_id = %self.room_id()))]
pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
self.inner.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
Ok(())
}
pub(in super::super) fn update_sender(&self) -> &RoomEventCacheUpdateSender {
&self.inner.update_sender
}
pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
self.inner.insert_sent_event_from_send_queue(event).await
}
pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
match self.inner.state.write().await {
Ok(mut state_guard) => {
if let Err(err) = state_guard.save_events(events).await {
warn!("couldn't save event in the event cache: {err}");
}
}
Err(err) => {
warn!("couldn't save event in the event cache: {err}");
}
}
}
pub async fn debug_string(&self) -> Vec<String> {
match self.inner.state.read().await {
Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
Err(err) => {
warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
vec![]
}
}
}
}
pub(super) struct RoomEventCacheInner {
room_id: OwnedRoomId,
pub weak_room: WeakRoom,
pub state: LockedRoomEventCacheState,
pub pagination_batch_token_notifier: Notify,
pub shared_pagination_status: SharedObservable<SharedPaginationStatus>,
auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
update_sender: RoomEventCacheUpdateSender,
}
impl RoomEventCacheInner {
fn new(
room_id: OwnedRoomId,
weak_room: WeakRoom,
state: LockedRoomEventCacheState,
shared_pagination_status: SharedObservable<SharedPaginationStatus>,
auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
update_sender: RoomEventCacheUpdateSender,
) -> Self {
Self {
room_id,
weak_room,
state,
update_sender,
pagination_batch_token_notifier: Default::default(),
auto_shrink_sender,
shared_pagination_status,
}
}
fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
if account_data.is_empty() {
return;
}
let mut handled_read_marker = false;
trace!("Handling account data");
for raw_event in account_data {
match raw_event.deserialize() {
Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
if handled_read_marker {
continue;
}
handled_read_marker = true;
self.update_sender.send(
RoomEventCacheUpdate::MoveReadMarkerTo { event_id: ev.content.event_id },
None,
);
}
Ok(_) => {
}
Err(e) => {
let event_type = raw_event.get_field::<String>("type").ok().flatten();
warn!(event_type, "Failed to deserialize account data: {e}");
}
}
}
}
async fn handle_timeline(
&self,
timeline: Timeline,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
self.handle_timeline_inner(
self.state.write().await?,
timeline,
ephemeral_events,
ambiguity_changes,
)
.await
}
async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
let state = self.state.write().await?;
if state.room_linked_chunk().events().next().is_some() {
return self
.handle_timeline_inner(
state,
Timeline { limited: false, prev_batch: None, events: vec![event] },
Vec::new(),
BTreeMap::new(),
)
.await;
}
Ok(())
}
async fn handle_timeline_inner(
&self,
mut state: RoomEventCacheStateLockWriteGuard<'_>,
timeline: Timeline,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
if timeline.events.is_empty()
&& timeline.prev_batch.is_none()
&& ephemeral_events.is_empty()
&& ambiguity_changes.is_empty()
{
return Ok(());
}
trace!("adding new events");
let (stored_prev_batch_token, timeline_event_diffs) =
state.handle_sync(timeline, &ephemeral_events).await?;
drop(state);
if stored_prev_batch_token {
self.pagination_batch_token_notifier.notify_one();
}
if !timeline_event_diffs.is_empty() {
self.update_sender.send(
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
diffs: timeline_event_diffs,
origin: EventsOrigin::Sync,
}),
Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
);
}
if !ephemeral_events.is_empty() {
self.update_sender
.send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events }, None);
}
if !ambiguity_changes.is_empty() {
self.update_sender
.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes }, None);
}
Ok(())
}
}
#[derive(Clone, Copy)]
pub(in super::super) enum PostProcessingOrigin {
Sync,
Backpagination,
#[cfg(feature = "e2e-encryption")]
Redecryption,
}
#[cfg(test)]
mod tests {
use matrix_sdk_base::{RoomState, event_cache::Event};
use matrix_sdk_test::{async_test, event_factory::EventFactory};
use ruma::{
RoomId, event_id,
events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
room_id, user_id,
};
use crate::test_utils::logged_in_client;
#[async_test]
async fn test_find_event_by_id_with_edit_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
assert_relations(
room_id,
f.text_msg("Original event").event_id(original_id).into(),
f.text_msg("* An edited event")
.edit(
original_id,
RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
)
.event_id(related_id)
.into(),
f,
)
.await;
}
#[async_test]
async fn test_find_event_by_id_with_thread_reply_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
assert_relations(
room_id,
f.text_msg("Original event").event_id(original_id).into(),
f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
f,
)
.await;
}
#[async_test]
async fn test_find_event_by_id_with_reaction_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
assert_relations(
room_id,
f.text_msg("Original event").event_id(original_id).into(),
f.reaction(original_id, ":D").event_id(related_id).into(),
f,
)
.await;
}
#[async_test]
async fn test_find_event_by_id_with_poll_response_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
assert_relations(
room_id,
f.poll_start("Poll start event", "A poll question", vec!["An answer"])
.event_id(original_id)
.into(),
f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
f,
)
.await;
}
#[async_test]
async fn test_find_event_by_id_with_poll_end_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
assert_relations(
room_id,
f.poll_start("Poll start event", "A poll question", vec!["An answer"])
.event_id(original_id)
.into(),
f.poll_end("Poll ended", original_id).event_id(related_id).into(),
f,
)
.await;
}
#[async_test]
async fn test_find_event_by_id_with_filtered_relationships() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let associated_related_id = event_id!("$recursive_related");
let room_id = room_id!("!galette:saucisse.bzh");
let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
let related_event = event_factory
.text_msg("* Edited event")
.edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
.event_id(related_id)
.into();
let associated_related_event =
event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
let client = logged_in_client(None).await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
room_event_cache.save_events([original_event]).await;
room_event_cache.save_events([related_event]).await;
room_event_cache.save_events([associated_related_event]).await;
let filter = Some(vec![RelationType::Replacement]);
let (event, related_events) = room_event_cache
.find_event_with_relations(original_id, filter)
.await
.expect("Failed to find the event with relations")
.expect("Event has no relation");
let cached_event_id = event.event_id().unwrap();
assert_eq!(cached_event_id, original_id);
assert_eq!(related_events.len(), 1);
let related_event_id = related_events[0].event_id().unwrap();
assert_eq!(related_event_id, related_id);
let filter = Some(vec![RelationType::Thread]);
let (event, related_events) = room_event_cache
.find_event_with_relations(original_id, filter)
.await
.expect("Failed to find the event with relations")
.expect("Event has no relation");
let cached_event_id = event.event_id().unwrap();
assert_eq!(cached_event_id, original_id);
assert!(related_events.is_empty());
}
#[async_test]
async fn test_find_event_by_id_with_recursive_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let associated_related_id = event_id!("$recursive_related");
let room_id = room_id!("!galette:saucisse.bzh");
let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
let related_event = event_factory
.text_msg("* Edited event")
.edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
.event_id(related_id)
.into();
let associated_related_event =
event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
let client = logged_in_client(None).await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
room_event_cache.save_events([original_event]).await;
room_event_cache.save_events([related_event]).await;
room_event_cache.save_events([associated_related_event]).await;
let (event, related_events) = room_event_cache
.find_event_with_relations(original_id, None)
.await
.expect("Failed to find the event with relations")
.expect("Event has no relation");
let cached_event_id = event.event_id().unwrap();
assert_eq!(cached_event_id, original_id);
assert_eq!(related_events.len(), 2);
let related_event_id = related_events[0].event_id().unwrap();
assert_eq!(related_event_id, related_id);
let related_event_id = related_events[1].event_id().unwrap();
assert_eq!(related_event_id, associated_related_id);
}
async fn assert_relations(
room_id: &RoomId,
original_event: Event,
related_event: Event,
event_factory: EventFactory,
) {
let client = logged_in_client(None).await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let original_event_id = original_event.event_id().unwrap();
room_event_cache.save_events([original_event]).await;
let unrelated_id = event_id!("$2");
room_event_cache
.save_events([event_factory
.text_msg("An unrelated event")
.event_id(unrelated_id)
.into()])
.await;
let related_id = related_event.event_id().unwrap();
room_event_cache.save_events([related_event]).await;
let (event, related_events) = room_event_cache
.find_event_with_relations(&original_event_id, None)
.await
.expect("Failed to find the event with relations")
.expect("Event has no relation");
let cached_event_id = event.event_id().unwrap();
assert_eq!(cached_event_id, original_event_id);
let related_event_id = related_events[0].event_id().unwrap();
assert_eq!(related_event_id, related_id);
}
}
#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
use std::{ops::Not, sync::Arc};
use assert_matches::assert_matches;
use assert_matches2::assert_let;
use eyeball_im::VectorDiff;
use futures_util::FutureExt;
use matrix_sdk_base::{
RoomState,
event_cache::{
Gap,
store::{EventCacheStore as _, MemoryStore},
},
linked_chunk::{
ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
lazy_loader::from_all_chunks,
},
store::StoreConfig,
sync::{JoinedRoomUpdate, Timeline},
};
use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
use ruma::{
EventId, event_id,
events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
room_id,
serde::Raw,
user_id,
};
use serde_json::json;
use tokio::task::yield_now;
use super::{
super::{
super::TimelineVectorDiffs, lock::Reload as _,
pagination::LoadMoreEventsBackwardsOutcome,
},
RoomEventCache, RoomEventCacheGenericUpdate, RoomEventCacheUpdate,
};
use crate::{assert_let_timeout, test_utils::client::MockClientBuilder};
#[async_test]
async fn test_write_to_storage() {
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
let event_cache_store = Arc::new(MemoryStore::new());
let client = MockClientBuilder::new(None)
.on_builder(|builder| {
builder.store_config(
StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
.event_cache_store(event_cache_store.clone()),
)
})
.build()
.await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let timeline = Timeline {
limited: true,
prev_batch: Some("raclette".to_owned()),
events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
};
room_event_cache
.handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
.await
.unwrap();
assert_matches!(
generic_stream.recv().await,
Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
assert_eq!(expected_room_id, room_id);
}
);
assert!(generic_stream.is_empty());
let linked_chunk = from_all_chunks::<3, _, _>(
event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
)
.unwrap()
.unwrap();
assert_eq!(linked_chunk.chunks().count(), 2);
let mut chunks = linked_chunk.chunks();
assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
assert_eq!(gap.token, "raclette");
});
assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
assert_eq!(events.len(), 1);
let deserialized = events[0].raw().deserialize().unwrap();
assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
});
assert!(chunks.next().is_none());
}
#[async_test]
async fn test_write_to_storage_strips_bundled_relations() {
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
let event_cache_store = Arc::new(MemoryStore::new());
let client = MockClientBuilder::new(None)
.on_builder(|builder| {
builder.store_config(
StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
.event_cache_store(event_cache_store.clone()),
)
})
.build()
.await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let ev = f
.text_msg("hey yo")
.sender(*ALICE)
.with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
.into_event();
let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
room_event_cache
.handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
.await
.unwrap();
assert_matches!(
generic_stream.recv().await,
Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
assert_eq!(expected_room_id, room_id);
}
);
assert!(generic_stream.is_empty());
{
let events = room_event_cache.events().await.unwrap();
assert_eq!(events.len(), 1);
let ev = events[0].raw().deserialize().unwrap();
assert_let!(
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
);
let original = msg.as_original().unwrap();
assert_eq!(original.content.body(), "hey yo");
assert!(original.unsigned.relations.replace.is_some());
}
let linked_chunk = from_all_chunks::<3, _, _>(
event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
)
.unwrap()
.unwrap();
assert_eq!(linked_chunk.chunks().count(), 1);
let mut chunks = linked_chunk.chunks();
assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
assert_eq!(events.len(), 1);
let ev = events[0].raw().deserialize().unwrap();
assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
let original = msg.as_original().unwrap();
assert_eq!(original.content.body(), "hey yo");
assert!(original.unsigned.relations.replace.is_none());
});
assert!(chunks.next().is_none());
}
#[async_test]
async fn test_clear() {
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
let event_cache_store = Arc::new(MemoryStore::new());
let event_id1 = event_id!("$1");
let event_id2 = event_id!("$2");
let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
event_cache_store
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::NewGapChunk {
previous: Some(ChunkIdentifier::new(0)),
new: ChunkIdentifier::new(42),
next: None,
gap: Gap { token: "comté".to_owned() },
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(42)),
new: ChunkIdentifier::new(1),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(1), 0),
items: vec![ev1.clone()],
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(1)),
new: ChunkIdentifier::new(2),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(2), 0),
items: vec![ev2.clone()],
},
],
)
.await
.unwrap();
let client = MockClientBuilder::new(None)
.on_builder(|builder| {
builder.store_config(
StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
.event_cache_store(event_cache_store.clone()),
)
})
.build()
.await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
{
assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
}
{
assert_eq!(items.len(), 1);
assert_eq!(items[0].event_id().unwrap(), event_id2);
assert!(stream.is_empty());
}
{
room_event_cache.pagination().run_backwards_once(20).await.unwrap();
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
assert_eq!(event.event_id().unwrap(), event_id1);
});
assert!(stream.is_empty());
assert_let_timeout!(
Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) =
generic_stream.recv()
);
assert_eq!(room_id, expected_room_id);
assert!(generic_stream.is_empty());
}
room_event_cache.clear().await.unwrap();
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_let!(VectorDiff::Clear = &diffs[0]);
assert_let_timeout!(
Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
);
assert_eq!(received_room_id, room_id);
assert!(generic_stream.is_empty());
assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
let items = room_event_cache.events().await.unwrap();
assert!(items.is_empty());
let linked_chunk = from_all_chunks::<3, _, _>(
event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
)
.unwrap()
.unwrap();
assert_eq!(linked_chunk.num_items(), 0);
}
#[async_test]
async fn test_load_from_storage() {
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
let event_cache_store = Arc::new(MemoryStore::new());
let event_id1 = event_id!("$1");
let event_id2 = event_id!("$2");
let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
event_cache_store
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::NewGapChunk {
previous: Some(ChunkIdentifier::new(0)),
new: ChunkIdentifier::new(42),
next: None,
gap: Gap { token: "cheddar".to_owned() },
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(42)),
new: ChunkIdentifier::new(1),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(1), 0),
items: vec![ev1.clone()],
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(1)),
new: ChunkIdentifier::new(2),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(2), 0),
items: vec![ev2.clone()],
},
],
)
.await
.unwrap();
let client = MockClientBuilder::new(None)
.on_builder(|builder| {
builder.store_config(
StoreConfig::new(CrossProcessLockConfig::multi_process("hodor"))
.event_cache_store(event_cache_store.clone()),
)
})
.build()
.await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
assert_matches!(
generic_stream.recv().await,
Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
assert_eq!(room_id, expected_room_id);
}
);
assert!(generic_stream.is_empty());
let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
assert_eq!(items.len(), 1);
assert_eq!(items[0].event_id().unwrap(), event_id2);
assert!(stream.is_empty());
assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
room_event_cache.pagination().run_backwards_once(20).await.unwrap();
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
assert_eq!(event.event_id().unwrap(), event_id1);
});
assert!(stream.is_empty());
assert_matches!(
generic_stream.recv().await,
Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
assert_eq!(expected_room_id, room_id);
}
);
assert!(generic_stream.is_empty());
let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
room_event_cache
.handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
.await
.unwrap();
assert!(generic_stream.recv().now_or_never().is_none());
let items = room_event_cache.events().await.unwrap();
assert_eq!(items.len(), 2);
assert_eq!(items[0].event_id().unwrap(), event_id1);
assert_eq!(items[1].event_id().unwrap(), event_id2);
}
#[async_test]
async fn test_load_from_storage_resilient_to_failure() {
let room_id = room_id!("!fondue:patate.ch");
let event_cache_store = Arc::new(MemoryStore::new());
let event = EventFactory::new()
.room(room_id)
.sender(user_id!("@ben:saucisse.bzh"))
.text_msg("foo")
.event_id(event_id!("$42"))
.into_event();
event_cache_store
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![event],
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(0)),
new: ChunkIdentifier::new(1),
next: Some(ChunkIdentifier::new(0)),
},
],
)
.await
.unwrap();
let client = MockClientBuilder::new(None)
.on_builder(|builder| {
builder.store_config(
StoreConfig::new(CrossProcessLockConfig::multi_process("holder"))
.event_cache_store(event_cache_store.clone()),
)
})
.build()
.await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let items = room_event_cache.events().await.unwrap();
assert!(items.is_empty());
let raw_chunks =
event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
assert!(raw_chunks.is_empty());
}
#[async_test]
async fn test_no_useless_gaps() {
let room_id = room_id!("!galette:saucisse.bzh");
let client = MockClientBuilder::new(None).build().await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
let f = EventFactory::new().room(room_id).sender(*ALICE);
room_event_cache
.handle_joined_room_update(JoinedRoomUpdate {
timeline: Timeline {
limited: true,
prev_batch: Some("raclette".to_owned()),
events: vec![f.text_msg("hey yo").into_event()],
},
..Default::default()
})
.await
.unwrap();
assert_matches!(
generic_stream.recv().await,
Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
assert_eq!(expected_room_id, room_id);
}
);
assert!(generic_stream.is_empty());
{
let state = room_event_cache.inner.state.read().await.unwrap();
let mut num_gaps = 0;
let mut num_events = 0;
for c in state.room_linked_chunk().chunks() {
match c.content() {
ChunkContent::Items(items) => num_events += items.len(),
ChunkContent::Gap(_) => num_gaps += 1,
}
}
assert_eq!(num_gaps, 0);
assert_eq!(num_events, 1);
}
assert_matches!(
room_event_cache.pagination().load_more_events_backwards().await.unwrap(),
LoadMoreEventsBackwardsOutcome::Gap { .. }
);
{
let state = room_event_cache.inner.state.read().await.unwrap();
let mut num_gaps = 0;
let mut num_events = 0;
for c in state.room_linked_chunk().chunks() {
match c.content() {
ChunkContent::Items(items) => num_events += items.len(),
ChunkContent::Gap(_) => num_gaps += 1,
}
}
assert_eq!(num_gaps, 1);
assert_eq!(num_events, 1);
}
room_event_cache
.handle_joined_room_update(JoinedRoomUpdate {
timeline: Timeline {
limited: false,
prev_batch: Some("fondue".to_owned()),
events: vec![f.text_msg("sup").into_event()],
},
..Default::default()
})
.await
.unwrap();
assert_matches!(
generic_stream.recv().await,
Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
assert_eq!(expected_room_id, room_id);
}
);
assert!(generic_stream.is_empty());
{
let state = room_event_cache.inner.state.read().await.unwrap();
let mut num_gaps = 0;
let mut num_events = 0;
for c in state.room_linked_chunk().chunks() {
match c.content() {
ChunkContent::Items(items) => num_events += items.len(),
ChunkContent::Gap(gap) => {
assert_eq!(gap.token, "raclette");
num_gaps += 1;
}
}
}
assert_eq!(num_gaps, 1);
assert_eq!(num_events, 2);
}
}
#[async_test]
async fn test_shrink_to_last_chunk() {
let room_id = room_id!("!galette:saucisse.bzh");
let client = MockClientBuilder::new(None).build().await;
let f = EventFactory::new().room(room_id);
let evid1 = event_id!("$1");
let evid2 = event_id!("$2");
let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
{
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![ev1],
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(0)),
new: ChunkIdentifier::new(1),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(1), 0),
items: vec![ev2],
},
],
)
.await
.unwrap();
}
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_id().as_deref(), Some(evid2));
assert!(stream.is_empty());
let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
assert_eq!(outcome.events.len(), 1);
assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
assert!(outcome.reached_start);
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
assert_eq!(value.event_id().as_deref(), Some(evid1));
});
assert!(stream.is_empty());
assert_let_timeout!(
Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
);
assert_eq!(expected_room_id, room_id);
assert!(generic_stream.is_empty());
room_event_cache
.inner
.state
.write()
.await
.unwrap()
.reload()
.await
.expect("shrinking should succeed");
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
stream.recv()
);
assert_eq!(diffs.len(), 2);
assert_matches!(&diffs[0], VectorDiff::Clear);
assert_matches!(&diffs[1], VectorDiff::Append { values} => {
assert_eq!(values.len(), 1);
assert_eq!(values[0].event_id().as_deref(), Some(evid2));
});
assert!(stream.is_empty());
assert_let_timeout!(Ok(RoomEventCacheGenericUpdate { .. }) = generic_stream.recv());
assert!(generic_stream.is_empty());
let events = room_event_cache.events().await.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_id().as_deref(), Some(evid2));
let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
assert_eq!(outcome.events.len(), 1);
assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
assert!(outcome.reached_start);
}
#[async_test]
async fn test_room_ordering() {
let room_id = room_id!("!galette:saucisse.bzh");
let client = MockClientBuilder::new(None).build().await;
let f = EventFactory::new().room(room_id).sender(*ALICE);
let evid1 = event_id!("$1");
let evid2 = event_id!("$2");
let evid3 = event_id!("$3");
let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
let ev3 = f.text_msg("yo").event_id(evid3).into_event();
{
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![ev1, ev2],
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(0)),
new: ChunkIdentifier::new(1),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(1), 0),
items: vec![ev3.clone()],
},
],
)
.await
.unwrap();
}
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
{
let state = room_event_cache.inner.state.read().await.unwrap();
let room_linked_chunk = state.room_linked_chunk();
assert_eq!(
room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
Some(0)
);
assert_eq!(
room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
Some(1)
);
let mut events = room_linked_chunk.events();
let (pos, ev) = events.next().unwrap();
assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
assert_eq!(ev.event_id().as_deref(), Some(evid3));
assert_eq!(room_linked_chunk.event_order(pos), Some(2));
assert!(events.next().is_none());
}
let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
assert!(outcome.reached_start);
{
let state = room_event_cache.inner.state.read().await.unwrap();
let room_linked_chunk = state.room_linked_chunk();
for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
assert_eq!(room_linked_chunk.event_order(pos), Some(i));
}
}
let evid4 = event_id!("$4");
room_event_cache
.handle_joined_room_update(JoinedRoomUpdate {
timeline: Timeline {
limited: true,
prev_batch: Some("fondue".to_owned()),
events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
},
..Default::default()
})
.await
.unwrap();
{
let state = room_event_cache.inner.state.read().await.unwrap();
let room_linked_chunk = state.room_linked_chunk();
let mut events = room_linked_chunk.events();
let (pos, ev) = events.next().unwrap();
assert_eq!(ev.event_id().as_deref(), Some(evid3));
assert_eq!(room_linked_chunk.event_order(pos), Some(2));
let (pos, ev) = events.next().unwrap();
assert_eq!(ev.event_id().as_deref(), Some(evid4));
assert_eq!(room_linked_chunk.event_order(pos), Some(3));
assert!(events.next().is_none());
assert_eq!(
room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
Some(0)
);
assert_eq!(
room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
Some(1)
);
assert_eq!(
room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
None
);
}
}
#[async_test]
async fn test_auto_shrink_after_all_subscribers_are_gone() {
let room_id = room_id!("!galette:saucisse.bzh");
let client = MockClientBuilder::new(None).build().await;
let f = EventFactory::new().room(room_id);
let evid1 = event_id!("$1");
let evid2 = event_id!("$2");
let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
{
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![ev1],
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(0)),
new: ChunkIdentifier::new(1),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(1), 0),
items: vec![ev2],
},
],
)
.await
.unwrap();
}
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
assert_eq!(events1.len(), 1);
assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
assert!(stream1.is_empty());
let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
assert_eq!(outcome.events.len(), 1);
assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
assert!(outcome.reached_start);
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) =
stream1.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
assert_eq!(value.event_id().as_deref(), Some(evid1));
});
assert!(stream1.is_empty());
assert_let_timeout!(
Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) = generic_stream.recv()
);
assert_eq!(expected_room_id, room_id);
assert!(generic_stream.is_empty());
let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
assert_eq!(events2.len(), 2);
assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
assert!(stream2.is_empty());
drop(stream1);
yield_now().await;
assert!(stream2.is_empty());
drop(stream2);
yield_now().await;
{
let state = room_event_cache.inner.state.read().await.unwrap();
assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
}
let events3 = room_event_cache.events().await.unwrap();
assert_eq!(events3.len(), 1);
assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
}
#[async_test]
async fn test_rfind_map_event_in_memory_by() {
let user_id = user_id!("@mnt_io:matrix.org");
let room_id = room_id!("!raclette:patate.ch");
let client = MockClientBuilder::new(None).build().await;
let event_factory = EventFactory::new().room(room_id);
let event_id_0 = event_id!("$ev0");
let event_id_1 = event_id!("$ev1");
let event_id_2 = event_id!("$ev2");
let event_id_3 = event_id!("$ev3");
let event_0 =
event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
let event_1 =
event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
let event_3 =
event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
{
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![event_3],
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(0)),
new: ChunkIdentifier::new(1),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(1), 0),
items: vec![event_0, event_1, event_2],
},
],
)
.await
.unwrap();
}
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
assert_matches!(
room_event_cache
.rfind_map_event_in_memory_by(|event| {
(event.sender().as_deref() == Some(*BOB)).then(|| event.event_id())
})
.await,
Ok(Some(event_id)) => {
assert_eq!(event_id.as_deref(), Some(event_id_0));
}
);
assert_matches!(
room_event_cache
.rfind_map_event_in_memory_by(|event| {
(event.sender().as_deref() == Some(*ALICE)).then(|| event.event_id())
})
.await,
Ok(Some(event_id)) => {
assert_eq!(event_id.as_deref(), Some(event_id_2));
}
);
assert!(
room_event_cache
.rfind_map_event_in_memory_by(|event| {
(event.sender().as_deref() == Some(user_id)).then(|| event.event_id())
})
.await
.unwrap()
.is_none()
);
assert!(
room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none()
);
}
#[async_test]
async fn test_reload_when_dirty() {
let user_id = user_id!("@mnt_io:matrix.org");
let room_id = room_id!("!raclette:patate.ch");
let event_cache_store = MemoryStore::new();
let client_p0 = MockClientBuilder::new(None)
.on_builder(|builder| {
builder.store_config(
StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
.event_cache_store(event_cache_store.clone()),
)
})
.build()
.await;
let client_p1 = MockClientBuilder::new(None)
.on_builder(|builder| {
builder.store_config(
StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
.event_cache_store(event_cache_store),
)
})
.build()
.await;
let event_factory = EventFactory::new().room(room_id).sender(user_id);
let ev_id_0 = event_id!("$ev_0");
let ev_id_1 = event_id!("$ev_1");
let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
client_p0
.event_cache_store()
.lock()
.await
.expect("[p0] Could not acquire the event cache lock")
.as_clean()
.expect("[p0] Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![ev_0],
},
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(0)),
new: ChunkIdentifier::new(1),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(1), 0),
items: vec![ev_1],
},
],
)
.await
.unwrap();
let (room_event_cache_p0, room_event_cache_p1) = {
let event_cache_p0 = client_p0.event_cache();
event_cache_p0.subscribe().unwrap();
let event_cache_p1 = client_p1.event_cache();
event_cache_p1.subscribe().unwrap();
client_p0.base_client().get_or_create_room(room_id, RoomState::Joined);
client_p1.base_client().get_or_create_room(room_id, RoomState::Joined);
let (room_event_cache_p0, _drop_handles) =
client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
let (room_event_cache_p1, _drop_handles) =
client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
(room_event_cache_p0, room_event_cache_p1)
};
let mut updates_stream_p0 = {
let room_event_cache = &room_event_cache_p0;
let (initial_updates, mut updates_stream) =
room_event_cache_p0.subscribe().await.unwrap();
assert_eq!(initial_updates.len(), 1);
assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
assert!(updates_stream.is_empty());
assert!(event_loaded(room_event_cache, ev_id_1).await);
assert!(event_loaded(room_event_cache, ev_id_0).await.not());
room_event_cache.pagination().run_backwards_once(1).await.unwrap();
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 1, "{diffs:#?}");
assert_matches!(
&diffs[0],
VectorDiff::Insert { index: 0, value: event } => {
assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
}
);
}
);
assert!(event_loaded(room_event_cache, ev_id_0).await);
updates_stream
};
let mut updates_stream_p1 = {
let room_event_cache = &room_event_cache_p1;
let (initial_updates, mut updates_stream) =
room_event_cache_p1.subscribe().await.unwrap();
assert_eq!(initial_updates.len(), 1);
assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
assert!(updates_stream.is_empty());
assert!(event_loaded(room_event_cache, ev_id_1).await);
assert!(event_loaded(room_event_cache, ev_id_0).await.not());
room_event_cache.pagination().run_backwards_once(1).await.unwrap();
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 1, "{diffs:#?}");
assert_matches!(
&diffs[0],
VectorDiff::Insert { index: 0, value: event } => {
assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
}
);
}
);
assert!(event_loaded(room_event_cache, ev_id_0).await);
updates_stream
};
for _ in 0..3 {
{
let room_event_cache = &room_event_cache_p0;
let updates_stream = &mut updates_stream_p0;
assert!(event_loaded(room_event_cache, ev_id_1).await);
assert!(event_loaded(room_event_cache, ev_id_0).await.not());
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 2, "{diffs:#?}");
assert_matches!(&diffs[0], VectorDiff::Clear);
assert_matches!(
&diffs[1],
VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
}
);
}
);
room_event_cache.pagination().run_backwards_once(1).await.unwrap();
assert!(event_loaded(room_event_cache, ev_id_0).await);
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 1, "{diffs:#?}");
assert_matches!(
&diffs[0],
VectorDiff::Insert { index: 0, value: event } => {
assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
}
);
}
);
}
{
let room_event_cache = &room_event_cache_p1;
let updates_stream = &mut updates_stream_p1;
assert!(event_loaded(room_event_cache, ev_id_1).await);
assert!(event_loaded(room_event_cache, ev_id_0).await.not());
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 2, "{diffs:#?}");
assert_matches!(&diffs[0], VectorDiff::Clear);
assert_matches!(
&diffs[1],
VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
}
);
}
);
room_event_cache.pagination().run_backwards_once(1).await.unwrap();
assert!(event_loaded(room_event_cache, ev_id_0).await);
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 1, "{diffs:#?}");
assert_matches!(
&diffs[0],
VectorDiff::Insert { index: 0, value: event } => {
assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
}
);
}
);
}
}
for _ in 0..3 {
{
let room_event_cache = &room_event_cache_p0;
let updates_stream = &mut updates_stream_p0;
let guard = room_event_cache.inner.state.read().await.unwrap();
assert!(guard.is_dirty().not());
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 2, "{diffs:#?}");
assert_matches!(&diffs[0], VectorDiff::Clear);
assert_matches!(
&diffs[1],
VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
}
);
}
);
assert!(event_loaded(room_event_cache, ev_id_1).await);
assert!(event_loaded(room_event_cache, ev_id_0).await.not());
drop(guard);
room_event_cache.pagination().run_backwards_once(1).await.unwrap();
assert!(event_loaded(room_event_cache, ev_id_0).await);
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 1, "{diffs:#?}");
assert_matches!(
&diffs[0],
VectorDiff::Insert { index: 0, value: event } => {
assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
}
);
}
);
}
{
let room_event_cache = &room_event_cache_p1;
let updates_stream = &mut updates_stream_p1;
let guard = room_event_cache.inner.state.read().await.unwrap();
assert!(guard.is_dirty().not());
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 2, "{diffs:#?}");
assert_matches!(&diffs[0], VectorDiff::Clear);
assert_matches!(
&diffs[1],
VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
}
);
}
);
assert!(event_loaded(room_event_cache, ev_id_1).await);
assert!(event_loaded(room_event_cache, ev_id_0).await.not());
drop(guard);
room_event_cache.pagination().run_backwards_once(1).await.unwrap();
assert!(event_loaded(room_event_cache, ev_id_0).await);
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 1, "{diffs:#?}");
assert_matches!(
&diffs[0],
VectorDiff::Insert { index: 0, value: event } => {
assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
}
);
}
);
}
}
for _ in 0..3 {
{
let room_event_cache = &room_event_cache_p0;
let updates_stream = &mut updates_stream_p0;
let guard = room_event_cache.inner.state.write().await.unwrap();
assert!(guard.is_dirty().not());
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 2, "{diffs:#?}");
assert_matches!(&diffs[0], VectorDiff::Clear);
assert_matches!(
&diffs[1],
VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
}
);
}
);
drop(guard);
assert!(event_loaded(room_event_cache, ev_id_1).await);
assert!(event_loaded(room_event_cache, ev_id_0).await.not());
room_event_cache.pagination().run_backwards_once(1).await.unwrap();
assert!(event_loaded(room_event_cache, ev_id_0).await);
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 1, "{diffs:#?}");
assert_matches!(
&diffs[0],
VectorDiff::Insert { index: 0, value: event } => {
assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
}
);
}
);
}
{
let room_event_cache = &room_event_cache_p1;
let updates_stream = &mut updates_stream_p1;
let guard = room_event_cache.inner.state.write().await.unwrap();
assert!(guard.is_dirty().not());
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 2, "{diffs:#?}");
assert_matches!(&diffs[0], VectorDiff::Clear);
assert_matches!(
&diffs[1],
VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
}
);
}
);
drop(guard);
assert!(event_loaded(room_event_cache, ev_id_1).await);
assert!(event_loaded(room_event_cache, ev_id_0).await.not());
room_event_cache.pagination().run_backwards_once(1).await.unwrap();
assert!(event_loaded(room_event_cache, ev_id_0).await);
assert_matches!(
updates_stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. }) => {
assert_eq!(diffs.len(), 1, "{diffs:#?}");
assert_matches!(
&diffs[0],
VectorDiff::Insert { index: 0, value: event } => {
assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
}
);
}
);
}
}
}
#[async_test]
async fn test_load_when_dirty() {
let room_id_0 = room_id!("!raclette:patate.ch");
let room_id_1 = room_id!("!morbiflette:patate.ch");
let event_cache_store = MemoryStore::new();
let client_p0 = MockClientBuilder::new(None)
.on_builder(|builder| {
builder.store_config(
StoreConfig::new(CrossProcessLockConfig::multi_process("process #0"))
.event_cache_store(event_cache_store.clone()),
)
})
.build()
.await;
let client_p1 = MockClientBuilder::new(None)
.on_builder(|builder| {
builder.store_config(
StoreConfig::new(CrossProcessLockConfig::multi_process("process #1"))
.event_cache_store(event_cache_store),
)
})
.build()
.await;
let (room_event_cache_0_p0, room_event_cache_0_p1) = {
let event_cache_p0 = client_p0.event_cache();
event_cache_p0.subscribe().unwrap();
let event_cache_p1 = client_p1.event_cache();
event_cache_p1.subscribe().unwrap();
client_p0.base_client().get_or_create_room(room_id_0, RoomState::Joined);
client_p0.base_client().get_or_create_room(room_id_1, RoomState::Joined);
client_p1.base_client().get_or_create_room(room_id_0, RoomState::Joined);
client_p1.base_client().get_or_create_room(room_id_1, RoomState::Joined);
let (room_event_cache_0_p0, _drop_handles) =
client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
let (room_event_cache_0_p1, _drop_handles) =
client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
(room_event_cache_0_p0, room_event_cache_0_p1)
};
{
drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
}
let (room_event_cache_1_p0, _) =
client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
{
let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
assert!(guard.is_dirty().not());
}
}
#[async_test]
async fn test_uniq_read_marker() {
let client = MockClientBuilder::new(None).build().await;
let room_id = room_id!("!galette:saucisse.bzh");
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
assert!(events.is_empty());
let read_marker_event = Raw::from_json_string(
json!({
"content": {
"event_id": "$crepe:saucisse.bzh"
},
"room_id": "!galette:saucisse.bzh",
"type": "m.fully_read"
})
.to_string(),
)
.unwrap();
let account_data = vec![read_marker_event; 100];
room_event_cache
.handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
.await
.unwrap();
assert_matches!(
stream.recv().await.unwrap(),
RoomEventCacheUpdate::MoveReadMarkerTo { .. }
);
assert!(stream.recv().now_or_never().is_none());
assert!(generic_stream.recv().now_or_never().is_none());
}
async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
room_event_cache
.rfind_map_event_in_memory_by(|event| {
(event.event_id().as_deref() == Some(event_id)).then_some(())
})
.await
.unwrap()
.is_some()
}
}