Skip to main content

matrix_sdk/event_cache/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All event cache types for a single room.
16
17use std::{
18    collections::BTreeMap,
19    fmt,
20    ops::{Deref, DerefMut},
21    sync::{
22        Arc,
23        atomic::{AtomicUsize, Ordering},
24    },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31    deserialized_responses::AmbiguityChange,
32    event_cache::Event,
33    linked_chunk::Position,
34    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37    EventId, OwnedEventId, OwnedRoomId,
38    api::Direction,
39    events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
40    serde::Raw,
41};
42use tokio::sync::{
43    Notify,
44    broadcast::{Receiver, Sender},
45    mpsc,
46};
47use tracing::{instrument, trace, warn};
48
49use super::{
50    AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
51    RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
52};
53use crate::{
54    client::WeakClient,
55    event_cache::EventCacheError,
56    room::{IncludeRelations, RelationsOptions, WeakRoom},
57};
58
59pub(super) mod events;
60mod threads;
61
62pub use threads::ThreadEventCacheUpdate;
63
64/// A subset of an event cache, for a room.
65///
66/// Cloning is shallow, and thus is cheap to do.
67#[derive(Clone)]
68pub struct RoomEventCache {
69    pub(super) inner: Arc<RoomEventCacheInner>,
70}
71
72impl fmt::Debug for RoomEventCache {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        f.debug_struct("RoomEventCache").finish_non_exhaustive()
75    }
76}
77
78/// Thin wrapper for a room event cache subscriber, so as to trigger
79/// side-effects when all subscribers are gone.
80///
81/// The current side-effect is: auto-shrinking the [`RoomEventCache`] when no
82/// more subscribers are active. This is an optimisation to reduce the number of
83/// data held in memory by a [`RoomEventCache`]: when no more subscribers are
84/// active, all data are reduced to the minimum.
85///
86/// The side-effect takes effect on `Drop`.
87#[allow(missing_debug_implementations)]
88pub struct RoomEventCacheSubscriber {
89    /// Underlying receiver of the room event cache's updates.
90    recv: Receiver<RoomEventCacheUpdate>,
91
92    /// To which room are we listening?
93    room_id: OwnedRoomId,
94
95    /// Sender to the auto-shrink channel.
96    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
97
98    /// Shared instance of the auto-shrinker.
99    subscriber_count: Arc<AtomicUsize>,
100}
101
102impl Drop for RoomEventCacheSubscriber {
103    fn drop(&mut self) {
104        let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
105
106        trace!(
107            "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
108        );
109
110        if previous_subscriber_count == 1 {
111            // We were the last instance of the subscriber; let the auto-shrinker know by
112            // notifying it of our room id.
113
114            let mut room_id = self.room_id.clone();
115
116            // Try to send without waiting for channel capacity, and restart in a spin-loop
117            // if it failed (until a maximum number of attempts is reached, or
118            // the send was successful). The channel shouldn't be super busy in
119            // general, so this should resolve quickly enough.
120
121            let mut num_attempts = 0;
122
123            while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
124                num_attempts += 1;
125
126                if num_attempts > 1024 {
127                    // If we've tried too many times, just give up with a warning; after all, this
128                    // is only an optimization.
129                    warn!(
130                        "couldn't send notification to the auto-shrink channel \
131                         after 1024 attempts; giving up"
132                    );
133                    return;
134                }
135
136                match err {
137                    mpsc::error::TrySendError::Full(stolen_room_id) => {
138                        room_id = stolen_room_id;
139                    }
140                    mpsc::error::TrySendError::Closed(_) => return,
141                }
142            }
143
144            trace!("sent notification to the parent channel that we were the last subscriber");
145        }
146    }
147}
148
149impl Deref for RoomEventCacheSubscriber {
150    type Target = Receiver<RoomEventCacheUpdate>;
151
152    fn deref(&self) -> &Self::Target {
153        &self.recv
154    }
155}
156
157impl DerefMut for RoomEventCacheSubscriber {
158    fn deref_mut(&mut self) -> &mut Self::Target {
159        &mut self.recv
160    }
161}
162
163impl RoomEventCache {
164    /// Create a new [`RoomEventCache`] using the given room and store.
165    pub(super) fn new(
166        client: WeakClient,
167        state: RoomEventCacheStateLock,
168        pagination_status: SharedObservable<RoomPaginationStatus>,
169        room_id: OwnedRoomId,
170        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
171        update_sender: Sender<RoomEventCacheUpdate>,
172        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
173    ) -> Self {
174        Self {
175            inner: Arc::new(RoomEventCacheInner::new(
176                client,
177                state,
178                pagination_status,
179                room_id,
180                auto_shrink_sender,
181                update_sender,
182                generic_update_sender,
183            )),
184        }
185    }
186
187    /// Read all current events.
188    ///
189    /// Use [`RoomEventCache::subscribe`] to get all current events, plus a
190    /// subscriber.
191    pub async fn events(&self) -> Result<Vec<Event>> {
192        let state = self.inner.state.read().await?;
193
194        Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
195    }
196
197    /// Subscribe to this room updates, after getting the initial list of
198    /// events.
199    ///
200    /// Use [`RoomEventCache::events`] to get all current events without the
201    /// subscriber. Creating, and especially dropping, a
202    /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects.
203    pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
204        let state = self.inner.state.read().await?;
205        let events =
206            state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
207
208        let subscriber_count = state.subscriber_count();
209        let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
210        trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
211
212        let recv = self.inner.update_sender.subscribe();
213        let subscriber = RoomEventCacheSubscriber {
214            recv,
215            room_id: self.inner.room_id.clone(),
216            auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
217            subscriber_count: subscriber_count.clone(),
218        };
219
220        Ok((events, subscriber))
221    }
222
223    /// Subscribe to thread for a given root event, and get a (maybe empty)
224    /// initially known list of events for that thread.
225    pub async fn subscribe_to_thread(
226        &self,
227        thread_root: OwnedEventId,
228    ) -> Result<(Vec<Event>, Receiver<ThreadEventCacheUpdate>)> {
229        let mut state = self.inner.state.write().await?;
230        Ok(state.subscribe_to_thread(thread_root))
231    }
232
233    /// Paginate backwards in a thread, given its root event ID.
234    ///
235    /// Returns whether we've hit the start of the thread, in which case the
236    /// root event will be prepended to the thread.
237    #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
238    pub async fn paginate_thread_backwards(
239        &self,
240        thread_root: OwnedEventId,
241        num_events: u16,
242    ) -> Result<bool> {
243        let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
244
245        // Take the lock only for a short time here.
246        let mut outcome =
247            self.inner.state.write().await?.load_more_thread_events_backwards(thread_root.clone());
248
249        loop {
250            match outcome {
251                LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
252                    // Start a threaded pagination from this gap.
253                    let options = RelationsOptions {
254                        from: prev_token.clone(),
255                        dir: Direction::Backward,
256                        limit: Some(num_events.into()),
257                        include_relations: IncludeRelations::AllRelations,
258                        recurse: true,
259                    };
260
261                    let mut result = room
262                        .relations(thread_root.clone(), options)
263                        .await
264                        .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
265
266                    let reached_start = result.next_batch_token.is_none();
267                    trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
268
269                    // Because the state lock is taken again in `load_or_fetch_event`, we need
270                    // to do this *before* we take the state lock again.
271                    let root_event =
272                        if reached_start {
273                            // Prepend the thread root event to the results.
274                            Some(room.load_or_fetch_event(&thread_root, None).await.map_err(
275                                |err| EventCacheError::BackpaginationError(Box::new(err)),
276                            )?)
277                        } else {
278                            None
279                        };
280
281                    let mut state = self.inner.state.write().await?;
282
283                    // Save all the events (but the thread root) in the store.
284                    state.save_events(result.chunk.iter().cloned()).await?;
285
286                    // Note: the events are still in the reversed order at this point, so
287                    // pushing will eventually make it so that the root event is the first.
288                    result.chunk.extend(root_event);
289
290                    if let Some(outcome) = state.finish_thread_network_pagination(
291                        thread_root.clone(),
292                        prev_token,
293                        result.next_batch_token,
294                        result.chunk,
295                    ) {
296                        return Ok(outcome.reached_start);
297                    }
298
299                    // fallthrough: restart the pagination.
300                    outcome = state.load_more_thread_events_backwards(thread_root.clone());
301                }
302
303                LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
304                    // We're done!
305                    return Ok(true);
306                }
307
308                LoadMoreEventsBackwardsOutcome::Events { .. } => {
309                    // TODO: implement :)
310                    unimplemented!("loading from disk for threads is not implemented yet");
311                }
312            }
313        }
314    }
315
316    /// Return a [`RoomPagination`] API object useful for running
317    /// back-pagination queries in the current room.
318    pub fn pagination(&self) -> RoomPagination {
319        RoomPagination { inner: self.inner.clone() }
320    }
321
322    /// Try to find a single event in this room, starting from the most recent
323    /// event.
324    ///
325    /// The `predicate` receives two arguments: the current event, and the
326    /// _previous_ (older) event.
327    ///
328    /// **Warning**! It looks into the loaded events from the in-memory linked
329    /// chunk **only**. It doesn't look inside the storage.
330    pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
331    where
332        P: FnMut(&Event, Option<&Event>) -> Option<O>,
333    {
334        Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
335    }
336
337    /// Try to find an event by ID in this room.
338    ///
339    /// It starts by looking into loaded events before looking inside the
340    /// storage.
341    pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
342        Ok(self
343            .inner
344            .state
345            .read()
346            .await?
347            .find_event(event_id)
348            .await
349            .ok()
350            .flatten()
351            .map(|(_loc, event)| event))
352    }
353
354    /// Try to find an event by ID in this room, along with its related events.
355    ///
356    /// You can filter which types of related events to retrieve using
357    /// `filter`. `None` will retrieve related events of any type.
358    ///
359    /// The related events are sorted like this:
360    ///
361    /// - events saved out-of-band (with `RoomEventCache::save_events`) will be
362    ///   located at the beginning of the array.
363    /// - events present in the linked chunk (be it in memory or in the storage)
364    ///   will be sorted according to their ordering in the linked chunk.
365    pub async fn find_event_with_relations(
366        &self,
367        event_id: &EventId,
368        filter: Option<Vec<RelationType>>,
369    ) -> Result<Option<(Event, Vec<Event>)>> {
370        // Search in all loaded or stored events.
371        Ok(self
372            .inner
373            .state
374            .read()
375            .await?
376            .find_event_with_relations(event_id, filter.clone())
377            .await
378            .ok()
379            .flatten())
380    }
381
382    /// Clear all the storage for this [`RoomEventCache`].
383    ///
384    /// This will get rid of all the events from the linked chunk and persisted
385    /// storage.
386    pub async fn clear(&self) -> Result<()> {
387        // Clear the linked chunk and persisted storage.
388        let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
389
390        // Notify observers about the update.
391        let _ = self.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
392            diffs: updates_as_vector_diffs,
393            origin: EventsOrigin::Cache,
394        });
395
396        // Notify observers about the generic update.
397        let _ = self
398            .inner
399            .generic_update_sender
400            .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
401
402        Ok(())
403    }
404
405    /// Handle a single event from the `SendQueue`.
406    pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
407        self.inner
408            .handle_timeline(
409                Timeline { limited: false, prev_batch: None, events: vec![event] },
410                Vec::new(),
411                BTreeMap::new(),
412            )
413            .await
414    }
415
416    /// Save some events in the event cache, for further retrieval with
417    /// [`Self::event`].
418    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
419        match self.inner.state.write().await {
420            Ok(mut state_guard) => {
421                if let Err(err) = state_guard.save_events(events).await {
422                    warn!("couldn't save event in the event cache: {err}");
423                }
424            }
425
426            Err(err) => {
427                warn!("couldn't save event in the event cache: {err}");
428            }
429        }
430    }
431
432    /// Return a nice debug string (a vector of lines) for the linked chunk of
433    /// events for this room.
434    pub async fn debug_string(&self) -> Vec<String> {
435        match self.inner.state.read().await {
436            Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
437            Err(err) => {
438                warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
439
440                vec![]
441            }
442        }
443    }
444}
445
446/// The (non-cloneable) details of the `RoomEventCache`.
447pub(super) struct RoomEventCacheInner {
448    /// The room id for this room.
449    pub(super) room_id: OwnedRoomId,
450
451    pub weak_room: WeakRoom,
452
453    /// State for this room's event cache.
454    pub state: RoomEventCacheStateLock,
455
456    /// A notifier that we received a new pagination token.
457    pub pagination_batch_token_notifier: Notify,
458
459    pub pagination_status: SharedObservable<RoomPaginationStatus>,
460
461    /// Sender to the auto-shrink channel.
462    ///
463    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
464    /// more details.
465    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
466
467    /// Sender part for update subscribers to this room.
468    pub update_sender: Sender<RoomEventCacheUpdate>,
469
470    /// A clone of [`EventCacheInner::generic_update_sender`].
471    ///
472    /// Whilst `EventCacheInner` handles the generic updates from the sync, or
473    /// the storage, it doesn't handle the update from pagination. Having a
474    /// clone here allows to access it from [`RoomPagination`].
475    pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
476}
477
478impl RoomEventCacheInner {
479    /// Creates a new cache for a room, and subscribes to room updates, so as
480    /// to handle new timeline events.
481    fn new(
482        client: WeakClient,
483        state: RoomEventCacheStateLock,
484        pagination_status: SharedObservable<RoomPaginationStatus>,
485        room_id: OwnedRoomId,
486        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
487        update_sender: Sender<RoomEventCacheUpdate>,
488        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
489    ) -> Self {
490        let weak_room = WeakRoom::new(client, room_id);
491
492        Self {
493            room_id: weak_room.room_id().to_owned(),
494            weak_room,
495            state,
496            update_sender,
497            pagination_batch_token_notifier: Default::default(),
498            auto_shrink_sender,
499            pagination_status,
500            generic_update_sender,
501        }
502    }
503
504    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
505        if account_data.is_empty() {
506            return;
507        }
508
509        let mut handled_read_marker = false;
510
511        trace!("Handling account data");
512
513        for raw_event in account_data {
514            match raw_event.deserialize() {
515                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
516                    // If duplicated, do not forward read marker multiple times
517                    // to avoid clutter the update channel.
518                    if handled_read_marker {
519                        continue;
520                    }
521
522                    handled_read_marker = true;
523
524                    // Propagate to observers. (We ignore the error if there aren't any.)
525                    let _ = self.update_sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
526                        event_id: ev.content.event_id,
527                    });
528                }
529
530                Ok(_) => {
531                    // We're not interested in other room account data updates,
532                    // at this point.
533                }
534
535                Err(e) => {
536                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
537                    warn!(event_type, "Failed to deserialize account data: {e}");
538                }
539            }
540        }
541    }
542
543    #[instrument(skip_all, fields(room_id = %self.room_id))]
544    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
545        self.handle_timeline(
546            updates.timeline,
547            updates.ephemeral.clone(),
548            updates.ambiguity_changes,
549        )
550        .await?;
551        self.handle_account_data(updates.account_data);
552
553        Ok(())
554    }
555
556    #[instrument(skip_all, fields(room_id = %self.room_id))]
557    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
558        self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
559
560        Ok(())
561    }
562
563    /// Handle a [`Timeline`], i.e. new events received by a sync for this
564    /// room.
565    async fn handle_timeline(
566        &self,
567        timeline: Timeline,
568        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
569        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
570    ) -> Result<()> {
571        if timeline.events.is_empty()
572            && timeline.prev_batch.is_none()
573            && ephemeral_events.is_empty()
574            && ambiguity_changes.is_empty()
575        {
576            return Ok(());
577        }
578
579        // Add all the events to the backend.
580        trace!("adding new events");
581
582        let (stored_prev_batch_token, timeline_event_diffs) =
583            self.state.write().await?.handle_sync(timeline).await?;
584
585        // Now that all events have been added, we can trigger the
586        // `pagination_token_notifier`.
587        if stored_prev_batch_token {
588            self.pagination_batch_token_notifier.notify_one();
589        }
590
591        // The order matters here: first send the timeline event diffs, then only the
592        // related events (read receipts, etc.).
593        if !timeline_event_diffs.is_empty() {
594            let _ = self.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
595                diffs: timeline_event_diffs,
596                origin: EventsOrigin::Sync,
597            });
598
599            let _ = self
600                .generic_update_sender
601                .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
602        }
603
604        if !ephemeral_events.is_empty() {
605            let _ = self
606                .update_sender
607                .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
608        }
609
610        if !ambiguity_changes.is_empty() {
611            let _ =
612                self.update_sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
613        }
614
615        Ok(())
616    }
617}
618
619/// Internal type to represent the output of
620/// [`RoomEventCacheState::load_more_events_backwards`].
621#[derive(Debug)]
622pub(super) enum LoadMoreEventsBackwardsOutcome {
623    /// A gap has been inserted.
624    Gap {
625        /// The previous batch token to be used as the "end" parameter in the
626        /// back-pagination request.
627        prev_token: Option<String>,
628    },
629
630    /// The start of the timeline has been reached.
631    StartOfTimeline,
632
633    /// Events have been inserted.
634    Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
635}
636
637// Use a private module to hide `events` to this parent module.
638mod private {
639    use std::{
640        collections::{BTreeMap, HashMap, HashSet},
641        sync::{
642            Arc,
643            atomic::{AtomicBool, AtomicUsize, Ordering},
644        },
645    };
646
647    use eyeball::SharedObservable;
648    use eyeball_im::VectorDiff;
649    use itertools::Itertools;
650    use matrix_sdk_base::{
651        apply_redaction, check_validity_of_replacement_events,
652        deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
653        event_cache::{
654            Event, Gap,
655            store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState},
656        },
657        linked_chunk::{
658            ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
659            OwnedLinkedChunkId, Position, Update, lazy_loader,
660        },
661        serde_helpers::{extract_edit_target, extract_thread_root},
662        sync::Timeline,
663    };
664    use matrix_sdk_common::executor::spawn;
665    use ruma::{
666        EventId, OwnedEventId, OwnedRoomId, RoomId,
667        events::{
668            AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
669            relation::RelationType, room::redaction::SyncRoomRedactionEvent,
670        },
671        room_version_rules::RoomVersionRules,
672        serde::Raw,
673    };
674    use tokio::sync::{
675        Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard,
676        broadcast::{Receiver, Sender},
677    };
678    use tracing::{debug, error, instrument, trace, warn};
679
680    use super::{
681        super::{
682            BackPaginationOutcome, EventCacheError, RoomEventCacheLinkedChunkUpdate,
683            RoomPaginationStatus, ThreadEventCacheUpdate,
684            deduplicator::{DeduplicationOutcome, filter_duplicate_events},
685            room::threads::ThreadEventCache,
686        },
687        EventLocation, EventsOrigin, LoadMoreEventsBackwardsOutcome, RoomEventCacheGenericUpdate,
688        RoomEventCacheUpdate,
689        events::EventLinkedChunk,
690        sort_positions_descending,
691    };
692
693    /// State for a single room's event cache.
694    ///
695    /// This contains all the inner mutable states that ought to be updated at
696    /// the same time.
697    pub struct RoomEventCacheStateLock {
698        /// The per-thread lock around the real state.
699        locked_state: RwLock<RoomEventCacheStateLockInner>,
700
701        /// Please see inline comment of [`Self::read`] to understand why it
702        /// exists.
703        read_lock_acquisition: Mutex<()>,
704    }
705
706    struct RoomEventCacheStateLockInner {
707        /// Whether thread support has been enabled for the event cache.
708        enabled_thread_support: bool,
709
710        /// The room this state relates to.
711        room_id: OwnedRoomId,
712
713        /// Reference to the underlying backing store.
714        store: EventCacheStoreLock,
715
716        /// The loaded events for the current room, that is, the in-memory
717        /// linked chunk for this room.
718        room_linked_chunk: EventLinkedChunk,
719
720        /// Threads present in this room.
721        ///
722        /// Keyed by the thread root event ID.
723        threads: HashMap<OwnedEventId, ThreadEventCache>,
724
725        pagination_status: SharedObservable<RoomPaginationStatus>,
726
727        /// A clone of [`super::RoomEventCacheInner::update_sender`].
728        ///
729        /// This is used only by the [`RoomEventCacheStateLock::read`] and
730        /// [`RoomEventCacheStateLock::write`] when the state must be reset.
731        update_sender: Sender<RoomEventCacheUpdate>,
732
733        /// A clone of [`super::super::EventCacheInner::generic_update_sender`].
734        ///
735        /// This is used only by the [`RoomEventCacheStateLock::read`] and
736        /// [`RoomEventCacheStateLock::write`] when the state must be reset.
737        generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
738
739        /// A clone of
740        /// [`super::super::EventCacheInner::linked_chunk_update_sender`].
741        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
742
743        /// The rules for the version of this room.
744        room_version_rules: RoomVersionRules,
745
746        /// Have we ever waited for a previous-batch-token to come from sync, in
747        /// the context of pagination? We do this at most once per room,
748        /// the first time we try to run backward pagination. We reset
749        /// that upon clearing the timeline events.
750        waited_for_initial_prev_token: Arc<AtomicBool>,
751
752        /// An atomic count of the current number of subscriber of the
753        /// [`super::RoomEventCache`].
754        subscriber_count: Arc<AtomicUsize>,
755    }
756
757    impl RoomEventCacheStateLock {
758        /// Create a new state, or reload it from storage if it's been enabled.
759        ///
760        /// Not all events are going to be loaded. Only a portion of them. The
761        /// [`EventLinkedChunk`] relies on a [`LinkedChunk`] to store all
762        /// events. Only the last chunk will be loaded. It means the
763        /// events are loaded from the most recent to the oldest. To
764        /// load more events, see [`RoomPagination`].
765        ///
766        /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
767        /// [`RoomPagination`]: super::RoomPagination
768        #[allow(clippy::too_many_arguments)]
769        pub async fn new(
770            room_id: OwnedRoomId,
771            room_version_rules: RoomVersionRules,
772            enabled_thread_support: bool,
773            update_sender: Sender<RoomEventCacheUpdate>,
774            generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
775            linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
776            store: EventCacheStoreLock,
777            pagination_status: SharedObservable<RoomPaginationStatus>,
778        ) -> Result<Self, EventCacheError> {
779            let store_guard = match store.lock().await? {
780                // Lock is clean: all good!
781                EventCacheStoreLockState::Clean(guard) => guard,
782
783                // Lock is dirty, not a problem, it's the first time we are creating this state, no
784                // need to refresh.
785                EventCacheStoreLockState::Dirty(guard) => {
786                    EventCacheStoreLockGuard::clear_dirty(&guard);
787
788                    guard
789                }
790            };
791
792            let linked_chunk_id = LinkedChunkId::Room(&room_id);
793
794            // Load the full linked chunk's metadata, so as to feed the order tracker.
795            //
796            // If loading the full linked chunk failed, we'll clear the event cache, as it
797            // indicates that at some point, there's some malformed data.
798            let full_linked_chunk_metadata =
799                match load_linked_chunk_metadata(&store_guard, linked_chunk_id).await {
800                    Ok(metas) => metas,
801                    Err(err) => {
802                        error!(
803                            "error when loading a linked chunk's metadata from the store: {err}"
804                        );
805
806                        // Try to clear storage for this room.
807                        store_guard
808                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
809                            .await?;
810
811                        // Restart with an empty linked chunk.
812                        None
813                    }
814                };
815
816            let linked_chunk = match store_guard
817                .load_last_chunk(linked_chunk_id)
818                .await
819                .map_err(EventCacheError::from)
820                .and_then(|(last_chunk, chunk_identifier_generator)| {
821                    lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
822                        .map_err(EventCacheError::from)
823                }) {
824                Ok(linked_chunk) => linked_chunk,
825                Err(err) => {
826                    error!(
827                        "error when loading a linked chunk's latest chunk from the store: {err}"
828                    );
829
830                    // Try to clear storage for this room.
831                    store_guard
832                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
833                        .await?;
834
835                    None
836                }
837            };
838
839            let waited_for_initial_prev_token = Arc::new(AtomicBool::new(false));
840
841            Ok(Self {
842                locked_state: RwLock::new(RoomEventCacheStateLockInner {
843                    enabled_thread_support,
844                    room_id,
845                    store,
846                    room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
847                        linked_chunk,
848                        full_linked_chunk_metadata,
849                    ),
850                    // The threads mapping is intentionally empty at start, since we're going to
851                    // reload threads lazily, as soon as we need to (based on external
852                    // subscribers) or when we get new information about those (from
853                    // sync).
854                    threads: HashMap::new(),
855                    pagination_status,
856                    update_sender,
857                    generic_update_sender,
858                    linked_chunk_update_sender,
859                    room_version_rules,
860                    waited_for_initial_prev_token,
861                    subscriber_count: Default::default(),
862                }),
863                read_lock_acquisition: Mutex::new(()),
864            })
865        }
866
867        /// Lock this [`RoomEventCacheStateLock`] with per-thread shared access.
868        ///
869        /// This method locks the per-thread lock over the state, and then locks
870        /// the cross-process lock over the store. It returns an RAII guard
871        /// which will drop the read access to the state and to the store when
872        /// dropped.
873        ///
874        /// If the cross-process lock over the store is dirty (see
875        /// [`EventCacheStoreLockState`]), the state is reset to the last chunk.
876        pub async fn read(&self) -> Result<RoomEventCacheStateLockReadGuard<'_>, EventCacheError> {
877            // Only one call at a time to `read` is allowed.
878            //
879            // Why? Because in case the cross-process lock over the store is dirty, we need
880            // to upgrade the read lock over the state to a write lock.
881            //
882            // ## Upgradable read lock
883            //
884            // One may argue that this upgrades can be done with an _upgradable read lock_
885            // [^1] [^2]. We don't want to use this solution: an upgradable read lock is
886            // basically a mutex because we are losing the shared access property, i.e.
887            // having multiple read locks at the same time. This is an important property to
888            // hold for performance concerns.
889            //
890            // ## Downgradable write lock
891            //
892            // One may also argue we could first obtain a write lock over the state from the
893            // beginning, thus removing the need to upgrade the read lock to a write lock.
894            // The write lock is then downgraded to a read lock once the dirty is cleaned
895            // up. It can potentially create a deadlock in the following situation:
896            //
897            // - `read` is called once, it takes a write lock, then downgrades it to a read
898            //   lock: the guard is kept alive somewhere,
899            // - `read` is called again, and waits to obtain the write lock, which is
900            //   impossible as long as the guard from the previous call is not dropped.
901            //
902            // ## “Atomic” read and write
903            //
904            // One may finally argue to first obtain a read lock over the state, then drop
905            // it if the cross-process lock over the store is dirty, and immediately obtain
906            // a write lock (which can later be downgraded to a read lock). The problem is
907            // that this write lock is async: anything can happen between the drop and the
908            // new lock acquisition, and it's not possible to pause the runtime in the
909            // meantime.
910            //
911            // ## Semaphore with 1 permit, aka a Mutex
912            //
913            // The chosen idea is to allow only one execution at a time of this method: it
914            // becomes a critical section. That way we are free to “upgrade” the read lock
915            // by dropping it and obtaining a new write lock. All callers to this method are
916            // waiting, so nothing can happen in the meantime.
917            //
918            // Note that it doesn't conflict with the `write` method because this later
919            // immediately obtains a write lock, which avoids any conflict with this method.
920            //
921            // [^1]: https://docs.rs/lock_api/0.4.14/lock_api/struct.RwLock.html#method.upgradable_read
922            // [^2]: https://docs.rs/async-lock/3.4.1/async_lock/struct.RwLock.html#method.upgradable_read
923            let _one_reader_guard = self.read_lock_acquisition.lock().await;
924
925            // Obtain a read lock.
926            let state_guard = self.locked_state.read().await;
927
928            match state_guard.store.lock().await? {
929                EventCacheStoreLockState::Clean(store_guard) => {
930                    Ok(RoomEventCacheStateLockReadGuard { state: state_guard, store: store_guard })
931                }
932                EventCacheStoreLockState::Dirty(store_guard) => {
933                    // Drop the read lock, and take a write lock to modify the state.
934                    // This is safe because only one reader at a time (see
935                    // `Self::read_lock_acquisition`) is allowed.
936                    drop(state_guard);
937                    let state_guard = self.locked_state.write().await;
938
939                    let mut guard = RoomEventCacheStateLockWriteGuard {
940                        state: state_guard,
941                        store: store_guard,
942                    };
943
944                    // Force to reload by shrinking to the last chunk.
945                    let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
946
947                    // All good now, mark the cross-process lock as non-dirty.
948                    EventCacheStoreLockGuard::clear_dirty(&guard.store);
949
950                    // Downgrade the guard as soon as possible.
951                    let guard = guard.downgrade();
952
953                    // Now let the world know about the reload.
954                    if !updates_as_vector_diffs.is_empty() {
955                        // Notify observers about the update.
956                        let _ = guard.state.update_sender.send(
957                            RoomEventCacheUpdate::UpdateTimelineEvents {
958                                diffs: updates_as_vector_diffs,
959                                origin: EventsOrigin::Cache,
960                            },
961                        );
962
963                        // Notify observers about the generic update.
964                        let _ =
965                            guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
966                                room_id: guard.state.room_id.clone(),
967                            });
968                    }
969
970                    Ok(guard)
971                }
972            }
973        }
974
975        /// Lock this [`RoomEventCacheStateLock`] with exclusive per-thread
976        /// write access.
977        ///
978        /// This method locks the per-thread lock over the state, and then locks
979        /// the cross-process lock over the store. It returns an RAII guard
980        /// which will drop the write access to the state and to the store when
981        /// dropped.
982        ///
983        /// If the cross-process lock over the store is dirty (see
984        /// [`EventCacheStoreLockState`]), the state is reset to the last chunk.
985        pub async fn write(
986            &self,
987        ) -> Result<RoomEventCacheStateLockWriteGuard<'_>, EventCacheError> {
988            let state_guard = self.locked_state.write().await;
989
990            match state_guard.store.lock().await? {
991                EventCacheStoreLockState::Clean(store_guard) => {
992                    Ok(RoomEventCacheStateLockWriteGuard { state: state_guard, store: store_guard })
993                }
994                EventCacheStoreLockState::Dirty(store_guard) => {
995                    let mut guard = RoomEventCacheStateLockWriteGuard {
996                        state: state_guard,
997                        store: store_guard,
998                    };
999
1000                    // Force to reload by shrinking to the last chunk.
1001                    let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
1002
1003                    // All good now, mark the cross-process lock as non-dirty.
1004                    EventCacheStoreLockGuard::clear_dirty(&guard.store);
1005
1006                    // Now let the world know about the reload.
1007                    if !updates_as_vector_diffs.is_empty() {
1008                        // Notify observers about the update.
1009                        let _ = guard.state.update_sender.send(
1010                            RoomEventCacheUpdate::UpdateTimelineEvents {
1011                                diffs: updates_as_vector_diffs,
1012                                origin: EventsOrigin::Cache,
1013                            },
1014                        );
1015
1016                        // Notify observers about the generic update.
1017                        let _ =
1018                            guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
1019                                room_id: guard.state.room_id.clone(),
1020                            });
1021                    }
1022
1023                    Ok(guard)
1024                }
1025            }
1026        }
1027    }
1028
1029    /// The read lock guard returned by [`RoomEventCacheStateLock::read`].
1030    pub struct RoomEventCacheStateLockReadGuard<'a> {
1031        /// The per-thread read lock guard over the
1032        /// [`RoomEventCacheStateLockInner`].
1033        state: RwLockReadGuard<'a, RoomEventCacheStateLockInner>,
1034
1035        /// The cross-process lock guard over the store.
1036        store: EventCacheStoreLockGuard,
1037    }
1038
1039    /// The write lock guard return by [`RoomEventCacheStateLock::write`].
1040    pub struct RoomEventCacheStateLockWriteGuard<'a> {
1041        /// The per-thread write lock guard over the
1042        /// [`RoomEventCacheStateLockInner`].
1043        state: RwLockWriteGuard<'a, RoomEventCacheStateLockInner>,
1044
1045        /// The cross-process lock guard over the store.
1046        store: EventCacheStoreLockGuard,
1047    }
1048
1049    impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1050        /// Synchronously downgrades a write lock into a read lock.
1051        ///
1052        /// The per-thread/state lock is downgraded atomically, without allowing
1053        /// any writers to take exclusive access of the lock in the meantime.
1054        ///
1055        /// It returns an RAII guard which will drop the write access to the
1056        /// state and to the store when dropped.
1057        fn downgrade(self) -> RoomEventCacheStateLockReadGuard<'a> {
1058            RoomEventCacheStateLockReadGuard { state: self.state.downgrade(), store: self.store }
1059        }
1060    }
1061
1062    impl<'a> RoomEventCacheStateLockReadGuard<'a> {
1063        /// Returns a read-only reference to the underlying room linked chunk.
1064        pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1065            &self.state.room_linked_chunk
1066        }
1067
1068        pub fn subscriber_count(&self) -> &Arc<AtomicUsize> {
1069            &self.state.subscriber_count
1070        }
1071
1072        /// Find a single event in this room.
1073        ///
1074        /// It starts by looking into loaded events in `EventLinkedChunk` before
1075        /// looking inside the storage.
1076        pub async fn find_event(
1077            &self,
1078            event_id: &EventId,
1079        ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1080            find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1081                .await
1082        }
1083
1084        /// Find an event and all its relations in the persisted storage.
1085        ///
1086        /// This goes straight to the database, as a simplification; we don't
1087        /// expect to need to have to look up in memory events, or that
1088        /// all the related events are actually loaded.
1089        ///
1090        /// The related events are sorted like this:
1091        /// - events saved out-of-band with
1092        ///   [`super::RoomEventCache::save_events`] will be located at the
1093        ///   beginning of the array.
1094        /// - events present in the linked chunk (be it in memory or in the
1095        ///   database) will be sorted according to their ordering in the linked
1096        ///   chunk.
1097        pub async fn find_event_with_relations(
1098            &self,
1099            event_id: &EventId,
1100            filters: Option<Vec<RelationType>>,
1101        ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1102            find_event_with_relations(
1103                event_id,
1104                &self.state.room_id,
1105                filters,
1106                &self.state.room_linked_chunk,
1107                &self.store,
1108            )
1109            .await
1110        }
1111
1112        //// Find a single event in this room, starting from the most recent event.
1113        ///
1114        /// The `predicate` receives two arguments: the current event, and the
1115        /// ID of the _previous_ (older) event.
1116        ///
1117        /// **Warning**! It looks into the loaded events from the in-memory
1118        /// linked chunk **only**. It doesn't look inside the storage,
1119        /// contrary to [`Self::find_event`].
1120        pub fn rfind_map_event_in_memory_by<O, P>(&self, mut predicate: P) -> Option<O>
1121        where
1122            P: FnMut(&Event, Option<&Event>) -> Option<O>,
1123        {
1124            self.state
1125                .room_linked_chunk
1126                .revents()
1127                .peekable()
1128                .batching(|iter| {
1129                    iter.next().map(|(_position, event)| {
1130                        (event, iter.peek().map(|(_next_position, next_event)| *next_event))
1131                    })
1132                })
1133                .find_map(|(event, next_event_id)| predicate(event, next_event_id))
1134        }
1135
1136        #[cfg(test)]
1137        pub fn is_dirty(&self) -> bool {
1138            EventCacheStoreLockGuard::is_dirty(&self.store)
1139        }
1140    }
1141
1142    impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1143        /// Returns a write reference to the underlying room linked chunk.
1144        #[cfg(any(feature = "e2e-encryption", test))]
1145        pub fn room_linked_chunk(&mut self) -> &mut EventLinkedChunk {
1146            &mut self.state.room_linked_chunk
1147        }
1148
1149        /// Get a reference to the `waited_for_initial_prev_token` atomic bool.
1150        pub fn waited_for_initial_prev_token(&self) -> &Arc<AtomicBool> {
1151            &self.state.waited_for_initial_prev_token
1152        }
1153
1154        /// Find a single event in this room.
1155        ///
1156        /// It starts by looking into loaded events in `EventLinkedChunk` before
1157        /// looking inside the storage.
1158        pub async fn find_event(
1159            &self,
1160            event_id: &EventId,
1161        ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1162            find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1163                .await
1164        }
1165
1166        /// Find an event and all its relations in the persisted storage.
1167        ///
1168        /// This goes straight to the database, as a simplification; we don't
1169        /// expect to need to have to look up in memory events, or that
1170        /// all the related events are actually loaded.
1171        ///
1172        /// The related events are sorted like this:
1173        /// - events saved out-of-band with
1174        ///   [`super::RoomEventCache::save_events`] will be located at the
1175        ///   beginning of the array.
1176        /// - events present in the linked chunk (be it in memory or in the
1177        ///   database) will be sorted according to their ordering in the linked
1178        ///   chunk.
1179        pub async fn find_event_with_relations(
1180            &self,
1181            event_id: &EventId,
1182            filters: Option<Vec<RelationType>>,
1183        ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1184            find_event_with_relations(
1185                event_id,
1186                &self.state.room_id,
1187                filters,
1188                &self.state.room_linked_chunk,
1189                &self.store,
1190            )
1191            .await
1192        }
1193
1194        /// Load more events backwards if the last chunk is **not** a gap.
1195        pub async fn load_more_events_backwards(
1196            &mut self,
1197        ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
1198            // If any in-memory chunk is a gap, don't load more events, and let the caller
1199            // resolve the gap.
1200            if let Some(prev_token) = self.state.room_linked_chunk.rgap().map(|gap| gap.prev_token)
1201            {
1202                return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
1203            }
1204
1205            let prev_first_chunk = self
1206                .state
1207                .room_linked_chunk
1208                .chunks()
1209                .next()
1210                .expect("a linked chunk is never empty");
1211
1212            // The first chunk is not a gap, we can load its previous chunk.
1213            let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1214            let new_first_chunk = match self
1215                .store
1216                .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
1217                .await
1218            {
1219                Ok(Some(new_first_chunk)) => {
1220                    // All good, let's continue with this chunk.
1221                    new_first_chunk
1222                }
1223
1224                Ok(None) => {
1225                    // If we never received events for this room, this means we've never received a
1226                    // sync for that room, because every room must have *at least* a room creation
1227                    // event. Otherwise, we have reached the start of the timeline.
1228
1229                    if self.state.room_linked_chunk.events().next().is_some() {
1230                        // If there's at least one event, this means we've reached the start of the
1231                        // timeline, since the chunk is fully loaded.
1232                        trace!("chunk is fully loaded and non-empty: reached_start=true");
1233                        return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
1234                    }
1235
1236                    // Otherwise, start back-pagination from the end of the room.
1237                    return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: None });
1238                }
1239
1240                Err(err) => {
1241                    error!("error when loading the previous chunk of a linked chunk: {err}");
1242
1243                    // Clear storage for this room.
1244                    self.store
1245                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1246                        .await?;
1247
1248                    // Return the error.
1249                    return Err(err.into());
1250                }
1251            };
1252
1253            let chunk_content = new_first_chunk.content.clone();
1254
1255            // We've reached the start on disk, if and only if, there was no chunk prior to
1256            // the one we just loaded.
1257            //
1258            // This value is correct, if and only if, it is used for a chunk content of kind
1259            // `Items`.
1260            let reached_start = new_first_chunk.previous.is_none();
1261
1262            if let Err(err) =
1263                self.state.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk)
1264            {
1265                error!("error when inserting the previous chunk into its linked chunk: {err}");
1266
1267                // Clear storage for this room.
1268                self.store
1269                    .handle_linked_chunk_updates(
1270                        LinkedChunkId::Room(&self.state.room_id),
1271                        vec![Update::Clear],
1272                    )
1273                    .await?;
1274
1275                // Return the error.
1276                return Err(err.into());
1277            }
1278
1279            // ⚠️ Let's not propagate the updates to the store! We already have these data
1280            // in the store! Let's drain them.
1281            let _ = self.state.room_linked_chunk.store_updates().take();
1282
1283            // However, we want to get updates as `VectorDiff`s.
1284            let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1285
1286            Ok(match chunk_content {
1287                ChunkContent::Gap(gap) => {
1288                    trace!("reloaded chunk from disk (gap)");
1289                    LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
1290                }
1291
1292                ChunkContent::Items(events) => {
1293                    trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
1294                    LoadMoreEventsBackwardsOutcome::Events {
1295                        events,
1296                        timeline_event_diffs,
1297                        reached_start,
1298                    }
1299                }
1300            })
1301        }
1302
1303        /// If storage is enabled, unload all the chunks, then reloads only the
1304        /// last one.
1305        ///
1306        /// If storage's enabled, return a diff update that starts with a clear
1307        /// of all events; as a result, the caller may override any
1308        /// pending diff updates with the result of this function.
1309        ///
1310        /// Otherwise, returns `None`.
1311        pub async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1312            // Attempt to load the last chunk.
1313            let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1314            let (last_chunk, chunk_identifier_generator) =
1315                match self.store.load_last_chunk(linked_chunk_id).await {
1316                    Ok(pair) => pair,
1317
1318                    Err(err) => {
1319                        // If loading the last chunk failed, clear the entire linked chunk.
1320                        error!("error when reloading a linked chunk from memory: {err}");
1321
1322                        // Clear storage for this room.
1323                        self.store
1324                            .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1325                            .await?;
1326
1327                        // Restart with an empty linked chunk.
1328                        (None, ChunkIdentifierGenerator::new_from_scratch())
1329                    }
1330                };
1331
1332            debug!("unloading the linked chunk, and resetting it to its last chunk");
1333
1334            // Remove all the chunks from the linked chunks, except for the last one, and
1335            // updates the chunk identifier generator.
1336            if let Err(err) =
1337                self.state.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1338            {
1339                error!("error when replacing the linked chunk: {err}");
1340                return self.reset_internal().await;
1341            }
1342
1343            // Let pagination observers know that we may have not reached the start of the
1344            // timeline.
1345            // TODO: likely need to cancel any ongoing pagination.
1346            self.state
1347                .pagination_status
1348                .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1349
1350            // Don't propagate those updates to the store; this is only for the in-memory
1351            // representation that we're doing this. Let's drain those store updates.
1352            let _ = self.state.room_linked_chunk.store_updates().take();
1353
1354            Ok(())
1355        }
1356
1357        /// Automatically shrink the room if there are no more subscribers, as
1358        /// indicated by the atomic number of active subscribers.
1359        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1360        pub async fn auto_shrink_if_no_subscribers(
1361            &mut self,
1362        ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1363            let subscriber_count = self.state.subscriber_count.load(Ordering::SeqCst);
1364
1365            trace!(subscriber_count, "received request to auto-shrink");
1366
1367            if subscriber_count == 0 {
1368                // If we are the last strong reference to the auto-shrinker, we can shrink the
1369                // events data structure to its last chunk.
1370                self.shrink_to_last_chunk().await?;
1371
1372                Ok(Some(self.state.room_linked_chunk.updates_as_vector_diffs()))
1373            } else {
1374                Ok(None)
1375            }
1376        }
1377
1378        /// Force to shrink the room, whenever there is subscribers or not.
1379        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1380        pub async fn force_shrink_to_last_chunk(
1381            &mut self,
1382        ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1383            self.shrink_to_last_chunk().await?;
1384
1385            Ok(self.state.room_linked_chunk.updates_as_vector_diffs())
1386        }
1387
1388        /// Remove events by their position, in `EventLinkedChunk` and in
1389        /// `EventCacheStore`.
1390        ///
1391        /// This method is purposely isolated because it must ensure that
1392        /// positions are sorted appropriately or it can be disastrous.
1393        #[instrument(skip_all)]
1394        pub async fn remove_events(
1395            &mut self,
1396            in_memory_events: Vec<(OwnedEventId, Position)>,
1397            in_store_events: Vec<(OwnedEventId, Position)>,
1398        ) -> Result<(), EventCacheError> {
1399            // In-store events.
1400            if !in_store_events.is_empty() {
1401                let mut positions = in_store_events
1402                    .into_iter()
1403                    .map(|(_event_id, position)| position)
1404                    .collect::<Vec<_>>();
1405
1406                sort_positions_descending(&mut positions);
1407
1408                let updates = positions
1409                    .into_iter()
1410                    .map(|pos| Update::RemoveItem { at: pos })
1411                    .collect::<Vec<_>>();
1412
1413                self.apply_store_only_updates(updates).await?;
1414            }
1415
1416            // In-memory events.
1417            if in_memory_events.is_empty() {
1418                // Nothing else to do, return early.
1419                return Ok(());
1420            }
1421
1422            // `remove_events_by_position` is responsible of sorting positions.
1423            self.state
1424                .room_linked_chunk
1425                .remove_events_by_position(
1426                    in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1427                )
1428                .expect("failed to remove an event");
1429
1430            self.propagate_changes().await
1431        }
1432
1433        async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1434            let updates = self.state.room_linked_chunk.store_updates().take();
1435            self.send_updates_to_store(updates).await
1436        }
1437
1438        /// Apply some updates that are effective only on the store itself.
1439        ///
1440        /// This method should be used only for updates that happen *outside*
1441        /// the in-memory linked chunk. Such updates must be applied
1442        /// onto the ordering tracker as well as to the persistent
1443        /// storage.
1444        async fn apply_store_only_updates(
1445            &mut self,
1446            updates: Vec<Update<Event, Gap>>,
1447        ) -> Result<(), EventCacheError> {
1448            self.state.room_linked_chunk.order_tracker.map_updates(&updates);
1449            self.send_updates_to_store(updates).await
1450        }
1451
1452        async fn send_updates_to_store(
1453            &mut self,
1454            mut updates: Vec<Update<Event, Gap>>,
1455        ) -> Result<(), EventCacheError> {
1456            if updates.is_empty() {
1457                return Ok(());
1458            }
1459
1460            // Strip relations from updates which insert or replace items.
1461            for update in updates.iter_mut() {
1462                match update {
1463                    Update::PushItems { items, .. } => strip_relations_from_events(items),
1464                    Update::ReplaceItem { item, .. } => strip_relations_from_event(item),
1465                    // Other update kinds don't involve adding new events.
1466                    Update::NewItemsChunk { .. }
1467                    | Update::NewGapChunk { .. }
1468                    | Update::RemoveChunk(_)
1469                    | Update::RemoveItem { .. }
1470                    | Update::DetachLastItems { .. }
1471                    | Update::StartReattachItems
1472                    | Update::EndReattachItems
1473                    | Update::Clear => {}
1474                }
1475            }
1476
1477            // Spawn a task to make sure that all the changes are effectively forwarded to
1478            // the store, even if the call to this method gets aborted.
1479            //
1480            // The store cross-process locking involves an actual mutex, which ensures that
1481            // storing updates happens in the expected order.
1482
1483            let store = self.store.clone();
1484            let room_id = self.state.room_id.clone();
1485            let cloned_updates = updates.clone();
1486
1487            spawn(async move {
1488                trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1489                let linked_chunk_id = LinkedChunkId::Room(&room_id);
1490                store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1491                trace!("linked chunk updates applied");
1492
1493                super::Result::Ok(())
1494            })
1495            .await
1496            .expect("joining failed")?;
1497
1498            // Forward that the store got updated to observers.
1499            let _ = self.state.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1500                linked_chunk_id: OwnedLinkedChunkId::Room(self.state.room_id.clone()),
1501                updates,
1502            });
1503
1504            Ok(())
1505        }
1506
1507        /// Reset this data structure as if it were brand new.
1508        ///
1509        /// Return a single diff update that is a clear of all events; as a
1510        /// result, the caller may override any pending diff updates
1511        /// with the result of this function.
1512        pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1513            self.reset_internal().await?;
1514
1515            let diff_updates = self.state.room_linked_chunk.updates_as_vector_diffs();
1516
1517            // Ensure the contract defined in the doc comment is true:
1518            debug_assert_eq!(diff_updates.len(), 1);
1519            debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1520
1521            Ok(diff_updates)
1522        }
1523
1524        async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1525            self.state.room_linked_chunk.reset();
1526
1527            // No need to update the thread summaries: the room events are
1528            // gone because of the reset of `room_linked_chunk`.
1529            //
1530            // Clear the threads.
1531            for thread in self.state.threads.values_mut() {
1532                thread.clear();
1533            }
1534
1535            self.propagate_changes().await?;
1536
1537            // Reset the pagination state too: pretend we never waited for the initial
1538            // prev-batch token, and indicate that we're not at the start of the
1539            // timeline, since we don't know about that anymore.
1540            self.state.waited_for_initial_prev_token.store(false, Ordering::SeqCst);
1541            // TODO: likely must cancel any ongoing back-paginations too
1542            self.state
1543                .pagination_status
1544                .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1545
1546            Ok(())
1547        }
1548
1549        /// Handle the result of a sync.
1550        ///
1551        /// It may send room event cache updates to the given sender, if it
1552        /// generated any of those.
1553        ///
1554        /// Returns `true` for the first part of the tuple if a new gap
1555        /// (previous-batch token) has been inserted, `false` otherwise.
1556        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1557        pub async fn handle_sync(
1558            &mut self,
1559            mut timeline: Timeline,
1560        ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1561            let mut prev_batch = timeline.prev_batch.take();
1562
1563            let DeduplicationOutcome {
1564                all_events: events,
1565                in_memory_duplicated_event_ids,
1566                in_store_duplicated_event_ids,
1567                non_empty_all_duplicates: all_duplicates,
1568            } = filter_duplicate_events(
1569                &self.store,
1570                LinkedChunkId::Room(&self.state.room_id),
1571                &self.state.room_linked_chunk,
1572                timeline.events,
1573            )
1574            .await?;
1575
1576            // If the timeline isn't limited, and we already knew about some past events,
1577            // then this definitely knows what the timeline head is (either we know
1578            // about all the events persisted in storage, or we have a gap
1579            // somewhere). In this case, we can ditch the previous-batch
1580            // token, which is an optimization to avoid unnecessary future back-pagination
1581            // requests.
1582            //
1583            // We can also ditch it if we knew about all the events that came from sync,
1584            // namely, they were all deduplicated. In this case, using the
1585            // previous-batch token would only result in fetching other events we
1586            // knew about. This is slightly incorrect in the presence of
1587            // network splits, but this has shown to be Good Enough™.
1588            if !timeline.limited && self.state.room_linked_chunk.events().next().is_some()
1589                || all_duplicates
1590            {
1591                prev_batch = None;
1592            }
1593
1594            if prev_batch.is_some() {
1595                // Sad time: there's a gap, somewhere, in the timeline, and there's at least one
1596                // non-duplicated event. We don't know which threads might have gappy, so we
1597                // must invalidate them all :(
1598                // TODO(bnjbvr): figure out a better catchup mechanism for threads.
1599                let mut summaries_to_update = Vec::new();
1600
1601                for (thread_root, thread) in self.state.threads.iter_mut() {
1602                    // Empty the thread's linked chunk.
1603                    thread.clear();
1604
1605                    summaries_to_update.push(thread_root.clone());
1606                }
1607
1608                // Now, update the summaries to indicate that we're not sure what the latest
1609                // thread event is. The thread count can remain as is, as it might still be
1610                // valid, and there's no good value to reset it to, anyways.
1611                for thread_root in summaries_to_update {
1612                    let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1613                    else {
1614                        trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1615                        continue;
1616                    };
1617
1618                    if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1619                        prev_summary.latest_reply = None;
1620
1621                        target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1622
1623                        self.replace_event_at(location, target_event).await?;
1624                    }
1625                }
1626            }
1627
1628            if all_duplicates {
1629                // No new events and no gap (per the previous check), thus no need to change the
1630                // room state. We're done!
1631                return Ok((false, Vec::new()));
1632            }
1633
1634            let has_new_gap = prev_batch.is_some();
1635
1636            // If we've never waited for an initial previous-batch token, and we've now
1637            // inserted a gap, no need to wait for a previous-batch token later.
1638            if !self.state.waited_for_initial_prev_token.load(Ordering::SeqCst) && has_new_gap {
1639                self.state.waited_for_initial_prev_token.store(true, Ordering::SeqCst);
1640            }
1641
1642            // Remove the old duplicated events.
1643            //
1644            // We don't have to worry the removals can change the position of the existing
1645            // events, because we are pushing all _new_ `events` at the back.
1646            self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1647                .await?;
1648
1649            self.state
1650                .room_linked_chunk
1651                .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1652
1653            self.post_process_new_events(events, true).await?;
1654
1655            if timeline.limited && has_new_gap {
1656                // If there was a previous batch token for a limited timeline, unload the chunks
1657                // so it only contains the last one; otherwise, there might be a
1658                // valid gap in between, and observers may not render it (yet).
1659                //
1660                // We must do this *after* persisting these events to storage (in
1661                // `post_process_new_events`).
1662                self.shrink_to_last_chunk().await?;
1663            }
1664
1665            let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1666
1667            Ok((has_new_gap, timeline_event_diffs))
1668        }
1669
1670        /// Handle the result of a single back-pagination request.
1671        ///
1672        /// If the `prev_token` is set, then this function will check that the
1673        /// corresponding gap is present in the in-memory linked chunk.
1674        /// If it's not the case, `Ok(None)` will be returned, and the
1675        /// caller may decide to do something based on that (e.g. restart a
1676        /// pagination).
1677        #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1678        pub async fn handle_backpagination(
1679            &mut self,
1680            events: Vec<Event>,
1681            mut new_token: Option<String>,
1682            prev_token: Option<String>,
1683        ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1684        {
1685            // Check that the previous token still exists; otherwise it's a sign that the
1686            // room's timeline has been cleared.
1687            let prev_gap_id = if let Some(token) = prev_token {
1688                // Find the corresponding gap in the in-memory linked chunk.
1689                let gap_chunk_id = self.state.room_linked_chunk.chunk_identifier(|chunk| {
1690                    matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
1691                });
1692
1693                if gap_chunk_id.is_none() {
1694                    // We got a previous-batch token from the linked chunk *before* running the
1695                    // request, but it is missing *after* completing the request.
1696                    //
1697                    // It may be a sign the linked chunk has been reset, but it's fine, per this
1698                    // function's contract.
1699                    return Ok(None);
1700                }
1701
1702                gap_chunk_id
1703            } else {
1704                None
1705            };
1706
1707            let DeduplicationOutcome {
1708                all_events: mut events,
1709                in_memory_duplicated_event_ids,
1710                in_store_duplicated_event_ids,
1711                non_empty_all_duplicates: all_duplicates,
1712            } = filter_duplicate_events(
1713                &self.store,
1714                LinkedChunkId::Room(&self.state.room_id),
1715                &self.state.room_linked_chunk,
1716                events,
1717            )
1718            .await?;
1719
1720            // If not all the events have been back-paginated, we need to remove the
1721            // previous ones, otherwise we can end up with misordered events.
1722            //
1723            // Consider the following scenario:
1724            // - sync returns [D, E, F]
1725            // - then sync returns [] with a previous batch token PB1, so the internal
1726            //   linked chunk state is [D, E, F, PB1].
1727            // - back-paginating with PB1 may return [A, B, C, D, E, F].
1728            //
1729            // Only inserting the new events when replacing PB1 would result in a timeline
1730            // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
1731            // all the events, in case this happens (see also #4746).
1732
1733            if !all_duplicates {
1734                // Let's forget all the previous events.
1735                self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1736                    .await?;
1737            } else {
1738                // All new events are duplicated, they can all be ignored.
1739                events.clear();
1740                // The gap can be ditched too, as it won't be useful to backpaginate any
1741                // further.
1742                new_token = None;
1743            }
1744
1745            // `/messages` has been called with `dir=b` (backwards), so the events are in
1746            // the inverted order; reorder them.
1747            let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1748
1749            let new_gap = new_token.map(|prev_token| Gap { prev_token });
1750            let reached_start = self.state.room_linked_chunk.finish_back_pagination(
1751                prev_gap_id,
1752                new_gap,
1753                &topo_ordered_events,
1754            );
1755
1756            // Note: this flushes updates to the store.
1757            self.post_process_new_events(topo_ordered_events, false).await?;
1758
1759            let event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1760
1761            Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1762        }
1763
1764        /// Subscribe to thread for a given root event, and get a (maybe empty)
1765        /// initially known list of events for that thread.
1766        pub fn subscribe_to_thread(
1767            &mut self,
1768            root: OwnedEventId,
1769        ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1770            self.get_or_reload_thread(root).subscribe()
1771        }
1772
1773        /// Back paginate in the given thread.
1774        ///
1775        /// Will always start from the end, unless we previously paginated.
1776        pub fn finish_thread_network_pagination(
1777            &mut self,
1778            root: OwnedEventId,
1779            prev_token: Option<String>,
1780            new_token: Option<String>,
1781            events: Vec<Event>,
1782        ) -> Option<BackPaginationOutcome> {
1783            self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1784        }
1785
1786        pub fn load_more_thread_events_backwards(
1787            &mut self,
1788            root: OwnedEventId,
1789        ) -> LoadMoreEventsBackwardsOutcome {
1790            self.get_or_reload_thread(root).load_more_events_backwards()
1791        }
1792
1793        // --------------------------------------------
1794        // utility methods
1795        // --------------------------------------------
1796
1797        /// Post-process new events, after they have been added to the in-memory
1798        /// linked chunk.
1799        ///
1800        /// Flushes updates to disk first.
1801        pub(in super::super) async fn post_process_new_events(
1802            &mut self,
1803            events: Vec<Event>,
1804            is_sync: bool,
1805        ) -> Result<(), EventCacheError> {
1806            // Update the store before doing the post-processing.
1807            self.propagate_changes().await?;
1808
1809            let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1810
1811            for event in events {
1812                self.maybe_apply_new_redaction(&event).await?;
1813
1814                if self.state.enabled_thread_support {
1815                    // Only add the event to a thread if:
1816                    // - thread support is enabled,
1817                    // - and if this is a sync (we can't know where to insert backpaginated events
1818                    //   in threads).
1819                    if is_sync {
1820                        if let Some(thread_root) = extract_thread_root(event.raw()) {
1821                            new_events_by_thread
1822                                .entry(thread_root)
1823                                .or_default()
1824                                .push(event.clone());
1825                        } else if let Some(event_id) = event.event_id() {
1826                            // If we spot the root of a thread, add it to its linked chunk.
1827                            if self.state.threads.contains_key(&event_id) {
1828                                new_events_by_thread
1829                                    .entry(event_id)
1830                                    .or_default()
1831                                    .push(event.clone());
1832                            }
1833                        }
1834                    }
1835
1836                    // Look for edits that may apply to a thread; we'll process them later.
1837                    if let Some(edit_target) = extract_edit_target(event.raw()) {
1838                        // If the edited event is known, and part of a thread,
1839                        if let Some((_location, edit_target_event)) =
1840                            self.find_event(&edit_target).await?
1841                            && let Some(thread_root) = extract_thread_root(edit_target_event.raw())
1842                        {
1843                            // Mark the thread for processing, unless it was already marked as
1844                            // such.
1845                            new_events_by_thread.entry(thread_root).or_default();
1846                        }
1847                    }
1848                }
1849
1850                // Save a bundled thread event, if there was one.
1851                if let Some(bundled_thread) = event.bundled_latest_thread_event {
1852                    self.save_events([*bundled_thread]).await?;
1853                }
1854            }
1855
1856            if self.state.enabled_thread_support {
1857                self.update_threads(new_events_by_thread).await?;
1858            }
1859
1860            Ok(())
1861        }
1862
1863        fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1864            // TODO: when there's persistent storage, try to lazily reload from disk, if
1865            // missing from memory.
1866            let room_id = self.state.room_id.clone();
1867            let linked_chunk_update_sender = self.state.linked_chunk_update_sender.clone();
1868
1869            self.state.threads.entry(root_event_id.clone()).or_insert_with(|| {
1870                ThreadEventCache::new(room_id, root_event_id, linked_chunk_update_sender)
1871            })
1872        }
1873
1874        #[instrument(skip_all)]
1875        async fn update_threads(
1876            &mut self,
1877            new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1878        ) -> Result<(), EventCacheError> {
1879            for (thread_root, new_events) in new_events_by_thread {
1880                let thread_cache = self.get_or_reload_thread(thread_root.clone());
1881
1882                thread_cache.add_live_events(new_events);
1883
1884                let mut latest_event_id = thread_cache.latest_event_id();
1885
1886                // If there's an edit to the latest event in the thread, use the latest edit
1887                // event id as the latest event id for the thread summary.
1888                if let Some(event_id) = latest_event_id.as_ref()
1889                    && let Some((original_event, edits)) = self
1890                        .find_event_with_relations(event_id, Some(vec![RelationType::Replacement]))
1891                        .await?
1892                {
1893                    let latest_valid_edit = edits.into_iter().rfind(|edit| {
1894                        let original_json = original_event.raw();
1895                        let original_encryption_info = original_event.encryption_info();
1896                        let replacement_json = edit.raw();
1897                        let replacement_encryption_info = edit.encryption_info();
1898
1899                        check_validity_of_replacement_events(
1900                            original_json,
1901                            original_encryption_info.map(|v| &**v),
1902                            replacement_json,
1903                            replacement_encryption_info.map(|v| &**v),
1904                        )
1905                        .is_ok()
1906                    });
1907
1908                    if let Some(latest_valid_edit) = latest_valid_edit {
1909                        latest_event_id = latest_valid_edit.event_id();
1910                    }
1911                }
1912
1913                self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
1914            }
1915
1916            Ok(())
1917        }
1918
1919        /// Update a thread summary on the given thread root, if needs be.
1920        async fn maybe_update_thread_summary(
1921            &mut self,
1922            thread_root: OwnedEventId,
1923            latest_event_id: Option<OwnedEventId>,
1924        ) -> Result<(), EventCacheError> {
1925            // Add a thread summary to the (room) event which has the thread root, if we
1926            // knew about it.
1927
1928            let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
1929                trace!(%thread_root, "thread root event is missing from the room linked chunk");
1930                return Ok(());
1931            };
1932
1933            let prev_summary = target_event.thread_summary.summary();
1934
1935            // Recompute the thread summary, if needs be.
1936
1937            // Read the latest number of thread replies from the store.
1938            //
1939            // Implementation note: since this is based on the `m.relates_to` field, and
1940            // that field can only be present on room messages, we don't have to
1941            // worry about filtering out aggregation events (like
1942            // reactions/edits/etc.). Pretty neat, huh?
1943            let num_replies = {
1944                let thread_replies = self
1945                    .store
1946                    .find_event_relations(
1947                        &self.state.room_id,
1948                        &thread_root,
1949                        Some(&[RelationType::Thread]),
1950                    )
1951                    .await?;
1952                thread_replies.len().try_into().unwrap_or(u32::MAX)
1953            };
1954
1955            let new_summary = if num_replies > 0 {
1956                Some(ThreadSummary { num_replies, latest_reply: latest_event_id })
1957            } else {
1958                None
1959            };
1960
1961            if prev_summary == new_summary.as_ref() {
1962                trace!(%thread_root, "thread summary is already up-to-date");
1963                return Ok(());
1964            }
1965
1966            // Trigger an update to observers.
1967            trace!(%thread_root, "updating thread summary: {new_summary:?}");
1968            target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary);
1969            self.replace_event_at(location, target_event).await
1970        }
1971
1972        /// Replaces a single event, be it saved in memory or in the store.
1973        ///
1974        /// If it was saved in memory, this will emit a notification to
1975        /// observers that a single item has been replaced. Otherwise,
1976        /// such a notification is not emitted, because observers are
1977        /// unlikely to observe the store updates directly.
1978        pub(crate) async fn replace_event_at(
1979            &mut self,
1980            location: EventLocation,
1981            event: Event,
1982        ) -> Result<(), EventCacheError> {
1983            match location {
1984                EventLocation::Memory(position) => {
1985                    self.state
1986                        .room_linked_chunk
1987                        .replace_event_at(position, event)
1988                        .expect("should have been a valid position of an item");
1989                    // We just changed the in-memory representation; synchronize this with
1990                    // the store.
1991                    self.propagate_changes().await?;
1992                }
1993                EventLocation::Store => {
1994                    self.save_events([event]).await?;
1995                }
1996            }
1997
1998            Ok(())
1999        }
2000
2001        /// If the given event is a redaction, try to retrieve the
2002        /// to-be-redacted event in the chunk, and replace it by the
2003        /// redacted form.
2004        #[instrument(skip_all)]
2005        async fn maybe_apply_new_redaction(
2006            &mut self,
2007            event: &Event,
2008        ) -> Result<(), EventCacheError> {
2009            let raw_event = event.raw();
2010
2011            // Do not deserialise the entire event if we aren't certain it's a
2012            // `m.room.redaction`. It saves a non-negligible amount of computations.
2013            let Ok(Some(MessageLikeEventType::RoomRedaction)) =
2014                raw_event.get_field::<MessageLikeEventType>("type")
2015            else {
2016                return Ok(());
2017            };
2018
2019            // It is a `m.room.redaction`! We can deserialize it entirely.
2020
2021            let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
2022                redaction,
2023            ))) = raw_event.deserialize()
2024            else {
2025                return Ok(());
2026            };
2027
2028            let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else {
2029                warn!("missing target event id from the redaction event");
2030                return Ok(());
2031            };
2032
2033            // Replace the redacted event by a redacted form, if we knew about it.
2034            let Some((location, mut target_event)) = self.find_event(event_id).await? else {
2035                trace!("redacted event is missing from the linked chunk");
2036                return Ok(());
2037            };
2038
2039            // Don't redact already redacted events.
2040            let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
2041                // TODO: replace with `deserialized.is_redacted()` when
2042                // https://github.com/ruma/ruma/pull/2254 has been merged.
2043                match deserialized {
2044                    AnySyncTimelineEvent::MessageLike(ev) => {
2045                        if ev.is_redacted() {
2046                            return Ok(());
2047                        }
2048                    }
2049                    AnySyncTimelineEvent::State(ev) => {
2050                        if ev.is_redacted() {
2051                            return Ok(());
2052                        }
2053                    }
2054                }
2055
2056                // If the event is part of a thread, update the thread linked chunk and the
2057                // summary.
2058                extract_thread_root(target_event.raw())
2059            } else {
2060                warn!("failed to deserialize the event to redact");
2061                None
2062            };
2063
2064            if let Some(redacted_event) = apply_redaction(
2065                target_event.raw(),
2066                event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
2067                &self.state.room_version_rules.redaction,
2068            ) {
2069                // It's safe to cast `redacted_event` here:
2070                // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
2071                //   when calling .raw(), so it's still one under the hood.
2072                // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
2073                target_event.replace_raw(redacted_event.cast_unchecked());
2074
2075                self.replace_event_at(location, target_event).await?;
2076
2077                // If the redacted event was part of a thread, remove it in the thread linked
2078                // chunk too, and make sure to update the thread root's summary
2079                // as well.
2080                //
2081                // Note: there is an ordering issue here: the above `replace_event_at` must
2082                // happen BEFORE we recompute the summary, otherwise the set of
2083                // replies may include the to-be-redacted event.
2084                if let Some(thread_root) = thread_root
2085                    && let Some(thread_cache) = self.state.threads.get_mut(&thread_root)
2086                {
2087                    thread_cache.remove_if_present(event_id);
2088
2089                    // The number of replies may have changed, so update the thread summary if
2090                    // needs be.
2091                    let latest_event_id = thread_cache.latest_event_id();
2092                    self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
2093                }
2094            }
2095
2096            Ok(())
2097        }
2098
2099        /// Save events into the database, without notifying observers.
2100        pub async fn save_events(
2101            &mut self,
2102            events: impl IntoIterator<Item = Event>,
2103        ) -> Result<(), EventCacheError> {
2104            let store = self.store.clone();
2105            let room_id = self.state.room_id.clone();
2106            let events = events.into_iter().collect::<Vec<_>>();
2107
2108            // Spawn a task so the save is uninterrupted by task cancellation.
2109            spawn(async move {
2110                for event in events {
2111                    store.save_event(&room_id, event).await?;
2112                }
2113                super::Result::Ok(())
2114            })
2115            .await
2116            .expect("joining failed")?;
2117
2118            Ok(())
2119        }
2120
2121        #[cfg(test)]
2122        pub fn is_dirty(&self) -> bool {
2123            EventCacheStoreLockGuard::is_dirty(&self.store)
2124        }
2125    }
2126
2127    /// Load a linked chunk's full metadata, making sure the chunks are
2128    /// according to their their links.
2129    ///
2130    /// Returns `None` if there's no such linked chunk in the store, or an
2131    /// error if the linked chunk is malformed.
2132    async fn load_linked_chunk_metadata(
2133        store_guard: &EventCacheStoreLockGuard,
2134        linked_chunk_id: LinkedChunkId<'_>,
2135    ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
2136        let mut all_chunks = store_guard
2137            .load_all_chunks_metadata(linked_chunk_id)
2138            .await
2139            .map_err(EventCacheError::from)?;
2140
2141        if all_chunks.is_empty() {
2142            // There are no chunks, so there's nothing to do.
2143            return Ok(None);
2144        }
2145
2146        // Transform the vector into a hashmap, for quick lookup of the predecessors.
2147        let chunk_map: HashMap<_, _> =
2148            all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
2149
2150        // Find a last chunk.
2151        let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
2152        let Some(last) = iter.next() else {
2153            return Err(EventCacheError::InvalidLinkedChunkMetadata {
2154                details: "no last chunk found".to_owned(),
2155            });
2156        };
2157
2158        // There must at most one last chunk.
2159        if let Some(other_last) = iter.next() {
2160            return Err(EventCacheError::InvalidLinkedChunkMetadata {
2161                details: format!(
2162                    "chunks {} and {} both claim to be last chunks",
2163                    last.identifier.index(),
2164                    other_last.identifier.index()
2165                ),
2166            });
2167        }
2168
2169        // Rewind the chain back to the first chunk, and do some checks at the same
2170        // time.
2171        let mut seen = HashSet::new();
2172        let mut current = last;
2173        loop {
2174            // If we've already seen this chunk, there's a cycle somewhere.
2175            if !seen.insert(current.identifier) {
2176                return Err(EventCacheError::InvalidLinkedChunkMetadata {
2177                    details: format!(
2178                        "cycle detected in linked chunk at {}",
2179                        current.identifier.index()
2180                    ),
2181                });
2182            }
2183
2184            let Some(prev_id) = current.previous else {
2185                // If there's no previous chunk, we're done.
2186                if seen.len() != all_chunks.len() {
2187                    return Err(EventCacheError::InvalidLinkedChunkMetadata {
2188                        details: format!(
2189                            "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
2190                            seen.len(),
2191                            all_chunks.len()
2192                        ),
2193                    });
2194                }
2195                break;
2196            };
2197
2198            // If the previous chunk is not in the map, then it's unknown
2199            // and missing.
2200            let Some(pred_meta) = chunk_map.get(&prev_id) else {
2201                return Err(EventCacheError::InvalidLinkedChunkMetadata {
2202                    details: format!(
2203                        "missing predecessor {} chunk for {}",
2204                        prev_id.index(),
2205                        current.identifier.index()
2206                    ),
2207                });
2208            };
2209
2210            // If the previous chunk isn't connected to the next, then the link is invalid.
2211            if pred_meta.next != Some(current.identifier) {
2212                return Err(EventCacheError::InvalidLinkedChunkMetadata {
2213                    details: format!(
2214                        "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
2215                        pred_meta.identifier.index(),
2216                        pred_meta.next.map(|chunk_id| chunk_id.index()),
2217                        current.identifier.index()
2218                    ),
2219                });
2220            }
2221
2222            current = *pred_meta;
2223        }
2224
2225        // At this point, `current` is the identifier of the first chunk.
2226        //
2227        // Reorder the resulting vector, by going through the chain of `next` links, and
2228        // swapping items into their final position.
2229        //
2230        // Invariant in this loop: all items in [0..i[ are in their final, correct
2231        // position.
2232        let mut current = current.identifier;
2233        for i in 0..all_chunks.len() {
2234            // Find the target metadata.
2235            let j = all_chunks
2236                .iter()
2237                .rev()
2238                .position(|meta| meta.identifier == current)
2239                .map(|j| all_chunks.len() - 1 - j)
2240                .expect("the target chunk must be present in the metadata");
2241            if i != j {
2242                all_chunks.swap(i, j);
2243            }
2244            if let Some(next) = all_chunks[i].next {
2245                current = next;
2246            }
2247        }
2248
2249        Ok(Some(all_chunks))
2250    }
2251
2252    /// Removes the bundled relations from an event, if they were present.
2253    ///
2254    /// Only replaces the present if it contained bundled relations.
2255    fn strip_relations_if_present<T>(event: &mut Raw<T>) {
2256        // We're going to get rid of the `unsigned`/`m.relations` field, if it's
2257        // present.
2258        // Use a closure that returns an option so we can quickly short-circuit.
2259        let mut closure = || -> Option<()> {
2260            let mut val: serde_json::Value = event.deserialize_as().ok()?;
2261            let unsigned = val.get_mut("unsigned")?;
2262            let unsigned_obj = unsigned.as_object_mut()?;
2263            if unsigned_obj.remove("m.relations").is_some() {
2264                *event = Raw::new(&val).ok()?.cast_unchecked();
2265            }
2266            None
2267        };
2268        let _ = closure();
2269    }
2270
2271    fn strip_relations_from_event(ev: &mut Event) {
2272        match &mut ev.kind {
2273            TimelineEventKind::Decrypted(decrypted) => {
2274                // Remove all information about encryption info for
2275                // the bundled events.
2276                decrypted.unsigned_encryption_info = None;
2277
2278                // Remove the `unsigned`/`m.relations` field, if needs be.
2279                strip_relations_if_present(&mut decrypted.event);
2280            }
2281
2282            TimelineEventKind::UnableToDecrypt { event, .. }
2283            | TimelineEventKind::PlainText { event } => {
2284                strip_relations_if_present(event);
2285            }
2286        }
2287    }
2288
2289    /// Strips the bundled relations from a collection of events.
2290    fn strip_relations_from_events(items: &mut [Event]) {
2291        for ev in items.iter_mut() {
2292            strip_relations_from_event(ev);
2293        }
2294    }
2295
2296    /// Implementation of [`RoomEventCacheStateLockReadGuard::find_event`] and
2297    /// [`RoomEventCacheStateLockWriteGuard::find_event`].
2298    async fn find_event(
2299        event_id: &EventId,
2300        room_id: &RoomId,
2301        room_linked_chunk: &EventLinkedChunk,
2302        store: &EventCacheStoreLockGuard,
2303    ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
2304        // There are supposedly fewer events loaded in memory than in the store. Let's
2305        // start by looking up in the `EventLinkedChunk`.
2306        for (position, event) in room_linked_chunk.revents() {
2307            if event.event_id().as_deref() == Some(event_id) {
2308                return Ok(Some((EventLocation::Memory(position), event.clone())));
2309            }
2310        }
2311
2312        Ok(store.find_event(room_id, event_id).await?.map(|event| (EventLocation::Store, event)))
2313    }
2314
2315    /// Implementation of
2316    /// [`RoomEventCacheStateLockReadGuard::find_event_with_relations`] and
2317    /// [`RoomEventCacheStateLockWriteGuard::find_event_with_relations`].
2318    async fn find_event_with_relations(
2319        event_id: &EventId,
2320        room_id: &RoomId,
2321        filters: Option<Vec<RelationType>>,
2322        room_linked_chunk: &EventLinkedChunk,
2323        store: &EventCacheStoreLockGuard,
2324    ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
2325        // First, hit storage to get the target event and its related events.
2326        let found = store.find_event(room_id, event_id).await?;
2327
2328        let Some(target) = found else {
2329            // We haven't found the event: return early.
2330            return Ok(None);
2331        };
2332
2333        // Then, initialize the stack with all the related events, to find the
2334        // transitive closure of all the related events.
2335        let mut related = store.find_event_relations(room_id, event_id, filters.as_deref()).await?;
2336        let mut stack =
2337            related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
2338
2339        // Also keep track of already seen events, in case there's a loop in the
2340        // relation graph.
2341        let mut already_seen = HashSet::new();
2342        already_seen.insert(event_id.to_owned());
2343
2344        let mut num_iters = 1;
2345
2346        // Find the related event for each previously-related event.
2347        while let Some(event_id) = stack.pop() {
2348            if !already_seen.insert(event_id.clone()) {
2349                // Skip events we've already seen.
2350                continue;
2351            }
2352
2353            let other_related =
2354                store.find_event_relations(room_id, &event_id, filters.as_deref()).await?;
2355
2356            stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
2357            related.extend(other_related);
2358
2359            num_iters += 1;
2360        }
2361
2362        trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
2363
2364        // Sort the results by their positions in the linked chunk, if available.
2365        //
2366        // If an event doesn't have a known position, it goes to the start of the array.
2367        related.sort_by(|(_, lhs), (_, rhs)| {
2368            use std::cmp::Ordering;
2369
2370            match (lhs, rhs) {
2371                (None, None) => Ordering::Equal,
2372                (None, Some(_)) => Ordering::Less,
2373                (Some(_), None) => Ordering::Greater,
2374                (Some(lhs), Some(rhs)) => {
2375                    let lhs = room_linked_chunk.event_order(*lhs);
2376                    let rhs = room_linked_chunk.event_order(*rhs);
2377
2378                    // The events should have a definite position, but in the case they don't,
2379                    // still consider that not having a position means you'll end at the start
2380                    // of the array.
2381                    match (lhs, rhs) {
2382                        (None, None) => Ordering::Equal,
2383                        (None, Some(_)) => Ordering::Less,
2384                        (Some(_), None) => Ordering::Greater,
2385                        (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
2386                    }
2387                }
2388            }
2389        });
2390
2391        // Keep only the events, not their positions.
2392        let related = related.into_iter().map(|(event, _pos)| event).collect();
2393
2394        Ok(Some((target, related)))
2395    }
2396}
2397
2398/// An enum representing where an event has been found.
2399pub(super) enum EventLocation {
2400    /// Event lives in memory (and likely in the store!).
2401    Memory(Position),
2402
2403    /// Event lives in the store only, it has not been loaded in memory yet.
2404    Store,
2405}
2406
2407pub(super) use private::RoomEventCacheStateLock;
2408
2409#[cfg(test)]
2410mod tests {
2411    use matrix_sdk_base::event_cache::Event;
2412    use matrix_sdk_test::{async_test, event_factory::EventFactory};
2413    use ruma::{
2414        RoomId, event_id,
2415        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2416        room_id, user_id,
2417    };
2418
2419    use crate::test_utils::logged_in_client;
2420
2421    #[async_test]
2422    async fn test_find_event_by_id_with_edit_relation() {
2423        let original_id = event_id!("$original");
2424        let related_id = event_id!("$related");
2425        let room_id = room_id!("!galette:saucisse.bzh");
2426        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2427
2428        assert_relations(
2429            room_id,
2430            f.text_msg("Original event").event_id(original_id).into(),
2431            f.text_msg("* An edited event")
2432                .edit(
2433                    original_id,
2434                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
2435                )
2436                .event_id(related_id)
2437                .into(),
2438            f,
2439        )
2440        .await;
2441    }
2442
2443    #[async_test]
2444    async fn test_find_event_by_id_with_thread_reply_relation() {
2445        let original_id = event_id!("$original");
2446        let related_id = event_id!("$related");
2447        let room_id = room_id!("!galette:saucisse.bzh");
2448        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2449
2450        assert_relations(
2451            room_id,
2452            f.text_msg("Original event").event_id(original_id).into(),
2453            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2454            f,
2455        )
2456        .await;
2457    }
2458
2459    #[async_test]
2460    async fn test_find_event_by_id_with_reaction_relation() {
2461        let original_id = event_id!("$original");
2462        let related_id = event_id!("$related");
2463        let room_id = room_id!("!galette:saucisse.bzh");
2464        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2465
2466        assert_relations(
2467            room_id,
2468            f.text_msg("Original event").event_id(original_id).into(),
2469            f.reaction(original_id, ":D").event_id(related_id).into(),
2470            f,
2471        )
2472        .await;
2473    }
2474
2475    #[async_test]
2476    async fn test_find_event_by_id_with_poll_response_relation() {
2477        let original_id = event_id!("$original");
2478        let related_id = event_id!("$related");
2479        let room_id = room_id!("!galette:saucisse.bzh");
2480        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2481
2482        assert_relations(
2483            room_id,
2484            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2485                .event_id(original_id)
2486                .into(),
2487            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2488            f,
2489        )
2490        .await;
2491    }
2492
2493    #[async_test]
2494    async fn test_find_event_by_id_with_poll_end_relation() {
2495        let original_id = event_id!("$original");
2496        let related_id = event_id!("$related");
2497        let room_id = room_id!("!galette:saucisse.bzh");
2498        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2499
2500        assert_relations(
2501            room_id,
2502            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2503                .event_id(original_id)
2504                .into(),
2505            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2506            f,
2507        )
2508        .await;
2509    }
2510
2511    #[async_test]
2512    async fn test_find_event_by_id_with_filtered_relationships() {
2513        let original_id = event_id!("$original");
2514        let related_id = event_id!("$related");
2515        let associated_related_id = event_id!("$recursive_related");
2516        let room_id = room_id!("!galette:saucisse.bzh");
2517        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2518
2519        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2520        let related_event = event_factory
2521            .text_msg("* Edited event")
2522            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2523            .event_id(related_id)
2524            .into();
2525        let associated_related_event =
2526            event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2527
2528        let client = logged_in_client(None).await;
2529
2530        let event_cache = client.event_cache();
2531        event_cache.subscribe().unwrap();
2532
2533        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2534        let room = client.get_room(room_id).unwrap();
2535
2536        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2537
2538        // Save the original event.
2539        room_event_cache.save_events([original_event]).await;
2540
2541        // Save the related event.
2542        room_event_cache.save_events([related_event]).await;
2543
2544        // Save the associated related event, which redacts the related event.
2545        room_event_cache.save_events([associated_related_event]).await;
2546
2547        let filter = Some(vec![RelationType::Replacement]);
2548        let (event, related_events) = room_event_cache
2549            .find_event_with_relations(original_id, filter)
2550            .await
2551            .expect("Failed to find the event with relations")
2552            .expect("Event has no relation");
2553        // Fetched event is the right one.
2554        let cached_event_id = event.event_id().unwrap();
2555        assert_eq!(cached_event_id, original_id);
2556
2557        // There's only the edit event (an edit event can't have its own edit event).
2558        assert_eq!(related_events.len(), 1);
2559
2560        let related_event_id = related_events[0].event_id().unwrap();
2561        assert_eq!(related_event_id, related_id);
2562
2563        // Now we'll filter threads instead, there should be no related events
2564        let filter = Some(vec![RelationType::Thread]);
2565        let (event, related_events) = room_event_cache
2566            .find_event_with_relations(original_id, filter)
2567            .await
2568            .expect("Failed to find the event with relations")
2569            .expect("Event has no relation");
2570
2571        // Fetched event is the right one.
2572        let cached_event_id = event.event_id().unwrap();
2573        assert_eq!(cached_event_id, original_id);
2574        // No Thread related events found
2575        assert!(related_events.is_empty());
2576    }
2577
2578    #[async_test]
2579    async fn test_find_event_by_id_with_recursive_relation() {
2580        let original_id = event_id!("$original");
2581        let related_id = event_id!("$related");
2582        let associated_related_id = event_id!("$recursive_related");
2583        let room_id = room_id!("!galette:saucisse.bzh");
2584        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2585
2586        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2587        let related_event = event_factory
2588            .text_msg("* Edited event")
2589            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2590            .event_id(related_id)
2591            .into();
2592        let associated_related_event =
2593            event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2594
2595        let client = logged_in_client(None).await;
2596
2597        let event_cache = client.event_cache();
2598        event_cache.subscribe().unwrap();
2599
2600        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2601        let room = client.get_room(room_id).unwrap();
2602
2603        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2604
2605        // Save the original event.
2606        room_event_cache.save_events([original_event]).await;
2607
2608        // Save the related event.
2609        room_event_cache.save_events([related_event]).await;
2610
2611        // Save the associated related event, which redacts the related event.
2612        room_event_cache.save_events([associated_related_event]).await;
2613
2614        let (event, related_events) = room_event_cache
2615            .find_event_with_relations(original_id, None)
2616            .await
2617            .expect("Failed to find the event with relations")
2618            .expect("Event has no relation");
2619        // Fetched event is the right one.
2620        let cached_event_id = event.event_id().unwrap();
2621        assert_eq!(cached_event_id, original_id);
2622
2623        // There are both the related id and the associatively related id
2624        assert_eq!(related_events.len(), 2);
2625
2626        let related_event_id = related_events[0].event_id().unwrap();
2627        assert_eq!(related_event_id, related_id);
2628        let related_event_id = related_events[1].event_id().unwrap();
2629        assert_eq!(related_event_id, associated_related_id);
2630    }
2631
2632    async fn assert_relations(
2633        room_id: &RoomId,
2634        original_event: Event,
2635        related_event: Event,
2636        event_factory: EventFactory,
2637    ) {
2638        let client = logged_in_client(None).await;
2639
2640        let event_cache = client.event_cache();
2641        event_cache.subscribe().unwrap();
2642
2643        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2644        let room = client.get_room(room_id).unwrap();
2645
2646        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2647
2648        // Save the original event.
2649        let original_event_id = original_event.event_id().unwrap();
2650        room_event_cache.save_events([original_event]).await;
2651
2652        // Save an unrelated event to check it's not in the related events list.
2653        let unrelated_id = event_id!("$2");
2654        room_event_cache
2655            .save_events([event_factory
2656                .text_msg("An unrelated event")
2657                .event_id(unrelated_id)
2658                .into()])
2659            .await;
2660
2661        // Save the related event.
2662        let related_id = related_event.event_id().unwrap();
2663        room_event_cache.save_events([related_event]).await;
2664
2665        let (event, related_events) = room_event_cache
2666            .find_event_with_relations(&original_event_id, None)
2667            .await
2668            .expect("Failed to find the event with relations")
2669            .expect("Event has no relation");
2670        // Fetched event is the right one.
2671        let cached_event_id = event.event_id().unwrap();
2672        assert_eq!(cached_event_id, original_event_id);
2673
2674        // There is only the actually related event in the related ones
2675        let related_event_id = related_events[0].event_id().unwrap();
2676        assert_eq!(related_event_id, related_id);
2677    }
2678}
2679
2680#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support.
2681mod timed_tests {
2682    use std::{ops::Not, sync::Arc};
2683
2684    use assert_matches::assert_matches;
2685    use assert_matches2::assert_let;
2686    use eyeball_im::VectorDiff;
2687    use futures_util::FutureExt;
2688    use matrix_sdk_base::{
2689        event_cache::{
2690            Gap,
2691            store::{EventCacheStore as _, MemoryStore},
2692        },
2693        linked_chunk::{
2694            ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
2695            lazy_loader::from_all_chunks,
2696        },
2697        store::StoreConfig,
2698        sync::{JoinedRoomUpdate, Timeline},
2699    };
2700    use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
2701    use ruma::{
2702        EventId, OwnedUserId, event_id,
2703        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2704        room_id, user_id,
2705    };
2706    use tokio::task::yield_now;
2707
2708    use super::RoomEventCacheGenericUpdate;
2709    use crate::{
2710        assert_let_timeout,
2711        event_cache::{RoomEventCache, RoomEventCacheUpdate, room::LoadMoreEventsBackwardsOutcome},
2712        test_utils::client::MockClientBuilder,
2713    };
2714
2715    #[async_test]
2716    async fn test_write_to_storage() {
2717        let room_id = room_id!("!galette:saucisse.bzh");
2718        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2719
2720        let event_cache_store = Arc::new(MemoryStore::new());
2721
2722        let client = MockClientBuilder::new(None)
2723            .on_builder(|builder| {
2724                builder.store_config(
2725                    StoreConfig::new("hodlor".to_owned())
2726                        .event_cache_store(event_cache_store.clone()),
2727                )
2728            })
2729            .build()
2730            .await;
2731
2732        let event_cache = client.event_cache();
2733
2734        // Don't forget to subscribe and like.
2735        event_cache.subscribe().unwrap();
2736
2737        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2738        let room = client.get_room(room_id).unwrap();
2739
2740        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2741        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2742
2743        // Propagate an update for a message and a prev-batch token.
2744        let timeline = Timeline {
2745            limited: true,
2746            prev_batch: Some("raclette".to_owned()),
2747            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2748        };
2749
2750        room_event_cache
2751            .inner
2752            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2753            .await
2754            .unwrap();
2755
2756        // Just checking the generic update is correct.
2757        assert_matches!(
2758            generic_stream.recv().await,
2759            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2760                assert_eq!(expected_room_id, room_id);
2761            }
2762        );
2763
2764        // Check the storage.
2765        let linked_chunk = from_all_chunks::<3, _, _>(
2766            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2767        )
2768        .unwrap()
2769        .unwrap();
2770
2771        assert_eq!(linked_chunk.chunks().count(), 2);
2772
2773        let mut chunks = linked_chunk.chunks();
2774
2775        // We start with the gap.
2776        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2777            assert_eq!(gap.prev_token, "raclette");
2778        });
2779
2780        // Then we have the stored event.
2781        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2782            assert_eq!(events.len(), 1);
2783            let deserialized = events[0].raw().deserialize().unwrap();
2784            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2785            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2786        });
2787
2788        // That's all, folks!
2789        assert!(chunks.next().is_none());
2790    }
2791
2792    #[async_test]
2793    async fn test_write_to_storage_strips_bundled_relations() {
2794        let room_id = room_id!("!galette:saucisse.bzh");
2795        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2796
2797        let event_cache_store = Arc::new(MemoryStore::new());
2798
2799        let client = MockClientBuilder::new(None)
2800            .on_builder(|builder| {
2801                builder.store_config(
2802                    StoreConfig::new("hodlor".to_owned())
2803                        .event_cache_store(event_cache_store.clone()),
2804                )
2805            })
2806            .build()
2807            .await;
2808
2809        let event_cache = client.event_cache();
2810
2811        // Don't forget to subscribe and like.
2812        event_cache.subscribe().unwrap();
2813
2814        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2815        let room = client.get_room(room_id).unwrap();
2816
2817        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2818        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2819
2820        // Propagate an update for a message with bundled relations.
2821        let ev = f
2822            .text_msg("hey yo")
2823            .sender(*ALICE)
2824            .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2825            .into_event();
2826
2827        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2828
2829        room_event_cache
2830            .inner
2831            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2832            .await
2833            .unwrap();
2834
2835        // Just checking the generic update is correct.
2836        assert_matches!(
2837            generic_stream.recv().await,
2838            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2839                assert_eq!(expected_room_id, room_id);
2840            }
2841        );
2842
2843        // The in-memory linked chunk keeps the bundled relation.
2844        {
2845            let events = room_event_cache.events().await.unwrap();
2846
2847            assert_eq!(events.len(), 1);
2848
2849            let ev = events[0].raw().deserialize().unwrap();
2850            assert_let!(
2851                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2852            );
2853
2854            let original = msg.as_original().unwrap();
2855            assert_eq!(original.content.body(), "hey yo");
2856            assert!(original.unsigned.relations.replace.is_some());
2857        }
2858
2859        // The one in storage does not.
2860        let linked_chunk = from_all_chunks::<3, _, _>(
2861            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2862        )
2863        .unwrap()
2864        .unwrap();
2865
2866        assert_eq!(linked_chunk.chunks().count(), 1);
2867
2868        let mut chunks = linked_chunk.chunks();
2869        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2870            assert_eq!(events.len(), 1);
2871
2872            let ev = events[0].raw().deserialize().unwrap();
2873            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2874
2875            let original = msg.as_original().unwrap();
2876            assert_eq!(original.content.body(), "hey yo");
2877            assert!(original.unsigned.relations.replace.is_none());
2878        });
2879
2880        // That's all, folks!
2881        assert!(chunks.next().is_none());
2882    }
2883
2884    #[async_test]
2885    async fn test_clear() {
2886        let room_id = room_id!("!galette:saucisse.bzh");
2887        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2888
2889        let event_cache_store = Arc::new(MemoryStore::new());
2890
2891        let event_id1 = event_id!("$1");
2892        let event_id2 = event_id!("$2");
2893
2894        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2895        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2896
2897        // Prefill the store with some data.
2898        event_cache_store
2899            .handle_linked_chunk_updates(
2900                LinkedChunkId::Room(room_id),
2901                vec![
2902                    // An empty items chunk.
2903                    Update::NewItemsChunk {
2904                        previous: None,
2905                        new: ChunkIdentifier::new(0),
2906                        next: None,
2907                    },
2908                    // A gap chunk.
2909                    Update::NewGapChunk {
2910                        previous: Some(ChunkIdentifier::new(0)),
2911                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
2912                        new: ChunkIdentifier::new(42),
2913                        next: None,
2914                        gap: Gap { prev_token: "comté".to_owned() },
2915                    },
2916                    // Another items chunk, non-empty this time.
2917                    Update::NewItemsChunk {
2918                        previous: Some(ChunkIdentifier::new(42)),
2919                        new: ChunkIdentifier::new(1),
2920                        next: None,
2921                    },
2922                    Update::PushItems {
2923                        at: Position::new(ChunkIdentifier::new(1), 0),
2924                        items: vec![ev1.clone()],
2925                    },
2926                    // And another items chunk, non-empty again.
2927                    Update::NewItemsChunk {
2928                        previous: Some(ChunkIdentifier::new(1)),
2929                        new: ChunkIdentifier::new(2),
2930                        next: None,
2931                    },
2932                    Update::PushItems {
2933                        at: Position::new(ChunkIdentifier::new(2), 0),
2934                        items: vec![ev2.clone()],
2935                    },
2936                ],
2937            )
2938            .await
2939            .unwrap();
2940
2941        let client = MockClientBuilder::new(None)
2942            .on_builder(|builder| {
2943                builder.store_config(
2944                    StoreConfig::new("hodlor".to_owned())
2945                        .event_cache_store(event_cache_store.clone()),
2946                )
2947            })
2948            .build()
2949            .await;
2950
2951        let event_cache = client.event_cache();
2952
2953        // Don't forget to subscribe and like.
2954        event_cache.subscribe().unwrap();
2955
2956        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2957        let room = client.get_room(room_id).unwrap();
2958
2959        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2960
2961        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
2962        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2963
2964        // The rooms knows about all cached events.
2965        {
2966            assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
2967            assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
2968        }
2969
2970        // But only part of events are loaded from the store
2971        {
2972            // The room must contain only one event because only one chunk has been loaded.
2973            assert_eq!(items.len(), 1);
2974            assert_eq!(items[0].event_id().unwrap(), event_id2);
2975
2976            assert!(stream.is_empty());
2977        }
2978
2979        // Let's load more chunks to load all events.
2980        {
2981            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2982
2983            assert_let_timeout!(
2984                Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2985            );
2986            assert_eq!(diffs.len(), 1);
2987            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2988                // Here you are `event_id1`!
2989                assert_eq!(event.event_id().unwrap(), event_id1);
2990            });
2991
2992            assert!(stream.is_empty());
2993        }
2994
2995        // After clearing,…
2996        room_event_cache.clear().await.unwrap();
2997
2998        //… we get an update that the content has been cleared.
2999        assert_let_timeout!(
3000            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3001        );
3002        assert_eq!(diffs.len(), 1);
3003        assert_let!(VectorDiff::Clear = &diffs[0]);
3004
3005        // … same with a generic update.
3006        assert_let_timeout!(
3007            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
3008        );
3009        assert_eq!(received_room_id, room_id);
3010
3011        // Events individually are not forgotten by the event cache, after clearing a
3012        // room.
3013        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3014
3015        // But their presence in a linked chunk is forgotten.
3016        let items = room_event_cache.events().await.unwrap();
3017        assert!(items.is_empty());
3018
3019        // The event cache store too.
3020        let linked_chunk = from_all_chunks::<3, _, _>(
3021            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
3022        )
3023        .unwrap()
3024        .unwrap();
3025
3026        // Note: while the event cache store could return `None` here, clearing it will
3027        // reset it to its initial form, maintaining the invariant that it
3028        // contains a single items chunk that's empty.
3029        assert_eq!(linked_chunk.num_items(), 0);
3030    }
3031
3032    #[async_test]
3033    async fn test_load_from_storage() {
3034        let room_id = room_id!("!galette:saucisse.bzh");
3035        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
3036
3037        let event_cache_store = Arc::new(MemoryStore::new());
3038
3039        let event_id1 = event_id!("$1");
3040        let event_id2 = event_id!("$2");
3041
3042        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
3043        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
3044
3045        // Prefill the store with some data.
3046        event_cache_store
3047            .handle_linked_chunk_updates(
3048                LinkedChunkId::Room(room_id),
3049                vec![
3050                    // An empty items chunk.
3051                    Update::NewItemsChunk {
3052                        previous: None,
3053                        new: ChunkIdentifier::new(0),
3054                        next: None,
3055                    },
3056                    // A gap chunk.
3057                    Update::NewGapChunk {
3058                        previous: Some(ChunkIdentifier::new(0)),
3059                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
3060                        new: ChunkIdentifier::new(42),
3061                        next: None,
3062                        gap: Gap { prev_token: "cheddar".to_owned() },
3063                    },
3064                    // Another items chunk, non-empty this time.
3065                    Update::NewItemsChunk {
3066                        previous: Some(ChunkIdentifier::new(42)),
3067                        new: ChunkIdentifier::new(1),
3068                        next: None,
3069                    },
3070                    Update::PushItems {
3071                        at: Position::new(ChunkIdentifier::new(1), 0),
3072                        items: vec![ev1.clone()],
3073                    },
3074                    // And another items chunk, non-empty again.
3075                    Update::NewItemsChunk {
3076                        previous: Some(ChunkIdentifier::new(1)),
3077                        new: ChunkIdentifier::new(2),
3078                        next: None,
3079                    },
3080                    Update::PushItems {
3081                        at: Position::new(ChunkIdentifier::new(2), 0),
3082                        items: vec![ev2.clone()],
3083                    },
3084                ],
3085            )
3086            .await
3087            .unwrap();
3088
3089        let client = MockClientBuilder::new(None)
3090            .on_builder(|builder| {
3091                builder.store_config(
3092                    StoreConfig::new("hodlor".to_owned())
3093                        .event_cache_store(event_cache_store.clone()),
3094                )
3095            })
3096            .build()
3097            .await;
3098
3099        let event_cache = client.event_cache();
3100
3101        // Don't forget to subscribe and like.
3102        event_cache.subscribe().unwrap();
3103
3104        // Let's check whether the generic updates are received for the initialisation.
3105        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3106
3107        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3108        let room = client.get_room(room_id).unwrap();
3109
3110        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3111
3112        // The room event cache has been loaded. A generic update must have been
3113        // triggered.
3114        assert_matches!(
3115            generic_stream.recv().await,
3116            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3117                assert_eq!(room_id, expected_room_id);
3118            }
3119        );
3120
3121        let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
3122
3123        // The initial items contain one event because only the last chunk is loaded by
3124        // default.
3125        assert_eq!(items.len(), 1);
3126        assert_eq!(items[0].event_id().unwrap(), event_id2);
3127        assert!(stream.is_empty());
3128
3129        // The event cache knows only all events though, even if they aren't loaded.
3130        assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3131        assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
3132
3133        // Let's paginate to load more events.
3134        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3135
3136        assert_let_timeout!(
3137            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3138        );
3139        assert_eq!(diffs.len(), 1);
3140        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
3141            assert_eq!(event.event_id().unwrap(), event_id1);
3142        });
3143
3144        assert!(stream.is_empty());
3145
3146        // A generic update is triggered too.
3147        assert_matches!(
3148            generic_stream.recv().await,
3149            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3150                assert_eq!(expected_room_id, room_id);
3151            }
3152        );
3153
3154        // A new update with one of these events leads to deduplication.
3155        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
3156
3157        room_event_cache
3158            .inner
3159            .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
3160            .await
3161            .unwrap();
3162
3163        // Just checking the generic update is correct. There is a duplicate event, so
3164        // no generic changes whatsoever!
3165        assert!(generic_stream.recv().now_or_never().is_none());
3166
3167        // The stream doesn't report these changes *yet*. Use the items vector given
3168        // when subscribing, to check that the items correspond to their new
3169        // positions. The duplicated item is removed (so it's not the first
3170        // element anymore), and it's added to the back of the list.
3171        let items = room_event_cache.events().await.unwrap();
3172        assert_eq!(items.len(), 2);
3173        assert_eq!(items[0].event_id().unwrap(), event_id1);
3174        assert_eq!(items[1].event_id().unwrap(), event_id2);
3175    }
3176
3177    #[async_test]
3178    async fn test_load_from_storage_resilient_to_failure() {
3179        let room_id = room_id!("!fondue:patate.ch");
3180        let event_cache_store = Arc::new(MemoryStore::new());
3181
3182        let event = EventFactory::new()
3183            .room(room_id)
3184            .sender(user_id!("@ben:saucisse.bzh"))
3185            .text_msg("foo")
3186            .event_id(event_id!("$42"))
3187            .into_event();
3188
3189        // Prefill the store with invalid data: two chunks that form a cycle.
3190        event_cache_store
3191            .handle_linked_chunk_updates(
3192                LinkedChunkId::Room(room_id),
3193                vec![
3194                    Update::NewItemsChunk {
3195                        previous: None,
3196                        new: ChunkIdentifier::new(0),
3197                        next: None,
3198                    },
3199                    Update::PushItems {
3200                        at: Position::new(ChunkIdentifier::new(0), 0),
3201                        items: vec![event],
3202                    },
3203                    Update::NewItemsChunk {
3204                        previous: Some(ChunkIdentifier::new(0)),
3205                        new: ChunkIdentifier::new(1),
3206                        next: Some(ChunkIdentifier::new(0)),
3207                    },
3208                ],
3209            )
3210            .await
3211            .unwrap();
3212
3213        let client = MockClientBuilder::new(None)
3214            .on_builder(|builder| {
3215                builder.store_config(
3216                    StoreConfig::new("holder".to_owned())
3217                        .event_cache_store(event_cache_store.clone()),
3218                )
3219            })
3220            .build()
3221            .await;
3222
3223        let event_cache = client.event_cache();
3224
3225        // Don't forget to subscribe and like.
3226        event_cache.subscribe().unwrap();
3227
3228        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3229        let room = client.get_room(room_id).unwrap();
3230
3231        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3232
3233        let items = room_event_cache.events().await.unwrap();
3234
3235        // Because the persisted content was invalid, the room store is reset: there are
3236        // no events in the cache.
3237        assert!(items.is_empty());
3238
3239        // Storage doesn't contain anything. It would also be valid that it contains a
3240        // single initial empty items chunk.
3241        let raw_chunks =
3242            event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
3243        assert!(raw_chunks.is_empty());
3244    }
3245
3246    #[async_test]
3247    async fn test_no_useless_gaps() {
3248        let room_id = room_id!("!galette:saucisse.bzh");
3249
3250        let client = MockClientBuilder::new(None).build().await;
3251
3252        let event_cache = client.event_cache();
3253        event_cache.subscribe().unwrap();
3254
3255        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3256        let room = client.get_room(room_id).unwrap();
3257        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3258        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3259
3260        let f = EventFactory::new().room(room_id).sender(*ALICE);
3261
3262        // Propagate an update including a limited timeline with one message and a
3263        // prev-batch token.
3264        room_event_cache
3265            .inner
3266            .handle_joined_room_update(JoinedRoomUpdate {
3267                timeline: Timeline {
3268                    limited: true,
3269                    prev_batch: Some("raclette".to_owned()),
3270                    events: vec![f.text_msg("hey yo").into_event()],
3271                },
3272                ..Default::default()
3273            })
3274            .await
3275            .unwrap();
3276
3277        // Just checking the generic update is correct.
3278        assert_matches!(
3279            generic_stream.recv().await,
3280            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3281                assert_eq!(expected_room_id, room_id);
3282            }
3283        );
3284
3285        {
3286            let mut state = room_event_cache.inner.state.write().await.unwrap();
3287
3288            let mut num_gaps = 0;
3289            let mut num_events = 0;
3290
3291            for c in state.room_linked_chunk().chunks() {
3292                match c.content() {
3293                    ChunkContent::Items(items) => num_events += items.len(),
3294                    ChunkContent::Gap(_) => num_gaps += 1,
3295                }
3296            }
3297
3298            // The limited sync unloads the chunk, so it will appear as if there are only
3299            // the events.
3300            assert_eq!(num_gaps, 0);
3301            assert_eq!(num_events, 1);
3302
3303            // But if I manually reload more of the chunk, the gap will be present.
3304            assert_matches!(
3305                state.load_more_events_backwards().await.unwrap(),
3306                LoadMoreEventsBackwardsOutcome::Gap { .. }
3307            );
3308
3309            num_gaps = 0;
3310            num_events = 0;
3311            for c in state.room_linked_chunk().chunks() {
3312                match c.content() {
3313                    ChunkContent::Items(items) => num_events += items.len(),
3314                    ChunkContent::Gap(_) => num_gaps += 1,
3315                }
3316            }
3317
3318            // The gap must have been stored.
3319            assert_eq!(num_gaps, 1);
3320            assert_eq!(num_events, 1);
3321        }
3322
3323        // Now, propagate an update for another message, but the timeline isn't limited
3324        // this time.
3325        room_event_cache
3326            .inner
3327            .handle_joined_room_update(JoinedRoomUpdate {
3328                timeline: Timeline {
3329                    limited: false,
3330                    prev_batch: Some("fondue".to_owned()),
3331                    events: vec![f.text_msg("sup").into_event()],
3332                },
3333                ..Default::default()
3334            })
3335            .await
3336            .unwrap();
3337
3338        // Just checking the generic update is correct.
3339        assert_matches!(
3340            generic_stream.recv().await,
3341            Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3342                assert_eq!(expected_room_id, room_id);
3343            }
3344        );
3345
3346        {
3347            let state = room_event_cache.inner.state.read().await.unwrap();
3348
3349            let mut num_gaps = 0;
3350            let mut num_events = 0;
3351
3352            for c in state.room_linked_chunk().chunks() {
3353                match c.content() {
3354                    ChunkContent::Items(items) => num_events += items.len(),
3355                    ChunkContent::Gap(gap) => {
3356                        assert_eq!(gap.prev_token, "raclette");
3357                        num_gaps += 1;
3358                    }
3359                }
3360            }
3361
3362            // There's only the previous gap, no new ones.
3363            assert_eq!(num_gaps, 1);
3364            assert_eq!(num_events, 2);
3365        }
3366    }
3367
3368    #[async_test]
3369    async fn test_shrink_to_last_chunk() {
3370        let room_id = room_id!("!galette:saucisse.bzh");
3371
3372        let client = MockClientBuilder::new(None).build().await;
3373
3374        let f = EventFactory::new().room(room_id);
3375
3376        let evid1 = event_id!("$1");
3377        let evid2 = event_id!("$2");
3378
3379        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3380        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3381
3382        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3383        {
3384            client
3385                .event_cache_store()
3386                .lock()
3387                .await
3388                .expect("Could not acquire the event cache lock")
3389                .as_clean()
3390                .expect("Could not acquire a clean event cache lock")
3391                .handle_linked_chunk_updates(
3392                    LinkedChunkId::Room(room_id),
3393                    vec![
3394                        Update::NewItemsChunk {
3395                            previous: None,
3396                            new: ChunkIdentifier::new(0),
3397                            next: None,
3398                        },
3399                        Update::PushItems {
3400                            at: Position::new(ChunkIdentifier::new(0), 0),
3401                            items: vec![ev1],
3402                        },
3403                        Update::NewItemsChunk {
3404                            previous: Some(ChunkIdentifier::new(0)),
3405                            new: ChunkIdentifier::new(1),
3406                            next: None,
3407                        },
3408                        Update::PushItems {
3409                            at: Position::new(ChunkIdentifier::new(1), 0),
3410                            items: vec![ev2],
3411                        },
3412                    ],
3413                )
3414                .await
3415                .unwrap();
3416        }
3417
3418        let event_cache = client.event_cache();
3419        event_cache.subscribe().unwrap();
3420
3421        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3422        let room = client.get_room(room_id).unwrap();
3423        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3424
3425        // Sanity check: lazily loaded, so only includes one item at start.
3426        let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
3427        assert_eq!(events.len(), 1);
3428        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3429        assert!(stream.is_empty());
3430
3431        let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3432
3433        // Force loading the full linked chunk by back-paginating.
3434        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3435        assert_eq!(outcome.events.len(), 1);
3436        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3437        assert!(outcome.reached_start);
3438
3439        // We also get an update about the loading from the store.
3440        assert_let_timeout!(
3441            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3442        );
3443        assert_eq!(diffs.len(), 1);
3444        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3445            assert_eq!(value.event_id().as_deref(), Some(evid1));
3446        });
3447
3448        assert!(stream.is_empty());
3449
3450        // Same for the generic update.
3451        assert_let_timeout!(
3452            Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
3453        );
3454        assert_eq!(received_room_id, room_id);
3455
3456        // Shrink the linked chunk to the last chunk.
3457        let diffs = room_event_cache
3458            .inner
3459            .state
3460            .write()
3461            .await
3462            .unwrap()
3463            .force_shrink_to_last_chunk()
3464            .await
3465            .expect("shrinking should succeed");
3466
3467        // We receive updates about the changes to the linked chunk.
3468        assert_eq!(diffs.len(), 2);
3469        assert_matches!(&diffs[0], VectorDiff::Clear);
3470        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
3471            assert_eq!(values.len(), 1);
3472            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3473        });
3474
3475        assert!(stream.is_empty());
3476
3477        // No generic update is sent in this case.
3478        assert!(generic_stream.is_empty());
3479
3480        // When reading the events, we do get only the last one.
3481        let events = room_event_cache.events().await.unwrap();
3482        assert_eq!(events.len(), 1);
3483        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3484
3485        // But if we back-paginate, we don't need access to network to find out about
3486        // the previous event.
3487        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3488        assert_eq!(outcome.events.len(), 1);
3489        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3490        assert!(outcome.reached_start);
3491    }
3492
3493    #[async_test]
3494    async fn test_room_ordering() {
3495        let room_id = room_id!("!galette:saucisse.bzh");
3496
3497        let client = MockClientBuilder::new(None).build().await;
3498
3499        let f = EventFactory::new().room(room_id).sender(*ALICE);
3500
3501        let evid1 = event_id!("$1");
3502        let evid2 = event_id!("$2");
3503        let evid3 = event_id!("$3");
3504
3505        let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3506        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3507        let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3508
3509        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3510        {
3511            client
3512                .event_cache_store()
3513                .lock()
3514                .await
3515                .expect("Could not acquire the event cache lock")
3516                .as_clean()
3517                .expect("Could not acquire a clean event cache lock")
3518                .handle_linked_chunk_updates(
3519                    LinkedChunkId::Room(room_id),
3520                    vec![
3521                        Update::NewItemsChunk {
3522                            previous: None,
3523                            new: ChunkIdentifier::new(0),
3524                            next: None,
3525                        },
3526                        Update::PushItems {
3527                            at: Position::new(ChunkIdentifier::new(0), 0),
3528                            items: vec![ev1, ev2],
3529                        },
3530                        Update::NewItemsChunk {
3531                            previous: Some(ChunkIdentifier::new(0)),
3532                            new: ChunkIdentifier::new(1),
3533                            next: None,
3534                        },
3535                        Update::PushItems {
3536                            at: Position::new(ChunkIdentifier::new(1), 0),
3537                            items: vec![ev3.clone()],
3538                        },
3539                    ],
3540                )
3541                .await
3542                .unwrap();
3543        }
3544
3545        let event_cache = client.event_cache();
3546        event_cache.subscribe().unwrap();
3547
3548        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3549        let room = client.get_room(room_id).unwrap();
3550        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3551
3552        // Initially, the linked chunk only contains the last chunk, so only ev3 is
3553        // loaded.
3554        {
3555            let state = room_event_cache.inner.state.read().await.unwrap();
3556            let room_linked_chunk = state.room_linked_chunk();
3557
3558            // But we can get the order of ev1.
3559            assert_eq!(
3560                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3561                Some(0)
3562            );
3563
3564            // And that of ev2 as well.
3565            assert_eq!(
3566                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3567                Some(1)
3568            );
3569
3570            // ev3, which is loaded, also has a known ordering.
3571            let mut events = room_linked_chunk.events();
3572            let (pos, ev) = events.next().unwrap();
3573            assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3574            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3575            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3576
3577            // No other loaded events.
3578            assert!(events.next().is_none());
3579        }
3580
3581        // Force loading the full linked chunk by back-paginating.
3582        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3583        assert!(outcome.reached_start);
3584
3585        // All events are now loaded, so their order is precisely their enumerated index
3586        // in a linear iteration.
3587        {
3588            let state = room_event_cache.inner.state.read().await.unwrap();
3589            let room_linked_chunk = state.room_linked_chunk();
3590
3591            for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
3592                assert_eq!(room_linked_chunk.event_order(pos), Some(i));
3593            }
3594        }
3595
3596        // Handle a gappy sync with two events (including one duplicate, so
3597        // deduplication kicks in), so that the linked chunk is shrunk to the
3598        // last chunk, and that the linked chunk only contains the last two
3599        // events.
3600        let evid4 = event_id!("$4");
3601        room_event_cache
3602            .inner
3603            .handle_joined_room_update(JoinedRoomUpdate {
3604                timeline: Timeline {
3605                    limited: true,
3606                    prev_batch: Some("fondue".to_owned()),
3607                    events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3608                },
3609                ..Default::default()
3610            })
3611            .await
3612            .unwrap();
3613
3614        {
3615            let state = room_event_cache.inner.state.read().await.unwrap();
3616            let room_linked_chunk = state.room_linked_chunk();
3617
3618            // After the shrink, only evid3 and evid4 are loaded.
3619            let mut events = room_linked_chunk.events();
3620
3621            let (pos, ev) = events.next().unwrap();
3622            assert_eq!(ev.event_id().as_deref(), Some(evid3));
3623            assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3624
3625            let (pos, ev) = events.next().unwrap();
3626            assert_eq!(ev.event_id().as_deref(), Some(evid4));
3627            assert_eq!(room_linked_chunk.event_order(pos), Some(3));
3628
3629            // No other loaded events.
3630            assert!(events.next().is_none());
3631
3632            // But we can still get the order of previous events.
3633            assert_eq!(
3634                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3635                Some(0)
3636            );
3637            assert_eq!(
3638                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3639                Some(1)
3640            );
3641
3642            // ev3 doesn't have an order with its previous position, since it's been
3643            // deduplicated.
3644            assert_eq!(
3645                room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
3646                None
3647            );
3648        }
3649    }
3650
3651    #[async_test]
3652    async fn test_auto_shrink_after_all_subscribers_are_gone() {
3653        let room_id = room_id!("!galette:saucisse.bzh");
3654
3655        let client = MockClientBuilder::new(None).build().await;
3656
3657        let f = EventFactory::new().room(room_id);
3658
3659        let evid1 = event_id!("$1");
3660        let evid2 = event_id!("$2");
3661
3662        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3663        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3664
3665        // Fill the event cache store with an initial linked chunk with 2 events chunks.
3666        {
3667            client
3668                .event_cache_store()
3669                .lock()
3670                .await
3671                .expect("Could not acquire the event cache lock")
3672                .as_clean()
3673                .expect("Could not acquire a clean event cache lock")
3674                .handle_linked_chunk_updates(
3675                    LinkedChunkId::Room(room_id),
3676                    vec![
3677                        Update::NewItemsChunk {
3678                            previous: None,
3679                            new: ChunkIdentifier::new(0),
3680                            next: None,
3681                        },
3682                        Update::PushItems {
3683                            at: Position::new(ChunkIdentifier::new(0), 0),
3684                            items: vec![ev1],
3685                        },
3686                        Update::NewItemsChunk {
3687                            previous: Some(ChunkIdentifier::new(0)),
3688                            new: ChunkIdentifier::new(1),
3689                            next: None,
3690                        },
3691                        Update::PushItems {
3692                            at: Position::new(ChunkIdentifier::new(1), 0),
3693                            items: vec![ev2],
3694                        },
3695                    ],
3696                )
3697                .await
3698                .unwrap();
3699        }
3700
3701        let event_cache = client.event_cache();
3702        event_cache.subscribe().unwrap();
3703
3704        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3705        let room = client.get_room(room_id).unwrap();
3706        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3707
3708        // Sanity check: lazily loaded, so only includes one item at start.
3709        let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
3710        assert_eq!(events1.len(), 1);
3711        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3712        assert!(stream1.is_empty());
3713
3714        // Force loading the full linked chunk by back-paginating.
3715        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3716        assert_eq!(outcome.events.len(), 1);
3717        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3718        assert!(outcome.reached_start);
3719
3720        // We also get an update about the loading from the store. Ignore it, for this
3721        // test's sake.
3722        assert_let_timeout!(
3723            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3724        );
3725        assert_eq!(diffs.len(), 1);
3726        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3727            assert_eq!(value.event_id().as_deref(), Some(evid1));
3728        });
3729
3730        assert!(stream1.is_empty());
3731
3732        // Have another subscriber.
3733        // Since it's not the first one, and the previous one loaded some more events,
3734        // the second subscribers sees them all.
3735        let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
3736        assert_eq!(events2.len(), 2);
3737        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3738        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3739        assert!(stream2.is_empty());
3740
3741        // Drop the first stream, and wait a bit.
3742        drop(stream1);
3743        yield_now().await;
3744
3745        // The second stream remains undisturbed.
3746        assert!(stream2.is_empty());
3747
3748        // Now drop the second stream, and wait a bit.
3749        drop(stream2);
3750        yield_now().await;
3751
3752        // The linked chunk must have auto-shrunk by now.
3753
3754        {
3755            // Check the inner state: there's no more shared auto-shrinker.
3756            let state = room_event_cache.inner.state.read().await.unwrap();
3757            assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
3758        }
3759
3760        // Getting the events will only give us the latest chunk.
3761        let events3 = room_event_cache.events().await.unwrap();
3762        assert_eq!(events3.len(), 1);
3763        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3764    }
3765
3766    #[async_test]
3767    async fn test_rfind_map_event_in_memory_by() {
3768        let user_id = user_id!("@mnt_io:matrix.org");
3769        let room_id = room_id!("!raclette:patate.ch");
3770        let client = MockClientBuilder::new(None).build().await;
3771
3772        let event_factory = EventFactory::new().room(room_id);
3773
3774        let event_id_0 = event_id!("$ev0");
3775        let event_id_1 = event_id!("$ev1");
3776        let event_id_2 = event_id!("$ev2");
3777        let event_id_3 = event_id!("$ev3");
3778
3779        let event_0 =
3780            event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3781        let event_1 =
3782            event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3783        let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3784        let event_3 =
3785            event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3786
3787        // Fill the event cache store with an initial linked chunk of 2 chunks, and 4
3788        // events.
3789        {
3790            client
3791                .event_cache_store()
3792                .lock()
3793                .await
3794                .expect("Could not acquire the event cache lock")
3795                .as_clean()
3796                .expect("Could not acquire a clean event cache lock")
3797                .handle_linked_chunk_updates(
3798                    LinkedChunkId::Room(room_id),
3799                    vec![
3800                        Update::NewItemsChunk {
3801                            previous: None,
3802                            new: ChunkIdentifier::new(0),
3803                            next: None,
3804                        },
3805                        Update::PushItems {
3806                            at: Position::new(ChunkIdentifier::new(0), 0),
3807                            items: vec![event_3],
3808                        },
3809                        Update::NewItemsChunk {
3810                            previous: Some(ChunkIdentifier::new(0)),
3811                            new: ChunkIdentifier::new(1),
3812                            next: None,
3813                        },
3814                        Update::PushItems {
3815                            at: Position::new(ChunkIdentifier::new(1), 0),
3816                            items: vec![event_0, event_1, event_2],
3817                        },
3818                    ],
3819                )
3820                .await
3821                .unwrap();
3822        }
3823
3824        let event_cache = client.event_cache();
3825        event_cache.subscribe().unwrap();
3826
3827        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3828        let room = client.get_room(room_id).unwrap();
3829        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3830
3831        // Look for an event from `BOB`: it must be `event_0`.
3832        assert_matches!(
3833            room_event_cache
3834                .rfind_map_event_in_memory_by(|event, previous_event| {
3835                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| (event.event_id(), previous_event.and_then(|e| e.event_id())))
3836                })
3837                .await,
3838            Ok(Some((event_id, previous_event_id))) => {
3839                assert_eq!(event_id.as_deref(), Some(event_id_0));
3840                assert!(previous_event_id.is_none());
3841            }
3842        );
3843
3844        // Look for an event from `ALICE`: it must be `event_2`, right before `event_1`
3845        // because events are looked for in reverse order.
3846        assert_matches!(
3847            room_event_cache
3848                .rfind_map_event_in_memory_by(|event, previous_event| {
3849                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| (event.event_id(), previous_event.and_then(|e| e.event_id())))
3850                })
3851                .await,
3852            Ok(Some((event_id, previous_event_id))) => {
3853                assert_eq!(event_id.as_deref(), Some(event_id_2));
3854                assert_eq!(previous_event_id.as_deref(), Some(event_id_1));
3855            }
3856        );
3857
3858        // Look for an event that is inside the storage, but not loaded.
3859        assert!(
3860            room_event_cache
3861                .rfind_map_event_in_memory_by(|event, _| {
3862                    (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
3863                        == Some(user_id))
3864                    .then(|| event.event_id())
3865                })
3866                .await
3867                .unwrap()
3868                .is_none()
3869        );
3870
3871        // Look for an event that doesn't exist.
3872        assert!(
3873            room_event_cache
3874                .rfind_map_event_in_memory_by(|_, _| None::<()>)
3875                .await
3876                .unwrap()
3877                .is_none()
3878        );
3879    }
3880
3881    #[async_test]
3882    async fn test_reload_when_dirty() {
3883        let user_id = user_id!("@mnt_io:matrix.org");
3884        let room_id = room_id!("!raclette:patate.ch");
3885
3886        // The storage shared by the two clients.
3887        let event_cache_store = MemoryStore::new();
3888
3889        // Client for the process 0.
3890        let client_p0 = MockClientBuilder::new(None)
3891            .on_builder(|builder| {
3892                builder.store_config(
3893                    StoreConfig::new("process #0".to_owned())
3894                        .event_cache_store(event_cache_store.clone()),
3895                )
3896            })
3897            .build()
3898            .await;
3899
3900        // Client for the process 1.
3901        let client_p1 = MockClientBuilder::new(None)
3902            .on_builder(|builder| {
3903                builder.store_config(
3904                    StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
3905                )
3906            })
3907            .build()
3908            .await;
3909
3910        let event_factory = EventFactory::new().room(room_id).sender(user_id);
3911
3912        let ev_id_0 = event_id!("$ev_0");
3913        let ev_id_1 = event_id!("$ev_1");
3914
3915        let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
3916        let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
3917
3918        // Add events to the storage (shared by the two clients!).
3919        client_p0
3920            .event_cache_store()
3921            .lock()
3922            .await
3923            .expect("[p0] Could not acquire the event cache lock")
3924            .as_clean()
3925            .expect("[p0] Could not acquire a clean event cache lock")
3926            .handle_linked_chunk_updates(
3927                LinkedChunkId::Room(room_id),
3928                vec![
3929                    Update::NewItemsChunk {
3930                        previous: None,
3931                        new: ChunkIdentifier::new(0),
3932                        next: None,
3933                    },
3934                    Update::PushItems {
3935                        at: Position::new(ChunkIdentifier::new(0), 0),
3936                        items: vec![ev_0],
3937                    },
3938                    Update::NewItemsChunk {
3939                        previous: Some(ChunkIdentifier::new(0)),
3940                        new: ChunkIdentifier::new(1),
3941                        next: None,
3942                    },
3943                    Update::PushItems {
3944                        at: Position::new(ChunkIdentifier::new(1), 0),
3945                        items: vec![ev_1],
3946                    },
3947                ],
3948            )
3949            .await
3950            .unwrap();
3951
3952        // Subscribe the event caches, and create the room.
3953        let (room_event_cache_p0, room_event_cache_p1) = {
3954            let event_cache_p0 = client_p0.event_cache();
3955            event_cache_p0.subscribe().unwrap();
3956
3957            let event_cache_p1 = client_p1.event_cache();
3958            event_cache_p1.subscribe().unwrap();
3959
3960            client_p0.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3961            client_p1.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3962
3963            let (room_event_cache_p0, _drop_handles) =
3964                client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
3965            let (room_event_cache_p1, _drop_handles) =
3966                client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
3967
3968            (room_event_cache_p0, room_event_cache_p1)
3969        };
3970
3971        // Okay. We are ready for the test!
3972        //
3973        // First off, let's check `room_event_cache_p0` has access to the first event
3974        // loaded in-memory, then do a pagination, and see more events.
3975        let mut updates_stream_p0 = {
3976            let room_event_cache = &room_event_cache_p0;
3977
3978            let (initial_updates, mut updates_stream) =
3979                room_event_cache_p0.subscribe().await.unwrap();
3980
3981            // Initial updates contain `ev_id_1` only.
3982            assert_eq!(initial_updates.len(), 1);
3983            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
3984            assert!(updates_stream.is_empty());
3985
3986            // `ev_id_1` must be loaded in memory.
3987            assert!(event_loaded(room_event_cache, ev_id_1).await);
3988
3989            // `ev_id_0` must NOT be loaded in memory.
3990            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
3991
3992            // Load one more event with a backpagination.
3993            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
3994
3995            // A new update for `ev_id_0` must be present.
3996            assert_matches!(
3997                updates_stream.recv().await.unwrap(),
3998                RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
3999                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
4000                    assert_matches!(
4001                        &diffs[0],
4002                        VectorDiff::Insert { index: 0, value: event } => {
4003                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4004                        }
4005                    );
4006                }
4007            );
4008
4009            // `ev_id_0` must now be loaded in memory.
4010            assert!(event_loaded(room_event_cache, ev_id_0).await);
4011
4012            updates_stream
4013        };
4014
4015        // Second, let's check `room_event_cache_p1` has the same accesses.
4016        let mut updates_stream_p1 = {
4017            let room_event_cache = &room_event_cache_p1;
4018            let (initial_updates, mut updates_stream) =
4019                room_event_cache_p1.subscribe().await.unwrap();
4020
4021            // Initial updates contain `ev_id_1` only.
4022            assert_eq!(initial_updates.len(), 1);
4023            assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
4024            assert!(updates_stream.is_empty());
4025
4026            // `ev_id_1` must be loaded in memory.
4027            assert!(event_loaded(room_event_cache, ev_id_1).await);
4028
4029            // `ev_id_0` must NOT be loaded in memory.
4030            assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4031
4032            // Load one more event with a backpagination.
4033            room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4034
4035            // A new update for `ev_id_0` must be present.
4036            assert_matches!(
4037                updates_stream.recv().await.unwrap(),
4038                RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4039                    assert_eq!(diffs.len(), 1, "{diffs:#?}");
4040                    assert_matches!(
4041                        &diffs[0],
4042                        VectorDiff::Insert { index: 0, value: event } => {
4043                            assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4044                        }
4045                    );
4046                }
4047            );
4048
4049            // `ev_id_0` must now be loaded in memory.
4050            assert!(event_loaded(room_event_cache, ev_id_0).await);
4051
4052            updates_stream
4053        };
4054
4055        // Do this a couple times, for the fun.
4056        for _ in 0..3 {
4057            // Third, because `room_event_cache_p1` has locked the store, the lock
4058            // is dirty for `room_event_cache_p0`, so it will shrink to its last
4059            // chunk!
4060            {
4061                let room_event_cache = &room_event_cache_p0;
4062                let updates_stream = &mut updates_stream_p0;
4063
4064                // `ev_id_1` must be loaded in memory, just like before.
4065                assert!(event_loaded(room_event_cache, ev_id_1).await);
4066
4067                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
4068                // state has been reloaded to its last chunk.
4069                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4070
4071                // The reload can be observed via the updates too.
4072                assert_matches!(
4073                    updates_stream.recv().await.unwrap(),
4074                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4075                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4076                        assert_matches!(&diffs[0], VectorDiff::Clear);
4077                        assert_matches!(
4078                            &diffs[1],
4079                            VectorDiff::Append { values: events } => {
4080                                assert_eq!(events.len(), 1);
4081                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4082                            }
4083                        );
4084                    }
4085                );
4086
4087                // Load one more event with a backpagination.
4088                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4089
4090                // `ev_id_0` must now be loaded in memory.
4091                assert!(event_loaded(room_event_cache, ev_id_0).await);
4092
4093                // The pagination can be observed via the updates too.
4094                assert_matches!(
4095                    updates_stream.recv().await.unwrap(),
4096                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4097                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4098                        assert_matches!(
4099                            &diffs[0],
4100                            VectorDiff::Insert { index: 0, value: event } => {
4101                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4102                            }
4103                        );
4104                    }
4105                );
4106            }
4107
4108            // Fourth, because `room_event_cache_p0` has locked the store again, the lock
4109            // is dirty for `room_event_cache_p1` too!, so it will shrink to its last
4110            // chunk!
4111            {
4112                let room_event_cache = &room_event_cache_p1;
4113                let updates_stream = &mut updates_stream_p1;
4114
4115                // `ev_id_1` must be loaded in memory, just like before.
4116                assert!(event_loaded(room_event_cache, ev_id_1).await);
4117
4118                // However, `ev_id_0` must NOT be loaded in memory. It WAS loaded, but the
4119                // state has shrunk to its last chunk.
4120                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4121
4122                // The reload can be observed via the updates too.
4123                assert_matches!(
4124                    updates_stream.recv().await.unwrap(),
4125                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4126                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4127                        assert_matches!(&diffs[0], VectorDiff::Clear);
4128                        assert_matches!(
4129                            &diffs[1],
4130                            VectorDiff::Append { values: events } => {
4131                                assert_eq!(events.len(), 1);
4132                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4133                            }
4134                        );
4135                    }
4136                );
4137
4138                // Load one more event with a backpagination.
4139                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4140
4141                // `ev_id_0` must now be loaded in memory.
4142                assert!(event_loaded(room_event_cache, ev_id_0).await);
4143
4144                // The pagination can be observed via the updates too.
4145                assert_matches!(
4146                    updates_stream.recv().await.unwrap(),
4147                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4148                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4149                        assert_matches!(
4150                            &diffs[0],
4151                            VectorDiff::Insert { index: 0, value: event } => {
4152                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4153                            }
4154                        );
4155                    }
4156                );
4157            }
4158        }
4159
4160        // Repeat that with an explicit read lock (so that we don't rely on
4161        // `event_loaded` to trigger the dirty detection).
4162        for _ in 0..3 {
4163            {
4164                let room_event_cache = &room_event_cache_p0;
4165                let updates_stream = &mut updates_stream_p0;
4166
4167                let guard = room_event_cache.inner.state.read().await.unwrap();
4168
4169                // Guard is kept alive, to ensure we can have multiple read guards alive with a
4170                // shared access.
4171                // See `RoomEventCacheStateLock::read` to learn more.
4172
4173                // The lock is no longer marked as dirty, it's been cleaned.
4174                assert!(guard.is_dirty().not());
4175
4176                // The reload can be observed via the updates too.
4177                assert_matches!(
4178                    updates_stream.recv().await.unwrap(),
4179                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4180                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4181                        assert_matches!(&diffs[0], VectorDiff::Clear);
4182                        assert_matches!(
4183                            &diffs[1],
4184                            VectorDiff::Append { values: events } => {
4185                                assert_eq!(events.len(), 1);
4186                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4187                            }
4188                        );
4189                    }
4190                );
4191
4192                assert!(event_loaded(room_event_cache, ev_id_1).await);
4193                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4194
4195                // Ensure `guard` is alive up to this point (in case this test is refactored, I
4196                // want to make this super explicit).
4197                //
4198                // We drop need to drop it before the pagination because the pagination needs to
4199                // obtain a write lock.
4200                drop(guard);
4201
4202                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4203                assert!(event_loaded(room_event_cache, ev_id_0).await);
4204
4205                // The pagination can be observed via the updates too.
4206                assert_matches!(
4207                    updates_stream.recv().await.unwrap(),
4208                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4209                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4210                        assert_matches!(
4211                            &diffs[0],
4212                            VectorDiff::Insert { index: 0, value: event } => {
4213                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4214                            }
4215                        );
4216                    }
4217                );
4218            }
4219
4220            {
4221                let room_event_cache = &room_event_cache_p1;
4222                let updates_stream = &mut updates_stream_p1;
4223
4224                let guard = room_event_cache.inner.state.read().await.unwrap();
4225
4226                // Guard is kept alive, to ensure we can have multiple read guards alive with a
4227                // shared access.
4228
4229                // The lock is no longer marked as dirty, it's been cleaned.
4230                assert!(guard.is_dirty().not());
4231
4232                // The reload can be observed via the updates too.
4233                assert_matches!(
4234                    updates_stream.recv().await.unwrap(),
4235                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4236                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4237                        assert_matches!(&diffs[0], VectorDiff::Clear);
4238                        assert_matches!(
4239                            &diffs[1],
4240                            VectorDiff::Append { values: events } => {
4241                                assert_eq!(events.len(), 1);
4242                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4243                            }
4244                        );
4245                    }
4246                );
4247
4248                assert!(event_loaded(room_event_cache, ev_id_1).await);
4249                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4250
4251                // Ensure `guard` is alive up to this point (in case this test is refactored, I
4252                // want to make this super explicit).
4253                //
4254                // We drop need to drop it before the pagination because the pagination needs to
4255                // obtain a write lock.
4256                drop(guard);
4257
4258                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4259                assert!(event_loaded(room_event_cache, ev_id_0).await);
4260
4261                // The pagination can be observed via the updates too.
4262                assert_matches!(
4263                    updates_stream.recv().await.unwrap(),
4264                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4265                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4266                        assert_matches!(
4267                            &diffs[0],
4268                            VectorDiff::Insert { index: 0, value: event } => {
4269                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4270                            }
4271                        );
4272                    }
4273                );
4274            }
4275        }
4276
4277        // Repeat that with an explicit write lock.
4278        for _ in 0..3 {
4279            {
4280                let room_event_cache = &room_event_cache_p0;
4281                let updates_stream = &mut updates_stream_p0;
4282
4283                let guard = room_event_cache.inner.state.write().await.unwrap();
4284
4285                // The lock is no longer marked as dirty, it's been cleaned.
4286                assert!(guard.is_dirty().not());
4287
4288                // The reload can be observed via the updates too.
4289                assert_matches!(
4290                    updates_stream.recv().await.unwrap(),
4291                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4292                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4293                        assert_matches!(&diffs[0], VectorDiff::Clear);
4294                        assert_matches!(
4295                            &diffs[1],
4296                            VectorDiff::Append { values: events } => {
4297                                assert_eq!(events.len(), 1);
4298                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4299                            }
4300                        );
4301                    }
4302                );
4303
4304                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
4305                // needs to obtain a read lock.
4306                drop(guard);
4307
4308                assert!(event_loaded(room_event_cache, ev_id_1).await);
4309                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4310
4311                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4312                assert!(event_loaded(room_event_cache, ev_id_0).await);
4313
4314                // The pagination can be observed via the updates too.
4315                assert_matches!(
4316                    updates_stream.recv().await.unwrap(),
4317                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4318                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4319                        assert_matches!(
4320                            &diffs[0],
4321                            VectorDiff::Insert { index: 0, value: event } => {
4322                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4323                            }
4324                        );
4325                    }
4326                );
4327            }
4328
4329            {
4330                let room_event_cache = &room_event_cache_p1;
4331                let updates_stream = &mut updates_stream_p1;
4332
4333                let guard = room_event_cache.inner.state.write().await.unwrap();
4334
4335                // The lock is no longer marked as dirty, it's been cleaned.
4336                assert!(guard.is_dirty().not());
4337
4338                // The reload can be observed via the updates too.
4339                assert_matches!(
4340                    updates_stream.recv().await.unwrap(),
4341                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4342                        assert_eq!(diffs.len(), 2, "{diffs:#?}");
4343                        assert_matches!(&diffs[0], VectorDiff::Clear);
4344                        assert_matches!(
4345                            &diffs[1],
4346                            VectorDiff::Append { values: events } => {
4347                                assert_eq!(events.len(), 1);
4348                                assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4349                            }
4350                        );
4351                    }
4352                );
4353
4354                // Guard isn't kept alive, otherwise `event_loaded` couldn't run because it
4355                // needs to obtain a read lock.
4356                drop(guard);
4357
4358                assert!(event_loaded(room_event_cache, ev_id_1).await);
4359                assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4360
4361                room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4362                assert!(event_loaded(room_event_cache, ev_id_0).await);
4363
4364                // The pagination can be observed via the updates too.
4365                assert_matches!(
4366                    updates_stream.recv().await.unwrap(),
4367                    RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4368                        assert_eq!(diffs.len(), 1, "{diffs:#?}");
4369                        assert_matches!(
4370                            &diffs[0],
4371                            VectorDiff::Insert { index: 0, value: event } => {
4372                                assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4373                            }
4374                        );
4375                    }
4376                );
4377            }
4378        }
4379    }
4380
4381    #[async_test]
4382    async fn test_load_when_dirty() {
4383        let room_id_0 = room_id!("!raclette:patate.ch");
4384        let room_id_1 = room_id!("!morbiflette:patate.ch");
4385
4386        // The storage shared by the two clients.
4387        let event_cache_store = MemoryStore::new();
4388
4389        // Client for the process 0.
4390        let client_p0 = MockClientBuilder::new(None)
4391            .on_builder(|builder| {
4392                builder.store_config(
4393                    StoreConfig::new("process #0".to_owned())
4394                        .event_cache_store(event_cache_store.clone()),
4395                )
4396            })
4397            .build()
4398            .await;
4399
4400        // Client for the process 1.
4401        let client_p1 = MockClientBuilder::new(None)
4402            .on_builder(|builder| {
4403                builder.store_config(
4404                    StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
4405                )
4406            })
4407            .build()
4408            .await;
4409
4410        // Subscribe the event caches, and create the room.
4411        let (room_event_cache_0_p0, room_event_cache_0_p1) = {
4412            let event_cache_p0 = client_p0.event_cache();
4413            event_cache_p0.subscribe().unwrap();
4414
4415            let event_cache_p1 = client_p1.event_cache();
4416            event_cache_p1.subscribe().unwrap();
4417
4418            client_p0
4419                .base_client()
4420                .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4421            client_p0
4422                .base_client()
4423                .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4424
4425            client_p1
4426                .base_client()
4427                .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4428            client_p1
4429                .base_client()
4430                .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4431
4432            let (room_event_cache_0_p0, _drop_handles) =
4433                client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4434            let (room_event_cache_0_p1, _drop_handles) =
4435                client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4436
4437            (room_event_cache_0_p0, room_event_cache_0_p1)
4438        };
4439
4440        // Let's make the cross-process lock over the store dirty.
4441        {
4442            drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
4443            drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
4444        }
4445
4446        // Create the `RoomEventCache` for `room_id_1`. During its creation, the
4447        // cross-process lock over the store MUST be dirty, which makes no difference as
4448        // a clean one: the state is just loaded, not reloaded.
4449        let (room_event_cache_1_p0, _) =
4450            client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
4451
4452        // Check the lock isn't dirty because it's been cleared.
4453        {
4454            let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
4455            assert!(guard.is_dirty().not());
4456        }
4457
4458        // The only way to test this behaviour is to see that the dirty block in
4459        // `RoomEventCacheStateLock` is covered by this test.
4460    }
4461
4462    async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
4463        room_event_cache
4464            .rfind_map_event_in_memory_by(|event, _previous_event_id| {
4465                (event.event_id().as_deref() == Some(event_id)).then_some(())
4466            })
4467            .await
4468            .unwrap()
4469            .is_some()
4470    }
4471}