matrix_sdk/event_cache/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All event cache types for a single room.
16
17use std::{collections::BTreeMap, fmt, sync::Arc};
18
19use events::Gap;
20use matrix_sdk_base::{
21    deserialized_responses::{AmbiguityChange, TimelineEvent},
22    linked_chunk::ChunkContent,
23    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
24};
25use ruma::{
26    events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
27    serde::Raw,
28    EventId, OwnedEventId, OwnedRoomId, RoomVersionId,
29};
30use tokio::sync::{
31    broadcast::{Receiver, Sender},
32    Notify, RwLock,
33};
34use tracing::{debug, trace, warn};
35
36use super::{
37    paginator::{Paginator, PaginatorState},
38    AllEventsCache, EventsOrigin, Result, RoomEventCacheUpdate, RoomPagination,
39};
40use crate::{client::WeakClient, room::WeakRoom};
41
42pub(super) mod events;
43
44/// A subset of an event cache, for a room.
45///
46/// Cloning is shallow, and thus is cheap to do.
47#[derive(Clone)]
48pub struct RoomEventCache {
49    pub(super) inner: Arc<RoomEventCacheInner>,
50}
51
52impl fmt::Debug for RoomEventCache {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_struct("RoomEventCache").finish_non_exhaustive()
55    }
56}
57
58impl RoomEventCache {
59    /// Create a new [`RoomEventCache`] using the given room and store.
60    pub(super) fn new(
61        client: WeakClient,
62        state: RoomEventCacheState,
63        room_id: OwnedRoomId,
64        room_version: RoomVersionId,
65        all_events_cache: Arc<RwLock<AllEventsCache>>,
66    ) -> Self {
67        Self {
68            inner: Arc::new(RoomEventCacheInner::new(
69                client,
70                state,
71                room_id,
72                room_version,
73                all_events_cache,
74            )),
75        }
76    }
77
78    /// Subscribe to this room updates, after getting the initial list of
79    /// events.
80    pub async fn subscribe(&self) -> Result<(Vec<TimelineEvent>, Receiver<RoomEventCacheUpdate>)> {
81        let state = self.inner.state.read().await;
82        let events = state.events().events().map(|(_position, item)| item.clone()).collect();
83
84        Ok((events, self.inner.sender.subscribe()))
85    }
86
87    /// Return a [`RoomPagination`] API object useful for running
88    /// back-pagination queries in the current room.
89    pub fn pagination(&self) -> RoomPagination {
90        RoomPagination { inner: self.inner.clone() }
91    }
92
93    /// Try to find an event by id in this room.
94    pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
95        if let Some((room_id, event)) =
96            self.inner.all_events.read().await.events.get(event_id).cloned()
97        {
98            if room_id == self.inner.room_id {
99                return Some(event);
100            }
101        }
102
103        let state = self.inner.state.read().await;
104        for (_pos, event) in state.events().revents() {
105            if event.event_id().as_deref() == Some(event_id) {
106                return Some(event.clone());
107            }
108        }
109        None
110    }
111
112    /// Try to find an event by id in this room, along with its related events.
113    ///
114    /// You can filter which types of related events to retrieve using
115    /// `filter`. `None` will retrieve related events of any type.
116    pub async fn event_with_relations(
117        &self,
118        event_id: &EventId,
119        filter: Option<Vec<RelationType>>,
120    ) -> Option<(TimelineEvent, Vec<TimelineEvent>)> {
121        let cache = self.inner.all_events.read().await;
122        if let Some((_, event)) = cache.events.get(event_id) {
123            let related_events = cache.collect_related_events(event_id, filter.as_deref());
124            Some((event.clone(), related_events))
125        } else {
126            None
127        }
128    }
129
130    /// Clear all the storage for this [`RoomEventCache`].
131    ///
132    /// This will get rid of all the events from the linked chunk and persisted
133    /// storage.
134    pub async fn clear(&self) -> Result<()> {
135        // Clear the linked chunk and persisted storage.
136        self.inner.state.write().await.reset().await?;
137
138        // Clear the (temporary) events mappings.
139        self.inner.all_events.write().await.clear();
140
141        // Reset the paginator.
142        // TODO: properly stop any ongoing back-pagination.
143        let _ = self.inner.paginator.set_idle_state(PaginatorState::Initial, None, None);
144
145        // Notify observers about the update.
146        let _ = self.inner.sender.send(RoomEventCacheUpdate::Clear);
147
148        Ok(())
149    }
150
151    /// Save a single event in the event cache, for further retrieval with
152    /// [`Self::event`].
153    // TODO: This doesn't insert the event into the linked chunk. In the future
154    // there'll be no distinction between the linked chunk and the separate
155    // cache. There is a discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/3886.
156    pub(crate) async fn save_event(&self, event: TimelineEvent) {
157        if let Some(event_id) = event.event_id() {
158            let mut cache = self.inner.all_events.write().await;
159
160            cache.append_related_event(&event);
161            cache.events.insert(event_id, (self.inner.room_id.clone(), event));
162        } else {
163            warn!("couldn't save event without event id in the event cache");
164        }
165    }
166
167    /// Save some events in the event cache, for further retrieval with
168    /// [`Self::event`]. This function will save them using a single lock,
169    /// as opposed to [`Self::save_event`].
170    // TODO: This doesn't insert the event into the linked chunk. In the future
171    // there'll be no distinction between the linked chunk and the separate
172    // cache. There is a discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/3886.
173    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = TimelineEvent>) {
174        let mut cache = self.inner.all_events.write().await;
175        for event in events {
176            if let Some(event_id) = event.event_id() {
177                cache.append_related_event(&event);
178                cache.events.insert(event_id, (self.inner.room_id.clone(), event));
179            } else {
180                warn!("couldn't save event without event id in the event cache");
181            }
182        }
183    }
184
185    /// Return a nice debug string (a vector of lines) for the linked chunk of
186    /// events for this room.
187    pub async fn debug_string(&self) -> Vec<String> {
188        self.inner.state.read().await.events().debug_string()
189    }
190}
191
192/// The (non-cloneable) details of the `RoomEventCache`.
193pub(super) struct RoomEventCacheInner {
194    /// The room id for this room.
195    room_id: OwnedRoomId,
196
197    /// The room version for this room.
198    pub(crate) room_version: RoomVersionId,
199
200    /// Sender part for subscribers to this room.
201    pub sender: Sender<RoomEventCacheUpdate>,
202
203    /// State for this room's event cache.
204    pub state: RwLock<RoomEventCacheState>,
205
206    /// See comment of [`super::EventCacheInner::all_events`].
207    ///
208    /// This is shared between the [`super::EventCacheInner`] singleton and all
209    /// [`RoomEventCacheInner`] instances.
210    all_events: Arc<RwLock<AllEventsCache>>,
211
212    /// A notifier that we received a new pagination token.
213    pub pagination_batch_token_notifier: Notify,
214
215    /// A paginator instance, that's configured to run back-pagination on our
216    /// behalf.
217    ///
218    /// Note: forward-paginations are still run "out-of-band", that is,
219    /// disconnected from the event cache, as we don't implement matching
220    /// events received from those kinds of pagination with the cache. This
221    /// paginator is only used for queries that interact with the actual event
222    /// cache.
223    pub paginator: Paginator<WeakRoom>,
224}
225
226impl RoomEventCacheInner {
227    /// Creates a new cache for a room, and subscribes to room updates, so as
228    /// to handle new timeline events.
229    fn new(
230        client: WeakClient,
231        state: RoomEventCacheState,
232        room_id: OwnedRoomId,
233        room_version: RoomVersionId,
234        all_events_cache: Arc<RwLock<AllEventsCache>>,
235    ) -> Self {
236        let sender = Sender::new(32);
237        let weak_room = WeakRoom::new(client, room_id);
238        Self {
239            room_id: weak_room.room_id().to_owned(),
240            room_version,
241            state: RwLock::new(state),
242            all_events: all_events_cache,
243            sender,
244            pagination_batch_token_notifier: Default::default(),
245            paginator: Paginator::new(weak_room),
246        }
247    }
248
249    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
250        if account_data.is_empty() {
251            return;
252        }
253
254        let mut handled_read_marker = false;
255
256        trace!("Handling account data");
257
258        for raw_event in account_data {
259            match raw_event.deserialize() {
260                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
261                    // Sometimes the sliding sync proxy sends many duplicates of the read marker
262                    // event. Don't forward it multiple times to avoid clutter
263                    // the update channel.
264                    //
265                    // NOTE: SS proxy workaround.
266                    if handled_read_marker {
267                        continue;
268                    }
269
270                    handled_read_marker = true;
271
272                    // Propagate to observers. (We ignore the error if there aren't any.)
273                    let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
274                        event_id: ev.content.event_id,
275                    });
276                }
277
278                Ok(_) => {
279                    // We're not interested in other room account data updates,
280                    // at this point.
281                }
282
283                Err(e) => {
284                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
285                    warn!(event_type, "Failed to deserialize account data: {e}");
286                }
287            }
288        }
289    }
290
291    pub(super) async fn handle_joined_room_update(
292        &self,
293        has_storage: bool,
294        updates: JoinedRoomUpdate,
295    ) -> Result<()> {
296        self.handle_timeline(
297            has_storage,
298            updates.timeline,
299            updates.ephemeral.clone(),
300            updates.ambiguity_changes,
301        )
302        .await?;
303
304        self.handle_account_data(updates.account_data);
305
306        Ok(())
307    }
308
309    async fn handle_timeline(
310        &self,
311        has_storage: bool,
312        timeline: Timeline,
313        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
314        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
315    ) -> Result<()> {
316        if !has_storage && timeline.limited {
317            // Ideally we'd try to reconcile existing events against those received in the
318            // timeline, but we're not there yet. In the meanwhile, clear the
319            // items from the room. TODO: implement Smart Matching™.
320            trace!("limited timeline, clearing all previous events and pushing new events");
321
322            self.replace_all_events_by(
323                timeline.events,
324                timeline.prev_batch,
325                ephemeral_events,
326                ambiguity_changes,
327            )
328            .await?;
329        } else {
330            // Add all the events to the backend.
331            trace!("adding new events");
332
333            // If we have storage, only keep the previous-batch token if we have a limited
334            // timeline. Otherwise, we know about all the events, and we don't need to
335            // back-paginate, so we wouldn't make use of the given previous-batch token.
336            //
337            // If we don't have storage, even if the timeline isn't limited, we may not have
338            // saved the previous events in any cache, so we should always be
339            // able to retrieve those.
340            let prev_batch =
341                if has_storage && !timeline.limited { None } else { timeline.prev_batch };
342
343            let mut state = self.state.write().await;
344            self.append_events_locked(
345                &mut state,
346                timeline.events,
347                prev_batch,
348                ephemeral_events,
349                ambiguity_changes,
350            )
351            .await?;
352        }
353
354        Ok(())
355    }
356
357    pub(super) async fn handle_left_room_update(
358        &self,
359        has_storage: bool,
360        updates: LeftRoomUpdate,
361    ) -> Result<()> {
362        self.handle_timeline(has_storage, updates.timeline, Vec::new(), updates.ambiguity_changes)
363            .await?;
364        Ok(())
365    }
366
367    /// Remove existing events, and append a set of events to the room cache and
368    /// storage, notifying observers.
369    pub(super) async fn replace_all_events_by(
370        &self,
371        sync_timeline_events: Vec<TimelineEvent>,
372        prev_batch: Option<String>,
373        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
374        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
375    ) -> Result<()> {
376        // Acquire the lock.
377        let mut state = self.state.write().await;
378
379        // Reset the room's state.
380        state.reset().await?;
381
382        // Propagate to observers.
383        let _ = self.sender.send(RoomEventCacheUpdate::Clear);
384
385        // Push the new events.
386        self.append_events_locked(
387            &mut state,
388            sync_timeline_events,
389            prev_batch.clone(),
390            ephemeral_events,
391            ambiguity_changes,
392        )
393        .await?;
394
395        // Reset the paginator status to initial.
396        self.paginator.set_idle_state(PaginatorState::Initial, prev_batch, None)?;
397
398        Ok(())
399    }
400
401    /// Append a set of events to the room cache and storage, notifying
402    /// observers.
403    ///
404    /// This is a private implementation. It must not be exposed publicly.
405    async fn append_events_locked(
406        &self,
407        state: &mut RoomEventCacheState,
408        sync_timeline_events: Vec<TimelineEvent>,
409        prev_batch: Option<String>,
410        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
411        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
412    ) -> Result<()> {
413        if sync_timeline_events.is_empty()
414            && prev_batch.is_none()
415            && ephemeral_events.is_empty()
416            && ambiguity_changes.is_empty()
417        {
418            return Ok(());
419        }
420
421        // Add the previous back-pagination token (if present), followed by the timeline
422        // events themselves.
423        let sync_timeline_events_diffs = {
424            let (_, sync_timeline_events_diffs) = state
425                .with_events_mut(|room_events| {
426                    if let Some(prev_token) = &prev_batch {
427                        room_events.push_gap(Gap { prev_token: prev_token.clone() });
428                    }
429
430                    let added_unique_events = room_events.push_events(sync_timeline_events.clone());
431
432                    if !added_unique_events {
433                        debug!(
434                            "not storing previous batch token, because we deduplicated all new sync events"
435                        );
436
437                        if let Some(prev_token) = &prev_batch {
438                            // Note: there can't be any race with another task touching the linked
439                            // chunk at this point, because we're using `with_events_mut` which
440                            // guards access to the data.
441                            trace!("removing gap we just inserted");
442
443                                // Find the gap that had the previous-batch token we inserted above.
444                            let prev_gap_id = room_events
445                                .rchunks()
446                                .find_map(|c| {
447                                    let gap = as_variant::as_variant!(c.content(), ChunkContent::Gap)?;
448                                    (gap.prev_token == *prev_token).then_some(c.identifier())
449                                })
450                                .expect("we just inserted the gap beforehand");
451
452                            room_events
453                                .replace_gap_at([], prev_gap_id)
454                                .expect("we obtained the valid position beforehand");
455                        }
456                    }
457
458                    room_events.on_new_events(&self.room_version, sync_timeline_events.iter());
459                })
460                .await?;
461
462            let mut all_events = self.all_events.write().await;
463
464            for sync_timeline_event in sync_timeline_events {
465                if let Some(event_id) = sync_timeline_event.event_id() {
466                    all_events.append_related_event(&sync_timeline_event);
467                    all_events
468                        .events
469                        .insert(event_id.to_owned(), (self.room_id.clone(), sync_timeline_event));
470                }
471            }
472
473            sync_timeline_events_diffs
474        };
475
476        // Now that all events have been added, we can trigger the
477        // `pagination_token_notifier`.
478        if prev_batch.is_some() {
479            self.pagination_batch_token_notifier.notify_one();
480        }
481
482        // The order of `RoomEventCacheUpdate`s is **really** important here.
483        {
484            if !sync_timeline_events_diffs.is_empty() {
485                let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
486                    diffs: sync_timeline_events_diffs,
487                    origin: EventsOrigin::Sync,
488                });
489            }
490
491            if !ephemeral_events.is_empty() {
492                let _ = self
493                    .sender
494                    .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
495            }
496
497            if !ambiguity_changes.is_empty() {
498                let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
499            }
500        }
501
502        Ok(())
503    }
504}
505
506/// Create a debug string for a [`ChunkContent`] for an event/gap pair.
507fn chunk_debug_string(content: &ChunkContent<TimelineEvent, Gap>) -> String {
508    match content {
509        ChunkContent::Gap(Gap { prev_token }) => {
510            format!("gap['{prev_token}']")
511        }
512        ChunkContent::Items(vec) => {
513            let items = vec
514                .iter()
515                .map(|event| {
516                    // Limit event ids to 8 chars *after* the $.
517                    event.event_id().map_or_else(
518                        || "<no event id>".to_owned(),
519                        |id| id.as_str().chars().take(1 + 8).collect(),
520                    )
521                })
522                .collect::<Vec<_>>()
523                .join(", ");
524            format!("events[{items}]")
525        }
526    }
527}
528
529// Use a private module to hide `events` to this parent module.
530mod private {
531    use std::sync::Arc;
532
533    use eyeball_im::VectorDiff;
534    use matrix_sdk_base::{
535        deserialized_responses::{TimelineEvent, TimelineEventKind},
536        event_cache::{
537            store::{
538                EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockGuard,
539                DEFAULT_CHUNK_CAPACITY,
540            },
541            Event, Gap,
542        },
543        linked_chunk::{LinkedChunk, LinkedChunkBuilder, RawChunk, Update},
544    };
545    use once_cell::sync::OnceCell;
546    use ruma::{serde::Raw, OwnedRoomId, RoomId};
547    use tracing::{error, instrument, trace};
548
549    use super::{chunk_debug_string, events::RoomEvents};
550    use crate::event_cache::EventCacheError;
551
552    /// State for a single room's event cache.
553    ///
554    /// This contains all the inner mutable states that ought to be updated at
555    /// the same time.
556    pub struct RoomEventCacheState {
557        /// The room this state relates to.
558        room: OwnedRoomId,
559
560        /// Reference to the underlying backing store.
561        ///
562        /// Set to none if the room shouldn't read the linked chunk from
563        /// storage, and shouldn't store updates to storage.
564        store: Arc<OnceCell<EventCacheStoreLock>>,
565
566        /// The events of the room.
567        events: RoomEvents,
568
569        /// Have we ever waited for a previous-batch-token to come from sync, in
570        /// the context of pagination? We do this at most once per room,
571        /// the first time we try to run backward pagination. We reset
572        /// that upon clearing the timeline events.
573        pub waited_for_initial_prev_token: bool,
574    }
575
576    impl RoomEventCacheState {
577        async fn try_reload_linked_chunk(
578            room: &RoomId,
579            locked: &EventCacheStoreLockGuard<'_>,
580        ) -> Result<Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>, EventCacheError>
581        {
582            let raw_chunks = locked.reload_linked_chunk(room).await?;
583
584            let mut builder = LinkedChunkBuilder::from_raw_parts(raw_chunks.clone());
585
586            builder.with_update_history();
587
588            Ok(builder.build().map_err(|err| {
589                // Show a debug string representing the known chunks.
590                if tracing::enabled!(tracing::Level::TRACE) {
591                    trace!("couldn't build a linked chunk from the raw parts. Raw chunks:");
592                    for line in raw_chunks_debug_string(raw_chunks) {
593                        trace!("{line}");
594                    }
595                }
596
597                EventCacheStoreError::InvalidData { details: err.to_string() }
598            })?)
599        }
600
601        /// Create a new state, or reload it from storage if it's been enabled.
602        pub async fn new(
603            room: OwnedRoomId,
604            store: Arc<OnceCell<EventCacheStoreLock>>,
605        ) -> Result<Self, EventCacheError> {
606            let events = if let Some(store) = store.get() {
607                let locked = store.lock().await?;
608
609                // Try to reload a linked chunk from storage. If it fails, log the error and
610                // restart with a fresh, empty linked chunk.
611                let linked_chunk = match Self::try_reload_linked_chunk(&room, &locked).await {
612                    Ok(linked_chunk) => linked_chunk,
613                    Err(err) => {
614                        error!("error when reloading a linked chunk from memory: {err}");
615
616                        // Clear storage for this room.
617                        locked.handle_linked_chunk_updates(&room, vec![Update::Clear]).await?;
618
619                        // Restart with an empty linked chunk.
620                        None
621                    }
622                };
623
624                RoomEvents::with_initial_chunks(linked_chunk)
625            } else {
626                RoomEvents::default()
627            };
628
629            Ok(Self { room, store, events, waited_for_initial_prev_token: false })
630        }
631
632        /// Removes the bundled relations from an event, if they were present.
633        ///
634        /// Only replaces the present if it contained bundled relations.
635        fn strip_relations_if_present<T>(event: &mut Raw<T>) {
636            // We're going to get rid of the `unsigned`/`m.relations` field, if it's
637            // present.
638            // Use a closure that returns an option so we can quickly short-circuit.
639            let mut closure = || -> Option<()> {
640                let mut val: serde_json::Value = event.deserialize_as().ok()?;
641                let unsigned = val.get_mut("unsigned")?;
642                let unsigned_obj = unsigned.as_object_mut()?;
643                if unsigned_obj.remove("m.relations").is_some() {
644                    *event = Raw::new(&val).ok()?.cast();
645                }
646                None
647            };
648            let _ = closure();
649        }
650
651        fn strip_relations_from_event(ev: &mut TimelineEvent) {
652            match &mut ev.kind {
653                TimelineEventKind::Decrypted(decrypted) => {
654                    // Remove all information about encryption info for
655                    // the bundled events.
656                    decrypted.unsigned_encryption_info = None;
657
658                    // Remove the `unsigned`/`m.relations` field, if needs be.
659                    Self::strip_relations_if_present(&mut decrypted.event);
660                }
661
662                TimelineEventKind::UnableToDecrypt { event, .. }
663                | TimelineEventKind::PlainText { event } => {
664                    Self::strip_relations_if_present(event);
665                }
666            }
667        }
668
669        /// Strips the bundled relations from a collection of events.
670        fn strip_relations_from_events(items: &mut [TimelineEvent]) {
671            for ev in items.iter_mut() {
672                Self::strip_relations_from_event(ev);
673            }
674        }
675
676        /// Propagate changes to the underlying storage.
677        #[instrument(skip_all)]
678        async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
679            let mut updates = self.events.updates().take();
680
681            if updates.is_empty() {
682                return Ok(());
683            }
684
685            let Some(store) = self.store.get() else {
686                return Ok(());
687            };
688
689            trace!("propagating {} updates", updates.len());
690
691            // Strip relations from updates which insert or replace items.
692            for up in updates.iter_mut() {
693                match up {
694                    Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
695                    Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
696                    // Other update kinds don't involve adding new events.
697                    Update::NewItemsChunk { .. }
698                    | Update::NewGapChunk { .. }
699                    | Update::RemoveChunk(_)
700                    | Update::RemoveItem { .. }
701                    | Update::DetachLastItems { .. }
702                    | Update::StartReattachItems
703                    | Update::EndReattachItems
704                    | Update::Clear => {}
705                }
706            }
707
708            // Spawn a task to make sure that all the changes are effectively forwarded to
709            // the store, even if the call to this method gets aborted.
710            //
711            // The store cross-process locking involves an actual mutex, which ensures that
712            // storing updates happens in the expected order.
713
714            let store = store.clone();
715            let room_id = self.room.clone();
716
717            matrix_sdk_common::executor::spawn(async move {
718                let locked = store.lock().await?;
719
720                if let Err(err) = locked.handle_linked_chunk_updates(&room_id, updates).await {
721                    error!("unable to handle linked chunk updates: {err}");
722                }
723
724                super::Result::Ok(())
725            })
726            .await
727            .expect("joining failed")?;
728
729            trace!("done propagating store changes");
730
731            Ok(())
732        }
733
734        /// Resets this data structure as if it were brand new.
735        pub async fn reset(&mut self) -> Result<(), EventCacheError> {
736            self.events.reset();
737            self.propagate_changes().await?;
738            self.waited_for_initial_prev_token = false;
739            Ok(())
740        }
741
742        /// Returns a read-only reference to the underlying events.
743        pub fn events(&self) -> &RoomEvents {
744            &self.events
745        }
746
747        /// Gives a temporary mutable handle to the underlying in-memory events,
748        /// and will propagate changes to the storage once done.
749        ///
750        /// Returns the output of the given callback, as well as updates to the
751        /// linked chunk, as vector diff, so the caller may propagate
752        /// such updates, if needs be.
753        pub async fn with_events_mut<O, F: FnOnce(&mut RoomEvents) -> O>(
754            &mut self,
755            func: F,
756        ) -> Result<(O, Vec<VectorDiff<TimelineEvent>>), EventCacheError> {
757            let output = func(&mut self.events);
758            self.propagate_changes().await?;
759            let updates_as_vector_diffs = self.events.updates_as_vector_diffs();
760            Ok((output, updates_as_vector_diffs))
761        }
762    }
763
764    /// Create a debug string over multiple lines (one String per line),
765    /// offering a debug representation of a [`RawChunk`] loaded from disk.
766    fn raw_chunks_debug_string(mut raw_chunks: Vec<RawChunk<Event, Gap>>) -> Vec<String> {
767        let mut result = Vec::new();
768
769        // Sort the chunks by id, for the output to be deterministic.
770        raw_chunks.sort_by_key(|c| c.identifier.index());
771
772        for c in raw_chunks {
773            let content = chunk_debug_string(&c.content);
774
775            let prev =
776                c.previous.map_or_else(|| "<none>".to_owned(), |prev| prev.index().to_string());
777            let next = c.next.map_or_else(|| "<none>".to_owned(), |prev| prev.index().to_string());
778
779            let line =
780                format!("chunk #{} (prev={prev}, next={next}): {content}", c.identifier.index());
781
782            result.push(line);
783        }
784
785        result
786    }
787
788    #[cfg(test)]
789    mod tests {
790        use matrix_sdk_base::{
791            event_cache::Gap,
792            linked_chunk::{ChunkContent, ChunkIdentifier as CId, RawChunk},
793        };
794        use matrix_sdk_test::{event_factory::EventFactory, ALICE, DEFAULT_TEST_ROOM_ID};
795        use ruma::event_id;
796
797        use super::raw_chunks_debug_string;
798
799        #[test]
800        fn test_raw_chunks_debug_string() {
801            let mut raws = Vec::new();
802            let f = EventFactory::new().room(&DEFAULT_TEST_ROOM_ID).sender(*ALICE);
803
804            raws.push(RawChunk {
805                content: ChunkContent::Items(vec![
806                    f.text_msg("hey")
807                        .event_id(event_id!("$123456789101112131415617181920"))
808                        .into_event(),
809                    f.text_msg("you").event_id(event_id!("$2")).into_event(),
810                ]),
811                identifier: CId::new(1),
812                previous: Some(CId::new(0)),
813                next: None,
814            });
815
816            raws.push(RawChunk {
817                content: ChunkContent::Gap(Gap { prev_token: "prev-token".to_owned() }),
818                identifier: CId::new(0),
819                previous: None,
820                next: Some(CId::new(1)),
821            });
822
823            let output = raw_chunks_debug_string(raws);
824            assert_eq!(output.len(), 2);
825            assert_eq!(&output[0], "chunk #0 (prev=<none>, next=1): gap['prev-token']");
826            assert_eq!(&output[1], "chunk #1 (prev=0, next=<none>): events[$12345678, $2]");
827        }
828    }
829}
830
831pub(super) use private::RoomEventCacheState;
832
833#[cfg(test)]
834mod tests {
835    use std::sync::Arc;
836
837    use assert_matches::assert_matches;
838    use assert_matches2::assert_let;
839    use matrix_sdk_base::{
840        event_cache::{
841            store::{EventCacheStore as _, MemoryStore},
842            Gap,
843        },
844        linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
845        store::StoreConfig,
846        sync::{JoinedRoomUpdate, Timeline},
847    };
848    use matrix_sdk_common::deserialized_responses::TimelineEvent;
849    use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE, BOB};
850    use ruma::{
851        event_id,
852        events::{
853            relation::RelationType, room::message::RoomMessageEventContentWithoutRelation,
854            AnySyncMessageLikeEvent, AnySyncTimelineEvent,
855        },
856        room_id, user_id, RoomId,
857    };
858
859    use crate::test_utils::{client::MockClientBuilder, logged_in_client};
860
861    #[async_test]
862    async fn test_event_with_redaction_relation() {
863        let original_id = event_id!("$original");
864        let related_id = event_id!("$related");
865        let room_id = room_id!("!galette:saucisse.bzh");
866        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
867
868        assert_relations(
869            room_id,
870            f.text_msg("Original event").event_id(original_id).into(),
871            f.redaction(original_id).event_id(related_id).into(),
872            f,
873        )
874        .await;
875    }
876
877    #[async_test]
878    async fn test_event_with_edit_relation() {
879        let original_id = event_id!("$original");
880        let related_id = event_id!("$related");
881        let room_id = room_id!("!galette:saucisse.bzh");
882        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
883
884        assert_relations(
885            room_id,
886            f.text_msg("Original event").event_id(original_id).into(),
887            f.text_msg("* An edited event")
888                .edit(
889                    original_id,
890                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
891                )
892                .event_id(related_id)
893                .into(),
894            f,
895        )
896        .await;
897    }
898
899    #[async_test]
900    async fn test_event_with_reply_relation() {
901        let original_id = event_id!("$original");
902        let related_id = event_id!("$related");
903        let room_id = room_id!("!galette:saucisse.bzh");
904        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
905
906        assert_relations(
907            room_id,
908            f.text_msg("Original event").event_id(original_id).into(),
909            f.text_msg("A reply").reply_to(original_id).event_id(related_id).into(),
910            f,
911        )
912        .await;
913    }
914
915    #[async_test]
916    async fn test_event_with_thread_reply_relation() {
917        let original_id = event_id!("$original");
918        let related_id = event_id!("$related");
919        let room_id = room_id!("!galette:saucisse.bzh");
920        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
921
922        assert_relations(
923            room_id,
924            f.text_msg("Original event").event_id(original_id).into(),
925            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
926            f,
927        )
928        .await;
929    }
930
931    #[async_test]
932    async fn test_event_with_reaction_relation() {
933        let original_id = event_id!("$original");
934        let related_id = event_id!("$related");
935        let room_id = room_id!("!galette:saucisse.bzh");
936        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
937
938        assert_relations(
939            room_id,
940            f.text_msg("Original event").event_id(original_id).into(),
941            f.reaction(original_id, ":D").event_id(related_id).into(),
942            f,
943        )
944        .await;
945    }
946
947    #[async_test]
948    async fn test_event_with_poll_response_relation() {
949        let original_id = event_id!("$original");
950        let related_id = event_id!("$related");
951        let room_id = room_id!("!galette:saucisse.bzh");
952        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
953
954        assert_relations(
955            room_id,
956            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
957                .event_id(original_id)
958                .into(),
959            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
960            f,
961        )
962        .await;
963    }
964
965    #[async_test]
966    async fn test_event_with_poll_end_relation() {
967        let original_id = event_id!("$original");
968        let related_id = event_id!("$related");
969        let room_id = room_id!("!galette:saucisse.bzh");
970        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
971
972        assert_relations(
973            room_id,
974            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
975                .event_id(original_id)
976                .into(),
977            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
978            f,
979        )
980        .await;
981    }
982
983    #[async_test]
984    async fn test_event_with_filtered_relationships() {
985        let original_id = event_id!("$original");
986        let related_id = event_id!("$related");
987        let associated_related_id = event_id!("$recursive_related");
988        let room_id = room_id!("!galette:saucisse.bzh");
989        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
990
991        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
992        let related_event = event_factory
993            .text_msg("* Edited event")
994            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
995            .event_id(related_id)
996            .into();
997        let associated_related_event =
998            event_factory.redaction(related_id).event_id(associated_related_id).into();
999
1000        let client = logged_in_client(None).await;
1001
1002        let event_cache = client.event_cache();
1003        event_cache.subscribe().unwrap();
1004
1005        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1006        let room = client.get_room(room_id).unwrap();
1007
1008        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1009
1010        // Save the original event.
1011        room_event_cache.save_event(original_event).await;
1012
1013        // Save the related event.
1014        room_event_cache.save_event(related_event).await;
1015
1016        // Save the associated related event, which redacts the related event.
1017        room_event_cache.save_event(associated_related_event).await;
1018
1019        let filter = Some(vec![RelationType::Replacement]);
1020        let (event, related_events) =
1021            room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1022        // Fetched event is the right one.
1023        let cached_event_id = event.event_id().unwrap();
1024        assert_eq!(cached_event_id, original_id);
1025
1026        // There are both the related id and the associatively related id
1027        assert_eq!(related_events.len(), 2);
1028
1029        let related_event_id = related_events[0].event_id().unwrap();
1030        assert_eq!(related_event_id, related_id);
1031        let related_event_id = related_events[1].event_id().unwrap();
1032        assert_eq!(related_event_id, associated_related_id);
1033
1034        // Now we'll filter threads instead, there should be no related events
1035        let filter = Some(vec![RelationType::Thread]);
1036        let (event, related_events) =
1037            room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1038        // Fetched event is the right one.
1039        let cached_event_id = event.event_id().unwrap();
1040        assert_eq!(cached_event_id, original_id);
1041        // No Thread related events found
1042        assert!(related_events.is_empty());
1043    }
1044
1045    #[async_test]
1046    async fn test_event_with_recursive_relation() {
1047        let original_id = event_id!("$original");
1048        let related_id = event_id!("$related");
1049        let associated_related_id = event_id!("$recursive_related");
1050        let room_id = room_id!("!galette:saucisse.bzh");
1051        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1052
1053        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1054        let related_event = event_factory
1055            .text_msg("* Edited event")
1056            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1057            .event_id(related_id)
1058            .into();
1059        let associated_related_event =
1060            event_factory.redaction(related_id).event_id(associated_related_id).into();
1061
1062        let client = logged_in_client(None).await;
1063
1064        let event_cache = client.event_cache();
1065        event_cache.subscribe().unwrap();
1066
1067        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1068        let room = client.get_room(room_id).unwrap();
1069
1070        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1071
1072        // Save the original event.
1073        room_event_cache.save_event(original_event).await;
1074
1075        // Save the related event.
1076        room_event_cache.save_event(related_event).await;
1077
1078        // Save the associated related event, which redacts the related event.
1079        room_event_cache.save_event(associated_related_event).await;
1080
1081        let (event, related_events) =
1082            room_event_cache.event_with_relations(original_id, None).await.unwrap();
1083        // Fetched event is the right one.
1084        let cached_event_id = event.event_id().unwrap();
1085        assert_eq!(cached_event_id, original_id);
1086
1087        // There are both the related id and the associatively related id
1088        assert_eq!(related_events.len(), 2);
1089
1090        let related_event_id = related_events[0].event_id().unwrap();
1091        assert_eq!(related_event_id, related_id);
1092        let related_event_id = related_events[1].event_id().unwrap();
1093        assert_eq!(related_event_id, associated_related_id);
1094    }
1095
1096    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1097    #[async_test]
1098    async fn test_write_to_storage() {
1099        use matrix_sdk_base::linked_chunk::LinkedChunkBuilder;
1100
1101        let room_id = room_id!("!galette:saucisse.bzh");
1102        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1103
1104        let event_cache_store = Arc::new(MemoryStore::new());
1105
1106        let client = MockClientBuilder::new("http://localhost".to_owned())
1107            .store_config(
1108                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1109            )
1110            .build()
1111            .await;
1112
1113        let event_cache = client.event_cache();
1114
1115        // Don't forget to subscribe and like^W enable storage!
1116        event_cache.subscribe().unwrap();
1117        event_cache.enable_storage().unwrap();
1118
1119        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1120        let room = client.get_room(room_id).unwrap();
1121
1122        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1123
1124        // Propagate an update for a message and a prev-batch token.
1125        let timeline = Timeline {
1126            limited: true,
1127            prev_batch: Some("raclette".to_owned()),
1128            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
1129        };
1130
1131        room_event_cache
1132            .inner
1133            .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1134            .await
1135            .unwrap();
1136
1137        let raws = event_cache_store.reload_linked_chunk(room_id).await.unwrap();
1138        let linked_chunk =
1139            LinkedChunkBuilder::<3, _, _>::from_raw_parts(raws).build().unwrap().unwrap();
1140
1141        assert_eq!(linked_chunk.chunks().count(), 3);
1142
1143        let mut chunks = linked_chunk.chunks();
1144
1145        // Invariant: there's always an empty items chunk at the beginning.
1146        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1147            assert_eq!(events.len(), 0)
1148        });
1149
1150        // Then we have the gap.
1151        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
1152            assert_eq!(gap.prev_token, "raclette");
1153        });
1154
1155        // Then we have the stored event.
1156        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1157            assert_eq!(events.len(), 1);
1158            let deserialized = events[0].raw().deserialize().unwrap();
1159            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
1160            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
1161        });
1162
1163        // That's all, folks!
1164        assert!(chunks.next().is_none());
1165    }
1166
1167    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1168    #[async_test]
1169    async fn test_write_to_storage_strips_bundled_relations() {
1170        use matrix_sdk_base::linked_chunk::LinkedChunkBuilder;
1171        use ruma::events::BundledMessageLikeRelations;
1172
1173        let room_id = room_id!("!galette:saucisse.bzh");
1174        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1175
1176        let event_cache_store = Arc::new(MemoryStore::new());
1177
1178        let client = MockClientBuilder::new("http://localhost".to_owned())
1179            .store_config(
1180                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1181            )
1182            .build()
1183            .await;
1184
1185        let event_cache = client.event_cache();
1186
1187        // Don't forget to subscribe and like^W enable storage!
1188        event_cache.subscribe().unwrap();
1189        event_cache.enable_storage().unwrap();
1190
1191        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1192        let room = client.get_room(room_id).unwrap();
1193
1194        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1195
1196        // Propagate an update for a message with bundled relations.
1197        let mut relations = BundledMessageLikeRelations::new();
1198        relations.replace =
1199            Some(Box::new(f.text_msg("Hello, Kind Sir").sender(*ALICE).into_raw_sync()));
1200        let ev = f.text_msg("hey yo").sender(*ALICE).bundled_relations(relations).into_event();
1201
1202        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1203
1204        room_event_cache
1205            .inner
1206            .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1207            .await
1208            .unwrap();
1209
1210        // The in-memory linked chunk keeps the bundled relation.
1211        {
1212            let (events, _) = room_event_cache.subscribe().await.unwrap();
1213
1214            assert_eq!(events.len(), 1);
1215
1216            let ev = events[0].raw().deserialize().unwrap();
1217            assert_let!(
1218                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1219            );
1220
1221            let original = msg.as_original().unwrap();
1222            assert_eq!(original.content.body(), "hey yo");
1223            assert!(original.unsigned.relations.replace.is_some());
1224        }
1225
1226        // The one in storage does not.
1227        let raws = event_cache_store.reload_linked_chunk(room_id).await.unwrap();
1228        let linked_chunk =
1229            LinkedChunkBuilder::<3, _, _>::from_raw_parts(raws).build().unwrap().unwrap();
1230
1231        assert_eq!(linked_chunk.chunks().count(), 1);
1232
1233        let mut chunks = linked_chunk.chunks();
1234        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1235            assert_eq!(events.len(), 1);
1236
1237            let ev = events[0].raw().deserialize().unwrap();
1238            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1239
1240            let original = msg.as_original().unwrap();
1241            assert_eq!(original.content.body(), "hey yo");
1242            assert!(original.unsigned.relations.replace.is_none());
1243        });
1244
1245        // That's all, folks!
1246        assert!(chunks.next().is_none());
1247    }
1248
1249    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1250    #[async_test]
1251    async fn test_clear() {
1252        use matrix_sdk_base::linked_chunk::LinkedChunkBuilder;
1253
1254        use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
1255
1256        let room_id = room_id!("!galette:saucisse.bzh");
1257        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1258
1259        let event_cache_store = Arc::new(MemoryStore::new());
1260
1261        let event_id1 = event_id!("$1");
1262        let event_id2 = event_id!("$2");
1263
1264        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1265        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1266
1267        // Prefill the store with some data.
1268        event_cache_store
1269            .handle_linked_chunk_updates(
1270                room_id,
1271                vec![
1272                    // An empty items chunk.
1273                    Update::NewItemsChunk {
1274                        previous: None,
1275                        new: ChunkIdentifier::new(0),
1276                        next: None,
1277                    },
1278                    // A gap chunk.
1279                    Update::NewGapChunk {
1280                        previous: Some(ChunkIdentifier::new(0)),
1281                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1282                        new: ChunkIdentifier::new(42),
1283                        next: None,
1284                        gap: Gap { prev_token: "cheddar".to_owned() },
1285                    },
1286                    // Another items chunk, non-empty this time.
1287                    Update::NewItemsChunk {
1288                        previous: Some(ChunkIdentifier::new(42)),
1289                        new: ChunkIdentifier::new(1),
1290                        next: None,
1291                    },
1292                    Update::PushItems {
1293                        at: Position::new(ChunkIdentifier::new(1), 0),
1294                        items: vec![ev1.clone()],
1295                    },
1296                    // And another items chunk, non-empty again.
1297                    Update::NewItemsChunk {
1298                        previous: Some(ChunkIdentifier::new(1)),
1299                        new: ChunkIdentifier::new(2),
1300                        next: None,
1301                    },
1302                    Update::PushItems {
1303                        at: Position::new(ChunkIdentifier::new(2), 0),
1304                        items: vec![ev2.clone()],
1305                    },
1306                ],
1307            )
1308            .await
1309            .unwrap();
1310
1311        let client = MockClientBuilder::new("http://localhost".to_owned())
1312            .store_config(
1313                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1314            )
1315            .build()
1316            .await;
1317
1318        let event_cache = client.event_cache();
1319
1320        // Don't forget to subscribe and like^W enable storage!
1321        event_cache.subscribe().unwrap();
1322        event_cache.enable_storage().unwrap();
1323
1324        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1325        let room = client.get_room(room_id).unwrap();
1326
1327        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1328
1329        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1330
1331        // The rooms knows about the cached events.
1332        assert!(room_event_cache.event(event_id1).await.is_some());
1333
1334        // The reloaded room must contain the two events.
1335        assert_eq!(items.len(), 2);
1336        assert_eq!(items[0].event_id().unwrap(), event_id1);
1337        assert_eq!(items[1].event_id().unwrap(), event_id2);
1338
1339        assert!(stream.is_empty());
1340
1341        // After clearing,…
1342        room_event_cache.clear().await.unwrap();
1343
1344        //…we get an update that the content has been cleared.
1345        assert_let_timeout!(Ok(RoomEventCacheUpdate::Clear) = stream.recv());
1346
1347        // The room event cache has forgotten about the events.
1348        assert!(room_event_cache.event(event_id1).await.is_none());
1349
1350        let (items, _) = room_event_cache.subscribe().await.unwrap();
1351        assert!(items.is_empty());
1352
1353        // The event cache store too.
1354        let raws = event_cache_store.reload_linked_chunk(room_id).await.unwrap();
1355        let linked_chunk = LinkedChunkBuilder::<3, _, _>::from_raw_parts(raws).build().unwrap();
1356
1357        // Note: while the event cache store could return `None` here, clearing it will
1358        // reset it to its initial form, maintaining the invariant that it
1359        // contains a single items chunk that's empty.
1360        let linked_chunk = linked_chunk.unwrap();
1361        assert_eq!(linked_chunk.num_items(), 0);
1362    }
1363
1364    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1365    #[async_test]
1366    async fn test_load_from_storage() {
1367        let room_id = room_id!("!galette:saucisse.bzh");
1368        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1369
1370        let event_cache_store = Arc::new(MemoryStore::new());
1371
1372        let event_id1 = event_id!("$1");
1373        let event_id2 = event_id!("$2");
1374
1375        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1376        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1377
1378        // Prefill the store with some data.
1379        event_cache_store
1380            .handle_linked_chunk_updates(
1381                room_id,
1382                vec![
1383                    // An empty items chunk.
1384                    Update::NewItemsChunk {
1385                        previous: None,
1386                        new: ChunkIdentifier::new(0),
1387                        next: None,
1388                    },
1389                    // A gap chunk.
1390                    Update::NewGapChunk {
1391                        previous: Some(ChunkIdentifier::new(0)),
1392                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1393                        new: ChunkIdentifier::new(42),
1394                        next: None,
1395                        gap: Gap { prev_token: "cheddar".to_owned() },
1396                    },
1397                    // Another items chunk, non-empty this time.
1398                    Update::NewItemsChunk {
1399                        previous: Some(ChunkIdentifier::new(42)),
1400                        new: ChunkIdentifier::new(1),
1401                        next: None,
1402                    },
1403                    Update::PushItems {
1404                        at: Position::new(ChunkIdentifier::new(1), 0),
1405                        items: vec![ev1.clone()],
1406                    },
1407                    // And another items chunk, non-empty again.
1408                    Update::NewItemsChunk {
1409                        previous: Some(ChunkIdentifier::new(1)),
1410                        new: ChunkIdentifier::new(2),
1411                        next: None,
1412                    },
1413                    Update::PushItems {
1414                        at: Position::new(ChunkIdentifier::new(2), 0),
1415                        items: vec![ev2.clone()],
1416                    },
1417                ],
1418            )
1419            .await
1420            .unwrap();
1421
1422        let client = MockClientBuilder::new("http://localhost".to_owned())
1423            .store_config(
1424                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1425            )
1426            .build()
1427            .await;
1428
1429        let event_cache = client.event_cache();
1430
1431        // Don't forget to subscribe and like^W enable storage!
1432        event_cache.subscribe().unwrap();
1433        event_cache.enable_storage().unwrap();
1434
1435        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1436        let room = client.get_room(room_id).unwrap();
1437
1438        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1439
1440        let (items, _stream) = room_event_cache.subscribe().await.unwrap();
1441
1442        // The reloaded room must contain the two events.
1443        assert_eq!(items.len(), 2);
1444        assert_eq!(items[0].event_id().unwrap(), event_id1);
1445        assert_eq!(items[1].event_id().unwrap(), event_id2);
1446
1447        // A new update with one of these events leads to deduplication.
1448        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1449        room_event_cache
1450            .inner
1451            .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1452            .await
1453            .unwrap();
1454
1455        // The stream doesn't report these changes *yet*. Use the items vector given
1456        // when subscribing, to check that the items correspond to their new
1457        // positions. The duplicated item is removed (so it's not the first
1458        // element anymore), and it's added to the back of the list.
1459        let (items, _stream) = room_event_cache.subscribe().await.unwrap();
1460        assert_eq!(items.len(), 2);
1461        assert_eq!(items[0].event_id().unwrap(), event_id1);
1462        assert_eq!(items[1].event_id().unwrap(), event_id2);
1463    }
1464
1465    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1466    #[async_test]
1467    async fn test_load_from_storage_resilient_to_failure() {
1468        let room_id = room_id!("!galette:saucisse.bzh");
1469        let event_cache_store = Arc::new(MemoryStore::new());
1470
1471        // Prefill the store with invalid data: two chunks that form a cycle.
1472        event_cache_store
1473            .handle_linked_chunk_updates(
1474                room_id,
1475                vec![
1476                    Update::NewItemsChunk {
1477                        previous: None,
1478                        new: ChunkIdentifier::new(0),
1479                        next: None,
1480                    },
1481                    Update::NewItemsChunk {
1482                        previous: Some(ChunkIdentifier::new(0)),
1483                        new: ChunkIdentifier::new(1),
1484                        next: Some(ChunkIdentifier::new(0)),
1485                    },
1486                ],
1487            )
1488            .await
1489            .unwrap();
1490
1491        let client = MockClientBuilder::new("http://localhost".to_owned())
1492            .store_config(
1493                StoreConfig::new("holder".to_owned()).event_cache_store(event_cache_store.clone()),
1494            )
1495            .build()
1496            .await;
1497
1498        let event_cache = client.event_cache();
1499
1500        // Don't forget to subscribe and like^W enable storage!
1501        event_cache.subscribe().unwrap();
1502        event_cache.enable_storage().unwrap();
1503
1504        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1505        let room = client.get_room(room_id).unwrap();
1506
1507        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1508
1509        let (items, _stream) = room_event_cache.subscribe().await.unwrap();
1510
1511        // Because the persisted content was invalid, the room store is reset: there are
1512        // no events in the cache.
1513        assert!(items.is_empty());
1514
1515        // Storage doesn't contain anything. It would also be valid that it contains a
1516        // single initial empty items chunk.
1517        let raw_chunks = event_cache_store.reload_linked_chunk(room_id).await.unwrap();
1518        assert!(raw_chunks.is_empty());
1519    }
1520
1521    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1522    #[async_test]
1523    async fn test_no_useless_gaps() {
1524        let room_id = room_id!("!galette:saucisse.bzh");
1525
1526        let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
1527
1528        let event_cache = client.event_cache();
1529        event_cache.subscribe().unwrap();
1530
1531        let has_storage = true; // for testing purposes only
1532        event_cache.enable_storage().unwrap();
1533
1534        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1535        let room = client.get_room(room_id).unwrap();
1536        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1537
1538        let f = EventFactory::new().room(room_id).sender(*ALICE);
1539
1540        // Propagate an update including a limited timeline with one message and a
1541        // prev-batch token.
1542        room_event_cache
1543            .inner
1544            .handle_joined_room_update(
1545                has_storage,
1546                JoinedRoomUpdate {
1547                    timeline: Timeline {
1548                        limited: true,
1549                        prev_batch: Some("raclette".to_owned()),
1550                        events: vec![f.text_msg("hey yo").into_event()],
1551                    },
1552                    ..Default::default()
1553                },
1554            )
1555            .await
1556            .unwrap();
1557
1558        {
1559            let state = room_event_cache.inner.state.read().await;
1560
1561            let mut num_gaps = 0;
1562            let mut num_events = 0;
1563
1564            for c in state.events().chunks() {
1565                match c.content() {
1566                    ChunkContent::Items(items) => num_events += items.len(),
1567                    ChunkContent::Gap(_) => num_gaps += 1,
1568                }
1569            }
1570
1571            // The gap must have been stored.
1572            assert_eq!(num_gaps, 1);
1573            assert_eq!(num_events, 1);
1574        }
1575
1576        // Now, propagate an update for another message, but the timeline isn't limited
1577        // this time.
1578        room_event_cache
1579            .inner
1580            .handle_joined_room_update(
1581                has_storage,
1582                JoinedRoomUpdate {
1583                    timeline: Timeline {
1584                        limited: false,
1585                        prev_batch: Some("fondue".to_owned()),
1586                        events: vec![f.text_msg("sup").into_event()],
1587                    },
1588                    ..Default::default()
1589                },
1590            )
1591            .await
1592            .unwrap();
1593
1594        {
1595            let state = room_event_cache.inner.state.read().await;
1596
1597            let mut num_gaps = 0;
1598            let mut num_events = 0;
1599
1600            for c in state.events().chunks() {
1601                match c.content() {
1602                    ChunkContent::Items(items) => num_events += items.len(),
1603                    ChunkContent::Gap(gap) => {
1604                        assert_eq!(gap.prev_token, "raclette");
1605                        num_gaps += 1;
1606                    }
1607                }
1608            }
1609
1610            // There's only the previous gap, no new ones.
1611            assert_eq!(num_gaps, 1);
1612            assert_eq!(num_events, 2);
1613        }
1614    }
1615
1616    async fn assert_relations(
1617        room_id: &RoomId,
1618        original_event: TimelineEvent,
1619        related_event: TimelineEvent,
1620        event_factory: EventFactory,
1621    ) {
1622        let client = logged_in_client(None).await;
1623
1624        let event_cache = client.event_cache();
1625        event_cache.subscribe().unwrap();
1626
1627        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1628        let room = client.get_room(room_id).unwrap();
1629
1630        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1631
1632        // Save the original event.
1633        let original_event_id = original_event.event_id().unwrap();
1634        room_event_cache.save_event(original_event).await;
1635
1636        // Save an unrelated event to check it's not in the related events list.
1637        let unrelated_id = event_id!("$2");
1638        room_event_cache
1639            .save_event(event_factory.text_msg("An unrelated event").event_id(unrelated_id).into())
1640            .await;
1641
1642        // Save the related event.
1643        let related_id = related_event.event_id().unwrap();
1644        room_event_cache.save_event(related_event).await;
1645
1646        let (event, related_events) =
1647            room_event_cache.event_with_relations(&original_event_id, None).await.unwrap();
1648        // Fetched event is the right one.
1649        let cached_event_id = event.event_id().unwrap();
1650        assert_eq!(cached_event_id, original_event_id);
1651
1652        // There is only the actually related event in the related ones
1653        assert_eq!(related_events.len(), 1);
1654        let related_event_id = related_events[0].event_id().unwrap();
1655        assert_eq!(related_event_id, related_id);
1656    }
1657}