matrix_sdk/event_cache/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All event cache types for a single room.
16
17use std::{
18    collections::BTreeMap,
19    fmt,
20    ops::{Deref, DerefMut},
21    sync::{
22        atomic::{AtomicUsize, Ordering},
23        Arc,
24    },
25};
26
27use events::{sort_positions_descending, Gap};
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31    deserialized_responses::{AmbiguityChange, TimelineEvent},
32    linked_chunk::Position,
33    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
34};
35use ruma::{
36    events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
37    serde::Raw,
38    EventId, OwnedEventId, OwnedRoomId,
39};
40use tokio::sync::{
41    broadcast::{Receiver, Sender},
42    mpsc, Notify, RwLock,
43};
44use tracing::{instrument, trace, warn};
45
46use super::{
47    deduplicator::DeduplicationOutcome, AutoShrinkChannelPayload, EventsOrigin, Result,
48    RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
49};
50use crate::{client::WeakClient, room::WeakRoom};
51
52pub(super) mod events;
53
54/// A subset of an event cache, for a room.
55///
56/// Cloning is shallow, and thus is cheap to do.
57#[derive(Clone)]
58pub struct RoomEventCache {
59    pub(super) inner: Arc<RoomEventCacheInner>,
60}
61
62impl fmt::Debug for RoomEventCache {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        f.debug_struct("RoomEventCache").finish_non_exhaustive()
65    }
66}
67
68/// Thin wrapper for a room event cache listener, so as to trigger side-effects
69/// when all listeners are gone.
70#[allow(missing_debug_implementations)]
71pub struct RoomEventCacheListener {
72    /// Underlying receiver of the room event cache's updates.
73    recv: Receiver<RoomEventCacheUpdate>,
74
75    /// To which room are we listening?
76    room_id: OwnedRoomId,
77
78    /// Sender to the auto-shrink channel.
79    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
80
81    /// Shared instance of the auto-shrinker.
82    listener_count: Arc<AtomicUsize>,
83}
84
85impl Drop for RoomEventCacheListener {
86    fn drop(&mut self) {
87        let previous_listener_count = self.listener_count.fetch_sub(1, Ordering::SeqCst);
88
89        trace!("dropping a room event cache listener; previous count: {previous_listener_count}");
90
91        if previous_listener_count == 1 {
92            // We were the last instance of the listener; let the auto-shrinker know by
93            // notifying it of our room id.
94
95            let mut room_id = self.room_id.clone();
96
97            // Try to send without waiting for channel capacity, and restart in a spin-loop
98            // if it failed (until a maximum number of attempts is reached, or
99            // the send was successful). The channel shouldn't be super busy in
100            // general, so this should resolve quickly enough.
101
102            let mut num_attempts = 0;
103
104            while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
105                num_attempts += 1;
106
107                if num_attempts > 1024 {
108                    // If we've tried too many times, just give up with a warning; after all, this
109                    // is only an optimization.
110                    warn!("couldn't send notification to the auto-shrink channel after 1024 attempts; giving up");
111                    return;
112                }
113
114                match err {
115                    mpsc::error::TrySendError::Full(stolen_room_id) => {
116                        room_id = stolen_room_id;
117                    }
118                    mpsc::error::TrySendError::Closed(_) => return,
119                }
120            }
121
122            trace!("sent notification to the parent channel that we were the last listener");
123        }
124    }
125}
126
127impl Deref for RoomEventCacheListener {
128    type Target = Receiver<RoomEventCacheUpdate>;
129
130    fn deref(&self) -> &Self::Target {
131        &self.recv
132    }
133}
134
135impl DerefMut for RoomEventCacheListener {
136    fn deref_mut(&mut self) -> &mut Self::Target {
137        &mut self.recv
138    }
139}
140
141impl RoomEventCache {
142    /// Create a new [`RoomEventCache`] using the given room and store.
143    pub(super) fn new(
144        client: WeakClient,
145        state: RoomEventCacheState,
146        pagination_status: SharedObservable<RoomPaginationStatus>,
147        room_id: OwnedRoomId,
148        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
149    ) -> Self {
150        Self {
151            inner: Arc::new(RoomEventCacheInner::new(
152                client,
153                state,
154                pagination_status,
155                room_id,
156                auto_shrink_sender,
157            )),
158        }
159    }
160
161    /// Read all current events.
162    ///
163    /// Use [`RoomEventCache::subscribe`] to get all current events, plus a
164    /// listener/subscriber.
165    pub async fn events(&self) -> Vec<TimelineEvent> {
166        let state = self.inner.state.read().await;
167
168        state.events().events().map(|(_position, item)| item.clone()).collect()
169    }
170
171    /// Subscribe to this room updates, after getting the initial list of
172    /// events.
173    ///
174    /// Use [`RoomEventCache::events`] to get all current events without the
175    /// listener/subscriber. Creating, and especially dropping, a
176    /// [`RoomEventCacheListener`] isn't free.
177    pub async fn subscribe(&self) -> (Vec<TimelineEvent>, RoomEventCacheListener) {
178        let state = self.inner.state.read().await;
179        let events = state.events().events().map(|(_position, item)| item.clone()).collect();
180
181        let previous_listener_count = state.listener_count.fetch_add(1, Ordering::SeqCst);
182        trace!("added a room event cache listener; new count: {}", previous_listener_count + 1);
183
184        let recv = self.inner.sender.subscribe();
185        let listener = RoomEventCacheListener {
186            recv,
187            room_id: self.inner.room_id.clone(),
188            auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
189            listener_count: state.listener_count.clone(),
190        };
191
192        (events, listener)
193    }
194
195    /// Return a [`RoomPagination`] API object useful for running
196    /// back-pagination queries in the current room.
197    pub fn pagination(&self) -> RoomPagination {
198        RoomPagination { inner: self.inner.clone() }
199    }
200
201    /// Try to find an event by id in this room.
202    pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
203        self.inner
204            .state
205            .read()
206            .await
207            .find_event(event_id)
208            .await
209            .ok()
210            .flatten()
211            .map(|(_loc, event)| event)
212    }
213
214    /// Try to find an event by id in this room, along with its related events.
215    ///
216    /// You can filter which types of related events to retrieve using
217    /// `filter`. `None` will retrieve related events of any type.
218    pub async fn event_with_relations(
219        &self,
220        event_id: &EventId,
221        filter: Option<Vec<RelationType>>,
222    ) -> Option<(TimelineEvent, Vec<TimelineEvent>)> {
223        // Search in all loaded or stored events.
224        self.inner
225            .state
226            .read()
227            .await
228            .find_event_with_relations(event_id, filter.clone())
229            .await
230            .ok()
231            .flatten()
232    }
233
234    /// Clear all the storage for this [`RoomEventCache`].
235    ///
236    /// This will get rid of all the events from the linked chunk and persisted
237    /// storage.
238    pub async fn clear(&self) -> Result<()> {
239        // Clear the linked chunk and persisted storage.
240        let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
241
242        // Notify observers about the update.
243        let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
244            diffs: updates_as_vector_diffs,
245            origin: EventsOrigin::Cache,
246        });
247
248        Ok(())
249    }
250
251    /// Save some events in the event cache, for further retrieval with
252    /// [`Self::event`].
253    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = TimelineEvent>) {
254        if let Err(err) = self.inner.state.write().await.save_event(events).await {
255            warn!("couldn't save event in the event cache: {err}");
256        }
257    }
258
259    /// Return a nice debug string (a vector of lines) for the linked chunk of
260    /// events for this room.
261    pub async fn debug_string(&self) -> Vec<String> {
262        self.inner.state.read().await.events().debug_string()
263    }
264}
265
266/// The (non-cloneable) details of the `RoomEventCache`.
267pub(super) struct RoomEventCacheInner {
268    /// The room id for this room.
269    room_id: OwnedRoomId,
270
271    pub weak_room: WeakRoom,
272
273    /// Sender part for subscribers to this room.
274    pub sender: Sender<RoomEventCacheUpdate>,
275
276    /// State for this room's event cache.
277    pub state: RwLock<RoomEventCacheState>,
278
279    /// A notifier that we received a new pagination token.
280    pub pagination_batch_token_notifier: Notify,
281
282    pub pagination_status: SharedObservable<RoomPaginationStatus>,
283
284    /// Sender to the auto-shrink channel.
285    ///
286    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
287    /// more details.
288    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
289}
290
291impl RoomEventCacheInner {
292    /// Creates a new cache for a room, and subscribes to room updates, so as
293    /// to handle new timeline events.
294    fn new(
295        client: WeakClient,
296        state: RoomEventCacheState,
297        pagination_status: SharedObservable<RoomPaginationStatus>,
298        room_id: OwnedRoomId,
299        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
300    ) -> Self {
301        let sender = Sender::new(32);
302        let weak_room = WeakRoom::new(client, room_id);
303        Self {
304            room_id: weak_room.room_id().to_owned(),
305            weak_room,
306            state: RwLock::new(state),
307            sender,
308            pagination_batch_token_notifier: Default::default(),
309            auto_shrink_sender,
310            pagination_status,
311        }
312    }
313
314    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
315        if account_data.is_empty() {
316            return;
317        }
318
319        let mut handled_read_marker = false;
320
321        trace!("Handling account data");
322
323        for raw_event in account_data {
324            match raw_event.deserialize() {
325                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
326                    // If duplicated, do not forward read marker multiple times
327                    // to avoid clutter the update channel.
328                    if handled_read_marker {
329                        continue;
330                    }
331
332                    handled_read_marker = true;
333
334                    // Propagate to observers. (We ignore the error if there aren't any.)
335                    let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
336                        event_id: ev.content.event_id,
337                    });
338                }
339
340                Ok(_) => {
341                    // We're not interested in other room account data updates,
342                    // at this point.
343                }
344
345                Err(e) => {
346                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
347                    warn!(event_type, "Failed to deserialize account data: {e}");
348                }
349            }
350        }
351    }
352
353    #[instrument(skip_all, fields(room_id = %self.room_id))]
354    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
355        self.handle_timeline(
356            updates.timeline,
357            updates.ephemeral.clone(),
358            updates.ambiguity_changes,
359        )
360        .await?;
361
362        self.handle_account_data(updates.account_data);
363
364        Ok(())
365    }
366
367    #[instrument(skip_all, fields(room_id = %self.room_id))]
368    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
369        self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
370        Ok(())
371    }
372
373    async fn handle_timeline(
374        &self,
375        timeline: Timeline,
376        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
377        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
378    ) -> Result<()> {
379        let mut prev_batch = timeline.prev_batch;
380        if timeline.events.is_empty()
381            && prev_batch.is_none()
382            && ephemeral_events.is_empty()
383            && ambiguity_changes.is_empty()
384        {
385            return Ok(());
386        }
387
388        // Add all the events to the backend.
389        trace!("adding new events");
390
391        let mut state = self.state.write().await;
392
393        // Ditch the previous-batch token if the sync isn't limited and we've seen at
394        // least one event in the past.
395        //
396        // In this case (and only this one), we should definitely know what the head of
397        // the timeline is (either we know about all the events, or we have a
398        // gap somewhere), since storage is enabled by default.
399        if !timeline.limited && state.events().events().next().is_some() {
400            prev_batch = None;
401        }
402
403        let (
404            DeduplicationOutcome {
405                all_events: events,
406                in_memory_duplicated_event_ids,
407                in_store_duplicated_event_ids,
408            },
409            all_duplicates,
410        ) = state.collect_valid_and_duplicated_events(timeline.events).await?;
411
412        // During a sync, when a duplicated event is found, the old event is removed and
413        // the new event is added.
414        //
415        // Let's remove the old events that are duplicated.
416        let timeline_event_diffs = if all_duplicates {
417            // No new events, thus no need to change the room events.
418            vec![]
419        } else {
420            // Remove the old duplicated events.
421            //
422            // We don't have to worry the removals can change the position of the
423            // existing events, because we are pushing all _new_
424            // `events` at the back.
425            let mut timeline_event_diffs = state
426                .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
427                .await?;
428
429            // Add the previous back-pagination token (if present), followed by the timeline
430            // events themselves.
431            let new_timeline_event_diffs = state
432                .with_events_mut(true, |room_events| {
433                    // If we only received duplicated events, we don't need to store the gap: if
434                    // there was a gap, we'd have received an unknown event at the tail of
435                    // the room's timeline (unless the server reordered sync events since the last
436                    // time we sync'd).
437                    if !all_duplicates {
438                        if let Some(prev_token) = &prev_batch {
439                            // As a tiny optimization: remove the last chunk if it's an empty event
440                            // one, as it's not useful to keep it before a gap.
441                            let prev_chunk_to_remove =
442                                room_events.rchunks().next().and_then(|chunk| {
443                                    (chunk.is_items() && chunk.num_items() == 0)
444                                        .then_some(chunk.identifier())
445                                });
446
447                            room_events.push_gap(Gap { prev_token: prev_token.clone() });
448
449                            if let Some(prev_chunk_to_remove) = prev_chunk_to_remove {
450                                room_events.remove_empty_chunk_at(prev_chunk_to_remove).expect(
451                                    "we just checked the chunk is there, and it's an empty item chunk",
452                                );
453                            }
454                        }
455                    }
456
457                    room_events.push_events(events.clone());
458
459                    events.clone()
460                })
461                .await?;
462
463            timeline_event_diffs.extend(new_timeline_event_diffs);
464
465            if timeline.limited && prev_batch.is_some() && !all_duplicates {
466                // If there was a previous batch token for a limited timeline, and there's at
467                // least one non-duplicated new event, unload the chunks so it
468                // only contains the last one; otherwise, there might be a valid
469                // gap in between, and observers may not render it (yet).
470                //
471                // We must do this *after* the above call to `.with_events_mut`, so the new
472                // events and gaps are properly persisted to storage.
473                if let Some(diffs) = state.shrink_to_last_chunk().await? {
474                    // Override the diffs with the new ones, as per `shrink_to_last_chunk`'s API
475                    // contract.
476                    timeline_event_diffs = diffs;
477                }
478            }
479
480            timeline_event_diffs
481        };
482
483        // Now that all events have been added, we can trigger the
484        // `pagination_token_notifier`.
485        if prev_batch.is_some() {
486            self.pagination_batch_token_notifier.notify_one();
487        }
488
489        // The order of `RoomEventCacheUpdate`s is **really** important here.
490        {
491            if !timeline_event_diffs.is_empty() {
492                let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
493                    diffs: timeline_event_diffs,
494                    origin: EventsOrigin::Sync,
495                });
496            }
497
498            if !ephemeral_events.is_empty() {
499                let _ = self
500                    .sender
501                    .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
502            }
503
504            if !ambiguity_changes.is_empty() {
505                let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
506            }
507        }
508
509        Ok(())
510    }
511}
512
513/// Internal type to represent the output of
514/// [`RoomEventCacheState::load_more_events_backwards`].
515#[derive(Debug)]
516pub(super) enum LoadMoreEventsBackwardsOutcome {
517    /// A gap has been inserted.
518    Gap {
519        /// The previous batch token to be used as the "end" parameter in the
520        /// back-pagination request.
521        prev_token: Option<String>,
522    },
523
524    /// The start of the timeline has been reached.
525    StartOfTimeline,
526
527    /// Events have been inserted.
528    Events {
529        events: Vec<TimelineEvent>,
530        timeline_event_diffs: Vec<VectorDiff<TimelineEvent>>,
531        reached_start: bool,
532    },
533
534    /// The caller must wait for the initial previous-batch token, and retry.
535    WaitForInitialPrevToken,
536}
537
538// Use a private module to hide `events` to this parent module.
539mod private {
540    use std::{
541        collections::HashSet,
542        sync::{atomic::AtomicUsize, Arc},
543    };
544
545    use eyeball::SharedObservable;
546    use eyeball_im::VectorDiff;
547    use matrix_sdk_base::{
548        apply_redaction,
549        deserialized_responses::{
550            ThreadSummary, ThreadSummaryStatus, TimelineEvent, TimelineEventKind,
551        },
552        event_cache::{store::EventCacheStoreLock, Event, Gap},
553        linked_chunk::{
554            lazy_loader, ChunkContent, ChunkIdentifierGenerator, LinkedChunkId, Position, Update,
555        },
556        serde_helpers::extract_thread_root,
557    };
558    use matrix_sdk_common::executor::spawn;
559    use ruma::{
560        events::{
561            relation::RelationType, room::redaction::SyncRoomRedactionEvent, AnySyncTimelineEvent,
562            MessageLikeEventType,
563        },
564        serde::Raw,
565        EventId, OwnedEventId, OwnedRoomId, RoomVersionId,
566    };
567    use tracing::{debug, error, instrument, trace, warn};
568
569    use super::{
570        super::{
571            deduplicator::{DeduplicationOutcome, Deduplicator},
572            EventCacheError,
573        },
574        events::RoomEvents,
575        sort_positions_descending, EventLocation, LoadMoreEventsBackwardsOutcome,
576    };
577    use crate::event_cache::RoomPaginationStatus;
578
579    /// State for a single room's event cache.
580    ///
581    /// This contains all the inner mutable states that ought to be updated at
582    /// the same time.
583    pub struct RoomEventCacheState {
584        /// The room this state relates to.
585        room: OwnedRoomId,
586
587        /// The room version for this room.
588        room_version: RoomVersionId,
589
590        /// Reference to the underlying backing store.
591        store: EventCacheStoreLock,
592
593        /// The events of the room.
594        events: RoomEvents,
595
596        /// The events deduplicator instance to help finding duplicates.
597        deduplicator: Deduplicator,
598
599        /// Have we ever waited for a previous-batch-token to come from sync, in
600        /// the context of pagination? We do this at most once per room,
601        /// the first time we try to run backward pagination. We reset
602        /// that upon clearing the timeline events.
603        pub waited_for_initial_prev_token: bool,
604
605        pagination_status: SharedObservable<RoomPaginationStatus>,
606
607        /// An atomic count of the current number of listeners of the
608        /// [`super::RoomEventCache`].
609        pub(super) listener_count: Arc<AtomicUsize>,
610    }
611
612    impl RoomEventCacheState {
613        /// Create a new state, or reload it from storage if it's been enabled.
614        ///
615        /// Not all events are going to be loaded. Only a portion of them. The
616        /// [`RoomEvents`] relies on a [`LinkedChunk`] to store all events. Only
617        /// the last chunk will be loaded. It means the events are loaded from
618        /// the most recent to the oldest. To load more events, see
619        /// [`Self::load_more_events_backwards`].
620        ///
621        /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
622        pub async fn new(
623            room_id: OwnedRoomId,
624            room_version: RoomVersionId,
625            store: EventCacheStoreLock,
626            pagination_status: SharedObservable<RoomPaginationStatus>,
627        ) -> Result<Self, EventCacheError> {
628            let store_lock = store.lock().await?;
629
630            let linked_chunk_id = LinkedChunkId::Room(&room_id);
631            let linked_chunk = match store_lock
632                .load_last_chunk(linked_chunk_id)
633                .await
634                .map_err(EventCacheError::from)
635                .and_then(|(last_chunk, chunk_identifier_generator)| {
636                    lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
637                        .map_err(EventCacheError::from)
638                }) {
639                Ok(linked_chunk) => linked_chunk,
640
641                Err(err) => {
642                    error!("error when reloading a linked chunk from memory: {err}");
643
644                    // Clear storage for this room.
645                    store_lock
646                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
647                        .await?;
648
649                    // Restart with an empty linked chunk.
650                    None
651                }
652            };
653
654            let events = RoomEvents::with_initial_linked_chunk(linked_chunk);
655            let deduplicator = Deduplicator::new(room_id.clone(), store.clone());
656
657            Ok(Self {
658                room: room_id,
659                room_version,
660                store,
661                events,
662                deduplicator,
663                waited_for_initial_prev_token: false,
664                listener_count: Default::default(),
665                pagination_status,
666            })
667        }
668
669        /// Deduplicate `events` considering all events in `Self::events`.
670        ///
671        /// The returned tuple contains:
672        /// - all events (duplicated or not) with an ID
673        /// - all the duplicated event IDs with their position,
674        /// - a boolean indicating all events (at least one) are duplicates.
675        ///
676        /// This last boolean is useful to know whether we need to store a
677        /// previous-batch token (gap) we received from a server-side
678        /// request (sync or back-pagination), or if we should
679        /// *not* store it.
680        ///
681        /// Since there can be empty back-paginations with a previous-batch
682        /// token (that is, they don't contain any events), we need to
683        /// make sure that there is *at least* one new event that has
684        /// been added. Otherwise, we might conclude something wrong
685        /// because a subsequent back-pagination might
686        /// return non-duplicated events.
687        ///
688        /// If we had already seen all the duplicated events that we're trying
689        /// to add, then it would be wasteful to store a previous-batch
690        /// token, or even touch the linked chunk: we would repeat
691        /// back-paginations for events that we have already seen, and
692        /// possibly misplace them. And we should not be missing
693        /// events either: the already-known events would have their own
694        /// previous-batch token (it might already be consumed).
695        pub async fn collect_valid_and_duplicated_events(
696            &mut self,
697            events: Vec<Event>,
698        ) -> Result<(DeduplicationOutcome, bool), EventCacheError> {
699            let deduplication_outcome =
700                self.deduplicator.filter_duplicate_events(events, &self.events).await?;
701
702            let number_of_events = deduplication_outcome.all_events.len();
703            let number_of_deduplicated_events =
704                deduplication_outcome.in_memory_duplicated_event_ids.len()
705                    + deduplication_outcome.in_store_duplicated_event_ids.len();
706
707            let all_duplicates =
708                number_of_events > 0 && number_of_events == number_of_deduplicated_events;
709
710            Ok((deduplication_outcome, all_duplicates))
711        }
712
713        /// Given a fully-loaded linked chunk with no gaps, return the
714        /// [`LoadMoreEventsBackwardsOutcome`] expected for this room's cache.
715        fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome {
716            // If we never received events for this room, this means we've never
717            // received a sync for that room, because every room must have at least a
718            // room creation event. Otherwise, we have reached the start of the
719            // timeline.
720            if self.events.events().next().is_some() {
721                // If there's at least one event, this means we've reached the start of the
722                // timeline, since the chunk is fully loaded.
723                trace!("chunk is fully loaded and non-empty: reached_start=true");
724                LoadMoreEventsBackwardsOutcome::StartOfTimeline
725            } else if !self.waited_for_initial_prev_token {
726                // There's no events. Since we haven't yet, wait for an initial previous-token.
727                LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken
728            } else {
729                // Otherwise, we've already waited, *and* received no previous-batch token from
730                // the sync, *and* there are still no events in the fully-loaded
731                // chunk: start back-pagination from the end of the room.
732                LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
733            }
734        }
735
736        /// Load more events backwards if the last chunk is **not** a gap.
737        pub(in super::super) async fn load_more_events_backwards(
738            &mut self,
739        ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
740            // If any in-memory chunk is a gap, don't load more events, and let the caller
741            // resolve the gap.
742            if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
743                return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
744            }
745
746            // Because `first_chunk` is `not `Send`, get this information before the
747            // `.await` point, so that this `Future` can implement `Send`.
748            let first_chunk_identifier =
749                self.events.chunks().next().expect("a linked chunk is never empty").identifier();
750
751            let store = self.store.lock().await?;
752
753            // The first chunk is not a gap, we can load its previous chunk.
754            let linked_chunk_id = LinkedChunkId::Room(&self.room);
755            let new_first_chunk = match store
756                .load_previous_chunk(linked_chunk_id, first_chunk_identifier)
757                .await
758            {
759                Ok(Some(new_first_chunk)) => {
760                    // All good, let's continue with this chunk.
761                    new_first_chunk
762                }
763
764                Ok(None) => {
765                    // There's no previous chunk. The chunk is now fully-loaded. Conclude.
766                    return Ok(self.conclude_load_more_for_fully_loaded_chunk());
767                }
768
769                Err(err) => {
770                    error!("error when loading the previous chunk of a linked chunk: {err}");
771
772                    // Clear storage for this room.
773                    store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
774
775                    // Return the error.
776                    return Err(err.into());
777                }
778            };
779
780            let chunk_content = new_first_chunk.content.clone();
781
782            // We've reached the start on disk, if and only if, there was no chunk prior to
783            // the one we just loaded.
784            //
785            // This value is correct, if and only if, it is used for a chunk content of kind
786            // `Items`.
787            let reached_start = new_first_chunk.previous.is_none();
788
789            if let Err(err) = self.events.insert_new_chunk_as_first(new_first_chunk) {
790                error!("error when inserting the previous chunk into its linked chunk: {err}");
791
792                // Clear storage for this room.
793                store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
794
795                // Return the error.
796                return Err(err.into());
797            };
798
799            // ⚠️ Let's not propagate the updates to the store! We already have these data
800            // in the store! Let's drain them.
801            let _ = self.events.store_updates().take();
802
803            // However, we want to get updates as `VectorDiff`s.
804            let timeline_event_diffs = self.events.updates_as_vector_diffs();
805
806            Ok(match chunk_content {
807                ChunkContent::Gap(gap) => {
808                    trace!("reloaded chunk from disk (gap)");
809                    LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
810                }
811
812                ChunkContent::Items(events) => {
813                    trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
814                    LoadMoreEventsBackwardsOutcome::Events {
815                        events,
816                        timeline_event_diffs,
817                        reached_start,
818                    }
819                }
820            })
821        }
822
823        /// If storage is enabled, unload all the chunks, then reloads only the
824        /// last one.
825        ///
826        /// If storage's enabled, return a diff update that starts with a clear
827        /// of all events; as a result, the caller may override any
828        /// pending diff updates with the result of this function.
829        ///
830        /// Otherwise, returns `None`.
831        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
832        pub(super) async fn shrink_to_last_chunk(
833            &mut self,
834        ) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
835            let store_lock = self.store.lock().await?;
836
837            // Attempt to load the last chunk.
838            let linked_chunk_id = LinkedChunkId::Room(&self.room);
839            let (last_chunk, chunk_identifier_generator) =
840                match store_lock.load_last_chunk(linked_chunk_id).await {
841                    Ok(pair) => pair,
842
843                    Err(err) => {
844                        // If loading the last chunk failed, clear the entire linked chunk.
845                        error!("error when reloading a linked chunk from memory: {err}");
846
847                        // Clear storage for this room.
848                        store_lock
849                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
850                            .await?;
851
852                        // Restart with an empty linked chunk.
853                        (None, ChunkIdentifierGenerator::new_from_scratch())
854                    }
855                };
856
857            debug!("unloading the linked chunk, and resetting it to its last chunk");
858
859            // Remove all the chunks from the linked chunks, except for the last one, and
860            // updates the chunk identifier generator.
861            if let Err(err) = self.events.replace_with(last_chunk, chunk_identifier_generator) {
862                error!("error when replacing the linked chunk: {err}");
863                return self.reset().await.map(Some);
864            }
865
866            // Let pagination observers know that we may have not reached the start of the
867            // timeline.
868            // TODO: likely need to cancel any ongoing pagination.
869            self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
870
871            // Don't propagate those updates to the store; this is only for the in-memory
872            // representation that we're doing this. Let's drain those store updates.
873            let _ = self.events.store_updates().take();
874
875            // However, we want to get updates as `VectorDiff`s, for the external listeners.
876            // Check we're respecting the contract defined in the doc comment.
877            let diffs = self.events.updates_as_vector_diffs();
878            assert!(matches!(diffs[0], VectorDiff::Clear));
879
880            Ok(Some(diffs))
881        }
882
883        /// Automatically shrink the room if there are no listeners, as
884        /// indicated by the atomic number of active listeners.
885        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
886        pub(crate) async fn auto_shrink_if_no_listeners(
887            &mut self,
888        ) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
889            let listener_count = self.listener_count.load(std::sync::atomic::Ordering::SeqCst);
890
891            trace!(listener_count, "received request to auto-shrink");
892
893            if listener_count == 0 {
894                // If we are the last strong reference to the auto-shrinker, we can shrink the
895                // events data structure to its last chunk.
896                self.shrink_to_last_chunk().await
897            } else {
898                Ok(None)
899            }
900        }
901
902        /// Removes the bundled relations from an event, if they were present.
903        ///
904        /// Only replaces the present if it contained bundled relations.
905        fn strip_relations_if_present<T>(event: &mut Raw<T>) {
906            // We're going to get rid of the `unsigned`/`m.relations` field, if it's
907            // present.
908            // Use a closure that returns an option so we can quickly short-circuit.
909            let mut closure = || -> Option<()> {
910                let mut val: serde_json::Value = event.deserialize_as().ok()?;
911                let unsigned = val.get_mut("unsigned")?;
912                let unsigned_obj = unsigned.as_object_mut()?;
913                if unsigned_obj.remove("m.relations").is_some() {
914                    *event = Raw::new(&val).ok()?.cast();
915                }
916                None
917            };
918            let _ = closure();
919        }
920
921        fn strip_relations_from_event(ev: &mut TimelineEvent) {
922            match &mut ev.kind {
923                TimelineEventKind::Decrypted(decrypted) => {
924                    // Remove all information about encryption info for
925                    // the bundled events.
926                    decrypted.unsigned_encryption_info = None;
927
928                    // Remove the `unsigned`/`m.relations` field, if needs be.
929                    Self::strip_relations_if_present(&mut decrypted.event);
930                }
931
932                TimelineEventKind::UnableToDecrypt { event, .. }
933                | TimelineEventKind::PlainText { event } => {
934                    Self::strip_relations_if_present(event);
935                }
936            }
937        }
938
939        /// Strips the bundled relations from a collection of events.
940        fn strip_relations_from_events(items: &mut [TimelineEvent]) {
941            for ev in items.iter_mut() {
942                Self::strip_relations_from_event(ev);
943            }
944        }
945
946        /// Remove events by their position, in `RoomEvents` and in
947        /// `EventCacheStore`.
948        ///
949        /// This method is purposely isolated because it must ensure that
950        /// positions are sorted appropriately or it can be disastrous.
951        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
952        #[instrument(skip_all)]
953        pub(crate) async fn remove_events(
954            &mut self,
955            in_memory_events: Vec<(OwnedEventId, Position)>,
956            in_store_events: Vec<(OwnedEventId, Position)>,
957        ) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
958            // In-store events.
959            if !in_store_events.is_empty() {
960                let mut positions = in_store_events
961                    .into_iter()
962                    .map(|(_event_id, position)| position)
963                    .collect::<Vec<_>>();
964
965                sort_positions_descending(&mut positions);
966
967                self.send_updates_to_store(
968                    positions
969                        .into_iter()
970                        .map(|position| Update::RemoveItem { at: position })
971                        .collect(),
972                )
973                .await?;
974            }
975
976            // In-memory events.
977            if in_memory_events.is_empty() {
978                // Nothing else to do, return early.
979                return Ok(Vec::new());
980            }
981
982            // `remove_events_by_position` is responsible of sorting positions.
983            self.events
984                .remove_events_by_position(
985                    in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
986                )
987                .expect("failed to remove an event");
988
989            self.propagate_changes().await?;
990
991            Ok(self.events.updates_as_vector_diffs())
992        }
993
994        /// Propagate changes to the underlying storage.
995        async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
996            let updates = self.events.store_updates().take();
997            self.send_updates_to_store(updates).await
998        }
999
1000        pub async fn send_updates_to_store(
1001            &mut self,
1002            mut updates: Vec<Update<TimelineEvent, Gap>>,
1003        ) -> Result<(), EventCacheError> {
1004            if updates.is_empty() {
1005                return Ok(());
1006            }
1007
1008            // Strip relations from updates which insert or replace items.
1009            for update in updates.iter_mut() {
1010                match update {
1011                    Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
1012                    Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
1013                    // Other update kinds don't involve adding new events.
1014                    Update::NewItemsChunk { .. }
1015                    | Update::NewGapChunk { .. }
1016                    | Update::RemoveChunk(_)
1017                    | Update::RemoveItem { .. }
1018                    | Update::DetachLastItems { .. }
1019                    | Update::StartReattachItems
1020                    | Update::EndReattachItems
1021                    | Update::Clear => {}
1022                }
1023            }
1024
1025            // Spawn a task to make sure that all the changes are effectively forwarded to
1026            // the store, even if the call to this method gets aborted.
1027            //
1028            // The store cross-process locking involves an actual mutex, which ensures that
1029            // storing updates happens in the expected order.
1030
1031            let store = self.store.clone();
1032            let room_id = self.room.clone();
1033
1034            spawn(async move {
1035                let store = store.lock().await?;
1036
1037                trace!(?updates, "sending linked chunk updates to the store");
1038                let linked_chunk_id = LinkedChunkId::Room(&room_id);
1039                store.handle_linked_chunk_updates(linked_chunk_id, updates).await?;
1040                trace!("linked chunk updates applied");
1041
1042                super::Result::Ok(())
1043            })
1044            .await
1045            .expect("joining failed")?;
1046
1047            Ok(())
1048        }
1049
1050        /// Reset this data structure as if it were brand new.
1051        ///
1052        /// Return a single diff update that is a clear of all events; as a
1053        /// result, the caller may override any pending diff updates
1054        /// with the result of this function.
1055        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1056        pub async fn reset(&mut self) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
1057            self.events.reset();
1058
1059            self.propagate_changes().await?;
1060
1061            // Reset the pagination state too: pretend we never waited for the initial
1062            // prev-batch token, and indicate that we're not at the start of the
1063            // timeline, since we don't know about that anymore.
1064            self.waited_for_initial_prev_token = false;
1065            // TODO: likely must cancel any ongoing back-paginations too
1066            self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1067
1068            let diff_updates = self.events.updates_as_vector_diffs();
1069
1070            // Ensure the contract defined in the doc comment is true:
1071            debug_assert_eq!(diff_updates.len(), 1);
1072            debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1073
1074            Ok(diff_updates)
1075        }
1076
1077        /// Returns a read-only reference to the underlying events.
1078        pub fn events(&self) -> &RoomEvents {
1079            &self.events
1080        }
1081
1082        /// Find a single event in this room.
1083        ///
1084        /// It starts by looking into loaded events in `RoomEvents` before
1085        /// looking inside the storage if it is enabled.
1086        pub async fn find_event(
1087            &self,
1088            event_id: &EventId,
1089        ) -> Result<Option<(EventLocation, TimelineEvent)>, EventCacheError> {
1090            // There are supposedly fewer events loaded in memory than in the store. Let's
1091            // start by looking up in the `RoomEvents`.
1092            for (position, event) in self.events().revents() {
1093                if event.event_id().as_deref() == Some(event_id) {
1094                    return Ok(Some((EventLocation::Memory(position), event.clone())));
1095                }
1096            }
1097
1098            let store = self.store.lock().await?;
1099
1100            Ok(store
1101                .find_event(&self.room, event_id)
1102                .await?
1103                .map(|event| (EventLocation::Store, event)))
1104        }
1105
1106        /// Find an event and all its relations in the persisted storage.
1107        ///
1108        /// This goes straight to the database, as a simplification; we don't
1109        /// expect to need to have to look up in memory events, or that
1110        /// all the related events are actually loaded.
1111        pub async fn find_event_with_relations(
1112            &self,
1113            event_id: &EventId,
1114            filters: Option<Vec<RelationType>>,
1115        ) -> Result<Option<(TimelineEvent, Vec<TimelineEvent>)>, EventCacheError> {
1116            let store = self.store.lock().await?;
1117
1118            // First, hit storage to get the target event and its related events.
1119            let found = store.find_event(&self.room, event_id).await?;
1120
1121            let Some(target) = found else {
1122                // We haven't found the event: return early.
1123                return Ok(None);
1124            };
1125
1126            // Then, initialize the stack with all the related events, to find the
1127            // transitive closure of all the related events.
1128            let mut related =
1129                store.find_event_relations(&self.room, event_id, filters.as_deref()).await?;
1130            let mut stack = related.iter().filter_map(|event| event.event_id()).collect::<Vec<_>>();
1131
1132            // Also keep track of already seen events, in case there's a loop in the
1133            // relation graph.
1134            let mut already_seen = HashSet::new();
1135            already_seen.insert(event_id.to_owned());
1136
1137            let mut num_iters = 1;
1138
1139            // Find the related event for each previously-related event.
1140            while let Some(event_id) = stack.pop() {
1141                if !already_seen.insert(event_id.clone()) {
1142                    // Skip events we've already seen.
1143                    continue;
1144                }
1145
1146                let other_related =
1147                    store.find_event_relations(&self.room, &event_id, filters.as_deref()).await?;
1148
1149                stack.extend(other_related.iter().filter_map(|event| event.event_id()));
1150                related.extend(other_related);
1151
1152                num_iters += 1;
1153            }
1154
1155            trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
1156
1157            Ok(Some((target, related)))
1158        }
1159
1160        /// Gives a temporary mutable handle to the underlying in-memory events,
1161        /// and will propagate changes to the storage once done.
1162        ///
1163        /// Returns the updates to the linked chunk, as vector diffs, so the
1164        /// caller may propagate such updates, if needs be.
1165        ///
1166        /// The function `func` takes a mutable reference to `RoomEvents`. It
1167        /// returns a set of events that will be post-processed. At the time of
1168        /// writing, all these events are passed to
1169        /// `Self::maybe_apply_new_redaction`.
1170        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1171        #[instrument(skip_all, fields(room_id = %self.room))]
1172        pub async fn with_events_mut<F>(
1173            &mut self,
1174            is_live_sync: bool,
1175            func: F,
1176        ) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError>
1177        where
1178            F: FnOnce(&mut RoomEvents) -> Vec<TimelineEvent>,
1179        {
1180            let events_to_post_process = func(&mut self.events);
1181
1182            // Update the store before doing the post-processing.
1183            self.propagate_changes().await?;
1184
1185            for event in events_to_post_process {
1186                self.maybe_apply_new_redaction(&event).await?;
1187
1188                self.analyze_thread_root(&event, is_live_sync).await?;
1189
1190                // Save a bundled thread event, if there was one.
1191                if let Some(bundled_thread) = event.bundled_latest_thread_event {
1192                    self.save_event([*bundled_thread]).await?;
1193                }
1194            }
1195
1196            // If we've never waited for an initial previous-batch token, and we now have at
1197            // least one gap in the chunk, no need to wait for a previous-batch token later.
1198            if !self.waited_for_initial_prev_token
1199                && self.events.chunks().any(|chunk| chunk.is_gap())
1200            {
1201                self.waited_for_initial_prev_token = true;
1202            }
1203
1204            let updates_as_vector_diffs = self.events.updates_as_vector_diffs();
1205
1206            Ok(updates_as_vector_diffs)
1207        }
1208
1209        /// If the event is a threaded reply, ensure the related thread's root
1210        /// event (i.e. first thread event) has a thread summary.
1211        #[instrument(skip_all)]
1212        async fn analyze_thread_root(
1213            &mut self,
1214            event: &Event,
1215            is_live_sync: bool,
1216        ) -> Result<(), EventCacheError> {
1217            let Some(thread_root) = extract_thread_root(event.raw()) else {
1218                // No thread root, carry on.
1219                return Ok(());
1220            };
1221
1222            // Add a thread summary to the event which has the thread root, if we knew about
1223            // it.
1224            let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
1225                trace!("thread root event is missing from the linked chunk");
1226                return Ok(());
1227            };
1228
1229            // Read the latest number of thread replies from the store.
1230            //
1231            // Implementation note: since this is based on the `m.relates_to` field, and
1232            // that field can only be present on room messages, we don't have to
1233            // worry about filtering out aggregation events (like
1234            // reactions/edits/etc.). Pretty neat, huh?
1235            let num_replies = {
1236                let store_guard = &*self.store.lock().await?;
1237                let related_thread_events = store_guard
1238                    .find_event_relations(&self.room, &thread_root, Some(&[RelationType::Thread]))
1239                    .await?;
1240                related_thread_events.len()
1241            };
1242
1243            let prev_summary = target_event.thread_summary.summary();
1244            let mut latest_reply =
1245                prev_summary.as_ref().and_then(|summary| summary.latest_reply.clone());
1246
1247            // If we're live-syncing, then the latest event is always the event we're
1248            // currently processing. We're processing the sync events from oldest to newest,
1249            // so a a single sync response containing multiple thread events
1250            // will correctly override the latest event to the most recent one.
1251            //
1252            // If we're back-paginating, then we shouldn't update the latest event
1253            // information if it's set. If it's not set, then we should update
1254            // it to the last event in the batch. TODO(bnjbvr): the code is
1255            // wrong here in this particular case, because a single pagination
1256            // batch may include multiple events in the same thread, and they're
1257            // processed from oldest to newest; so the first in-thread event seen in that
1258            // batch will be marked as the latest reply, which is incorrect.
1259            // This will be fixed Later™ by using a proper linked chunk per
1260            // thread.
1261
1262            if is_live_sync || latest_reply.is_none() {
1263                latest_reply = event.event_id();
1264            }
1265
1266            let new_summary = ThreadSummary { num_replies, latest_reply };
1267
1268            if prev_summary == Some(&new_summary) {
1269                trace!("thread summary is already up-to-date");
1270                return Ok(());
1271            }
1272
1273            // Cause an update to observers.
1274            target_event.thread_summary = ThreadSummaryStatus::Some(new_summary);
1275            self.replace_event_at(location, target_event).await?;
1276
1277            Ok(())
1278        }
1279
1280        /// Replaces a single event, be it saved in memory or in the store.
1281        ///
1282        /// If it was saved in memory, this will emit a notification to
1283        /// observers that a single item has been replaced. Otherwise,
1284        /// such a notification is not emitted, because observers are
1285        /// unlikely to observe the store updates directly.
1286        async fn replace_event_at(
1287            &mut self,
1288            location: EventLocation,
1289            event: TimelineEvent,
1290        ) -> Result<(), EventCacheError> {
1291            match location {
1292                EventLocation::Memory(position) => {
1293                    self.events
1294                        .replace_event_at(position, event)
1295                        .expect("should have been a valid position of an item");
1296                    // We just changed the in-memory representation; synchronize this with
1297                    // the store.
1298                    self.propagate_changes().await?;
1299                }
1300                EventLocation::Store => {
1301                    self.save_event([event]).await?;
1302                }
1303            }
1304
1305            Ok(())
1306        }
1307
1308        /// If the given event is a redaction, try to retrieve the
1309        /// to-be-redacted event in the chunk, and replace it by the
1310        /// redacted form.
1311        #[instrument(skip_all)]
1312        async fn maybe_apply_new_redaction(
1313            &mut self,
1314            event: &Event,
1315        ) -> Result<(), EventCacheError> {
1316            let raw_event = event.raw();
1317
1318            // Do not deserialise the entire event if we aren't certain it's a
1319            // `m.room.redaction`. It saves a non-negligible amount of computations.
1320            let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1321                raw_event.get_field::<MessageLikeEventType>("type")
1322            else {
1323                return Ok(());
1324            };
1325
1326            // It is a `m.room.redaction`! We can deserialize it entirely.
1327
1328            let Ok(AnySyncTimelineEvent::MessageLike(
1329                ruma::events::AnySyncMessageLikeEvent::RoomRedaction(redaction),
1330            )) = event.raw().deserialize()
1331            else {
1332                return Ok(());
1333            };
1334
1335            let Some(event_id) = redaction.redacts(&self.room_version) else {
1336                warn!("missing target event id from the redaction event");
1337                return Ok(());
1338            };
1339
1340            // Replace the redacted event by a redacted form, if we knew about it.
1341            if let Some((location, mut target_event)) = self.find_event(event_id).await? {
1342                // Don't redact already redacted events.
1343                if let Ok(deserialized) = target_event.raw().deserialize() {
1344                    match deserialized {
1345                        AnySyncTimelineEvent::MessageLike(ev) => {
1346                            if ev.is_redacted() {
1347                                return Ok(());
1348                            }
1349                        }
1350                        AnySyncTimelineEvent::State(ev) => {
1351                            if ev.is_redacted() {
1352                                return Ok(());
1353                            }
1354                        }
1355                    }
1356                }
1357
1358                if let Some(redacted_event) = apply_redaction(
1359                    target_event.raw(),
1360                    event.raw().cast_ref::<SyncRoomRedactionEvent>(),
1361                    &self.room_version,
1362                ) {
1363                    // It's safe to cast `redacted_event` here:
1364                    // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
1365                    //   when calling .raw(), so it's still one under the hood.
1366                    // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
1367                    target_event.replace_raw(redacted_event.cast());
1368
1369                    self.replace_event_at(location, target_event).await?;
1370                }
1371            } else {
1372                trace!("redacted event is missing from the linked chunk");
1373            }
1374
1375            Ok(())
1376        }
1377
1378        /// Save a single event into the database, without notifying observers.
1379        ///
1380        /// Note: if the event was already saved as part of a linked chunk, and
1381        /// its event id may have changed, it's not safe to use this
1382        /// method because it may break the link between the chunk and
1383        /// the event. Instead, an update to the linked chunk must be used.
1384        pub async fn save_event(
1385            &self,
1386            events: impl IntoIterator<Item = TimelineEvent>,
1387        ) -> Result<(), EventCacheError> {
1388            let store = self.store.clone();
1389            let room_id = self.room.clone();
1390            let events = events.into_iter().collect::<Vec<_>>();
1391
1392            // Spawn a task so the save is uninterrupted by task cancellation.
1393            spawn(async move {
1394                let store = store.lock().await?;
1395                for event in events {
1396                    store.save_event(&room_id, event).await?;
1397                }
1398                super::Result::Ok(())
1399            })
1400            .await
1401            .expect("joining failed")?;
1402
1403            Ok(())
1404        }
1405    }
1406}
1407
1408/// An enum representing where an event has been found.
1409pub(super) enum EventLocation {
1410    /// Event lives in memory (and likely in the store!).
1411    Memory(Position),
1412
1413    /// Event lives in the store only, it has not been loaded in memory yet.
1414    Store,
1415}
1416
1417pub(super) use private::RoomEventCacheState;
1418
1419#[cfg(test)]
1420mod tests {
1421
1422    use matrix_sdk_common::deserialized_responses::TimelineEvent;
1423    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1424    use ruma::{
1425        event_id,
1426        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
1427        room_id, user_id, RoomId,
1428    };
1429
1430    use crate::test_utils::logged_in_client;
1431
1432    #[async_test]
1433    async fn test_event_with_edit_relation() {
1434        let original_id = event_id!("$original");
1435        let related_id = event_id!("$related");
1436        let room_id = room_id!("!galette:saucisse.bzh");
1437        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1438
1439        assert_relations(
1440            room_id,
1441            f.text_msg("Original event").event_id(original_id).into(),
1442            f.text_msg("* An edited event")
1443                .edit(
1444                    original_id,
1445                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
1446                )
1447                .event_id(related_id)
1448                .into(),
1449            f,
1450        )
1451        .await;
1452    }
1453
1454    #[async_test]
1455    async fn test_event_with_thread_reply_relation() {
1456        let original_id = event_id!("$original");
1457        let related_id = event_id!("$related");
1458        let room_id = room_id!("!galette:saucisse.bzh");
1459        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1460
1461        assert_relations(
1462            room_id,
1463            f.text_msg("Original event").event_id(original_id).into(),
1464            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
1465            f,
1466        )
1467        .await;
1468    }
1469
1470    #[async_test]
1471    async fn test_event_with_reaction_relation() {
1472        let original_id = event_id!("$original");
1473        let related_id = event_id!("$related");
1474        let room_id = room_id!("!galette:saucisse.bzh");
1475        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1476
1477        assert_relations(
1478            room_id,
1479            f.text_msg("Original event").event_id(original_id).into(),
1480            f.reaction(original_id, ":D").event_id(related_id).into(),
1481            f,
1482        )
1483        .await;
1484    }
1485
1486    #[async_test]
1487    async fn test_event_with_poll_response_relation() {
1488        let original_id = event_id!("$original");
1489        let related_id = event_id!("$related");
1490        let room_id = room_id!("!galette:saucisse.bzh");
1491        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1492
1493        assert_relations(
1494            room_id,
1495            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1496                .event_id(original_id)
1497                .into(),
1498            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
1499            f,
1500        )
1501        .await;
1502    }
1503
1504    #[async_test]
1505    async fn test_event_with_poll_end_relation() {
1506        let original_id = event_id!("$original");
1507        let related_id = event_id!("$related");
1508        let room_id = room_id!("!galette:saucisse.bzh");
1509        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1510
1511        assert_relations(
1512            room_id,
1513            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1514                .event_id(original_id)
1515                .into(),
1516            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
1517            f,
1518        )
1519        .await;
1520    }
1521
1522    #[async_test]
1523    async fn test_event_with_filtered_relationships() {
1524        let original_id = event_id!("$original");
1525        let related_id = event_id!("$related");
1526        let associated_related_id = event_id!("$recursive_related");
1527        let room_id = room_id!("!galette:saucisse.bzh");
1528        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1529
1530        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1531        let related_event = event_factory
1532            .text_msg("* Edited event")
1533            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1534            .event_id(related_id)
1535            .into();
1536        let associated_related_event =
1537            event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
1538
1539        let client = logged_in_client(None).await;
1540
1541        let event_cache = client.event_cache();
1542        event_cache.subscribe().unwrap();
1543
1544        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1545        let room = client.get_room(room_id).unwrap();
1546
1547        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1548
1549        // Save the original event.
1550        room_event_cache.save_events([original_event]).await;
1551
1552        // Save the related event.
1553        room_event_cache.save_events([related_event]).await;
1554
1555        // Save the associated related event, which redacts the related event.
1556        room_event_cache.save_events([associated_related_event]).await;
1557
1558        let filter = Some(vec![RelationType::Replacement]);
1559        let (event, related_events) =
1560            room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1561        // Fetched event is the right one.
1562        let cached_event_id = event.event_id().unwrap();
1563        assert_eq!(cached_event_id, original_id);
1564
1565        // There's only the edit event (an edit event can't have its own edit event).
1566        assert_eq!(related_events.len(), 1);
1567
1568        let related_event_id = related_events[0].event_id().unwrap();
1569        assert_eq!(related_event_id, related_id);
1570
1571        // Now we'll filter threads instead, there should be no related events
1572        let filter = Some(vec![RelationType::Thread]);
1573        let (event, related_events) =
1574            room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1575        // Fetched event is the right one.
1576        let cached_event_id = event.event_id().unwrap();
1577        assert_eq!(cached_event_id, original_id);
1578        // No Thread related events found
1579        assert!(related_events.is_empty());
1580    }
1581
1582    #[async_test]
1583    async fn test_event_with_recursive_relation() {
1584        let original_id = event_id!("$original");
1585        let related_id = event_id!("$related");
1586        let associated_related_id = event_id!("$recursive_related");
1587        let room_id = room_id!("!galette:saucisse.bzh");
1588        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1589
1590        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1591        let related_event = event_factory
1592            .text_msg("* Edited event")
1593            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1594            .event_id(related_id)
1595            .into();
1596        let associated_related_event =
1597            event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
1598
1599        let client = logged_in_client(None).await;
1600
1601        let event_cache = client.event_cache();
1602        event_cache.subscribe().unwrap();
1603
1604        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1605        let room = client.get_room(room_id).unwrap();
1606
1607        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1608
1609        // Save the original event.
1610        room_event_cache.save_events([original_event]).await;
1611
1612        // Save the related event.
1613        room_event_cache.save_events([related_event]).await;
1614
1615        // Save the associated related event, which redacts the related event.
1616        room_event_cache.save_events([associated_related_event]).await;
1617
1618        let (event, related_events) =
1619            room_event_cache.event_with_relations(original_id, None).await.unwrap();
1620        // Fetched event is the right one.
1621        let cached_event_id = event.event_id().unwrap();
1622        assert_eq!(cached_event_id, original_id);
1623
1624        // There are both the related id and the associatively related id
1625        assert_eq!(related_events.len(), 2);
1626
1627        let related_event_id = related_events[0].event_id().unwrap();
1628        assert_eq!(related_event_id, related_id);
1629        let related_event_id = related_events[1].event_id().unwrap();
1630        assert_eq!(related_event_id, associated_related_id);
1631    }
1632
1633    async fn assert_relations(
1634        room_id: &RoomId,
1635        original_event: TimelineEvent,
1636        related_event: TimelineEvent,
1637        event_factory: EventFactory,
1638    ) {
1639        let client = logged_in_client(None).await;
1640
1641        let event_cache = client.event_cache();
1642        event_cache.subscribe().unwrap();
1643
1644        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1645        let room = client.get_room(room_id).unwrap();
1646
1647        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1648
1649        // Save the original event.
1650        let original_event_id = original_event.event_id().unwrap();
1651        room_event_cache.save_events([original_event]).await;
1652
1653        // Save an unrelated event to check it's not in the related events list.
1654        let unrelated_id = event_id!("$2");
1655        room_event_cache
1656            .save_events([event_factory
1657                .text_msg("An unrelated event")
1658                .event_id(unrelated_id)
1659                .into()])
1660            .await;
1661
1662        // Save the related event.
1663        let related_id = related_event.event_id().unwrap();
1664        room_event_cache.save_events([related_event]).await;
1665
1666        let (event, related_events) =
1667            room_event_cache.event_with_relations(&original_event_id, None).await.unwrap();
1668        // Fetched event is the right one.
1669        let cached_event_id = event.event_id().unwrap();
1670        assert_eq!(cached_event_id, original_event_id);
1671
1672        // There is only the actually related event in the related ones
1673        let related_event_id = related_events[0].event_id().unwrap();
1674        assert_eq!(related_event_id, related_id);
1675    }
1676}
1677
1678#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
1679mod timed_tests {
1680    use std::sync::Arc;
1681
1682    use assert_matches::assert_matches;
1683    use assert_matches2::assert_let;
1684    use eyeball_im::VectorDiff;
1685    use matrix_sdk_base::{
1686        event_cache::{
1687            store::{EventCacheStore as _, MemoryStore},
1688            Gap,
1689        },
1690        linked_chunk::{
1691            lazy_loader::from_all_chunks, ChunkContent, ChunkIdentifier, LinkedChunkId, Position,
1692            Update,
1693        },
1694        store::StoreConfig,
1695        sync::{JoinedRoomUpdate, Timeline},
1696    };
1697    use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE, BOB};
1698    use ruma::{
1699        event_id,
1700        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
1701        room_id, user_id,
1702    };
1703    use tokio::task::yield_now;
1704
1705    use crate::{
1706        assert_let_timeout,
1707        event_cache::{room::LoadMoreEventsBackwardsOutcome, RoomEventCacheUpdate},
1708        test_utils::client::MockClientBuilder,
1709    };
1710
1711    #[async_test]
1712    async fn test_write_to_storage() {
1713        let room_id = room_id!("!galette:saucisse.bzh");
1714        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1715
1716        let event_cache_store = Arc::new(MemoryStore::new());
1717
1718        let client = MockClientBuilder::new("http://localhost".to_owned())
1719            .store_config(
1720                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1721            )
1722            .build()
1723            .await;
1724
1725        let event_cache = client.event_cache();
1726
1727        // Don't forget to subscribe and like^W enable storage!
1728        event_cache.subscribe().unwrap();
1729
1730        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1731        let room = client.get_room(room_id).unwrap();
1732
1733        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1734
1735        // Propagate an update for a message and a prev-batch token.
1736        let timeline = Timeline {
1737            limited: true,
1738            prev_batch: Some("raclette".to_owned()),
1739            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
1740        };
1741
1742        room_event_cache
1743            .inner
1744            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1745            .await
1746            .unwrap();
1747
1748        let linked_chunk = from_all_chunks::<3, _, _>(
1749            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1750        )
1751        .unwrap()
1752        .unwrap();
1753
1754        assert_eq!(linked_chunk.chunks().count(), 2);
1755
1756        let mut chunks = linked_chunk.chunks();
1757
1758        // We start with the gap.
1759        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
1760            assert_eq!(gap.prev_token, "raclette");
1761        });
1762
1763        // Then we have the stored event.
1764        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1765            assert_eq!(events.len(), 1);
1766            let deserialized = events[0].raw().deserialize().unwrap();
1767            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
1768            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
1769        });
1770
1771        // That's all, folks!
1772        assert!(chunks.next().is_none());
1773    }
1774
1775    #[async_test]
1776    async fn test_write_to_storage_strips_bundled_relations() {
1777        let room_id = room_id!("!galette:saucisse.bzh");
1778        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1779
1780        let event_cache_store = Arc::new(MemoryStore::new());
1781
1782        let client = MockClientBuilder::new("http://localhost".to_owned())
1783            .store_config(
1784                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1785            )
1786            .build()
1787            .await;
1788
1789        let event_cache = client.event_cache();
1790
1791        // Don't forget to subscribe and like^W enable storage!
1792        event_cache.subscribe().unwrap();
1793
1794        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1795        let room = client.get_room(room_id).unwrap();
1796
1797        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1798
1799        // Propagate an update for a message with bundled relations.
1800        let ev = f
1801            .text_msg("hey yo")
1802            .sender(*ALICE)
1803            .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
1804            .into_event();
1805
1806        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1807
1808        room_event_cache
1809            .inner
1810            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1811            .await
1812            .unwrap();
1813
1814        // The in-memory linked chunk keeps the bundled relation.
1815        {
1816            let events = room_event_cache.events().await;
1817
1818            assert_eq!(events.len(), 1);
1819
1820            let ev = events[0].raw().deserialize().unwrap();
1821            assert_let!(
1822                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1823            );
1824
1825            let original = msg.as_original().unwrap();
1826            assert_eq!(original.content.body(), "hey yo");
1827            assert!(original.unsigned.relations.replace.is_some());
1828        }
1829
1830        // The one in storage does not.
1831        let linked_chunk = from_all_chunks::<3, _, _>(
1832            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1833        )
1834        .unwrap()
1835        .unwrap();
1836
1837        assert_eq!(linked_chunk.chunks().count(), 1);
1838
1839        let mut chunks = linked_chunk.chunks();
1840        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1841            assert_eq!(events.len(), 1);
1842
1843            let ev = events[0].raw().deserialize().unwrap();
1844            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1845
1846            let original = msg.as_original().unwrap();
1847            assert_eq!(original.content.body(), "hey yo");
1848            assert!(original.unsigned.relations.replace.is_none());
1849        });
1850
1851        // That's all, folks!
1852        assert!(chunks.next().is_none());
1853    }
1854
1855    #[async_test]
1856    async fn test_clear() {
1857        let room_id = room_id!("!galette:saucisse.bzh");
1858        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1859
1860        let event_cache_store = Arc::new(MemoryStore::new());
1861
1862        let event_id1 = event_id!("$1");
1863        let event_id2 = event_id!("$2");
1864
1865        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1866        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1867
1868        // Prefill the store with some data.
1869        event_cache_store
1870            .handle_linked_chunk_updates(
1871                LinkedChunkId::Room(room_id),
1872                vec![
1873                    // An empty items chunk.
1874                    Update::NewItemsChunk {
1875                        previous: None,
1876                        new: ChunkIdentifier::new(0),
1877                        next: None,
1878                    },
1879                    // A gap chunk.
1880                    Update::NewGapChunk {
1881                        previous: Some(ChunkIdentifier::new(0)),
1882                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1883                        new: ChunkIdentifier::new(42),
1884                        next: None,
1885                        gap: Gap { prev_token: "comté".to_owned() },
1886                    },
1887                    // Another items chunk, non-empty this time.
1888                    Update::NewItemsChunk {
1889                        previous: Some(ChunkIdentifier::new(42)),
1890                        new: ChunkIdentifier::new(1),
1891                        next: None,
1892                    },
1893                    Update::PushItems {
1894                        at: Position::new(ChunkIdentifier::new(1), 0),
1895                        items: vec![ev1.clone()],
1896                    },
1897                    // And another items chunk, non-empty again.
1898                    Update::NewItemsChunk {
1899                        previous: Some(ChunkIdentifier::new(1)),
1900                        new: ChunkIdentifier::new(2),
1901                        next: None,
1902                    },
1903                    Update::PushItems {
1904                        at: Position::new(ChunkIdentifier::new(2), 0),
1905                        items: vec![ev2.clone()],
1906                    },
1907                ],
1908            )
1909            .await
1910            .unwrap();
1911
1912        let client = MockClientBuilder::new("http://localhost".to_owned())
1913            .store_config(
1914                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1915            )
1916            .build()
1917            .await;
1918
1919        let event_cache = client.event_cache();
1920
1921        // Don't forget to subscribe and like^W enable storage!
1922        event_cache.subscribe().unwrap();
1923
1924        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1925        let room = client.get_room(room_id).unwrap();
1926
1927        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1928
1929        let (items, mut stream) = room_event_cache.subscribe().await;
1930
1931        // The rooms knows about all cached events.
1932        {
1933            assert!(room_event_cache.event(event_id1).await.is_some());
1934            assert!(room_event_cache.event(event_id2).await.is_some());
1935        }
1936
1937        // But only part of events are loaded from the store
1938        {
1939            // The room must contain only one event because only one chunk has been loaded.
1940            assert_eq!(items.len(), 1);
1941            assert_eq!(items[0].event_id().unwrap(), event_id2);
1942
1943            assert!(stream.is_empty());
1944        }
1945
1946        // Let's load more chunks to load all events.
1947        {
1948            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1949
1950            assert_let_timeout!(
1951                Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1952            );
1953            assert_eq!(diffs.len(), 1);
1954            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1955                // Here you are `event_id1`!
1956                assert_eq!(event.event_id().unwrap(), event_id1);
1957            });
1958
1959            assert!(stream.is_empty());
1960        }
1961
1962        // After clearing,…
1963        room_event_cache.clear().await.unwrap();
1964
1965        //… we get an update that the content has been cleared.
1966        assert_let_timeout!(
1967            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1968        );
1969        assert_eq!(diffs.len(), 1);
1970        assert_let!(VectorDiff::Clear = &diffs[0]);
1971
1972        // Events individually are not forgotten by the event cache, after clearing a
1973        // room.
1974        assert!(room_event_cache.event(event_id1).await.is_some());
1975
1976        // But their presence in a linked chunk is forgotten.
1977        let items = room_event_cache.events().await;
1978        assert!(items.is_empty());
1979
1980        // The event cache store too.
1981        let linked_chunk = from_all_chunks::<3, _, _>(
1982            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1983        )
1984        .unwrap()
1985        .unwrap();
1986
1987        // Note: while the event cache store could return `None` here, clearing it will
1988        // reset it to its initial form, maintaining the invariant that it
1989        // contains a single items chunk that's empty.
1990        assert_eq!(linked_chunk.num_items(), 0);
1991    }
1992
1993    #[async_test]
1994    async fn test_load_from_storage() {
1995        let room_id = room_id!("!galette:saucisse.bzh");
1996        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1997
1998        let event_cache_store = Arc::new(MemoryStore::new());
1999
2000        let event_id1 = event_id!("$1");
2001        let event_id2 = event_id!("$2");
2002
2003        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2004        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2005
2006        // Prefill the store with some data.
2007        event_cache_store
2008            .handle_linked_chunk_updates(
2009                LinkedChunkId::Room(room_id),
2010                vec![
2011                    // An empty items chunk.
2012                    Update::NewItemsChunk {
2013                        previous: None,
2014                        new: ChunkIdentifier::new(0),
2015                        next: None,
2016                    },
2017                    // A gap chunk.
2018                    Update::NewGapChunk {
2019                        previous: Some(ChunkIdentifier::new(0)),
2020                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
2021                        new: ChunkIdentifier::new(42),
2022                        next: None,
2023                        gap: Gap { prev_token: "cheddar".to_owned() },
2024                    },
2025                    // Another items chunk, non-empty this time.
2026                    Update::NewItemsChunk {
2027                        previous: Some(ChunkIdentifier::new(42)),
2028                        new: ChunkIdentifier::new(1),
2029                        next: None,
2030                    },
2031                    Update::PushItems {
2032                        at: Position::new(ChunkIdentifier::new(1), 0),
2033                        items: vec![ev1.clone()],
2034                    },
2035                    // And another items chunk, non-empty again.
2036                    Update::NewItemsChunk {
2037                        previous: Some(ChunkIdentifier::new(1)),
2038                        new: ChunkIdentifier::new(2),
2039                        next: None,
2040                    },
2041                    Update::PushItems {
2042                        at: Position::new(ChunkIdentifier::new(2), 0),
2043                        items: vec![ev2.clone()],
2044                    },
2045                ],
2046            )
2047            .await
2048            .unwrap();
2049
2050        let client = MockClientBuilder::new("http://localhost".to_owned())
2051            .store_config(
2052                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
2053            )
2054            .build()
2055            .await;
2056
2057        let event_cache = client.event_cache();
2058
2059        // Don't forget to subscribe and like^W enable storage!
2060        event_cache.subscribe().unwrap();
2061
2062        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2063        let room = client.get_room(room_id).unwrap();
2064
2065        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2066
2067        let (items, mut stream) = room_event_cache.subscribe().await;
2068
2069        // The initial items contain one event because only the last chunk is loaded by
2070        // default.
2071        assert_eq!(items.len(), 1);
2072        assert_eq!(items[0].event_id().unwrap(), event_id2);
2073        assert!(stream.is_empty());
2074
2075        // The event cache knows only all events though, even if they aren't loaded.
2076        assert!(room_event_cache.event(event_id1).await.is_some());
2077        assert!(room_event_cache.event(event_id2).await.is_some());
2078
2079        // Let's paginate to load more events.
2080        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2081
2082        assert_let_timeout!(
2083            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2084        );
2085        assert_eq!(diffs.len(), 1);
2086        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2087            assert_eq!(event.event_id().unwrap(), event_id1);
2088        });
2089
2090        assert!(stream.is_empty());
2091
2092        // A new update with one of these events leads to deduplication.
2093        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
2094        room_event_cache
2095            .inner
2096            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2097            .await
2098            .unwrap();
2099
2100        // The stream doesn't report these changes *yet*. Use the items vector given
2101        // when subscribing, to check that the items correspond to their new
2102        // positions. The duplicated item is removed (so it's not the first
2103        // element anymore), and it's added to the back of the list.
2104        let items = room_event_cache.events().await;
2105        assert_eq!(items.len(), 2);
2106        assert_eq!(items[0].event_id().unwrap(), event_id1);
2107        assert_eq!(items[1].event_id().unwrap(), event_id2);
2108    }
2109
2110    #[async_test]
2111    async fn test_load_from_storage_resilient_to_failure() {
2112        let room_id = room_id!("!fondue:patate.ch");
2113        let event_cache_store = Arc::new(MemoryStore::new());
2114
2115        let event = EventFactory::new()
2116            .room(room_id)
2117            .sender(user_id!("@ben:saucisse.bzh"))
2118            .text_msg("foo")
2119            .event_id(event_id!("$42"))
2120            .into_event();
2121
2122        // Prefill the store with invalid data: two chunks that form a cycle.
2123        event_cache_store
2124            .handle_linked_chunk_updates(
2125                LinkedChunkId::Room(room_id),
2126                vec![
2127                    Update::NewItemsChunk {
2128                        previous: None,
2129                        new: ChunkIdentifier::new(0),
2130                        next: None,
2131                    },
2132                    Update::PushItems {
2133                        at: Position::new(ChunkIdentifier::new(0), 0),
2134                        items: vec![event],
2135                    },
2136                    Update::NewItemsChunk {
2137                        previous: Some(ChunkIdentifier::new(0)),
2138                        new: ChunkIdentifier::new(1),
2139                        next: Some(ChunkIdentifier::new(0)),
2140                    },
2141                ],
2142            )
2143            .await
2144            .unwrap();
2145
2146        let client = MockClientBuilder::new("http://localhost".to_owned())
2147            .store_config(
2148                StoreConfig::new("holder".to_owned()).event_cache_store(event_cache_store.clone()),
2149            )
2150            .build()
2151            .await;
2152
2153        let event_cache = client.event_cache();
2154
2155        // Don't forget to subscribe and like^W enable storage!
2156        event_cache.subscribe().unwrap();
2157
2158        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2159        let room = client.get_room(room_id).unwrap();
2160
2161        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2162
2163        let items = room_event_cache.events().await;
2164
2165        // Because the persisted content was invalid, the room store is reset: there are
2166        // no events in the cache.
2167        assert!(items.is_empty());
2168
2169        // Storage doesn't contain anything. It would also be valid that it contains a
2170        // single initial empty items chunk.
2171        let raw_chunks =
2172            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
2173        assert!(raw_chunks.is_empty());
2174    }
2175
2176    #[async_test]
2177    async fn test_no_useless_gaps() {
2178        let room_id = room_id!("!galette:saucisse.bzh");
2179
2180        let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2181
2182        let event_cache = client.event_cache();
2183        event_cache.subscribe().unwrap();
2184
2185        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2186        let room = client.get_room(room_id).unwrap();
2187        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2188
2189        let f = EventFactory::new().room(room_id).sender(*ALICE);
2190
2191        // Propagate an update including a limited timeline with one message and a
2192        // prev-batch token.
2193        room_event_cache
2194            .inner
2195            .handle_joined_room_update(JoinedRoomUpdate {
2196                timeline: Timeline {
2197                    limited: true,
2198                    prev_batch: Some("raclette".to_owned()),
2199                    events: vec![f.text_msg("hey yo").into_event()],
2200                },
2201                ..Default::default()
2202            })
2203            .await
2204            .unwrap();
2205
2206        {
2207            let mut state = room_event_cache.inner.state.write().await;
2208
2209            let mut num_gaps = 0;
2210            let mut num_events = 0;
2211
2212            for c in state.events().chunks() {
2213                match c.content() {
2214                    ChunkContent::Items(items) => num_events += items.len(),
2215                    ChunkContent::Gap(_) => num_gaps += 1,
2216                }
2217            }
2218
2219            // The limited sync unloads the chunk, so it will appear as if there are only
2220            // the events.
2221            assert_eq!(num_gaps, 0);
2222            assert_eq!(num_events, 1);
2223
2224            // But if I manually reload more of the chunk, the gap will be present.
2225            assert_matches!(
2226                state.load_more_events_backwards().await.unwrap(),
2227                LoadMoreEventsBackwardsOutcome::Gap { .. }
2228            );
2229
2230            num_gaps = 0;
2231            num_events = 0;
2232            for c in state.events().chunks() {
2233                match c.content() {
2234                    ChunkContent::Items(items) => num_events += items.len(),
2235                    ChunkContent::Gap(_) => num_gaps += 1,
2236                }
2237            }
2238
2239            // The gap must have been stored.
2240            assert_eq!(num_gaps, 1);
2241            assert_eq!(num_events, 1);
2242        }
2243
2244        // Now, propagate an update for another message, but the timeline isn't limited
2245        // this time.
2246        room_event_cache
2247            .inner
2248            .handle_joined_room_update(JoinedRoomUpdate {
2249                timeline: Timeline {
2250                    limited: false,
2251                    prev_batch: Some("fondue".to_owned()),
2252                    events: vec![f.text_msg("sup").into_event()],
2253                },
2254                ..Default::default()
2255            })
2256            .await
2257            .unwrap();
2258
2259        {
2260            let state = room_event_cache.inner.state.read().await;
2261
2262            let mut num_gaps = 0;
2263            let mut num_events = 0;
2264
2265            for c in state.events().chunks() {
2266                match c.content() {
2267                    ChunkContent::Items(items) => num_events += items.len(),
2268                    ChunkContent::Gap(gap) => {
2269                        assert_eq!(gap.prev_token, "raclette");
2270                        num_gaps += 1;
2271                    }
2272                }
2273            }
2274
2275            // There's only the previous gap, no new ones.
2276            assert_eq!(num_gaps, 1);
2277            assert_eq!(num_events, 2);
2278        }
2279    }
2280
2281    #[async_test]
2282    async fn test_shrink_to_last_chunk() {
2283        let room_id = room_id!("!galette:saucisse.bzh");
2284
2285        let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2286
2287        let f = EventFactory::new().room(room_id);
2288
2289        let evid1 = event_id!("$1");
2290        let evid2 = event_id!("$2");
2291
2292        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2293        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2294
2295        // Fill the event cache store with an initial linked chunk with 2 events chunks.
2296        {
2297            let store = client.event_cache_store();
2298            let store = store.lock().await.unwrap();
2299            store
2300                .handle_linked_chunk_updates(
2301                    LinkedChunkId::Room(room_id),
2302                    vec![
2303                        Update::NewItemsChunk {
2304                            previous: None,
2305                            new: ChunkIdentifier::new(0),
2306                            next: None,
2307                        },
2308                        Update::PushItems {
2309                            at: Position::new(ChunkIdentifier::new(0), 0),
2310                            items: vec![ev1],
2311                        },
2312                        Update::NewItemsChunk {
2313                            previous: Some(ChunkIdentifier::new(0)),
2314                            new: ChunkIdentifier::new(1),
2315                            next: None,
2316                        },
2317                        Update::PushItems {
2318                            at: Position::new(ChunkIdentifier::new(1), 0),
2319                            items: vec![ev2],
2320                        },
2321                    ],
2322                )
2323                .await
2324                .unwrap();
2325        }
2326
2327        let event_cache = client.event_cache();
2328        event_cache.subscribe().unwrap();
2329
2330        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2331        let room = client.get_room(room_id).unwrap();
2332        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2333
2334        // Sanity check: lazily loaded, so only includes one item at start.
2335        let (events, mut stream) = room_event_cache.subscribe().await;
2336        assert_eq!(events.len(), 1);
2337        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2338        assert!(stream.is_empty());
2339
2340        // Force loading the full linked chunk by back-paginating.
2341        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2342        assert_eq!(outcome.events.len(), 1);
2343        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2344        assert!(outcome.reached_start);
2345
2346        // We also get an update about the loading from the store.
2347        assert_let_timeout!(
2348            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2349        );
2350        assert_eq!(diffs.len(), 1);
2351        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2352            assert_eq!(value.event_id().as_deref(), Some(evid1));
2353        });
2354
2355        assert!(stream.is_empty());
2356
2357        // Shrink the linked chunk to the last chunk.
2358        let diffs = room_event_cache
2359            .inner
2360            .state
2361            .write()
2362            .await
2363            .shrink_to_last_chunk()
2364            .await
2365            .expect("shrinking should succeed")
2366            .unwrap();
2367
2368        // We receive updates about the changes to the linked chunk.
2369        assert_eq!(diffs.len(), 2);
2370        assert_matches!(&diffs[0], VectorDiff::Clear);
2371        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
2372            assert_eq!(values.len(), 1);
2373            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
2374        });
2375
2376        assert!(stream.is_empty());
2377
2378        // When reading the events, we do get only the last one.
2379        let events = room_event_cache.events().await;
2380        assert_eq!(events.len(), 1);
2381        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2382
2383        // But if we back-paginate, we don't need access to network to find out about
2384        // the previous event.
2385        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2386        assert_eq!(outcome.events.len(), 1);
2387        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2388        assert!(outcome.reached_start);
2389    }
2390
2391    #[async_test]
2392    async fn test_auto_shrink_after_all_subscribers_are_gone() {
2393        let room_id = room_id!("!galette:saucisse.bzh");
2394
2395        let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2396
2397        let f = EventFactory::new().room(room_id);
2398
2399        let evid1 = event_id!("$1");
2400        let evid2 = event_id!("$2");
2401
2402        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2403        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2404
2405        // Fill the event cache store with an initial linked chunk with 2 events chunks.
2406        {
2407            let store = client.event_cache_store();
2408            let store = store.lock().await.unwrap();
2409            store
2410                .handle_linked_chunk_updates(
2411                    LinkedChunkId::Room(room_id),
2412                    vec![
2413                        Update::NewItemsChunk {
2414                            previous: None,
2415                            new: ChunkIdentifier::new(0),
2416                            next: None,
2417                        },
2418                        Update::PushItems {
2419                            at: Position::new(ChunkIdentifier::new(0), 0),
2420                            items: vec![ev1],
2421                        },
2422                        Update::NewItemsChunk {
2423                            previous: Some(ChunkIdentifier::new(0)),
2424                            new: ChunkIdentifier::new(1),
2425                            next: None,
2426                        },
2427                        Update::PushItems {
2428                            at: Position::new(ChunkIdentifier::new(1), 0),
2429                            items: vec![ev2],
2430                        },
2431                    ],
2432                )
2433                .await
2434                .unwrap();
2435        }
2436
2437        let event_cache = client.event_cache();
2438        event_cache.subscribe().unwrap();
2439
2440        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2441        let room = client.get_room(room_id).unwrap();
2442        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2443
2444        // Sanity check: lazily loaded, so only includes one item at start.
2445        let (events1, mut stream1) = room_event_cache.subscribe().await;
2446        assert_eq!(events1.len(), 1);
2447        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
2448        assert!(stream1.is_empty());
2449
2450        // Force loading the full linked chunk by back-paginating.
2451        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2452        assert_eq!(outcome.events.len(), 1);
2453        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2454        assert!(outcome.reached_start);
2455
2456        // We also get an update about the loading from the store. Ignore it, for this
2457        // test's sake.
2458        assert_let_timeout!(
2459            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
2460        );
2461        assert_eq!(diffs.len(), 1);
2462        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2463            assert_eq!(value.event_id().as_deref(), Some(evid1));
2464        });
2465
2466        assert!(stream1.is_empty());
2467
2468        // Have another listener subscribe to the event cache.
2469        // Since it's not the first one, and the previous one loaded some more events,
2470        // the second listener seems them all.
2471        let (events2, stream2) = room_event_cache.subscribe().await;
2472        assert_eq!(events2.len(), 2);
2473        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
2474        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
2475        assert!(stream2.is_empty());
2476
2477        // Drop the first stream, and wait a bit.
2478        drop(stream1);
2479        yield_now().await;
2480
2481        // The second stream remains undisturbed.
2482        assert!(stream2.is_empty());
2483
2484        // Now drop the second stream, and wait a bit.
2485        drop(stream2);
2486        yield_now().await;
2487
2488        // The linked chunk must have auto-shrunk by now.
2489
2490        {
2491            // Check the inner state: there's no more shared auto-shrinker.
2492            let state = room_event_cache.inner.state.read().await;
2493            assert_eq!(state.listener_count.load(std::sync::atomic::Ordering::SeqCst), 0);
2494        }
2495
2496        // Getting the events will only give us the latest chunk.
2497        let events3 = room_event_cache.events().await;
2498        assert_eq!(events3.len(), 1);
2499        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
2500    }
2501}