matrix_sdk/event_cache/mod.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31 collections::BTreeMap,
32 fmt::Debug,
33 sync::{Arc, OnceLock},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40 deserialized_responses::{AmbiguityChange, TimelineEvent},
41 event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
42 linked_chunk::lazy_loader::LazyLoaderError,
43 store_locks::LockStoreError,
44 sync::RoomUpdates,
45};
46use matrix_sdk_common::executor::{spawn, JoinHandle};
47use once_cell::sync::OnceCell;
48use room::RoomEventCacheState;
49use ruma::{
50 events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
51};
52use tokio::sync::{
53 broadcast::{error::RecvError, Receiver},
54 mpsc, Mutex, RwLock,
55};
56use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span};
57
58use self::paginator::PaginatorError;
59use crate::{client::WeakClient, Client};
60
61mod deduplicator;
62mod pagination;
63mod room;
64
65pub mod paginator;
66pub use pagination::{PaginationToken, RoomPagination, RoomPaginationStatus};
67pub use room::{RoomEventCache, RoomEventCacheListener};
68
69/// An error observed in the [`EventCache`].
70#[derive(thiserror::Error, Debug)]
71pub enum EventCacheError {
72 /// The [`EventCache`] instance hasn't been initialized with
73 /// [`EventCache::subscribe`]
74 #[error(
75 "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
76 )]
77 NotSubscribedYet,
78
79 /// An error has been observed while back-paginating.
80 #[error(transparent)]
81 BackpaginationError(#[from] PaginatorError),
82
83 /// Back-pagination was already happening in a given room, where we tried to
84 /// back-paginate again.
85 #[error("We were already back-paginating.")]
86 AlreadyBackpaginating,
87
88 /// An error happening when interacting with storage.
89 #[error(transparent)]
90 Storage(#[from] EventCacheStoreError),
91
92 /// An error happening when attempting to (cross-process) lock storage.
93 #[error(transparent)]
94 LockingStorage(#[from] LockStoreError),
95
96 /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
97 /// to. It's possible this weak reference points to nothing anymore, at
98 /// times where we try to use the client.
99 #[error("The owning client of the event cache has been dropped.")]
100 ClientDropped,
101
102 /// An error happening when interacting with the [`LinkedChunk`]'s lazy
103 /// loader.
104 ///
105 /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
106 #[error(transparent)]
107 LinkedChunkLoader(#[from] LazyLoaderError),
108}
109
110/// A result using the [`EventCacheError`].
111pub type Result<T> = std::result::Result<T, EventCacheError>;
112
113/// Hold handles to the tasks spawn by a [`RoomEventCache`].
114pub struct EventCacheDropHandles {
115 /// Task that listens to room updates.
116 listen_updates_task: JoinHandle<()>,
117
118 /// Task that listens to updates to the user's ignored list.
119 ignore_user_list_update_task: JoinHandle<()>,
120
121 /// The task used to automatically shrink the linked chunks.
122 auto_shrink_linked_chunk_task: JoinHandle<()>,
123}
124
125impl Debug for EventCacheDropHandles {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
128 }
129}
130
131impl Drop for EventCacheDropHandles {
132 fn drop(&mut self) {
133 self.listen_updates_task.abort();
134 self.ignore_user_list_update_task.abort();
135 self.auto_shrink_linked_chunk_task.abort();
136 }
137}
138
139/// An event cache, providing lots of useful functionality for clients.
140///
141/// Cloning is shallow, and thus is cheap to do.
142///
143/// See also the module-level comment.
144#[derive(Clone)]
145pub struct EventCache {
146 /// Reference to the inner cache.
147 inner: Arc<EventCacheInner>,
148}
149
150impl Debug for EventCache {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 f.debug_struct("EventCache").finish_non_exhaustive()
153 }
154}
155
156impl EventCache {
157 /// Create a new [`EventCache`] for the given client.
158 pub(crate) fn new(client: WeakClient) -> Self {
159 Self {
160 inner: Arc::new(EventCacheInner {
161 client,
162 store: Default::default(),
163 multiple_room_updates_lock: Default::default(),
164 by_room: Default::default(),
165 drop_handles: Default::default(),
166 auto_shrink_sender: Default::default(),
167 }),
168 }
169 }
170
171 /// Enable storing updates to storage, and reload events from storage.
172 ///
173 /// Has an effect only the first time it's called. It's safe to call it
174 /// multiple times.
175 pub fn enable_storage(&self) -> Result<()> {
176 let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| {
177 let client = self.inner.client()?;
178 Ok(client.event_cache_store().clone())
179 })?;
180 Ok(())
181 }
182
183 /// Check whether the storage is enabled or not.
184 pub fn has_storage(&self) -> bool {
185 self.inner.has_storage()
186 }
187
188 /// Starts subscribing the [`EventCache`] to sync responses, if not done
189 /// before.
190 ///
191 /// Re-running this has no effect if we already subscribed before, and is
192 /// cheap.
193 pub fn subscribe(&self) -> Result<()> {
194 let client = self.inner.client()?;
195
196 let _ = self.inner.drop_handles.get_or_init(|| {
197 // Spawn the task that will listen to all the room updates at once.
198 let listen_updates_task = spawn(Self::listen_task(
199 self.inner.clone(),
200 client.subscribe_to_all_room_updates(),
201 ));
202
203 let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
204 self.inner.clone(),
205 client.subscribe_to_ignore_user_list_changes(),
206 ));
207
208 let (tx, rx) = mpsc::channel(32);
209
210 // Force-initialize the sender in the [`RoomEventCacheInner`].
211 self.inner.auto_shrink_sender.get_or_init(|| tx);
212
213 let auto_shrink_linked_chunk_tasks =
214 spawn(Self::auto_shrink_linked_chunk_task(self.inner.clone(), rx));
215
216 Arc::new(EventCacheDropHandles {
217 listen_updates_task,
218 ignore_user_list_update_task,
219 auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_tasks,
220 })
221 });
222
223 Ok(())
224 }
225
226 #[instrument(skip_all)]
227 async fn ignore_user_list_update_task(
228 inner: Arc<EventCacheInner>,
229 mut ignore_user_list_stream: Subscriber<Vec<String>>,
230 ) {
231 let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
232 span.follows_from(Span::current());
233
234 async move {
235 while ignore_user_list_stream.next().await.is_some() {
236 info!("Received an ignore user list change");
237 if let Err(err) = inner.clear_all_rooms().await {
238 error!("when clearing room storage after ignore user list change: {err}");
239 }
240 }
241 info!("Ignore user list stream has closed");
242 }
243 .instrument(span)
244 .await;
245 }
246
247 #[instrument(skip_all)]
248 async fn listen_task(
249 inner: Arc<EventCacheInner>,
250 mut room_updates_feed: Receiver<RoomUpdates>,
251 ) {
252 trace!("Spawning the listen task");
253 loop {
254 match room_updates_feed.recv().await {
255 Ok(updates) => {
256 if let Err(err) = inner.handle_room_updates(updates).await {
257 match err {
258 EventCacheError::ClientDropped => {
259 // The client has dropped, exit the listen task.
260 info!("Closing the event cache global listen task because client dropped");
261 break;
262 }
263 err => {
264 error!("Error when handling room updates: {err}");
265 }
266 }
267 }
268 }
269
270 Err(RecvError::Lagged(num_skipped)) => {
271 // Forget everything we know; we could have missed events, and we have
272 // no way to reconcile at the moment!
273 // TODO: implement Smart Matching™,
274 warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
275 if let Err(err) = inner.clear_all_rooms().await {
276 error!("when clearing storage after lag in listen_task: {err}");
277 }
278 }
279
280 Err(RecvError::Closed) => {
281 // The sender has shut down, exit.
282 info!("Closing the event cache global listen task because receiver closed");
283 break;
284 }
285 }
286 }
287 }
288
289 /// Spawns the task that will listen to auto-shrink notifications.
290 ///
291 /// The auto-shrink mechanism works this way:
292 ///
293 /// - Each time there's a new subscriber to a [`RoomEventCache`], it will
294 /// increment the active number of listeners to that room, aka
295 /// [`RoomEventCacheState::listener_count`].
296 /// - When that subscriber is dropped, it will decrement that count; and
297 /// notify the task below if it reached 0.
298 /// - The task spawned here, owned by the [`EventCacheInner`], will listen
299 /// to such notifications that a room may be shrunk. It will attempt an
300 /// auto-shrink, by letting the inner state decide whether this is a good
301 /// time to do so (new listeners might have spawned in the meanwhile).
302 #[instrument(skip_all)]
303 async fn auto_shrink_linked_chunk_task(
304 inner: Arc<EventCacheInner>,
305 mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
306 ) {
307 while let Some(room_id) = rx.recv().await {
308 trace!(for_room = %room_id, "received notification to shrink");
309
310 let room = match inner.for_room(&room_id).await {
311 Ok(room) => room,
312 Err(err) => {
313 warn!(for_room = %room_id, "error when getting a RoomEventCache: {err}");
314 continue;
315 }
316 };
317
318 trace!("waiting for state lock…");
319 let mut state = room.inner.state.write().await;
320
321 match state.auto_shrink_if_no_listeners().await {
322 Ok(diffs) => {
323 if let Some(diffs) = diffs {
324 // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
325 // listeners, right? RIGHT? Especially because the state is guarded behind
326 // a lock.
327 //
328 // However, better safe than sorry, and it's cheap to send an update here,
329 // so let's do it!
330 if !diffs.is_empty() {
331 let _ = room.inner.sender.send(
332 RoomEventCacheUpdate::UpdateTimelineEvents {
333 diffs,
334 origin: EventsOrigin::Cache,
335 },
336 );
337 }
338 } else {
339 debug!("auto-shrinking didn't happen");
340 }
341 }
342
343 Err(err) => {
344 // There's not much we can do here, unfortunately.
345 warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
346 }
347 }
348 }
349 }
350
351 /// Return a room-specific view over the [`EventCache`].
352 pub(crate) async fn for_room(
353 &self,
354 room_id: &RoomId,
355 ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
356 let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
357 return Err(EventCacheError::NotSubscribedYet);
358 };
359
360 let room = self.inner.for_room(room_id).await?;
361
362 Ok((room, drop_handles))
363 }
364
365 /// Cleanly clear all the rooms' event caches.
366 ///
367 /// This will notify any live observers that the room has been cleared.
368 pub async fn clear_all_rooms(&self) -> Result<()> {
369 self.inner.clear_all_rooms().await
370 }
371
372 /// Add an initial set of events to the event cache, reloaded from a cache.
373 ///
374 /// TODO: temporary for API compat, as the event cache should take care of
375 /// its own store.
376 #[instrument(skip(self, events))]
377 pub async fn add_initial_events(
378 &self,
379 room_id: &RoomId,
380 events: Vec<TimelineEvent>,
381 prev_batch: Option<String>,
382 ) -> Result<()> {
383 // If the event cache's storage has been enabled, do nothing.
384 if self.inner.has_storage() {
385 return Ok(());
386 }
387
388 let room_cache = self.inner.for_room(room_id).await?;
389
390 // If the linked chunked already has at least one event, ignore this request, as
391 // it should happen at most once per room.
392 if !room_cache.inner.state.read().await.events().is_empty() {
393 return Ok(());
394 }
395
396 // We could have received events during a previous sync; remove them all, since
397 // we can't know where to insert the "initial events" with respect to
398 // them.
399
400 room_cache
401 .inner
402 .replace_all_events_by(
403 events,
404 prev_batch,
405 Default::default(),
406 Default::default(),
407 EventsOrigin::Cache,
408 )
409 .await?;
410
411 Ok(())
412 }
413}
414
415struct EventCacheInner {
416 /// A weak reference to the inner client, useful when trying to get a handle
417 /// on the owning client.
418 client: WeakClient,
419
420 /// Reference to the underlying store.
421 ///
422 /// Set to none if we shouldn't use storage for reading / writing linked
423 /// chunks.
424 store: Arc<OnceCell<EventCacheStoreLock>>,
425
426 /// A lock used when many rooms must be updated at once.
427 ///
428 /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
429 /// ensure that multiple updates will be applied in the correct order, which
430 /// is enforced by taking this lock when handling an update.
431 // TODO: that's the place to add a cross-process lock!
432 multiple_room_updates_lock: Mutex<()>,
433
434 /// Lazily-filled cache of live [`RoomEventCache`], once per room.
435 by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
436
437 /// Handles to keep alive the task listening to updates.
438 drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
439
440 /// A sender for notifications that a room *may* need to be auto-shrunk.
441 ///
442 /// Needs to live here, so it may be passed to each [`RoomEventCache`]
443 /// instance.
444 ///
445 /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
446 auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
447}
448
449type AutoShrinkChannelPayload = OwnedRoomId;
450
451impl EventCacheInner {
452 fn client(&self) -> Result<Client> {
453 self.client.get().ok_or(EventCacheError::ClientDropped)
454 }
455
456 /// Has persistent storage been enabled for the event cache?
457 fn has_storage(&self) -> bool {
458 self.store.get().is_some()
459 }
460
461 /// Clears all the room's data.
462 async fn clear_all_rooms(&self) -> Result<()> {
463 // Okay, here's where things get complicated.
464 //
465 // On the one hand, `by_room` may include storage for *some* rooms that we know
466 // about, but not *all* of them. Any room that hasn't been loaded in the
467 // client, or touched by a sync, will remain unloaded in memory, so it
468 // will be missing from `self.by_room`. As a result, we need to make
469 // sure that we're hitting the storage backend to *really* clear all the
470 // rooms, including those that haven't been loaded yet.
471 //
472 // On the other hand, one must NOT clear the `by_room` map, because if someone
473 // subscribed to a room update, they would never get any new update for
474 // that room, since re-creating the `RoomEventCache` would create a new,
475 // unrelated sender.
476 //
477 // So we need to *keep* the rooms in `by_room` alive, while clearing them in the
478 // store backend.
479 //
480 // As a result, for a short while, the in-memory linked chunks
481 // will be desynchronized from the storage. We need to be careful then. During
482 // that short while, we don't want *anyone* to touch the linked chunk
483 // (be it in memory or in the storage).
484 //
485 // And since that requirement applies to *any* room in `by_room` at the same
486 // time, we'll have to take the locks for *all* the live rooms, so as to
487 // properly clear the underlying storage.
488 //
489 // At this point, you might be scared about the potential for deadlocking. I am
490 // as well, but I'm convinced we're fine:
491 // 1. the lock for `by_room` is usually held only for a short while, and
492 // independently of the other two kinds.
493 // 2. the state may acquire the store cross-process lock internally, but only
494 // while the state's methods are called (so it's always transient). As a
495 // result, as soon as we've acquired the state locks, the store lock ought to
496 // be free.
497 // 3. The store lock is held explicitly only in a small scoped area below.
498 // 4. Then the store lock will be held internally when calling `reset()`, but at
499 // this point it's only held for a short while each time, so rooms will take
500 // turn to acquire it.
501
502 let rooms = self.by_room.write().await;
503
504 // Collect all the rooms' state locks, first: we can clear the storage only when
505 // nobody will touch it at the same time.
506 let room_locks = join_all(
507 rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
508 )
509 .await;
510
511 // Clear the storage for all the rooms, using the storage facility.
512 if let Some(store) = self.store.get() {
513 let store_guard = store.lock().await?;
514 store_guard.clear_all_rooms_chunks().await?;
515 }
516
517 // At this point, all the in-memory linked chunks are desynchronized from the
518 // storage. Resynchronize them manually by calling reset(), and
519 // propagate updates to observers.
520 try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move {
521 let updates_as_vector_diffs = state_guard.reset().await?;
522 let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
523 diffs: updates_as_vector_diffs,
524 origin: EventsOrigin::Cache,
525 });
526 Ok::<_, EventCacheError>(())
527 }))
528 .await?;
529
530 Ok(())
531 }
532
533 /// Handles a single set of room updates at once.
534 #[instrument(skip(self, updates))]
535 async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
536 // First, take the lock that indicates we're processing updates, to avoid
537 // handling multiple updates concurrently.
538 let _lock = self.multiple_room_updates_lock.lock().await;
539
540 // Left rooms.
541 for (room_id, left_room_update) in updates.leave {
542 let room = self.for_room(&room_id).await?;
543
544 if let Err(err) =
545 room.inner.handle_left_room_update(self.has_storage(), left_room_update).await
546 {
547 // Non-fatal error, try to continue to the next room.
548 error!("handling left room update: {err}");
549 }
550 }
551
552 // Joined rooms.
553 for (room_id, joined_room_update) in updates.join {
554 let room = self.for_room(&room_id).await?;
555
556 if let Err(err) =
557 room.inner.handle_joined_room_update(self.has_storage(), joined_room_update).await
558 {
559 // Non-fatal error, try to continue to the next room.
560 error!(%room_id, "handling joined room update: {err}");
561 }
562 }
563
564 // Invited rooms.
565 // TODO: we don't anything with `updates.invite` at this point.
566
567 Ok(())
568 }
569
570 /// Return a room-specific view over the [`EventCache`].
571 ///
572 /// It may not be found, if the room isn't known to the client, in which
573 /// case it'll return None.
574 async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
575 // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
576 // write lock.
577 let by_room_guard = self.by_room.read().await;
578
579 match by_room_guard.get(room_id) {
580 Some(room) => Ok(room.clone()),
581
582 None => {
583 // Slow-path: the entry doesn't exist; let's acquire a write lock.
584 drop(by_room_guard);
585 let mut by_room_guard = self.by_room.write().await;
586
587 // In the meanwhile, some other caller might have obtained write access and done
588 // the same, so check for existence again.
589 if let Some(room) = by_room_guard.get(room_id) {
590 return Ok(room.clone());
591 }
592
593 let pagination_status =
594 SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
595
596 let room_version = self
597 .client
598 .get()
599 .and_then(|client| client.get_room(room_id))
600 .as_ref()
601 .map(|room| room.clone_info().room_version_or_default())
602 .unwrap_or_else(|| {
603 warn!("unknown room version for {room_id}, using default V1");
604 RoomVersionId::V1
605 });
606
607 let room_state = RoomEventCacheState::new(
608 room_id.to_owned(),
609 room_version,
610 self.store.clone(),
611 pagination_status.clone(),
612 )
613 .await?;
614
615 // SAFETY: we must have subscribed before reaching this coed, otherwise
616 // something is very wrong.
617 let auto_shrink_sender =
618 self.auto_shrink_sender.get().cloned().expect(
619 "we must have called `EventCache::subscribe()` before calling here.",
620 );
621
622 let room_event_cache = RoomEventCache::new(
623 self.client.clone(),
624 room_state,
625 pagination_status,
626 room_id.to_owned(),
627 auto_shrink_sender,
628 );
629
630 by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
631
632 Ok(room_event_cache)
633 }
634 }
635 }
636}
637
638/// The result of a single back-pagination request.
639#[derive(Debug)]
640pub struct BackPaginationOutcome {
641 /// Did the back-pagination reach the start of the timeline?
642 pub reached_start: bool,
643
644 /// All the events that have been returned in the back-pagination
645 /// request.
646 ///
647 /// Events are presented in reverse order: the first element of the vec,
648 /// if present, is the most "recent" event from the chunk (or
649 /// technically, the last one in the topological ordering).
650 pub events: Vec<TimelineEvent>,
651}
652
653/// An update related to events happened in a room.
654#[derive(Debug, Clone)]
655pub enum RoomEventCacheUpdate {
656 /// The fully read marker has moved to a different event.
657 MoveReadMarkerTo {
658 /// Event at which the read marker is now pointing.
659 event_id: OwnedEventId,
660 },
661
662 /// The members have changed.
663 UpdateMembers {
664 /// Collection of ambiguity changes that room member events trigger.
665 ///
666 /// This is a map of event ID of the `m.room.member` event to the
667 /// details of the ambiguity change.
668 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
669 },
670
671 /// The room has received updates for the timeline as _diffs_.
672 UpdateTimelineEvents {
673 /// Diffs to apply to the timeline.
674 diffs: Vec<VectorDiff<TimelineEvent>>,
675
676 /// Where the diffs are coming from.
677 origin: EventsOrigin,
678 },
679
680 /// The room has received new ephemeral events.
681 AddEphemeralEvents {
682 /// XXX: this is temporary, until read receipts are handled in the event
683 /// cache
684 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
685 },
686}
687
688/// Indicate where events are coming from.
689#[derive(Debug, Clone)]
690pub enum EventsOrigin {
691 /// Events are coming from a sync.
692 Sync,
693
694 /// Events are coming from pagination.
695 Pagination,
696
697 /// The cause of the change is purely internal to the cache.
698 Cache,
699}
700
701#[cfg(test)]
702mod tests {
703 use assert_matches::assert_matches;
704 use futures_util::FutureExt as _;
705 use matrix_sdk_base::sync::{JoinedRoomUpdate, RoomUpdates, Timeline};
706 use matrix_sdk_test::{async_test, event_factory::EventFactory};
707 use ruma::{event_id, room_id, serde::Raw, user_id};
708 use serde_json::json;
709
710 use super::{EventCacheError, RoomEventCacheUpdate};
711 use crate::test_utils::{assert_event_matches_msg, logged_in_client};
712
713 #[async_test]
714 async fn test_must_explicitly_subscribe() {
715 let client = logged_in_client(None).await;
716
717 let event_cache = client.event_cache();
718
719 // If I create a room event subscriber for a room before subscribing the event
720 // cache,
721 let room_id = room_id!("!omelette:fromage.fr");
722 let result = event_cache.for_room(room_id).await;
723
724 // Then it fails, because one must explicitly call `.subscribe()` on the event
725 // cache.
726 assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
727 }
728
729 #[async_test]
730 async fn test_uniq_read_marker() {
731 let client = logged_in_client(None).await;
732 let room_id = room_id!("!galette:saucisse.bzh");
733 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
734
735 let event_cache = client.event_cache();
736
737 event_cache.subscribe().unwrap();
738
739 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
740
741 let (events, mut stream) = room_event_cache.subscribe().await;
742
743 assert!(events.is_empty());
744
745 // When sending multiple times the same read marker event,…
746 let read_marker_event = Raw::from_json_string(
747 json!({
748 "content": {
749 "event_id": "$crepe:saucisse.bzh"
750 },
751 "room_id": "!galette:saucisse.bzh",
752 "type": "m.fully_read"
753 })
754 .to_string(),
755 )
756 .unwrap();
757 let account_data = vec![read_marker_event; 100];
758
759 room_event_cache
760 .inner
761 .handle_joined_room_update(
762 event_cache.inner.has_storage(),
763 JoinedRoomUpdate { account_data, ..Default::default() },
764 )
765 .await
766 .unwrap();
767
768 // … there's only one read marker update.
769 assert_matches!(
770 stream.recv().await.unwrap(),
771 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
772 );
773
774 assert!(stream.recv().now_or_never().is_none());
775 }
776
777 #[async_test]
778 async fn test_get_event_by_id() {
779 let client = logged_in_client(None).await;
780 let room_id1 = room_id!("!galette:saucisse.bzh");
781 let room_id2 = room_id!("!crepe:saucisse.bzh");
782
783 let event_cache = client.event_cache();
784 event_cache.subscribe().unwrap();
785
786 // Insert two rooms with a few events.
787 let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
788
789 let eid1 = event_id!("$1");
790 let eid2 = event_id!("$2");
791 let eid3 = event_id!("$3");
792
793 let joined_room_update1 = JoinedRoomUpdate {
794 timeline: Timeline {
795 events: vec![
796 f.text_msg("hey").event_id(eid1).into(),
797 f.text_msg("you").event_id(eid2).into(),
798 ],
799 ..Default::default()
800 },
801 ..Default::default()
802 };
803
804 let joined_room_update2 = JoinedRoomUpdate {
805 timeline: Timeline {
806 events: vec![f.text_msg("bjr").event_id(eid3).into()],
807 ..Default::default()
808 },
809 ..Default::default()
810 };
811
812 let mut updates = RoomUpdates::default();
813 updates.join.insert(room_id1.to_owned(), joined_room_update1);
814 updates.join.insert(room_id2.to_owned(), joined_room_update2);
815
816 // Have the event cache handle them.
817 event_cache.inner.handle_room_updates(updates).await.unwrap();
818
819 // We can find the events in a single room.
820 client.base_client().get_or_create_room(room_id1, matrix_sdk_base::RoomState::Joined);
821 let room1 = client.get_room(room_id1).unwrap();
822
823 let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
824
825 let found1 = room_event_cache.event(eid1).await.unwrap();
826 assert_event_matches_msg(&found1, "hey");
827
828 let found2 = room_event_cache.event(eid2).await.unwrap();
829 assert_event_matches_msg(&found2, "you");
830
831 // Retrieving the event with id3 from the room which doesn't contain it will
832 // fail…
833 assert!(room_event_cache.event(eid3).await.is_none());
834 }
835
836 #[async_test]
837 async fn test_save_event() {
838 let client = logged_in_client(None).await;
839 let room_id = room_id!("!galette:saucisse.bzh");
840
841 let event_cache = client.event_cache();
842 event_cache.subscribe().unwrap();
843 event_cache.enable_storage().unwrap();
844
845 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
846 let event_id = event_id!("$1");
847
848 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
849 let room = client.get_room(room_id).unwrap();
850
851 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
852 room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
853
854 // Retrieving the event at the room-wide cache works.
855 assert!(room_event_cache.event(event_id).await.is_some());
856 }
857
858 #[async_test]
859 async fn test_add_initial_events() {
860 // TODO: remove this test when the event cache uses its own persistent storage.
861 let client = logged_in_client(None).await;
862 let room_id = room_id!("!galette:saucisse.bzh");
863
864 let event_cache = client.event_cache();
865 event_cache.subscribe().unwrap();
866
867 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
868 event_cache
869 .add_initial_events(room_id, vec![f.text_msg("hey").into()], None)
870 .await
871 .unwrap();
872
873 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
874 let room = client.get_room(room_id).unwrap();
875
876 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
877 let (initial_events, _) = room_event_cache.subscribe().await;
878 // `add_initial_events` had an effect.
879 assert_eq!(initial_events.len(), 1);
880 }
881}