matrix_sdk/event_cache/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31    collections::BTreeMap,
32    fmt::Debug,
33    sync::{Arc, OnceLock},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40    deserialized_responses::{AmbiguityChange, TimelineEvent},
41    event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
42    linked_chunk::lazy_loader::LazyLoaderError,
43    store_locks::LockStoreError,
44    sync::RoomUpdates,
45};
46use matrix_sdk_common::executor::{spawn, JoinHandle};
47use once_cell::sync::OnceCell;
48use room::RoomEventCacheState;
49use ruma::{
50    events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
51};
52use tokio::sync::{
53    broadcast::{error::RecvError, Receiver},
54    mpsc, Mutex, RwLock,
55};
56use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span};
57
58use self::paginator::PaginatorError;
59use crate::{client::WeakClient, Client};
60
61mod deduplicator;
62mod pagination;
63mod room;
64
65pub mod paginator;
66pub use pagination::{PaginationToken, RoomPagination, RoomPaginationStatus};
67pub use room::{RoomEventCache, RoomEventCacheListener};
68
69/// An error observed in the [`EventCache`].
70#[derive(thiserror::Error, Debug)]
71pub enum EventCacheError {
72    /// The [`EventCache`] instance hasn't been initialized with
73    /// [`EventCache::subscribe`]
74    #[error(
75        "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
76    )]
77    NotSubscribedYet,
78
79    /// An error has been observed while back-paginating.
80    #[error(transparent)]
81    BackpaginationError(#[from] PaginatorError),
82
83    /// Back-pagination was already happening in a given room, where we tried to
84    /// back-paginate again.
85    #[error("We were already back-paginating.")]
86    AlreadyBackpaginating,
87
88    /// An error happening when interacting with storage.
89    #[error(transparent)]
90    Storage(#[from] EventCacheStoreError),
91
92    /// An error happening when attempting to (cross-process) lock storage.
93    #[error(transparent)]
94    LockingStorage(#[from] LockStoreError),
95
96    /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
97    /// to. It's possible this weak reference points to nothing anymore, at
98    /// times where we try to use the client.
99    #[error("The owning client of the event cache has been dropped.")]
100    ClientDropped,
101
102    /// An error happening when interacting with the [`LinkedChunk`]'s lazy
103    /// loader.
104    ///
105    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
106    #[error(transparent)]
107    LinkedChunkLoader(#[from] LazyLoaderError),
108}
109
110/// A result using the [`EventCacheError`].
111pub type Result<T> = std::result::Result<T, EventCacheError>;
112
113/// Hold handles to the tasks spawn by a [`RoomEventCache`].
114pub struct EventCacheDropHandles {
115    /// Task that listens to room updates.
116    listen_updates_task: JoinHandle<()>,
117
118    /// Task that listens to updates to the user's ignored list.
119    ignore_user_list_update_task: JoinHandle<()>,
120
121    /// The task used to automatically shrink the linked chunks.
122    auto_shrink_linked_chunk_task: JoinHandle<()>,
123}
124
125impl Debug for EventCacheDropHandles {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
128    }
129}
130
131impl Drop for EventCacheDropHandles {
132    fn drop(&mut self) {
133        self.listen_updates_task.abort();
134        self.ignore_user_list_update_task.abort();
135        self.auto_shrink_linked_chunk_task.abort();
136    }
137}
138
139/// An event cache, providing lots of useful functionality for clients.
140///
141/// Cloning is shallow, and thus is cheap to do.
142///
143/// See also the module-level comment.
144#[derive(Clone)]
145pub struct EventCache {
146    /// Reference to the inner cache.
147    inner: Arc<EventCacheInner>,
148}
149
150impl Debug for EventCache {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        f.debug_struct("EventCache").finish_non_exhaustive()
153    }
154}
155
156impl EventCache {
157    /// Create a new [`EventCache`] for the given client.
158    pub(crate) fn new(client: WeakClient) -> Self {
159        Self {
160            inner: Arc::new(EventCacheInner {
161                client,
162                store: Default::default(),
163                multiple_room_updates_lock: Default::default(),
164                by_room: Default::default(),
165                drop_handles: Default::default(),
166                auto_shrink_sender: Default::default(),
167            }),
168        }
169    }
170
171    /// Enable storing updates to storage, and reload events from storage.
172    ///
173    /// Has an effect only the first time it's called. It's safe to call it
174    /// multiple times.
175    pub fn enable_storage(&self) -> Result<()> {
176        let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| {
177            let client = self.inner.client()?;
178            Ok(client.event_cache_store().clone())
179        })?;
180        Ok(())
181    }
182
183    /// Check whether the storage is enabled or not.
184    pub fn has_storage(&self) -> bool {
185        self.inner.has_storage()
186    }
187
188    /// Starts subscribing the [`EventCache`] to sync responses, if not done
189    /// before.
190    ///
191    /// Re-running this has no effect if we already subscribed before, and is
192    /// cheap.
193    pub fn subscribe(&self) -> Result<()> {
194        let client = self.inner.client()?;
195
196        let _ = self.inner.drop_handles.get_or_init(|| {
197            // Spawn the task that will listen to all the room updates at once.
198            let listen_updates_task = spawn(Self::listen_task(
199                self.inner.clone(),
200                client.subscribe_to_all_room_updates(),
201            ));
202
203            let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
204                self.inner.clone(),
205                client.subscribe_to_ignore_user_list_changes(),
206            ));
207
208            let (tx, rx) = mpsc::channel(32);
209
210            // Force-initialize the sender in the [`RoomEventCacheInner`].
211            self.inner.auto_shrink_sender.get_or_init(|| tx);
212
213            let auto_shrink_linked_chunk_tasks =
214                spawn(Self::auto_shrink_linked_chunk_task(self.inner.clone(), rx));
215
216            Arc::new(EventCacheDropHandles {
217                listen_updates_task,
218                ignore_user_list_update_task,
219                auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_tasks,
220            })
221        });
222
223        Ok(())
224    }
225
226    #[instrument(skip_all)]
227    async fn ignore_user_list_update_task(
228        inner: Arc<EventCacheInner>,
229        mut ignore_user_list_stream: Subscriber<Vec<String>>,
230    ) {
231        let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
232        span.follows_from(Span::current());
233
234        async move {
235            while ignore_user_list_stream.next().await.is_some() {
236                info!("Received an ignore user list change");
237                if let Err(err) = inner.clear_all_rooms().await {
238                    error!("when clearing room storage after ignore user list change: {err}");
239                }
240            }
241            info!("Ignore user list stream has closed");
242        }
243        .instrument(span)
244        .await;
245    }
246
247    #[instrument(skip_all)]
248    async fn listen_task(
249        inner: Arc<EventCacheInner>,
250        mut room_updates_feed: Receiver<RoomUpdates>,
251    ) {
252        trace!("Spawning the listen task");
253        loop {
254            match room_updates_feed.recv().await {
255                Ok(updates) => {
256                    if let Err(err) = inner.handle_room_updates(updates).await {
257                        match err {
258                            EventCacheError::ClientDropped => {
259                                // The client has dropped, exit the listen task.
260                                info!("Closing the event cache global listen task because client dropped");
261                                break;
262                            }
263                            err => {
264                                error!("Error when handling room updates: {err}");
265                            }
266                        }
267                    }
268                }
269
270                Err(RecvError::Lagged(num_skipped)) => {
271                    // Forget everything we know; we could have missed events, and we have
272                    // no way to reconcile at the moment!
273                    // TODO: implement Smart Matching™,
274                    warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
275                    if let Err(err) = inner.clear_all_rooms().await {
276                        error!("when clearing storage after lag in listen_task: {err}");
277                    }
278                }
279
280                Err(RecvError::Closed) => {
281                    // The sender has shut down, exit.
282                    info!("Closing the event cache global listen task because receiver closed");
283                    break;
284                }
285            }
286        }
287    }
288
289    /// Spawns the task that will listen to auto-shrink notifications.
290    ///
291    /// The auto-shrink mechanism works this way:
292    ///
293    /// - Each time there's a new subscriber to a [`RoomEventCache`], it will
294    ///   increment the active number of listeners to that room, aka
295    ///   [`RoomEventCacheState::listener_count`].
296    /// - When that subscriber is dropped, it will decrement that count; and
297    ///   notify the task below if it reached 0.
298    /// - The task spawned here, owned by the [`EventCacheInner`], will listen
299    ///   to such notifications that a room may be shrunk. It will attempt an
300    ///   auto-shrink, by letting the inner state decide whether this is a good
301    ///   time to do so (new listeners might have spawned in the meanwhile).
302    #[instrument(skip_all)]
303    async fn auto_shrink_linked_chunk_task(
304        inner: Arc<EventCacheInner>,
305        mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
306    ) {
307        while let Some(room_id) = rx.recv().await {
308            trace!(for_room = %room_id, "received notification to shrink");
309
310            let room = match inner.for_room(&room_id).await {
311                Ok(room) => room,
312                Err(err) => {
313                    warn!(for_room = %room_id, "error when getting a RoomEventCache: {err}");
314                    continue;
315                }
316            };
317
318            trace!("waiting for state lock…");
319            let mut state = room.inner.state.write().await;
320
321            match state.auto_shrink_if_no_listeners().await {
322                Ok(diffs) => {
323                    if let Some(diffs) = diffs {
324                        // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
325                        // listeners, right? RIGHT? Especially because the state is guarded behind
326                        // a lock.
327                        //
328                        // However, better safe than sorry, and it's cheap to send an update here,
329                        // so let's do it!
330                        if !diffs.is_empty() {
331                            let _ = room.inner.sender.send(
332                                RoomEventCacheUpdate::UpdateTimelineEvents {
333                                    diffs,
334                                    origin: EventsOrigin::Cache,
335                                },
336                            );
337                        }
338                    } else {
339                        debug!("auto-shrinking didn't happen");
340                    }
341                }
342
343                Err(err) => {
344                    // There's not much we can do here, unfortunately.
345                    warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
346                }
347            }
348        }
349    }
350
351    /// Return a room-specific view over the [`EventCache`].
352    pub(crate) async fn for_room(
353        &self,
354        room_id: &RoomId,
355    ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
356        let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
357            return Err(EventCacheError::NotSubscribedYet);
358        };
359
360        let room = self.inner.for_room(room_id).await?;
361
362        Ok((room, drop_handles))
363    }
364
365    /// Cleanly clear all the rooms' event caches.
366    ///
367    /// This will notify any live observers that the room has been cleared.
368    pub async fn clear_all_rooms(&self) -> Result<()> {
369        self.inner.clear_all_rooms().await
370    }
371
372    /// Add an initial set of events to the event cache, reloaded from a cache.
373    ///
374    /// TODO: temporary for API compat, as the event cache should take care of
375    /// its own store.
376    #[instrument(skip(self, events))]
377    pub async fn add_initial_events(
378        &self,
379        room_id: &RoomId,
380        events: Vec<TimelineEvent>,
381        prev_batch: Option<String>,
382    ) -> Result<()> {
383        // If the event cache's storage has been enabled, do nothing.
384        if self.inner.has_storage() {
385            return Ok(());
386        }
387
388        let room_cache = self.inner.for_room(room_id).await?;
389
390        // If the linked chunked already has at least one event, ignore this request, as
391        // it should happen at most once per room.
392        if !room_cache.inner.state.read().await.events().is_empty() {
393            return Ok(());
394        }
395
396        // We could have received events during a previous sync; remove them all, since
397        // we can't know where to insert the "initial events" with respect to
398        // them.
399
400        room_cache
401            .inner
402            .replace_all_events_by(
403                events,
404                prev_batch,
405                Default::default(),
406                Default::default(),
407                EventsOrigin::Cache,
408            )
409            .await?;
410
411        Ok(())
412    }
413}
414
415struct EventCacheInner {
416    /// A weak reference to the inner client, useful when trying to get a handle
417    /// on the owning client.
418    client: WeakClient,
419
420    /// Reference to the underlying store.
421    ///
422    /// Set to none if we shouldn't use storage for reading / writing linked
423    /// chunks.
424    store: Arc<OnceCell<EventCacheStoreLock>>,
425
426    /// A lock used when many rooms must be updated at once.
427    ///
428    /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
429    /// ensure that multiple updates will be applied in the correct order, which
430    /// is enforced by taking this lock when handling an update.
431    // TODO: that's the place to add a cross-process lock!
432    multiple_room_updates_lock: Mutex<()>,
433
434    /// Lazily-filled cache of live [`RoomEventCache`], once per room.
435    by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
436
437    /// Handles to keep alive the task listening to updates.
438    drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
439
440    /// A sender for notifications that a room *may* need to be auto-shrunk.
441    ///
442    /// Needs to live here, so it may be passed to each [`RoomEventCache`]
443    /// instance.
444    ///
445    /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
446    auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
447}
448
449type AutoShrinkChannelPayload = OwnedRoomId;
450
451impl EventCacheInner {
452    fn client(&self) -> Result<Client> {
453        self.client.get().ok_or(EventCacheError::ClientDropped)
454    }
455
456    /// Has persistent storage been enabled for the event cache?
457    fn has_storage(&self) -> bool {
458        self.store.get().is_some()
459    }
460
461    /// Clears all the room's data.
462    async fn clear_all_rooms(&self) -> Result<()> {
463        // Okay, here's where things get complicated.
464        //
465        // On the one hand, `by_room` may include storage for *some* rooms that we know
466        // about, but not *all* of them. Any room that hasn't been loaded in the
467        // client, or touched by a sync, will remain unloaded in memory, so it
468        // will be missing from `self.by_room`. As a result, we need to make
469        // sure that we're hitting the storage backend to *really* clear all the
470        // rooms, including those that haven't been loaded yet.
471        //
472        // On the other hand, one must NOT clear the `by_room` map, because if someone
473        // subscribed to a room update, they would never get any new update for
474        // that room, since re-creating the `RoomEventCache` would create a new,
475        // unrelated sender.
476        //
477        // So we need to *keep* the rooms in `by_room` alive, while clearing them in the
478        // store backend.
479        //
480        // As a result, for a short while, the in-memory linked chunks
481        // will be desynchronized from the storage. We need to be careful then. During
482        // that short while, we don't want *anyone* to touch the linked chunk
483        // (be it in memory or in the storage).
484        //
485        // And since that requirement applies to *any* room in `by_room` at the same
486        // time, we'll have to take the locks for *all* the live rooms, so as to
487        // properly clear the underlying storage.
488        //
489        // At this point, you might be scared about the potential for deadlocking. I am
490        // as well, but I'm convinced we're fine:
491        // 1. the lock for `by_room` is usually held only for a short while, and
492        //    independently of the other two kinds.
493        // 2. the state may acquire the store cross-process lock internally, but only
494        //    while the state's methods are called (so it's always transient). As a
495        //    result, as soon as we've acquired the state locks, the store lock ought to
496        //    be free.
497        // 3. The store lock is held explicitly only in a small scoped area below.
498        // 4. Then the store lock will be held internally when calling `reset()`, but at
499        //    this point it's only held for a short while each time, so rooms will take
500        //    turn to acquire it.
501
502        let rooms = self.by_room.write().await;
503
504        // Collect all the rooms' state locks, first: we can clear the storage only when
505        // nobody will touch it at the same time.
506        let room_locks = join_all(
507            rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
508        )
509        .await;
510
511        // Clear the storage for all the rooms, using the storage facility.
512        if let Some(store) = self.store.get() {
513            let store_guard = store.lock().await?;
514            store_guard.clear_all_rooms_chunks().await?;
515        }
516
517        // At this point, all the in-memory linked chunks are desynchronized from the
518        // storage. Resynchronize them manually by calling reset(), and
519        // propagate updates to observers.
520        try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move {
521            let updates_as_vector_diffs = state_guard.reset().await?;
522            let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
523                diffs: updates_as_vector_diffs,
524                origin: EventsOrigin::Cache,
525            });
526            Ok::<_, EventCacheError>(())
527        }))
528        .await?;
529
530        Ok(())
531    }
532
533    /// Handles a single set of room updates at once.
534    #[instrument(skip(self, updates))]
535    async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
536        // First, take the lock that indicates we're processing updates, to avoid
537        // handling multiple updates concurrently.
538        let _lock = self.multiple_room_updates_lock.lock().await;
539
540        // Left rooms.
541        for (room_id, left_room_update) in updates.leave {
542            let room = self.for_room(&room_id).await?;
543
544            if let Err(err) =
545                room.inner.handle_left_room_update(self.has_storage(), left_room_update).await
546            {
547                // Non-fatal error, try to continue to the next room.
548                error!("handling left room update: {err}");
549            }
550        }
551
552        // Joined rooms.
553        for (room_id, joined_room_update) in updates.join {
554            let room = self.for_room(&room_id).await?;
555
556            if let Err(err) =
557                room.inner.handle_joined_room_update(self.has_storage(), joined_room_update).await
558            {
559                // Non-fatal error, try to continue to the next room.
560                error!(%room_id, "handling joined room update: {err}");
561            }
562        }
563
564        // Invited rooms.
565        // TODO: we don't anything with `updates.invite` at this point.
566
567        Ok(())
568    }
569
570    /// Return a room-specific view over the [`EventCache`].
571    ///
572    /// It may not be found, if the room isn't known to the client, in which
573    /// case it'll return None.
574    async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
575        // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
576        // write lock.
577        let by_room_guard = self.by_room.read().await;
578
579        match by_room_guard.get(room_id) {
580            Some(room) => Ok(room.clone()),
581
582            None => {
583                // Slow-path: the entry doesn't exist; let's acquire a write lock.
584                drop(by_room_guard);
585                let mut by_room_guard = self.by_room.write().await;
586
587                // In the meanwhile, some other caller might have obtained write access and done
588                // the same, so check for existence again.
589                if let Some(room) = by_room_guard.get(room_id) {
590                    return Ok(room.clone());
591                }
592
593                let pagination_status =
594                    SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
595
596                let room_version = self
597                    .client
598                    .get()
599                    .and_then(|client| client.get_room(room_id))
600                    .as_ref()
601                    .map(|room| room.clone_info().room_version_or_default())
602                    .unwrap_or_else(|| {
603                        warn!("unknown room version for {room_id}, using default V1");
604                        RoomVersionId::V1
605                    });
606
607                let room_state = RoomEventCacheState::new(
608                    room_id.to_owned(),
609                    room_version,
610                    self.store.clone(),
611                    pagination_status.clone(),
612                )
613                .await?;
614
615                // SAFETY: we must have subscribed before reaching this coed, otherwise
616                // something is very wrong.
617                let auto_shrink_sender =
618                    self.auto_shrink_sender.get().cloned().expect(
619                        "we must have called `EventCache::subscribe()` before calling here.",
620                    );
621
622                let room_event_cache = RoomEventCache::new(
623                    self.client.clone(),
624                    room_state,
625                    pagination_status,
626                    room_id.to_owned(),
627                    auto_shrink_sender,
628                );
629
630                by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
631
632                Ok(room_event_cache)
633            }
634        }
635    }
636}
637
638/// The result of a single back-pagination request.
639#[derive(Debug)]
640pub struct BackPaginationOutcome {
641    /// Did the back-pagination reach the start of the timeline?
642    pub reached_start: bool,
643
644    /// All the events that have been returned in the back-pagination
645    /// request.
646    ///
647    /// Events are presented in reverse order: the first element of the vec,
648    /// if present, is the most "recent" event from the chunk (or
649    /// technically, the last one in the topological ordering).
650    pub events: Vec<TimelineEvent>,
651}
652
653/// An update related to events happened in a room.
654#[derive(Debug, Clone)]
655pub enum RoomEventCacheUpdate {
656    /// The fully read marker has moved to a different event.
657    MoveReadMarkerTo {
658        /// Event at which the read marker is now pointing.
659        event_id: OwnedEventId,
660    },
661
662    /// The members have changed.
663    UpdateMembers {
664        /// Collection of ambiguity changes that room member events trigger.
665        ///
666        /// This is a map of event ID of the `m.room.member` event to the
667        /// details of the ambiguity change.
668        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
669    },
670
671    /// The room has received updates for the timeline as _diffs_.
672    UpdateTimelineEvents {
673        /// Diffs to apply to the timeline.
674        diffs: Vec<VectorDiff<TimelineEvent>>,
675
676        /// Where the diffs are coming from.
677        origin: EventsOrigin,
678    },
679
680    /// The room has received new ephemeral events.
681    AddEphemeralEvents {
682        /// XXX: this is temporary, until read receipts are handled in the event
683        /// cache
684        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
685    },
686}
687
688/// Indicate where events are coming from.
689#[derive(Debug, Clone)]
690pub enum EventsOrigin {
691    /// Events are coming from a sync.
692    Sync,
693
694    /// Events are coming from pagination.
695    Pagination,
696
697    /// The cause of the change is purely internal to the cache.
698    Cache,
699}
700
701#[cfg(test)]
702mod tests {
703    use assert_matches::assert_matches;
704    use futures_util::FutureExt as _;
705    use matrix_sdk_base::sync::{JoinedRoomUpdate, RoomUpdates, Timeline};
706    use matrix_sdk_test::{async_test, event_factory::EventFactory};
707    use ruma::{event_id, room_id, serde::Raw, user_id};
708    use serde_json::json;
709
710    use super::{EventCacheError, RoomEventCacheUpdate};
711    use crate::test_utils::{assert_event_matches_msg, logged_in_client};
712
713    #[async_test]
714    async fn test_must_explicitly_subscribe() {
715        let client = logged_in_client(None).await;
716
717        let event_cache = client.event_cache();
718
719        // If I create a room event subscriber for a room before subscribing the event
720        // cache,
721        let room_id = room_id!("!omelette:fromage.fr");
722        let result = event_cache.for_room(room_id).await;
723
724        // Then it fails, because one must explicitly call `.subscribe()` on the event
725        // cache.
726        assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
727    }
728
729    #[async_test]
730    async fn test_uniq_read_marker() {
731        let client = logged_in_client(None).await;
732        let room_id = room_id!("!galette:saucisse.bzh");
733        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
734
735        let event_cache = client.event_cache();
736
737        event_cache.subscribe().unwrap();
738
739        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
740
741        let (events, mut stream) = room_event_cache.subscribe().await;
742
743        assert!(events.is_empty());
744
745        // When sending multiple times the same read marker event,…
746        let read_marker_event = Raw::from_json_string(
747            json!({
748                "content": {
749                    "event_id": "$crepe:saucisse.bzh"
750                },
751                "room_id": "!galette:saucisse.bzh",
752                "type": "m.fully_read"
753            })
754            .to_string(),
755        )
756        .unwrap();
757        let account_data = vec![read_marker_event; 100];
758
759        room_event_cache
760            .inner
761            .handle_joined_room_update(
762                event_cache.inner.has_storage(),
763                JoinedRoomUpdate { account_data, ..Default::default() },
764            )
765            .await
766            .unwrap();
767
768        // … there's only one read marker update.
769        assert_matches!(
770            stream.recv().await.unwrap(),
771            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
772        );
773
774        assert!(stream.recv().now_or_never().is_none());
775    }
776
777    #[async_test]
778    async fn test_get_event_by_id() {
779        let client = logged_in_client(None).await;
780        let room_id1 = room_id!("!galette:saucisse.bzh");
781        let room_id2 = room_id!("!crepe:saucisse.bzh");
782
783        let event_cache = client.event_cache();
784        event_cache.subscribe().unwrap();
785
786        // Insert two rooms with a few events.
787        let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
788
789        let eid1 = event_id!("$1");
790        let eid2 = event_id!("$2");
791        let eid3 = event_id!("$3");
792
793        let joined_room_update1 = JoinedRoomUpdate {
794            timeline: Timeline {
795                events: vec![
796                    f.text_msg("hey").event_id(eid1).into(),
797                    f.text_msg("you").event_id(eid2).into(),
798                ],
799                ..Default::default()
800            },
801            ..Default::default()
802        };
803
804        let joined_room_update2 = JoinedRoomUpdate {
805            timeline: Timeline {
806                events: vec![f.text_msg("bjr").event_id(eid3).into()],
807                ..Default::default()
808            },
809            ..Default::default()
810        };
811
812        let mut updates = RoomUpdates::default();
813        updates.join.insert(room_id1.to_owned(), joined_room_update1);
814        updates.join.insert(room_id2.to_owned(), joined_room_update2);
815
816        // Have the event cache handle them.
817        event_cache.inner.handle_room_updates(updates).await.unwrap();
818
819        // We can find the events in a single room.
820        client.base_client().get_or_create_room(room_id1, matrix_sdk_base::RoomState::Joined);
821        let room1 = client.get_room(room_id1).unwrap();
822
823        let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
824
825        let found1 = room_event_cache.event(eid1).await.unwrap();
826        assert_event_matches_msg(&found1, "hey");
827
828        let found2 = room_event_cache.event(eid2).await.unwrap();
829        assert_event_matches_msg(&found2, "you");
830
831        // Retrieving the event with id3 from the room which doesn't contain it will
832        // fail…
833        assert!(room_event_cache.event(eid3).await.is_none());
834    }
835
836    #[async_test]
837    async fn test_save_event() {
838        let client = logged_in_client(None).await;
839        let room_id = room_id!("!galette:saucisse.bzh");
840
841        let event_cache = client.event_cache();
842        event_cache.subscribe().unwrap();
843        event_cache.enable_storage().unwrap();
844
845        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
846        let event_id = event_id!("$1");
847
848        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
849        let room = client.get_room(room_id).unwrap();
850
851        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
852        room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
853
854        // Retrieving the event at the room-wide cache works.
855        assert!(room_event_cache.event(event_id).await.is_some());
856    }
857
858    #[async_test]
859    async fn test_add_initial_events() {
860        // TODO: remove this test when the event cache uses its own persistent storage.
861        let client = logged_in_client(None).await;
862        let room_id = room_id!("!galette:saucisse.bzh");
863
864        let event_cache = client.event_cache();
865        event_cache.subscribe().unwrap();
866
867        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
868        event_cache
869            .add_initial_events(room_id, vec![f.text_msg("hey").into()], None)
870            .await
871            .unwrap();
872
873        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
874        let room = client.get_room(room_id).unwrap();
875
876        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
877        let (initial_events, _) = room_event_cache.subscribe().await;
878        // `add_initial_events` had an effect.
879        assert_eq!(initial_events.len(), 1);
880    }
881}