matrix_sdk/event_cache/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31    collections::BTreeMap,
32    fmt::Debug,
33    sync::{Arc, OnceLock},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40    deserialized_responses::{AmbiguityChange, TimelineEvent},
41    event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
42    linked_chunk::lazy_loader::LazyLoaderError,
43    store_locks::LockStoreError,
44    sync::RoomUpdates,
45};
46use matrix_sdk_common::executor::{spawn, JoinHandle};
47use room::RoomEventCacheState;
48use ruma::{
49    events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
50};
51use tokio::sync::{
52    broadcast::{error::RecvError, Receiver},
53    mpsc, Mutex, RwLock,
54};
55use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span};
56
57use self::paginator::PaginatorError;
58use crate::{client::WeakClient, Client};
59
60mod deduplicator;
61mod pagination;
62mod room;
63
64pub mod paginator;
65pub use pagination::{PaginationToken, RoomPagination, RoomPaginationStatus};
66pub use room::{RoomEventCache, RoomEventCacheListener};
67
68/// An error observed in the [`EventCache`].
69#[derive(thiserror::Error, Debug)]
70pub enum EventCacheError {
71    /// The [`EventCache`] instance hasn't been initialized with
72    /// [`EventCache::subscribe`]
73    #[error(
74        "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
75    )]
76    NotSubscribedYet,
77
78    /// An error has been observed while back-paginating.
79    #[error(transparent)]
80    BackpaginationError(#[from] PaginatorError),
81
82    /// Back-pagination was already happening in a given room, where we tried to
83    /// back-paginate again.
84    #[error("We were already back-paginating.")]
85    AlreadyBackpaginating,
86
87    /// An error happening when interacting with storage.
88    #[error(transparent)]
89    Storage(#[from] EventCacheStoreError),
90
91    /// An error happening when attempting to (cross-process) lock storage.
92    #[error(transparent)]
93    LockingStorage(#[from] LockStoreError),
94
95    /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
96    /// to. It's possible this weak reference points to nothing anymore, at
97    /// times where we try to use the client.
98    #[error("The owning client of the event cache has been dropped.")]
99    ClientDropped,
100
101    /// An error happening when interacting with the [`LinkedChunk`]'s lazy
102    /// loader.
103    ///
104    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
105    #[error(transparent)]
106    LinkedChunkLoader(#[from] LazyLoaderError),
107}
108
109/// A result using the [`EventCacheError`].
110pub type Result<T> = std::result::Result<T, EventCacheError>;
111
112/// Hold handles to the tasks spawn by a [`RoomEventCache`].
113pub struct EventCacheDropHandles {
114    /// Task that listens to room updates.
115    listen_updates_task: JoinHandle<()>,
116
117    /// Task that listens to updates to the user's ignored list.
118    ignore_user_list_update_task: JoinHandle<()>,
119
120    /// The task used to automatically shrink the linked chunks.
121    auto_shrink_linked_chunk_task: JoinHandle<()>,
122}
123
124impl Debug for EventCacheDropHandles {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
127    }
128}
129
130impl Drop for EventCacheDropHandles {
131    fn drop(&mut self) {
132        self.listen_updates_task.abort();
133        self.ignore_user_list_update_task.abort();
134        self.auto_shrink_linked_chunk_task.abort();
135    }
136}
137
138/// An event cache, providing lots of useful functionality for clients.
139///
140/// Cloning is shallow, and thus is cheap to do.
141///
142/// See also the module-level comment.
143#[derive(Clone)]
144pub struct EventCache {
145    /// Reference to the inner cache.
146    inner: Arc<EventCacheInner>,
147}
148
149impl Debug for EventCache {
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        f.debug_struct("EventCache").finish_non_exhaustive()
152    }
153}
154
155impl EventCache {
156    /// Create a new [`EventCache`] for the given client.
157    pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
158        Self {
159            inner: Arc::new(EventCacheInner {
160                client,
161                store: event_cache_store,
162                multiple_room_updates_lock: Default::default(),
163                by_room: Default::default(),
164                drop_handles: Default::default(),
165                auto_shrink_sender: Default::default(),
166            }),
167        }
168    }
169
170    /// Starts subscribing the [`EventCache`] to sync responses, if not done
171    /// before.
172    ///
173    /// Re-running this has no effect if we already subscribed before, and is
174    /// cheap.
175    pub fn subscribe(&self) -> Result<()> {
176        let client = self.inner.client()?;
177
178        // Initialize the drop handles.
179        let _ = self.inner.drop_handles.get_or_init(|| {
180            // Spawn the task that will listen to all the room updates at once.
181            let listen_updates_task = spawn(Self::listen_task(
182                self.inner.clone(),
183                client.subscribe_to_all_room_updates(),
184            ));
185
186            let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
187                self.inner.clone(),
188                client.subscribe_to_ignore_user_list_changes(),
189            ));
190
191            let (tx, rx) = mpsc::channel(32);
192
193            // Force-initialize the sender in the [`RoomEventCacheInner`].
194            self.inner.auto_shrink_sender.get_or_init(|| tx);
195
196            let auto_shrink_linked_chunk_tasks =
197                spawn(Self::auto_shrink_linked_chunk_task(self.inner.clone(), rx));
198
199            Arc::new(EventCacheDropHandles {
200                listen_updates_task,
201                ignore_user_list_update_task,
202                auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_tasks,
203            })
204        });
205
206        Ok(())
207    }
208
209    #[instrument(skip_all)]
210    async fn ignore_user_list_update_task(
211        inner: Arc<EventCacheInner>,
212        mut ignore_user_list_stream: Subscriber<Vec<String>>,
213    ) {
214        let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
215        span.follows_from(Span::current());
216
217        async move {
218            while ignore_user_list_stream.next().await.is_some() {
219                info!("Received an ignore user list change");
220                if let Err(err) = inner.clear_all_rooms().await {
221                    error!("when clearing room storage after ignore user list change: {err}");
222                }
223            }
224            info!("Ignore user list stream has closed");
225        }
226        .instrument(span)
227        .await;
228    }
229
230    #[instrument(skip_all)]
231    async fn listen_task(
232        inner: Arc<EventCacheInner>,
233        mut room_updates_feed: Receiver<RoomUpdates>,
234    ) {
235        trace!("Spawning the listen task");
236        loop {
237            match room_updates_feed.recv().await {
238                Ok(updates) => {
239                    if let Err(err) = inner.handle_room_updates(updates).await {
240                        match err {
241                            EventCacheError::ClientDropped => {
242                                // The client has dropped, exit the listen task.
243                                info!("Closing the event cache global listen task because client dropped");
244                                break;
245                            }
246                            err => {
247                                error!("Error when handling room updates: {err}");
248                            }
249                        }
250                    }
251                }
252
253                Err(RecvError::Lagged(num_skipped)) => {
254                    // Forget everything we know; we could have missed events, and we have
255                    // no way to reconcile at the moment!
256                    // TODO: implement Smart Matching™,
257                    warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
258                    if let Err(err) = inner.clear_all_rooms().await {
259                        error!("when clearing storage after lag in listen_task: {err}");
260                    }
261                }
262
263                Err(RecvError::Closed) => {
264                    // The sender has shut down, exit.
265                    info!("Closing the event cache global listen task because receiver closed");
266                    break;
267                }
268            }
269        }
270    }
271
272    /// Spawns the task that will listen to auto-shrink notifications.
273    ///
274    /// The auto-shrink mechanism works this way:
275    ///
276    /// - Each time there's a new subscriber to a [`RoomEventCache`], it will
277    ///   increment the active number of listeners to that room, aka
278    ///   [`RoomEventCacheState::listener_count`].
279    /// - When that subscriber is dropped, it will decrement that count; and
280    ///   notify the task below if it reached 0.
281    /// - The task spawned here, owned by the [`EventCacheInner`], will listen
282    ///   to such notifications that a room may be shrunk. It will attempt an
283    ///   auto-shrink, by letting the inner state decide whether this is a good
284    ///   time to do so (new listeners might have spawned in the meanwhile).
285    #[instrument(skip_all)]
286    async fn auto_shrink_linked_chunk_task(
287        inner: Arc<EventCacheInner>,
288        mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
289    ) {
290        while let Some(room_id) = rx.recv().await {
291            trace!(for_room = %room_id, "received notification to shrink");
292
293            let room = match inner.for_room(&room_id).await {
294                Ok(room) => room,
295                Err(err) => {
296                    warn!(for_room = %room_id, "error when getting a RoomEventCache: {err}");
297                    continue;
298                }
299            };
300
301            trace!("waiting for state lock…");
302            let mut state = room.inner.state.write().await;
303
304            match state.auto_shrink_if_no_listeners().await {
305                Ok(diffs) => {
306                    if let Some(diffs) = diffs {
307                        // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
308                        // listeners, right? RIGHT? Especially because the state is guarded behind
309                        // a lock.
310                        //
311                        // However, better safe than sorry, and it's cheap to send an update here,
312                        // so let's do it!
313                        if !diffs.is_empty() {
314                            let _ = room.inner.sender.send(
315                                RoomEventCacheUpdate::UpdateTimelineEvents {
316                                    diffs,
317                                    origin: EventsOrigin::Cache,
318                                },
319                            );
320                        }
321                    } else {
322                        debug!("auto-shrinking didn't happen");
323                    }
324                }
325
326                Err(err) => {
327                    // There's not much we can do here, unfortunately.
328                    warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
329                }
330            }
331        }
332    }
333
334    /// Return a room-specific view over the [`EventCache`].
335    pub(crate) async fn for_room(
336        &self,
337        room_id: &RoomId,
338    ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
339        let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
340            return Err(EventCacheError::NotSubscribedYet);
341        };
342
343        let room = self.inner.for_room(room_id).await?;
344
345        Ok((room, drop_handles))
346    }
347
348    /// Cleanly clear all the rooms' event caches.
349    ///
350    /// This will notify any live observers that the room has been cleared.
351    pub async fn clear_all_rooms(&self) -> Result<()> {
352        self.inner.clear_all_rooms().await
353    }
354}
355
356struct EventCacheInner {
357    /// A weak reference to the inner client, useful when trying to get a handle
358    /// on the owning client.
359    client: WeakClient,
360
361    /// Reference to the underlying store.
362    store: EventCacheStoreLock,
363
364    /// A lock used when many rooms must be updated at once.
365    ///
366    /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
367    /// ensure that multiple updates will be applied in the correct order, which
368    /// is enforced by taking this lock when handling an update.
369    // TODO: that's the place to add a cross-process lock!
370    multiple_room_updates_lock: Mutex<()>,
371
372    /// Lazily-filled cache of live [`RoomEventCache`], once per room.
373    by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
374
375    /// Handles to keep alive the task listening to updates.
376    drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
377
378    /// A sender for notifications that a room *may* need to be auto-shrunk.
379    ///
380    /// Needs to live here, so it may be passed to each [`RoomEventCache`]
381    /// instance.
382    ///
383    /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
384    auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
385}
386
387type AutoShrinkChannelPayload = OwnedRoomId;
388
389impl EventCacheInner {
390    fn client(&self) -> Result<Client> {
391        self.client.get().ok_or(EventCacheError::ClientDropped)
392    }
393
394    /// Clears all the room's data.
395    async fn clear_all_rooms(&self) -> Result<()> {
396        // Okay, here's where things get complicated.
397        //
398        // On the one hand, `by_room` may include storage for *some* rooms that we know
399        // about, but not *all* of them. Any room that hasn't been loaded in the
400        // client, or touched by a sync, will remain unloaded in memory, so it
401        // will be missing from `self.by_room`. As a result, we need to make
402        // sure that we're hitting the storage backend to *really* clear all the
403        // rooms, including those that haven't been loaded yet.
404        //
405        // On the other hand, one must NOT clear the `by_room` map, because if someone
406        // subscribed to a room update, they would never get any new update for
407        // that room, since re-creating the `RoomEventCache` would create a new,
408        // unrelated sender.
409        //
410        // So we need to *keep* the rooms in `by_room` alive, while clearing them in the
411        // store backend.
412        //
413        // As a result, for a short while, the in-memory linked chunks
414        // will be desynchronized from the storage. We need to be careful then. During
415        // that short while, we don't want *anyone* to touch the linked chunk
416        // (be it in memory or in the storage).
417        //
418        // And since that requirement applies to *any* room in `by_room` at the same
419        // time, we'll have to take the locks for *all* the live rooms, so as to
420        // properly clear the underlying storage.
421        //
422        // At this point, you might be scared about the potential for deadlocking. I am
423        // as well, but I'm convinced we're fine:
424        // 1. the lock for `by_room` is usually held only for a short while, and
425        //    independently of the other two kinds.
426        // 2. the state may acquire the store cross-process lock internally, but only
427        //    while the state's methods are called (so it's always transient). As a
428        //    result, as soon as we've acquired the state locks, the store lock ought to
429        //    be free.
430        // 3. The store lock is held explicitly only in a small scoped area below.
431        // 4. Then the store lock will be held internally when calling `reset()`, but at
432        //    this point it's only held for a short while each time, so rooms will take
433        //    turn to acquire it.
434
435        let rooms = self.by_room.write().await;
436
437        // Collect all the rooms' state locks, first: we can clear the storage only when
438        // nobody will touch it at the same time.
439        let room_locks = join_all(
440            rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
441        )
442        .await;
443
444        // Clear the storage for all the rooms, using the storage facility.
445        self.store.lock().await?.clear_all_linked_chunks().await?;
446
447        // At this point, all the in-memory linked chunks are desynchronized from the
448        // storage. Resynchronize them manually by calling reset(), and
449        // propagate updates to observers.
450        try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move {
451            let updates_as_vector_diffs = state_guard.reset().await?;
452            let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
453                diffs: updates_as_vector_diffs,
454                origin: EventsOrigin::Cache,
455            });
456            Ok::<_, EventCacheError>(())
457        }))
458        .await?;
459
460        Ok(())
461    }
462
463    /// Handles a single set of room updates at once.
464    #[instrument(skip(self, updates))]
465    async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
466        // First, take the lock that indicates we're processing updates, to avoid
467        // handling multiple updates concurrently.
468        let _lock = self.multiple_room_updates_lock.lock().await;
469
470        // Left rooms.
471        for (room_id, left_room_update) in updates.left {
472            let room = self.for_room(&room_id).await?;
473
474            if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
475                // Non-fatal error, try to continue to the next room.
476                error!("handling left room update: {err}");
477            }
478        }
479
480        // Joined rooms.
481        for (room_id, joined_room_update) in updates.joined {
482            let room = self.for_room(&room_id).await?;
483
484            if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
485                // Non-fatal error, try to continue to the next room.
486                error!(%room_id, "handling joined room update: {err}");
487            }
488        }
489
490        // Invited rooms.
491        // TODO: we don't anything with `updates.invite` at this point.
492
493        Ok(())
494    }
495
496    /// Return a room-specific view over the [`EventCache`].
497    ///
498    /// It may not be found, if the room isn't known to the client, in which
499    /// case it'll return None.
500    async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
501        // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
502        // write lock.
503        let by_room_guard = self.by_room.read().await;
504
505        match by_room_guard.get(room_id) {
506            Some(room) => Ok(room.clone()),
507
508            None => {
509                // Slow-path: the entry doesn't exist; let's acquire a write lock.
510                drop(by_room_guard);
511                let mut by_room_guard = self.by_room.write().await;
512
513                // In the meanwhile, some other caller might have obtained write access and done
514                // the same, so check for existence again.
515                if let Some(room) = by_room_guard.get(room_id) {
516                    return Ok(room.clone());
517                }
518
519                let pagination_status =
520                    SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
521
522                let room_version = self
523                    .client
524                    .get()
525                    .and_then(|client| client.get_room(room_id))
526                    .as_ref()
527                    .map(|room| room.clone_info().room_version_or_default())
528                    .unwrap_or_else(|| {
529                        warn!("unknown room version for {room_id}, using default V1");
530                        RoomVersionId::V1
531                    });
532
533                let room_state = RoomEventCacheState::new(
534                    room_id.to_owned(),
535                    room_version,
536                    self.store.clone(),
537                    pagination_status.clone(),
538                )
539                .await?;
540
541                // SAFETY: we must have subscribed before reaching this coed, otherwise
542                // something is very wrong.
543                let auto_shrink_sender =
544                    self.auto_shrink_sender.get().cloned().expect(
545                        "we must have called `EventCache::subscribe()` before calling here.",
546                    );
547
548                let room_event_cache = RoomEventCache::new(
549                    self.client.clone(),
550                    room_state,
551                    pagination_status,
552                    room_id.to_owned(),
553                    auto_shrink_sender,
554                );
555
556                by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
557
558                Ok(room_event_cache)
559            }
560        }
561    }
562}
563
564/// The result of a single back-pagination request.
565#[derive(Debug)]
566pub struct BackPaginationOutcome {
567    /// Did the back-pagination reach the start of the timeline?
568    pub reached_start: bool,
569
570    /// All the events that have been returned in the back-pagination
571    /// request.
572    ///
573    /// Events are presented in reverse order: the first element of the vec,
574    /// if present, is the most "recent" event from the chunk (or
575    /// technically, the last one in the topological ordering).
576    pub events: Vec<TimelineEvent>,
577}
578
579/// An update related to events happened in a room.
580#[derive(Debug, Clone)]
581pub enum RoomEventCacheUpdate {
582    /// The fully read marker has moved to a different event.
583    MoveReadMarkerTo {
584        /// Event at which the read marker is now pointing.
585        event_id: OwnedEventId,
586    },
587
588    /// The members have changed.
589    UpdateMembers {
590        /// Collection of ambiguity changes that room member events trigger.
591        ///
592        /// This is a map of event ID of the `m.room.member` event to the
593        /// details of the ambiguity change.
594        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
595    },
596
597    /// The room has received updates for the timeline as _diffs_.
598    UpdateTimelineEvents {
599        /// Diffs to apply to the timeline.
600        diffs: Vec<VectorDiff<TimelineEvent>>,
601
602        /// Where the diffs are coming from.
603        origin: EventsOrigin,
604    },
605
606    /// The room has received new ephemeral events.
607    AddEphemeralEvents {
608        /// XXX: this is temporary, until read receipts are handled in the event
609        /// cache
610        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
611    },
612}
613
614/// Indicate where events are coming from.
615#[derive(Debug, Clone)]
616pub enum EventsOrigin {
617    /// Events are coming from a sync.
618    Sync,
619
620    /// Events are coming from pagination.
621    Pagination,
622
623    /// The cause of the change is purely internal to the cache.
624    Cache,
625}
626
627#[cfg(test)]
628mod tests {
629    use assert_matches::assert_matches;
630    use futures_util::FutureExt as _;
631    use matrix_sdk_base::sync::{JoinedRoomUpdate, RoomUpdates, Timeline};
632    use matrix_sdk_test::{async_test, event_factory::EventFactory};
633    use ruma::{event_id, room_id, serde::Raw, user_id};
634    use serde_json::json;
635
636    use super::{EventCacheError, RoomEventCacheUpdate};
637    use crate::test_utils::{assert_event_matches_msg, logged_in_client};
638
639    #[async_test]
640    async fn test_must_explicitly_subscribe() {
641        let client = logged_in_client(None).await;
642
643        let event_cache = client.event_cache();
644
645        // If I create a room event subscriber for a room before subscribing the event
646        // cache,
647        let room_id = room_id!("!omelette:fromage.fr");
648        let result = event_cache.for_room(room_id).await;
649
650        // Then it fails, because one must explicitly call `.subscribe()` on the event
651        // cache.
652        assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
653    }
654
655    #[async_test]
656    async fn test_uniq_read_marker() {
657        let client = logged_in_client(None).await;
658        let room_id = room_id!("!galette:saucisse.bzh");
659        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
660
661        let event_cache = client.event_cache();
662
663        event_cache.subscribe().unwrap();
664
665        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
666
667        let (events, mut stream) = room_event_cache.subscribe().await;
668
669        assert!(events.is_empty());
670
671        // When sending multiple times the same read marker event,…
672        let read_marker_event = Raw::from_json_string(
673            json!({
674                "content": {
675                    "event_id": "$crepe:saucisse.bzh"
676                },
677                "room_id": "!galette:saucisse.bzh",
678                "type": "m.fully_read"
679            })
680            .to_string(),
681        )
682        .unwrap();
683        let account_data = vec![read_marker_event; 100];
684
685        room_event_cache
686            .inner
687            .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
688            .await
689            .unwrap();
690
691        // … there's only one read marker update.
692        assert_matches!(
693            stream.recv().await.unwrap(),
694            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
695        );
696
697        assert!(stream.recv().now_or_never().is_none());
698    }
699
700    #[async_test]
701    async fn test_get_event_by_id() {
702        let client = logged_in_client(None).await;
703        let room_id1 = room_id!("!galette:saucisse.bzh");
704        let room_id2 = room_id!("!crepe:saucisse.bzh");
705
706        let event_cache = client.event_cache();
707        event_cache.subscribe().unwrap();
708
709        // Insert two rooms with a few events.
710        let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
711
712        let eid1 = event_id!("$1");
713        let eid2 = event_id!("$2");
714        let eid3 = event_id!("$3");
715
716        let joined_room_update1 = JoinedRoomUpdate {
717            timeline: Timeline {
718                events: vec![
719                    f.text_msg("hey").event_id(eid1).into(),
720                    f.text_msg("you").event_id(eid2).into(),
721                ],
722                ..Default::default()
723            },
724            ..Default::default()
725        };
726
727        let joined_room_update2 = JoinedRoomUpdate {
728            timeline: Timeline {
729                events: vec![f.text_msg("bjr").event_id(eid3).into()],
730                ..Default::default()
731            },
732            ..Default::default()
733        };
734
735        let mut updates = RoomUpdates::default();
736        updates.joined.insert(room_id1.to_owned(), joined_room_update1);
737        updates.joined.insert(room_id2.to_owned(), joined_room_update2);
738
739        // Have the event cache handle them.
740        event_cache.inner.handle_room_updates(updates).await.unwrap();
741
742        // We can find the events in a single room.
743        client.base_client().get_or_create_room(room_id1, matrix_sdk_base::RoomState::Joined);
744        let room1 = client.get_room(room_id1).unwrap();
745
746        let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
747
748        let found1 = room_event_cache.event(eid1).await.unwrap();
749        assert_event_matches_msg(&found1, "hey");
750
751        let found2 = room_event_cache.event(eid2).await.unwrap();
752        assert_event_matches_msg(&found2, "you");
753
754        // Retrieving the event with id3 from the room which doesn't contain it will
755        // fail…
756        assert!(room_event_cache.event(eid3).await.is_none());
757    }
758
759    #[async_test]
760    async fn test_save_event() {
761        let client = logged_in_client(None).await;
762        let room_id = room_id!("!galette:saucisse.bzh");
763
764        let event_cache = client.event_cache();
765        event_cache.subscribe().unwrap();
766
767        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
768        let event_id = event_id!("$1");
769
770        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
771        let room = client.get_room(room_id).unwrap();
772
773        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
774        room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
775
776        // Retrieving the event at the room-wide cache works.
777        assert!(room_event_cache.event(event_id).await.is_some());
778    }
779}