matrix_sdk/event_cache/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31    collections::BTreeMap,
32    fmt::Debug,
33    sync::{Arc, OnceLock},
34};
35
36use eyeball::Subscriber;
37use eyeball_im::VectorDiff;
38use matrix_sdk_base::{
39    deserialized_responses::{AmbiguityChange, TimelineEvent},
40    event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
41    store_locks::LockStoreError,
42    sync::RoomUpdates,
43};
44use matrix_sdk_common::executor::{spawn, JoinHandle};
45use once_cell::sync::OnceCell;
46use room::RoomEventCacheState;
47use ruma::{
48    events::{
49        relation::RelationType,
50        room::{message::Relation, redaction::SyncRoomRedactionEvent},
51        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
52        AnySyncTimelineEvent,
53    },
54    serde::Raw,
55    EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
56};
57use tokio::sync::{
58    broadcast::{error::RecvError, Receiver},
59    Mutex, RwLock,
60};
61use tracing::{error, info, info_span, instrument, trace, warn, Instrument as _, Span};
62
63use self::paginator::PaginatorError;
64use crate::{client::WeakClient, Client};
65
66mod deduplicator;
67mod pagination;
68mod room;
69
70pub mod paginator;
71pub use pagination::{PaginationToken, RoomPagination, TimelineHasBeenResetWhilePaginating};
72pub use room::RoomEventCache;
73
74/// An error observed in the [`EventCache`].
75#[derive(thiserror::Error, Debug)]
76pub enum EventCacheError {
77    /// The [`EventCache`] instance hasn't been initialized with
78    /// [`EventCache::subscribe`]
79    #[error(
80        "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
81    )]
82    NotSubscribedYet,
83
84    /// The room hasn't been found in the client.
85    ///
86    /// Technically, it's possible to request a [`RoomEventCache`] for a room
87    /// that is not known to the client, leading to this error.
88    #[error("Room {0} hasn't been found in the Client.")]
89    RoomNotFound(OwnedRoomId),
90
91    /// The given back-pagination token is unknown to the event cache.
92    #[error("The given back-pagination token is unknown to the event cache.")]
93    UnknownBackpaginationToken,
94
95    /// An error has been observed while back-paginating.
96    #[error("Error observed while back-paginating: {0}")]
97    BackpaginationError(#[from] PaginatorError),
98
99    /// An error happening when interacting with storage.
100    #[error(transparent)]
101    Storage(#[from] EventCacheStoreError),
102
103    /// An error happening when attempting to (cross-process) lock storage.
104    #[error(transparent)]
105    LockingStorage(#[from] LockStoreError),
106
107    /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
108    /// to. It's possible this weak reference points to nothing anymore, at
109    /// times where we try to use the client.
110    #[error("The owning client of the event cache has been dropped.")]
111    ClientDropped,
112}
113
114/// A result using the [`EventCacheError`].
115pub type Result<T> = std::result::Result<T, EventCacheError>;
116
117/// Hold handles to the tasks spawn by a [`RoomEventCache`].
118pub struct EventCacheDropHandles {
119    /// Task that listens to room updates.
120    listen_updates_task: JoinHandle<()>,
121
122    /// Task that listens to updates to the user's ignored list.
123    ignore_user_list_update_task: JoinHandle<()>,
124}
125
126impl Debug for EventCacheDropHandles {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
129    }
130}
131
132impl Drop for EventCacheDropHandles {
133    fn drop(&mut self) {
134        self.listen_updates_task.abort();
135        self.ignore_user_list_update_task.abort();
136    }
137}
138
139/// An event cache, providing lots of useful functionality for clients.
140///
141/// Cloning is shallow, and thus is cheap to do.
142///
143/// See also the module-level comment.
144#[derive(Clone)]
145pub struct EventCache {
146    /// Reference to the inner cache.
147    inner: Arc<EventCacheInner>,
148}
149
150impl Debug for EventCache {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        f.debug_struct("EventCache").finish_non_exhaustive()
153    }
154}
155
156impl EventCache {
157    /// Create a new [`EventCache`] for the given client.
158    pub(crate) fn new(client: WeakClient) -> Self {
159        Self {
160            inner: Arc::new(EventCacheInner {
161                client,
162                store: Default::default(),
163                multiple_room_updates_lock: Default::default(),
164                by_room: Default::default(),
165                drop_handles: Default::default(),
166                all_events: Default::default(),
167            }),
168        }
169    }
170
171    /// Enable storing updates to storage, and reload events from storage.
172    ///
173    /// Has an effect only the first time it's called. It's safe to call it
174    /// multiple times.
175    pub fn enable_storage(&self) -> Result<()> {
176        let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| {
177            let client = self.inner.client()?;
178            Ok(client.event_cache_store().clone())
179        })?;
180        Ok(())
181    }
182
183    /// Check whether the storage is enabled or not.
184    pub fn has_storage(&self) -> bool {
185        self.inner.has_storage()
186    }
187
188    /// Starts subscribing the [`EventCache`] to sync responses, if not done
189    /// before.
190    ///
191    /// Re-running this has no effect if we already subscribed before, and is
192    /// cheap.
193    pub fn subscribe(&self) -> Result<()> {
194        let client = self.inner.client()?;
195
196        let _ = self.inner.drop_handles.get_or_init(|| {
197            // Spawn the task that will listen to all the room updates at once.
198            let listen_updates_task = spawn(Self::listen_task(
199                self.inner.clone(),
200                client.subscribe_to_all_room_updates(),
201            ));
202
203            let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
204                self.inner.clone(),
205                client.subscribe_to_ignore_user_list_changes(),
206            ));
207
208            Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task })
209        });
210
211        Ok(())
212    }
213
214    /// Try to find an event by its ID in all the rooms.
215    // Note: replace this with a select-by-id query when this is implemented in a
216    // store.
217    pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
218        self.inner
219            .all_events
220            .read()
221            .await
222            .events
223            .get(event_id)
224            .map(|(_room_id, event)| event.clone())
225    }
226
227    /// Clear all the events from the immutable event cache.
228    ///
229    /// This keeps all the rooms along with their internal events linked chunks,
230    /// but it clears the side immutable cache for events.
231    ///
232    /// As such, it doesn't emit any [`RoomEventCacheUpdate`], and it's expected
233    /// to be only useful in testing contexts.
234    // Note: replace this with a remove query when this is implemented in a
235    // store.
236    #[cfg(any(test, feature = "testing"))]
237    pub async fn empty_immutable_cache(&self) {
238        self.inner.all_events.write().await.events.clear();
239    }
240
241    #[instrument(skip_all)]
242    async fn ignore_user_list_update_task(
243        inner: Arc<EventCacheInner>,
244        mut ignore_user_list_stream: Subscriber<Vec<String>>,
245    ) {
246        let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
247        span.follows_from(Span::current());
248
249        async move {
250            while ignore_user_list_stream.next().await.is_some() {
251                info!("Received an ignore user list change");
252                if let Err(err) = inner.clear_all_rooms().await {
253                    error!("when clearing room storage after ignore user list change: {err}");
254                }
255            }
256            info!("Ignore user list stream has closed");
257        }
258        .instrument(span)
259        .await;
260    }
261
262    #[instrument(skip_all)]
263    async fn listen_task(
264        inner: Arc<EventCacheInner>,
265        mut room_updates_feed: Receiver<RoomUpdates>,
266    ) {
267        trace!("Spawning the listen task");
268        loop {
269            match room_updates_feed.recv().await {
270                Ok(updates) => {
271                    if let Err(err) = inner.handle_room_updates(updates).await {
272                        match err {
273                            EventCacheError::ClientDropped => {
274                                // The client has dropped, exit the listen task.
275                                info!("Closing the event cache global listen task because client dropped");
276                                break;
277                            }
278                            err => {
279                                error!("Error when handling room updates: {err}");
280                            }
281                        }
282                    }
283                }
284
285                Err(RecvError::Lagged(num_skipped)) => {
286                    // Forget everything we know; we could have missed events, and we have
287                    // no way to reconcile at the moment!
288                    // TODO: implement Smart Matching™,
289                    warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
290                    if let Err(err) = inner.clear_all_rooms().await {
291                        error!("when clearing storage after lag in listen_task: {err}");
292                    }
293                }
294
295                Err(RecvError::Closed) => {
296                    // The sender has shut down, exit.
297                    info!("Closing the event cache global listen task because receiver closed");
298                    break;
299                }
300            }
301        }
302    }
303
304    /// Return a room-specific view over the [`EventCache`].
305    pub(crate) async fn for_room(
306        &self,
307        room_id: &RoomId,
308    ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
309        let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
310            return Err(EventCacheError::NotSubscribedYet);
311        };
312
313        let room = self.inner.for_room(room_id).await?;
314
315        Ok((room, drop_handles))
316    }
317
318    /// Add an initial set of events to the event cache, reloaded from a cache.
319    ///
320    /// TODO: temporary for API compat, as the event cache should take care of
321    /// its own store.
322    #[instrument(skip(self, events))]
323    pub async fn add_initial_events(
324        &self,
325        room_id: &RoomId,
326        events: Vec<TimelineEvent>,
327        prev_batch: Option<String>,
328    ) -> Result<()> {
329        // If the event cache's storage has been enabled, do nothing.
330        if self.inner.has_storage() {
331            return Ok(());
332        }
333
334        let room_cache = self.inner.for_room(room_id).await?;
335
336        // If the linked chunked already has at least one event, ignore this request, as
337        // it should happen at most once per room.
338        if !room_cache.inner.state.read().await.events().is_empty() {
339            return Ok(());
340        }
341
342        // We could have received events during a previous sync; remove them all, since
343        // we can't know where to insert the "initial events" with respect to
344        // them.
345
346        room_cache
347            .inner
348            .replace_all_events_by(events, prev_batch, Default::default(), Default::default())
349            .await?;
350
351        Ok(())
352    }
353}
354
355type AllEventsMap = BTreeMap<OwnedEventId, (OwnedRoomId, TimelineEvent)>;
356type RelationsMap = BTreeMap<OwnedEventId, BTreeMap<OwnedEventId, RelationType>>;
357
358/// Cache wrapper containing both copies of received events and lists of event
359/// ids related to them.
360#[derive(Default, Clone)]
361struct AllEventsCache {
362    /// A cache of received events mapped by their event id.
363    events: AllEventsMap,
364    /// A cache of related event ids for an event id. The key is the original
365    /// event id and the value a list of event ids related to it.
366    relations: RelationsMap,
367}
368
369impl AllEventsCache {
370    fn clear(&mut self) {
371        self.events.clear();
372        self.relations.clear();
373    }
374
375    /// If the event is related to another one, its id is added to the relations
376    /// map.
377    fn append_related_event(&mut self, event: &TimelineEvent) {
378        // Handle and cache events and relations.
379        let Ok(AnySyncTimelineEvent::MessageLike(ev)) = event.raw().deserialize() else {
380            return;
381        };
382
383        // Handle redactions separately, as their logic is slightly different.
384        if let AnySyncMessageLikeEvent::RoomRedaction(room_redaction) = &ev {
385            let redacted_event_id = match room_redaction {
386                SyncRoomRedactionEvent::Original(ev) => {
387                    ev.content.redacts.as_ref().or(ev.redacts.as_ref())
388                }
389                SyncRoomRedactionEvent::Redacted(redacted_redaction) => {
390                    redacted_redaction.content.redacts.as_ref()
391                }
392            };
393
394            if let Some(redacted_event_id) = redacted_event_id {
395                self.relations
396                    .entry(redacted_event_id.to_owned())
397                    .or_default()
398                    .insert(ev.event_id().to_owned(), RelationType::Replacement);
399            }
400
401            return;
402        }
403
404        let relationship = match ev.original_content() {
405            Some(AnyMessageLikeEventContent::RoomMessage(c)) => {
406                if let Some(relation) = c.relates_to {
407                    match relation {
408                        Relation::Replacement(replacement) => {
409                            Some((replacement.event_id, RelationType::Replacement))
410                        }
411                        Relation::Reply { in_reply_to } => {
412                            Some((in_reply_to.event_id, RelationType::Reference))
413                        }
414                        Relation::Thread(thread) => Some((thread.event_id, RelationType::Thread)),
415                        // Do nothing for custom
416                        _ => None,
417                    }
418                } else {
419                    None
420                }
421            }
422            Some(AnyMessageLikeEventContent::PollResponse(c)) => {
423                Some((c.relates_to.event_id, RelationType::Reference))
424            }
425            Some(AnyMessageLikeEventContent::PollEnd(c)) => {
426                Some((c.relates_to.event_id, RelationType::Reference))
427            }
428            Some(AnyMessageLikeEventContent::UnstablePollResponse(c)) => {
429                Some((c.relates_to.event_id, RelationType::Reference))
430            }
431            Some(AnyMessageLikeEventContent::UnstablePollEnd(c)) => {
432                Some((c.relates_to.event_id, RelationType::Reference))
433            }
434            Some(AnyMessageLikeEventContent::Reaction(c)) => {
435                Some((c.relates_to.event_id, RelationType::Annotation))
436            }
437            _ => None,
438        };
439
440        if let Some(relationship) = relationship {
441            self.relations
442                .entry(relationship.0)
443                .or_default()
444                .insert(ev.event_id().to_owned(), relationship.1);
445        }
446    }
447
448    /// Looks for related event ids for the passed event id, and appends them to
449    /// the `results` parameter. Then it'll recursively get the related
450    /// event ids for those too.
451    fn collect_related_events(
452        &self,
453        event_id: &EventId,
454        filter: Option<&[RelationType]>,
455    ) -> Vec<TimelineEvent> {
456        let mut results = Vec::new();
457        self.collect_related_events_rec(event_id, filter, &mut results);
458        results
459    }
460
461    fn collect_related_events_rec(
462        &self,
463        event_id: &EventId,
464        filter: Option<&[RelationType]>,
465        results: &mut Vec<TimelineEvent>,
466    ) {
467        let Some(related_event_ids) = self.relations.get(event_id) else {
468            return;
469        };
470
471        for (related_event_id, relation_type) in related_event_ids {
472            if let Some(filter) = filter {
473                if !filter.contains(relation_type) {
474                    continue;
475                }
476            }
477
478            // If the event was already added to the related ones, skip it.
479            if results.iter().any(|event| {
480                event.event_id().is_some_and(|added_related_event_id| {
481                    added_related_event_id == *related_event_id
482                })
483            }) {
484                continue;
485            }
486
487            if let Some((_, ev)) = self.events.get(related_event_id) {
488                results.push(ev.clone());
489                self.collect_related_events_rec(related_event_id, filter, results);
490            }
491        }
492    }
493}
494
495struct EventCacheInner {
496    /// A weak reference to the inner client, useful when trying to get a handle
497    /// on the owning client.
498    client: WeakClient,
499
500    /// Reference to the underlying store.
501    ///
502    /// Set to none if we shouldn't use storage for reading / writing linked
503    /// chunks.
504    store: Arc<OnceCell<EventCacheStoreLock>>,
505
506    /// A lock used when many rooms must be updated at once.
507    ///
508    /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
509    /// ensure that multiple updates will be applied in the correct order, which
510    /// is enforced by taking this lock when handling an update.
511    // TODO: that's the place to add a cross-process lock!
512    multiple_room_updates_lock: Mutex<()>,
513
514    /// Lazily-filled cache of live [`RoomEventCache`], once per room.
515    by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
516
517    /// All events, keyed by event id.
518    ///
519    /// Since events are immutable in Matrix, this is append-only — events can
520    /// be updated, though (e.g. if it was encrypted before, and
521    /// successfully decrypted later).
522    ///
523    /// This is shared between the [`EventCacheInner`] singleton and all
524    /// [`RoomEventCacheInner`] instances.
525    all_events: Arc<RwLock<AllEventsCache>>,
526
527    /// Handles to keep alive the task listening to updates.
528    drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
529}
530
531impl EventCacheInner {
532    fn client(&self) -> Result<Client> {
533        self.client.get().ok_or(EventCacheError::ClientDropped)
534    }
535
536    /// Has persistent storage been enabled for the event cache?
537    fn has_storage(&self) -> bool {
538        self.store.get().is_some()
539    }
540
541    /// Clears all the room's data.
542    async fn clear_all_rooms(&self) -> Result<()> {
543        // Note: one must NOT clear the `by_room` map, because if something subscribed
544        // to a room update, they would never get any new update for that room, since
545        // re-creating the `RoomEventCache` would create a new unrelated sender.
546
547        // Note 2: we don't need to clear the [`Self::events`] map, because events are
548        // immutable in the Matrix protocol.
549
550        let rooms = self.by_room.write().await;
551        for room in rooms.values() {
552            // Notify all the observers that we've lost track of state. (We ignore the
553            // error if there aren't any.)
554            let _ = room.inner.sender.send(RoomEventCacheUpdate::Clear);
555            // Clear all the room state.
556            room.inner.state.write().await.reset().await?;
557        }
558
559        Ok(())
560    }
561
562    /// Handles a single set of room updates at once.
563    #[instrument(skip(self, updates))]
564    async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
565        // First, take the lock that indicates we're processing updates, to avoid
566        // handling multiple updates concurrently.
567        let _lock = self.multiple_room_updates_lock.lock().await;
568
569        // Left rooms.
570        for (room_id, left_room_update) in updates.leave {
571            let room = self.for_room(&room_id).await?;
572
573            if let Err(err) =
574                room.inner.handle_left_room_update(self.has_storage(), left_room_update).await
575            {
576                // Non-fatal error, try to continue to the next room.
577                error!("handling left room update: {err}");
578            }
579        }
580
581        // Joined rooms.
582        for (room_id, joined_room_update) in updates.join {
583            let room = self.for_room(&room_id).await?;
584
585            if let Err(err) =
586                room.inner.handle_joined_room_update(self.has_storage(), joined_room_update).await
587            {
588                // Non-fatal error, try to continue to the next room.
589                error!("handling joined room update: {err}");
590            }
591        }
592
593        // Invited rooms.
594        // TODO: we don't anything with `updates.invite` at this point.
595
596        Ok(())
597    }
598
599    /// Return a room-specific view over the [`EventCache`].
600    ///
601    /// It may not be found, if the room isn't known to the client, in which
602    /// case it'll return None.
603    async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
604        // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
605        // write lock.
606        let by_room_guard = self.by_room.read().await;
607
608        match by_room_guard.get(room_id) {
609            Some(room) => Ok(room.clone()),
610
611            None => {
612                // Slow-path: the entry doesn't exist; let's acquire a write lock.
613                drop(by_room_guard);
614                let mut by_room_guard = self.by_room.write().await;
615
616                // In the meanwhile, some other caller might have obtained write access and done
617                // the same, so check for existence again.
618                if let Some(room) = by_room_guard.get(room_id) {
619                    return Ok(room.clone());
620                }
621
622                let room_state =
623                    RoomEventCacheState::new(room_id.to_owned(), self.store.clone()).await?;
624
625                let room_version = self
626                    .client
627                    .get()
628                    .and_then(|client| client.get_room(room_id))
629                    .map(|room| room.clone_info().room_version_or_default())
630                    .unwrap_or_else(|| {
631                        warn!("unknown room version for {room_id}, using default V1");
632                        RoomVersionId::V1
633                    });
634
635                let room_event_cache = RoomEventCache::new(
636                    self.client.clone(),
637                    room_state,
638                    room_id.to_owned(),
639                    room_version,
640                    self.all_events.clone(),
641                );
642
643                by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
644
645                Ok(room_event_cache)
646            }
647        }
648    }
649}
650
651/// The result of a single back-pagination request.
652#[derive(Debug)]
653pub struct BackPaginationOutcome {
654    /// Did the back-pagination reach the start of the timeline?
655    pub reached_start: bool,
656
657    /// All the events that have been returned in the back-pagination
658    /// request.
659    ///
660    /// Events are presented in reverse order: the first element of the vec,
661    /// if present, is the most "recent" event from the chunk (or
662    /// technically, the last one in the topological ordering).
663    ///
664    /// Note: they're not deduplicated (TODO: smart reconciliation).
665    pub events: Vec<TimelineEvent>,
666}
667
668/// An update related to events happened in a room.
669#[derive(Debug, Clone)]
670pub enum RoomEventCacheUpdate {
671    /// The room has been cleared from events.
672    Clear,
673
674    /// The fully read marker has moved to a different event.
675    MoveReadMarkerTo {
676        /// Event at which the read marker is now pointing.
677        event_id: OwnedEventId,
678    },
679
680    /// The members have changed.
681    UpdateMembers {
682        /// Collection of ambiguity changes that room member events trigger.
683        ///
684        /// This is a map of event ID of the `m.room.member` event to the
685        /// details of the ambiguity change.
686        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
687    },
688
689    /// The room has received updates for the timeline as _diffs_.
690    UpdateTimelineEvents {
691        /// Diffs to apply to the timeline.
692        diffs: Vec<VectorDiff<TimelineEvent>>,
693
694        /// Where the diffs are coming from.
695        origin: EventsOrigin,
696    },
697
698    /// The room has received new ephemeral events.
699    AddEphemeralEvents {
700        /// XXX: this is temporary, until read receipts are handled in the event
701        /// cache
702        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
703    },
704}
705
706/// Indicate where events are coming from.
707#[derive(Debug, Clone)]
708pub enum EventsOrigin {
709    /// Events are coming from a sync.
710    Sync,
711
712    /// Events are coming from pagination.
713    Pagination,
714}
715
716#[cfg(test)]
717mod tests {
718    use assert_matches::assert_matches;
719    use futures_util::FutureExt as _;
720    use matrix_sdk_base::sync::{JoinedRoomUpdate, RoomUpdates, Timeline};
721    use matrix_sdk_test::{async_test, event_factory::EventFactory};
722    use ruma::{event_id, room_id, serde::Raw, user_id};
723    use serde_json::json;
724
725    use super::{EventCacheError, RoomEventCacheUpdate};
726    use crate::test_utils::{assert_event_matches_msg, logged_in_client};
727
728    #[async_test]
729    async fn test_must_explicitly_subscribe() {
730        let client = logged_in_client(None).await;
731
732        let event_cache = client.event_cache();
733
734        // If I create a room event subscriber for a room before subscribing the event
735        // cache,
736        let room_id = room_id!("!omelette:fromage.fr");
737        let result = event_cache.for_room(room_id).await;
738
739        // Then it fails, because one must explicitly call `.subscribe()` on the event
740        // cache.
741        assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
742    }
743
744    #[async_test]
745    async fn test_uniq_read_marker() {
746        let client = logged_in_client(None).await;
747        let room_id = room_id!("!galette:saucisse.bzh");
748        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
749
750        let event_cache = client.event_cache();
751
752        event_cache.subscribe().unwrap();
753
754        let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
755
756        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
757
758        assert!(events.is_empty());
759
760        // When sending multiple times the same read marker event,…
761        let read_marker_event = Raw::from_json_string(
762            json!({
763                "content": {
764                    "event_id": "$crepe:saucisse.bzh"
765                },
766                "room_id": "!galette:saucisse.bzh",
767                "type": "m.fully_read"
768            })
769            .to_string(),
770        )
771        .unwrap();
772        let account_data = vec![read_marker_event; 100];
773
774        room_event_cache
775            .inner
776            .handle_joined_room_update(
777                event_cache.inner.has_storage(),
778                JoinedRoomUpdate { account_data, ..Default::default() },
779            )
780            .await
781            .unwrap();
782
783        // … there's only one read marker update.
784        assert_matches!(
785            stream.recv().await.unwrap(),
786            RoomEventCacheUpdate::MoveReadMarkerTo { .. }
787        );
788
789        assert!(stream.recv().now_or_never().is_none());
790    }
791
792    #[async_test]
793    async fn test_get_event_by_id() {
794        let client = logged_in_client(None).await;
795        let room_id1 = room_id!("!galette:saucisse.bzh");
796        let room_id2 = room_id!("!crepe:saucisse.bzh");
797
798        let event_cache = client.event_cache();
799        event_cache.subscribe().unwrap();
800
801        // Insert two rooms with a few events.
802        let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
803
804        let eid1 = event_id!("$1");
805        let eid2 = event_id!("$2");
806        let eid3 = event_id!("$3");
807
808        let joined_room_update1 = JoinedRoomUpdate {
809            timeline: Timeline {
810                events: vec![
811                    f.text_msg("hey").event_id(eid1).into(),
812                    f.text_msg("you").event_id(eid2).into(),
813                ],
814                ..Default::default()
815            },
816            ..Default::default()
817        };
818
819        let joined_room_update2 = JoinedRoomUpdate {
820            timeline: Timeline {
821                events: vec![f.text_msg("bjr").event_id(eid3).into()],
822                ..Default::default()
823            },
824            ..Default::default()
825        };
826
827        let mut updates = RoomUpdates::default();
828        updates.join.insert(room_id1.to_owned(), joined_room_update1);
829        updates.join.insert(room_id2.to_owned(), joined_room_update2);
830
831        // Have the event cache handle them.
832        event_cache.inner.handle_room_updates(updates).await.unwrap();
833
834        // Now retrieve all the events one by one.
835        let found1 = event_cache.event(eid1).await.unwrap();
836        assert_event_matches_msg(&found1, "hey");
837
838        let found2 = event_cache.event(eid2).await.unwrap();
839        assert_event_matches_msg(&found2, "you");
840
841        let found3 = event_cache.event(eid3).await.unwrap();
842        assert_event_matches_msg(&found3, "bjr");
843
844        // An unknown event won't be found.
845        assert!(event_cache.event(event_id!("$unknown")).await.is_none());
846
847        // Can also find events in a single room.
848        client.base_client().get_or_create_room(room_id1, matrix_sdk_base::RoomState::Joined);
849        let room1 = client.get_room(room_id1).unwrap();
850
851        let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
852
853        let found1 = room_event_cache.event(eid1).await.unwrap();
854        assert_event_matches_msg(&found1, "hey");
855
856        let found2 = room_event_cache.event(eid2).await.unwrap();
857        assert_event_matches_msg(&found2, "you");
858
859        // Retrieving the event with id3 from the room which doesn't contain it will
860        // fail…
861        assert!(room_event_cache.event(eid3).await.is_none());
862        // …but it doesn't fail at the client-wide level.
863        assert!(event_cache.event(eid3).await.is_some());
864    }
865
866    #[async_test]
867    async fn test_save_event_and_clear() {
868        let client = logged_in_client(None).await;
869        let room_id = room_id!("!galette:saucisse.bzh");
870
871        let event_cache = client.event_cache();
872        event_cache.subscribe().unwrap();
873
874        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
875        let event_id = event_id!("$1");
876
877        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
878        let room = client.get_room(room_id).unwrap();
879
880        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
881        room_event_cache.save_event(f.text_msg("hey there").event_id(event_id).into()).await;
882
883        // Retrieving the event at the room-wide cache works.
884        assert!(room_event_cache.event(event_id).await.is_some());
885        // Also at the client level.
886        assert!(event_cache.event(event_id).await.is_some());
887
888        event_cache.empty_immutable_cache().await;
889
890        // After clearing, both fail to find the event.
891        assert!(room_event_cache.event(event_id).await.is_none());
892        assert!(event_cache.event(event_id).await.is_none());
893    }
894
895    #[async_test]
896    async fn test_add_initial_events() {
897        // TODO: remove this test when the event cache uses its own persistent storage.
898        let client = logged_in_client(None).await;
899        let room_id = room_id!("!galette:saucisse.bzh");
900
901        let event_cache = client.event_cache();
902        event_cache.subscribe().unwrap();
903
904        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
905        event_cache
906            .add_initial_events(room_id, vec![f.text_msg("hey").into()], None)
907            .await
908            .unwrap();
909
910        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
911        let room = client.get_room(room_id).unwrap();
912
913        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
914        let (initial_events, _) = room_event_cache.subscribe().await.unwrap();
915        // `add_initial_events` had an effect.
916        assert_eq!(initial_events.len(), 1);
917    }
918}