Skip to main content

matrix_sdk/event_cache/caches/room/
state.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{BTreeMap, HashMap, HashSet},
17    sync::{
18        Arc, OnceLock,
19        atomic::{AtomicUsize, Ordering},
20    },
21};
22
23use eyeball::SharedObservable;
24use eyeball_im::VectorDiff;
25use matrix_sdk_base::{
26    RoomInfoNotableUpdateReasons, apply_redaction, check_validity_of_replacement_events,
27    deserialized_responses::{ThreadSummary, ThreadSummaryStatus},
28    event_cache::{
29        Event, Gap,
30        store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState},
31    },
32    linked_chunk::{
33        ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, OwnedLinkedChunkId, Position,
34        Update, lazy_loader,
35    },
36    serde_helpers::{extract_edit_target, extract_thread_root},
37    sync::Timeline,
38};
39use matrix_sdk_common::executor::spawn;
40use ruma::{
41    EventId, OwnedEventId, OwnedRoomId, OwnedUserId,
42    events::{
43        AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
44        MessageLikeEventType,
45        receipt::{ReceiptEventContent, SyncReceiptEvent},
46        relation::RelationType,
47        room::redaction::SyncRoomRedactionEvent,
48    },
49    room_version_rules::RoomVersionRules,
50    serde::Raw,
51};
52use tokio::sync::broadcast::{Receiver, Sender};
53use tracing::{debug, error, instrument, trace, warn};
54
55use super::{
56    super::{
57        super::{
58            EventCacheError,
59            deduplicator::{DeduplicationOutcome, filter_duplicate_events},
60            persistence::send_updates_to_store,
61        },
62        EventLocation, TimelineVectorDiffs,
63        event_focused::{EventFocusThreadMode, EventFocusedCache},
64        event_linked_chunk::EventLinkedChunk,
65        lock,
66        pinned_events::PinnedEventCache,
67        read_receipts::compute_unread_counts,
68        thread::ThreadEventCache,
69    },
70    EventsOrigin, PostProcessingOrigin, RoomEventCacheGenericUpdate,
71    RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate, RoomEventCacheUpdateSender,
72    sort_positions_descending,
73};
74use crate::{
75    Room,
76    event_cache::{
77        automatic_pagination::AutomaticPagination, caches::pagination::SharedPaginationStatus,
78    },
79    room::WeakRoom,
80};
81
82/// Key for the event-focused caches.
83#[derive(Hash, PartialEq, Eq)]
84struct EventFocusedCacheKey {
85    /// The event ID that the cache is focused on.
86    focused: OwnedEventId,
87    /// The thread mode for this cache.
88    thread_mode: EventFocusThreadMode,
89}
90
91pub struct RoomEventCacheState {
92    /// Whether thread support has been enabled for the event cache.
93    enabled_thread_support: bool,
94
95    /// The room this state relates to.
96    pub room_id: OwnedRoomId,
97
98    /// A weak reference to the actual room.
99    weak_room: WeakRoom,
100
101    /// The user's own user id.
102    pub own_user_id: OwnedUserId,
103
104    /// Reference to the underlying backing store.
105    store: EventCacheStoreLock,
106
107    /// The loaded events for the current room, that is, the in-memory
108    /// linked chunk for this room.
109    room_linked_chunk: EventLinkedChunk,
110
111    /// Threads present in this room.
112    ///
113    /// Keyed by the thread root event ID.
114    threads: HashMap<OwnedEventId, ThreadEventCache>,
115
116    /// Event-focused caches for this room.
117    ///
118    /// Keyed by the focused event ID and thread mode. Each entry represents
119    /// a timeline centered around a specific event (e.g. from a
120    /// permalink).
121    event_focused_caches: HashMap<EventFocusedCacheKey, EventFocusedCache>,
122
123    /// Cache for pinned events in this room, initialized on-demand.
124    pinned_event_cache: OnceLock<PinnedEventCache>,
125
126    pagination_status: SharedObservable<SharedPaginationStatus>,
127
128    /// A clone of [`super::RoomEventCacheInner::update_sender`].
129    ///
130    /// This is used only by the [`RoomEventCacheStateLock::read`] and
131    /// [`RoomEventCacheStateLock::write`] when the state must be reset.
132    update_sender: RoomEventCacheUpdateSender,
133
134    /// A clone of
135    /// [`super::super::EventCacheInner::linked_chunk_update_sender`].
136    pub(super) linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
137
138    /// The rules for the version of this room.
139    room_version_rules: RoomVersionRules,
140
141    /// Have we ever waited for a previous-batch-token to come from sync, in
142    /// the context of pagination? We do this at most once per room,
143    /// the first time we try to run backward pagination. We reset
144    /// that upon clearing the timeline events.
145    waited_for_initial_prev_token: bool,
146
147    /// An atomic count of the current number of subscriber of the
148    /// [`super::RoomEventCache`].
149    subscriber_count: Arc<AtomicUsize>,
150
151    /// A copy of the automatic pagination API object.
152    automatic_pagination: Option<AutomaticPagination>,
153}
154
155impl RoomEventCacheState {
156    /// Return a read-only reference to the underlying room linked chunk.
157    pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
158        &self.room_linked_chunk
159    }
160
161    /// Implementation of [`RoomEventCacheStateLockReadGuard::find_event`] and
162    /// [`RoomEventCacheStateLockWriteGuard::find_event`].
163    async fn find_event(
164        &self,
165        event_id: &EventId,
166        store: &EventCacheStoreLockGuard,
167    ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
168        // There are supposedly fewer events loaded in memory than in the store. Let's
169        // start by looking up in the `EventLinkedChunk`.
170        for (position, event) in self.room_linked_chunk.revents() {
171            if event.event_id().as_deref() == Some(event_id) {
172                return Ok(Some((EventLocation::Memory(position), event.clone())));
173            }
174        }
175
176        Ok(store
177            .find_event(&self.room_id, event_id)
178            .await?
179            .map(|event| (EventLocation::Store, event)))
180    }
181
182    /// Implementation of
183    /// [`RoomEventCacheStateLockReadGuard::find_event_with_relations`] and
184    /// [`RoomEventCacheStateLockWriteGuard::find_event_with_relations`].
185    async fn find_event_with_relations(
186        &self,
187        event_id: &EventId,
188        filters: Option<Vec<RelationType>>,
189        store: &EventCacheStoreLockGuard,
190    ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
191        // First, hit storage to get the target event and its related events.
192        let found = store.find_event(&self.room_id, event_id).await?;
193
194        let Some(target) = found else {
195            // We haven't found the event: return early.
196            return Ok(None);
197        };
198
199        // Then, find the transitive closure of all the related events.
200        let related = self.find_event_relations(event_id, filters, store).await?;
201
202        Ok(Some((target, related)))
203    }
204
205    /// Implementation of
206    /// [`RoomEventCacheStateLockReadGuard::find_event_relations`].
207    async fn find_event_relations(
208        &self,
209        event_id: &EventId,
210        filters: Option<Vec<RelationType>>,
211        store: &EventCacheStoreLockGuard,
212    ) -> Result<Vec<Event>, EventCacheError> {
213        // Initialize the stack with all the related events, to find the
214        // transitive closure of all the related events.
215        let mut related =
216            store.find_event_relations(&self.room_id, event_id, filters.as_deref()).await?;
217        let mut stack =
218            related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
219
220        // Also keep track of already seen events, in case there's a loop in the
221        // relation graph.
222        let mut already_seen = HashSet::new();
223        already_seen.insert(event_id.to_owned());
224
225        let mut num_iters = 1;
226
227        // Find the related event for each previously-related event.
228        while let Some(event_id) = stack.pop() {
229            if !already_seen.insert(event_id.clone()) {
230                // Skip events we've already seen.
231                continue;
232            }
233
234            let other_related =
235                store.find_event_relations(&self.room_id, &event_id, filters.as_deref()).await?;
236
237            stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
238            related.extend(other_related);
239
240            num_iters += 1;
241        }
242
243        trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
244
245        // Sort the results by their positions in the linked chunk, if available.
246        //
247        // If an event doesn't have a known position, it goes to the start of the array.
248        related.sort_by(|(_, lhs), (_, rhs)| {
249            use std::cmp::Ordering;
250
251            match (lhs, rhs) {
252                (None, None) => Ordering::Equal,
253                (None, Some(_)) => Ordering::Less,
254                (Some(_), None) => Ordering::Greater,
255                (Some(lhs), Some(rhs)) => {
256                    let lhs = self.room_linked_chunk.event_order(*lhs);
257                    let rhs = self.room_linked_chunk.event_order(*rhs);
258
259                    // The events should have a definite position, but in the case they don't,
260                    // still consider that not having a position means you'll end at the start
261                    // of the array.
262                    match (lhs, rhs) {
263                        (None, None) => Ordering::Equal,
264                        (None, Some(_)) => Ordering::Less,
265                        (Some(_), None) => Ordering::Greater,
266                        (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
267                    }
268                }
269            }
270        });
271
272        // Keep only the events, not their positions.
273        let related = related.into_iter().map(|(event, _pos)| event).collect();
274
275        Ok(related)
276    }
277}
278
279impl lock::Store for RoomEventCacheState {
280    fn store(&self) -> &EventCacheStoreLock {
281        &self.store
282    }
283}
284
285/// State for a single room's event cache.
286///
287/// This contains all the inner mutable states that ought to be updated at
288/// the same time.
289pub type LockedRoomEventCacheState = lock::StateLock<RoomEventCacheState>;
290
291impl LockedRoomEventCacheState {
292    /// Create a new state, or reload it from storage if it's been enabled.
293    ///
294    /// Not all events are going to be loaded. Only a portion of them. The
295    /// [`EventLinkedChunk`] relies on a [`LinkedChunk`] to store all
296    /// events. Only the last chunk will be loaded. It means the
297    /// events are loaded from the most recent to the oldest. To
298    /// load more events, see [`RoomPagination`].
299    ///
300    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
301    /// [`RoomPagination`]: super::RoomPagination
302    #[allow(clippy::too_many_arguments)]
303    pub async fn new(
304        own_user_id: OwnedUserId,
305        room_id: OwnedRoomId,
306        weak_room: WeakRoom,
307        room_version_rules: RoomVersionRules,
308        enabled_thread_support: bool,
309        update_sender: RoomEventCacheUpdateSender,
310        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
311        store: EventCacheStoreLock,
312        pagination_status: SharedObservable<SharedPaginationStatus>,
313        automatic_pagination: Option<AutomaticPagination>,
314    ) -> Result<Self, EventCacheError> {
315        let store_guard = match store.lock().await? {
316            // Lock is clean: all good!
317            EventCacheStoreLockState::Clean(guard) => guard,
318
319            // Lock is dirty, not a problem, it's the first time we are creating this state, no
320            // need to refresh.
321            EventCacheStoreLockState::Dirty(guard) => {
322                EventCacheStoreLockGuard::clear_dirty(&guard);
323
324                guard
325            }
326        };
327
328        let linked_chunk_id = LinkedChunkId::Room(&room_id);
329
330        // Load the full linked chunk's metadata, so as to feed the order tracker.
331        //
332        // If loading the full linked chunk failed, we'll clear the event cache, as it
333        // indicates that at some point, there's some malformed data.
334        let full_linked_chunk_metadata =
335            match load_linked_chunk_metadata(&store_guard, linked_chunk_id).await {
336                Ok(metas) => metas,
337                Err(err) => {
338                    error!("error when loading a linked chunk's metadata from the store: {err}");
339
340                    // Try to clear storage for this room.
341                    store_guard
342                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
343                        .await?;
344
345                    // Restart with an empty linked chunk.
346                    None
347                }
348            };
349
350        let linked_chunk = match store_guard
351            .load_last_chunk(linked_chunk_id)
352            .await
353            .map_err(EventCacheError::from)
354            .and_then(|(last_chunk, chunk_identifier_generator)| {
355                lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
356                    .map_err(EventCacheError::from)
357            }) {
358            Ok(linked_chunk) => linked_chunk,
359            Err(err) => {
360                error!("error when loading a linked chunk's latest chunk from the store: {err}");
361
362                // Try to clear storage for this room.
363                store_guard
364                    .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
365                    .await?;
366
367                None
368            }
369        };
370
371        Ok(Self::new_inner(RoomEventCacheState {
372            own_user_id,
373            enabled_thread_support,
374            room_id,
375            weak_room,
376            store,
377            room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
378                linked_chunk,
379                full_linked_chunk_metadata,
380            ),
381            // The threads mapping is intentionally empty at start, since we're going to
382            // reload threads lazily, as soon as we need to (based on external
383            // subscribers) or when we get new information about those (from
384            // sync).
385            threads: HashMap::new(),
386            // Event-focused caches are created on-demand when the user navigates to a
387            // permalink.
388            event_focused_caches: HashMap::new(),
389            pagination_status,
390            update_sender,
391            linked_chunk_update_sender,
392            room_version_rules,
393            waited_for_initial_prev_token: false,
394            subscriber_count: Default::default(),
395            pinned_event_cache: OnceLock::new(),
396            automatic_pagination,
397        }))
398    }
399}
400
401/// The read-lock guard around [`RoomEventCacheState`].
402///
403/// See [`RoomEventCacheStateLock::read`] to acquire it.
404pub type RoomEventCacheStateLockReadGuard<'a> = lock::StateLockReadGuard<'a, RoomEventCacheState>;
405
406/// The write-lock guard around [`RoomEventCacheState`].
407///
408/// See [`RoomEventCacheStateLock::write`] to acquire it.
409pub type RoomEventCacheStateLockWriteGuard<'a> = lock::StateLockWriteGuard<'a, RoomEventCacheState>;
410
411impl<'a> lock::Reload for RoomEventCacheStateLockWriteGuard<'a> {
412    /// Force to shrink the room, whenever there is subscribers or not.
413    async fn reload(&mut self) -> Result<(), EventCacheError> {
414        self.shrink_to_last_chunk().await?;
415
416        let diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
417
418        // Notify observers about the update.
419        self.state.update_sender.send(
420            RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
421                diffs,
422                origin: EventsOrigin::Cache,
423            }),
424            Some(RoomEventCacheGenericUpdate { room_id: self.state.room_id.clone() }),
425        );
426
427        Ok(())
428    }
429}
430
431impl<'a> RoomEventCacheStateLockReadGuard<'a> {
432    /// Return the subscriber count.
433    pub fn subscriber_count(&self) -> &Arc<AtomicUsize> {
434        &self.state.subscriber_count
435    }
436
437    /// Find a single event in this room.
438    ///
439    /// It starts by looking into loaded events in `EventLinkedChunk` before
440    /// looking inside the storage.
441    pub async fn find_event(
442        &self,
443        event_id: &EventId,
444    ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
445        self.state.find_event(event_id, &self.store).await
446    }
447
448    /// Find an event and all its relations in the persisted storage.
449    ///
450    /// This goes straight to the database, as a simplification; we don't
451    /// expect to need to have to look up in memory events, or that
452    /// all the related events are actually loaded.
453    ///
454    /// The related events are sorted like this:
455    /// - events saved out-of-band with [`super::RoomEventCache::save_events`]
456    ///   will be located at the beginning of the array.
457    /// - events present in the linked chunk (be it in memory or in the
458    ///   database) will be sorted according to their ordering in the linked
459    ///   chunk.
460    pub async fn find_event_with_relations(
461        &self,
462        event_id: &EventId,
463        filters: Option<Vec<RelationType>>,
464    ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
465        self.state.find_event_with_relations(event_id, filters, &self.store).await
466    }
467
468    /// Find all relations for an event in the persisted storage.
469    ///
470    /// This goes straight to the database, as a simplification; we don't
471    /// expect to need to have to look up in memory events, or that
472    /// all the related events are actually loaded.
473    ///
474    /// The related events are sorted like this:
475    /// - events saved out-of-band with [`super::RoomEventCache::save_events`]
476    ///   will be located at the beginning of the array.
477    /// - events present in the linked chunk (be it in memory or in the
478    ///   database) will be sorted according to their ordering in the linked
479    ///   chunk.
480    pub async fn find_event_relations(
481        &self,
482        event_id: &EventId,
483        filters: Option<Vec<RelationType>>,
484    ) -> Result<Vec<Event>, EventCacheError> {
485        self.state.find_event_relations(event_id, filters, &self.store).await
486    }
487
488    //// Find a single event in this room, starting from the most recent event.
489    ///
490    /// The `predicate` receives the current event as its single argument.
491    ///
492    /// **Warning**! It looks into the loaded events from the in-memory
493    /// linked chunk **only**. It doesn't look inside the storage,
494    /// contrary to [`Self::find_event`].
495    pub fn rfind_map_event_in_memory_by<O, P>(&self, mut predicate: P) -> Option<O>
496    where
497        P: FnMut(&Event) -> Option<O>,
498    {
499        self.state.room_linked_chunk.revents().find_map(|(_, event)| predicate(event))
500    }
501
502    #[cfg(test)]
503    pub fn is_dirty(&self) -> bool {
504        EventCacheStoreLockGuard::is_dirty(&self.store)
505    }
506
507    /// Subscribe to the lazily initialized pinned event cache for this
508    /// room.
509    ///
510    /// This is a persisted view over the pinned events of a room. The
511    /// pinned events will be initially loaded from a network
512    /// request to fetch the latest pinned events will be performed,
513    /// to update it as needed. The list of pinned events will also
514    /// be kept up-to-date as new events are pinned, and new related
515    /// events show up from sync or backpagination.
516    ///
517    /// This requires the room's event cache to be initialized.
518    pub async fn subscribe_to_pinned_events(
519        &self,
520        room: Room,
521    ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>), EventCacheError> {
522        let pinned_event_cache = self.state.pinned_event_cache.get_or_init(|| {
523            PinnedEventCache::new(
524                room,
525                self.state.linked_chunk_update_sender.clone(),
526                self.state.store.clone(),
527            )
528        });
529
530        pinned_event_cache.subscribe().await
531    }
532
533    /// Get an event-focused cache for this event and thread mode, if it
534    /// exists.
535    ///
536    /// Otherwise, returns `None`.
537    pub fn get_event_focused_cache(
538        &self,
539        event_id: OwnedEventId,
540        thread_mode: EventFocusThreadMode,
541    ) -> Option<EventFocusedCache> {
542        get_event_focused_cache(&self.state, event_id, thread_mode)
543    }
544}
545
546impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
547    /// Return a mutable reference to the underlying room linked chunk.
548    pub fn room_linked_chunk_mut(&mut self) -> &mut EventLinkedChunk {
549        &mut self.state.room_linked_chunk
550    }
551
552    /// Get a reference to the [`pinned_event_cache`] if it has been
553    /// initialized.
554    #[cfg(any(feature = "e2e-encryption", test))]
555    pub fn pinned_event_cache(&self) -> Option<&PinnedEventCache> {
556        self.state.pinned_event_cache.get()
557    }
558
559    /// Get a reference to all the live [`event_focused_caches`].
560    #[cfg(feature = "e2e-encryption")]
561    pub fn event_focused_caches(&self) -> impl Iterator<Item = &EventFocusedCache> {
562        self.state.event_focused_caches.values()
563    }
564
565    /// Get the `waited_for_initial_prev_token` value.
566    pub fn waited_for_initial_prev_token(&self) -> bool {
567        self.state.waited_for_initial_prev_token
568    }
569
570    /// Get a mutable reference to the `waited_for_initial_prev_token` value.
571    pub fn waited_for_initial_prev_token_mut(&mut self) -> &mut bool {
572        &mut self.state.waited_for_initial_prev_token
573    }
574
575    /// Find a single event in this room.
576    ///
577    /// It starts by looking into loaded events in `EventLinkedChunk` before
578    /// looking inside the storage.
579    pub async fn find_event(
580        &self,
581        event_id: &EventId,
582    ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
583        self.state.find_event(event_id, &self.store).await
584    }
585
586    /// Find an event and all its relations in the persisted storage.
587    ///
588    /// This goes straight to the database, as a simplification; we don't
589    /// expect to need to have to look up in memory events, or that
590    /// all the related events are actually loaded.
591    ///
592    /// The related events are sorted like this:
593    /// - events saved out-of-band with [`super::RoomEventCache::save_events`]
594    ///   will be located at the beginning of the array.
595    /// - events present in the linked chunk (be it in memory or in the
596    ///   database) will be sorted according to their ordering in the linked
597    ///   chunk.
598    pub async fn find_event_with_relations(
599        &self,
600        event_id: &EventId,
601        filters: Option<Vec<RelationType>>,
602    ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
603        self.state.find_event_with_relations(event_id, filters, &self.store).await
604    }
605
606    /// If storage is enabled, unload all the chunks, then reloads only the
607    /// last one.
608    ///
609    /// If storage's enabled, return a diff update that starts with a clear
610    /// of all events; as a result, the caller may override any
611    /// pending diff updates with the result of this function.
612    ///
613    /// Otherwise, returns `None`.
614    pub async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
615        // Attempt to load the last chunk.
616        let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
617        let (last_chunk, chunk_identifier_generator) =
618            match self.store.load_last_chunk(linked_chunk_id).await {
619                Ok(pair) => pair,
620
621                Err(err) => {
622                    // If loading the last chunk failed, clear the entire linked chunk.
623                    error!("error when reloading a linked chunk from memory: {err}");
624
625                    // Clear storage for this room.
626                    self.store
627                        .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
628                        .await?;
629
630                    // Restart with an empty linked chunk.
631                    (None, ChunkIdentifierGenerator::new_from_scratch())
632                }
633            };
634
635        debug!("unloading the linked chunk, and resetting it to its last chunk");
636
637        // Remove all the chunks from the linked chunks, except for the last one, and
638        // updates the chunk identifier generator.
639        if let Err(err) =
640            self.state.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
641        {
642            error!("error when replacing the linked chunk: {err}");
643            return self.reset_internal().await;
644        }
645
646        // Let pagination observers know that we may have not reached the start of the
647        // timeline. This may cancel an ongoing pagination.
648        self.state
649            .pagination_status
650            .set(SharedPaginationStatus::Idle { hit_timeline_start: false });
651
652        // Don't propagate those updates to the store; this is only for the in-memory
653        // representation that we're doing this. Let's drain those store updates.
654        let _ = self.state.room_linked_chunk.store_updates().take();
655
656        Ok(())
657    }
658
659    /// Automatically shrink the room if there are no more subscribers, as
660    /// indicated by the atomic number of active subscribers.
661    #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
662    pub async fn auto_shrink_if_no_subscribers(
663        &mut self,
664    ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
665        let subscriber_count = self.state.subscriber_count.load(Ordering::SeqCst);
666
667        trace!(subscriber_count, "received request to auto-shrink");
668
669        if subscriber_count == 0 {
670            // If we are the last strong reference to the auto-shrinker, we can shrink the
671            // events data structure to its last chunk.
672            self.shrink_to_last_chunk().await?;
673
674            Ok(Some(self.state.room_linked_chunk.updates_as_vector_diffs()))
675        } else {
676            Ok(None)
677        }
678    }
679
680    /// Remove events by their position, in `EventLinkedChunk` and in
681    /// `EventCacheStore`.
682    ///
683    /// This method is purposely isolated because it must ensure that
684    /// positions are sorted appropriately or it can be disastrous.
685    #[instrument(skip_all)]
686    pub async fn remove_events(
687        &mut self,
688        in_memory_events: Vec<(OwnedEventId, Position)>,
689        in_store_events: Vec<(OwnedEventId, Position)>,
690    ) -> Result<(), EventCacheError> {
691        // In-store events.
692        if !in_store_events.is_empty() {
693            let mut positions = in_store_events
694                .into_iter()
695                .map(|(_event_id, position)| position)
696                .collect::<Vec<_>>();
697
698            sort_positions_descending(&mut positions);
699
700            let updates =
701                positions.into_iter().map(|pos| Update::RemoveItem { at: pos }).collect::<Vec<_>>();
702
703            self.apply_store_only_updates(updates).await?;
704        }
705
706        // In-memory events.
707        if in_memory_events.is_empty() {
708            // Nothing else to do, return early.
709            return Ok(());
710        }
711
712        // `remove_events_by_position` is responsible of sorting positions.
713        self.state
714            .room_linked_chunk
715            .remove_events_by_position(
716                in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
717            )
718            .expect("failed to remove an event");
719
720        self.propagate_changes().await
721    }
722
723    async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
724        let updates = self.state.room_linked_chunk.store_updates().take();
725
726        self.send_updates_to_store(updates).await
727    }
728
729    /// Apply some updates that are effective only on the store itself.
730    ///
731    /// This method should be used only for updates that happen *outside*
732    /// the in-memory linked chunk. Such updates must be applied
733    /// onto the ordering tracker as well as to the persistent
734    /// storage.
735    async fn apply_store_only_updates(
736        &mut self,
737        updates: Vec<Update<Event, Gap>>,
738    ) -> Result<(), EventCacheError> {
739        self.state.room_linked_chunk.order_tracker.map_updates(&updates);
740        self.send_updates_to_store(updates).await
741    }
742
743    async fn send_updates_to_store(
744        &mut self,
745        updates: Vec<Update<Event, Gap>>,
746    ) -> Result<(), EventCacheError> {
747        let linked_chunk_id = OwnedLinkedChunkId::Room(self.state.room_id.clone());
748
749        send_updates_to_store(
750            &self.store,
751            linked_chunk_id,
752            &self.state.linked_chunk_update_sender,
753            updates,
754        )
755        .await
756    }
757
758    /// Reset this data structure as if it were brand new.
759    ///
760    /// Return a single diff update that is a clear of all events; as a
761    /// result, the caller may override any pending diff updates
762    /// with the result of this function.
763    pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
764        self.reset_internal().await?;
765
766        let diff_updates = self.state.room_linked_chunk.updates_as_vector_diffs();
767
768        // Ensure the contract defined in the doc comment is true:
769        debug_assert_eq!(diff_updates.len(), 1);
770        debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
771
772        Ok(diff_updates)
773    }
774
775    async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
776        self.state.room_linked_chunk.reset();
777
778        // No need to update the thread summaries: the room events are
779        // gone because of the reset of `room_linked_chunk`.
780        //
781        // Clear the threads.
782        for thread in self.state.threads.values_mut() {
783            thread.clear().await?;
784        }
785
786        self.propagate_changes().await?;
787
788        // Reset the pagination state too: pretend we never waited for the initial
789        // prev-batch token, and indicate that we're not at the start of the
790        // timeline, since we don't know about that anymore.
791        self.state.waited_for_initial_prev_token = false;
792
793        // Note: this may cancel an ongoing pagination.
794        self.state
795            .pagination_status
796            .set(SharedPaginationStatus::Idle { hit_timeline_start: false });
797
798        Ok(())
799    }
800
801    /// Handle the result of a sync.
802    ///
803    /// It may send room event cache updates to the given sender, if it
804    /// generated any of those.
805    ///
806    /// Returns `true` for the first part of the tuple if a new gap
807    /// (previous-batch token) has been inserted, `false` otherwise.
808    #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
809    pub async fn handle_sync(
810        &mut self,
811        mut timeline: Timeline,
812        ephemeral_events: &[Raw<AnySyncEphemeralRoomEvent>],
813    ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
814        let mut prev_batch = timeline.prev_batch.take();
815
816        let DeduplicationOutcome {
817            all_events: events,
818            in_memory_duplicated_event_ids,
819            in_store_duplicated_event_ids,
820            non_empty_all_duplicates: all_duplicates,
821        } = filter_duplicate_events(
822            &self.state.own_user_id,
823            &self.store,
824            LinkedChunkId::Room(&self.state.room_id),
825            &self.state.room_linked_chunk,
826            timeline.events,
827        )
828        .await?;
829
830        // If the timeline isn't limited, and we already knew about some past events,
831        // then this definitely knows what the timeline head is (either we know
832        // about all the events persisted in storage, or we have a gap
833        // somewhere). In this case, we can ditch the previous-batch
834        // token, which is an optimization to avoid unnecessary future back-pagination
835        // requests.
836        //
837        // We can also ditch it if we knew about all the events that came from sync,
838        // namely, they were all deduplicated. In this case, using the
839        // previous-batch token would only result in fetching other events we
840        // knew about. This is slightly incorrect in the presence of
841        // network splits, but this has shown to be Good Enoughâ„¢.
842        if !timeline.limited && self.state.room_linked_chunk.events().next().is_some()
843            || all_duplicates
844        {
845            prev_batch = None;
846        }
847
848        let has_new_gap = prev_batch.is_some();
849
850        if has_new_gap {
851            // Sad time: there's a gap, somewhere, in the timeline, and there's at least one
852            // non-duplicated event. We don't know which threads might have gappy, so we
853            // must invalidate them all :(
854            // TODO: figure out a better catchup mechanism for threads.
855            let mut summaries_to_update = Vec::new();
856
857            for (thread_root, thread) in self.state.threads.iter_mut() {
858                // Empty the thread's linked chunk.
859                thread.clear().await?;
860
861                summaries_to_update.push(thread_root.clone());
862            }
863
864            // Now, update the summaries to indicate that we're not sure what the latest
865            // thread event is. The thread count can remain as is, as it might still be
866            // valid, and there's no good value to reset it to, anyways.
867            for thread_root in summaries_to_update {
868                let Some((location, mut target_event)) = self.find_event(&thread_root).await?
869                else {
870                    trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
871                    continue;
872                };
873
874                if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
875                    prev_summary.latest_reply = None;
876
877                    target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
878
879                    self.replace_event_at(location, target_event).await?;
880                }
881            }
882        }
883
884        if all_duplicates {
885            // No new events and no gap (per the previous check), thus no need to change the
886            // room state. We're done!
887
888            // We might have a new read receipt, though! If that's the case, handle it for
889            // unread counts tracking.
890            if let Some(new_receipt) = extract_read_receipt(ephemeral_events) {
891                self.update_read_receipts(Some(&new_receipt)).await?;
892            }
893
894            return Ok((false, Vec::new()));
895        }
896
897        // If we've never waited for an initial previous-batch token, and we've now
898        // inserted a gap, no need to wait for a previous-batch token later.
899        if !self.state.waited_for_initial_prev_token && has_new_gap {
900            self.state.waited_for_initial_prev_token = true;
901        }
902
903        // Remove the old duplicated events.
904        //
905        // We don't have to worry the removals can change the position of the existing
906        // events, because we are pushing all _new_ `events` at the back.
907        self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids).await?;
908
909        self.state
910            .room_linked_chunk
911            .push_live_events(prev_batch.map(|prev_token| Gap { token: prev_token }), &events);
912
913        // Extract a new read receipt, if available.
914        let new_receipt = extract_read_receipt(ephemeral_events);
915        self.post_process_new_events(events, PostProcessingOrigin::Sync, new_receipt).await?;
916
917        if timeline.limited && has_new_gap {
918            // If there was a previous batch token for a limited timeline, unload the chunks
919            // so it only contains the last one; otherwise, there might be a
920            // valid gap in between, and observers may not render it (yet).
921            //
922            // We must do this *after* persisting these events to storage (in
923            // `post_process_new_events`).
924            self.shrink_to_last_chunk().await?;
925        }
926
927        let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
928
929        Ok((has_new_gap, timeline_event_diffs))
930    }
931
932    /// Subscribe to thread for a given root event, and get a (maybe empty)
933    /// initially known list of events for that thread.
934    pub async fn subscribe_to_thread(
935        &mut self,
936        root: OwnedEventId,
937    ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>), EventCacheError> {
938        self.get_or_reload_thread(root).subscribe().await
939    }
940
941    // --------------------------------------------
942    // utility methods
943    // --------------------------------------------
944
945    /// Post-process new events, after they have been added to the in-memory
946    /// linked chunk.
947    ///
948    /// Flushes updates to disk first.
949    pub async fn post_process_new_events(
950        &mut self,
951        events: Vec<Event>,
952        post_processing_origin: PostProcessingOrigin,
953        receipt_event: Option<ReceiptEventContent>,
954    ) -> Result<(), EventCacheError> {
955        // Update the store before doing the post-processing.
956        self.propagate_changes().await?;
957
958        // Need an explicit re-borrow to avoid a deref vs deref-mut borrowck conflict
959        // below.
960        let state = &mut *self.state;
961
962        if let Some(pinned_event_cache) = state.pinned_event_cache.get_mut() {
963            pinned_event_cache
964                .maybe_add_live_related_events(&events, &state.room_version_rules.redaction)
965                .await?;
966        }
967
968        let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
969
970        for event in events {
971            self.maybe_apply_new_redaction(&event, post_processing_origin).await?;
972
973            if self.state.enabled_thread_support {
974                // Only add the event to a thread if:
975                // - thread support is enabled,
976                // - and if this is a sync (we can't know where to insert backpaginated events
977                //   in threads).
978                if matches!(post_processing_origin, PostProcessingOrigin::Sync) {
979                    if let Some(thread_root) = extract_thread_root(event.raw()) {
980                        new_events_by_thread.entry(thread_root).or_default().push(event.clone());
981                    } else if let Some(event_id) = event.event_id() {
982                        // If we spot the root of a thread, add it to its linked chunk.
983                        if self.state.threads.contains_key(&event_id) {
984                            new_events_by_thread.entry(event_id).or_default().push(event.clone());
985                        }
986                    }
987                }
988
989                // If the post-processing origin is the redecryption, and this is part of a
990                // thread, mark the thread as needing an update, potentially for its latest
991                // event, that might have been redecrypted now.
992                #[cfg(feature = "e2e-encryption")]
993                if matches!(post_processing_origin, PostProcessingOrigin::Redecryption)
994                    && let Some(thread_root) = extract_thread_root(event.raw())
995                {
996                    new_events_by_thread.entry(thread_root).or_default();
997                }
998
999                // Look for edits that may apply to a thread; we'll process them later.
1000                if let Some(edit_target) = extract_edit_target(event.raw()) {
1001                    // If the edited event is known, and part of a thread,
1002                    if let Some((_location, edit_target_event)) =
1003                        self.find_event(&edit_target).await?
1004                        && let Some(thread_root) = extract_thread_root(edit_target_event.raw())
1005                    {
1006                        // Mark the thread for processing, unless it was already marked as
1007                        // such.
1008                        new_events_by_thread.entry(thread_root).or_default();
1009                    }
1010                }
1011            }
1012
1013            // Save a bundled thread event, if there was one.
1014            if let Some(bundled_thread) = event.bundled_latest_thread_event {
1015                self.save_events([*bundled_thread]).await?;
1016            }
1017        }
1018
1019        if self.state.enabled_thread_support {
1020            self.update_threads(new_events_by_thread, post_processing_origin).await?;
1021        }
1022
1023        self.update_read_receipts(receipt_event.as_ref()).await?;
1024
1025        Ok(())
1026    }
1027
1028    /// Update read receipts for all events in the room, based on the current
1029    /// state of the in-memory linked chunk.
1030    pub async fn update_read_receipts(
1031        &mut self,
1032        receipt_event: Option<&ReceiptEventContent>,
1033    ) -> Result<(), EventCacheError> {
1034        let Some(room) = self.state.weak_room.get() else {
1035            debug!("can't update read receipts: client's closing");
1036            return Ok(());
1037        };
1038
1039        let user_id = &self.state.own_user_id;
1040        let room_id = &self.state.room_id;
1041
1042        let prev_read_receipts = room.read_receipts().clone();
1043        let mut read_receipts = prev_read_receipts.clone();
1044
1045        compute_unread_counts(
1046            user_id,
1047            room_id,
1048            receipt_event,
1049            &self.state.room_linked_chunk,
1050            &mut read_receipts,
1051            self.state.enabled_thread_support,
1052            self.state.automatic_pagination.as_ref(),
1053            room.client().state_store(),
1054        )
1055        .await;
1056
1057        if prev_read_receipts != read_receipts {
1058            // The read receipt has changed! Do a little dance to update the `RoomInfo` in
1059            // the state store, and then in the room itself, so that observers
1060            // can be notified of the change.
1061            let result = room
1062                .update_and_save_room_info(|mut room_info| {
1063                    room_info.set_read_receipts(read_receipts);
1064                    (room_info, RoomInfoNotableUpdateReasons::READ_RECEIPT)
1065                })
1066                .await;
1067            if let Err(error) = result {
1068                error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
1069            }
1070        }
1071
1072        Ok(())
1073    }
1074
1075    pub(in super::super) fn get_or_reload_thread(
1076        &mut self,
1077        root_event_id: OwnedEventId,
1078    ) -> &mut ThreadEventCache {
1079        // TODO: when there's persistent storage, try to lazily reload from disk, if
1080        // missing from memory.
1081        let room_id = self.state.room_id.clone();
1082        let weak_room = self.state.weak_room.clone();
1083        let linked_chunk_update_sender = self.state.linked_chunk_update_sender.clone();
1084        let store = self.state.store.clone();
1085
1086        self.state.threads.entry(root_event_id.clone()).or_insert_with(|| {
1087            ThreadEventCache::new(
1088                room_id,
1089                root_event_id,
1090                weak_room,
1091                store,
1092                linked_chunk_update_sender,
1093            )
1094        })
1095    }
1096
1097    #[instrument(skip_all)]
1098    async fn update_threads(
1099        &mut self,
1100        new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1101        post_processing_origin: PostProcessingOrigin,
1102    ) -> Result<(), EventCacheError> {
1103        for (thread_root, new_events) in new_events_by_thread {
1104            let thread_cache = self.get_or_reload_thread(thread_root.clone());
1105
1106            thread_cache.add_live_events(new_events).await?;
1107
1108            let mut latest_event_id = thread_cache.latest_event_id().await?;
1109
1110            // If there's an edit to the latest event in the thread, use the latest edit
1111            // event id as the latest event id for the thread summary.
1112            if let Some(event_id) = latest_event_id.as_ref()
1113                && let Some((original_event, edits)) = self
1114                    .find_event_with_relations(event_id, Some(vec![RelationType::Replacement]))
1115                    .await?
1116            {
1117                let latest_valid_edit = edits.into_iter().rfind(|edit| {
1118                    let original_json = original_event.raw();
1119                    let original_encryption_info = original_event.encryption_info();
1120                    let replacement_json = edit.raw();
1121                    let replacement_encryption_info = edit.encryption_info();
1122
1123                    check_validity_of_replacement_events(
1124                        original_json,
1125                        original_encryption_info.map(|v| &**v),
1126                        replacement_json,
1127                        replacement_encryption_info.map(|v| &**v),
1128                    )
1129                    .is_ok()
1130                });
1131
1132                if let Some(latest_valid_edit) = latest_valid_edit {
1133                    latest_event_id = latest_valid_edit.event_id();
1134                }
1135            }
1136
1137            self.maybe_update_thread_summary(thread_root, latest_event_id, post_processing_origin)
1138                .await?;
1139        }
1140
1141        Ok(())
1142    }
1143
1144    /// Update a thread summary on the given thread root, if needs be.
1145    async fn maybe_update_thread_summary(
1146        &mut self,
1147        thread_root: OwnedEventId,
1148        latest_event_id: Option<OwnedEventId>,
1149        _post_processing_origin: PostProcessingOrigin,
1150    ) -> Result<(), EventCacheError> {
1151        // Add a thread summary to the (room) event which has the thread root, if we
1152        // knew about it.
1153
1154        let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
1155            trace!(%thread_root, "thread root event is missing from the room linked chunk");
1156            return Ok(());
1157        };
1158
1159        let prev_summary = target_event.thread_summary.summary();
1160
1161        // Recompute the thread summary, if needs be.
1162
1163        // Read the latest number of thread replies from the store.
1164        //
1165        // Implementation note: since this is based on the `m.relates_to` field, and
1166        // that field can only be present on room messages, we don't have to
1167        // worry about filtering out aggregation events (like
1168        // reactions/edits/etc.). Pretty neat, huh?
1169        let num_replies = {
1170            let thread_replies = self
1171                .store
1172                .find_event_relations(
1173                    &self.state.room_id,
1174                    &thread_root,
1175                    Some(&[RelationType::Thread]),
1176                )
1177                .await?;
1178            thread_replies.len().try_into().unwrap_or(u32::MAX)
1179        };
1180
1181        let new_summary = if num_replies > 0 {
1182            Some(ThreadSummary { num_replies, latest_reply: latest_event_id })
1183        } else {
1184            None
1185        };
1186
1187        // Note: in the case of redecryption, we still trigger an update even if the
1188        // summary has changed, so that observers can be notified that the
1189        // event in the summary may have been decrypted now.
1190        #[cfg(feature = "e2e-encryption")]
1191        let update_if_same_summaries =
1192            matches!(_post_processing_origin, PostProcessingOrigin::Redecryption);
1193        #[cfg(not(feature = "e2e-encryption"))]
1194        let update_if_same_summaries = false;
1195
1196        if !update_if_same_summaries && prev_summary == new_summary.as_ref() {
1197            trace!(%thread_root, "thread summary is up-to-date, no need to update it");
1198            return Ok(());
1199        }
1200
1201        // Trigger an update to observers.
1202        trace!(%thread_root, "updating thread summary: {new_summary:?}");
1203        target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary);
1204        self.replace_event_at(location, target_event).await
1205    }
1206
1207    /// Replaces a single event, be it saved in memory or in the store.
1208    ///
1209    /// If it was saved in memory, this will emit a notification to
1210    /// observers that a single item has been replaced. Otherwise,
1211    /// such a notification is not emitted, because observers are
1212    /// unlikely to observe the store updates directly.
1213    pub async fn replace_event_at(
1214        &mut self,
1215        location: EventLocation,
1216        event: Event,
1217    ) -> Result<(), EventCacheError> {
1218        match location {
1219            EventLocation::Memory(position) => {
1220                self.state
1221                    .room_linked_chunk
1222                    .replace_event_at(position, event)
1223                    .expect("should have been a valid position of an item");
1224                // We just changed the in-memory representation; synchronize this with
1225                // the store.
1226                self.propagate_changes().await?;
1227            }
1228            EventLocation::Store => {
1229                self.save_events([event]).await?;
1230            }
1231        }
1232
1233        Ok(())
1234    }
1235
1236    /// If the given event is a redaction, try to retrieve the
1237    /// to-be-redacted event in the chunk, and replace it by the
1238    /// redacted form.
1239    #[instrument(skip_all)]
1240    async fn maybe_apply_new_redaction(
1241        &mut self,
1242        event: &Event,
1243        post_processing_origin: PostProcessingOrigin,
1244    ) -> Result<(), EventCacheError> {
1245        let raw_event = event.raw();
1246
1247        // Do not deserialise the entire event if we aren't certain it's a
1248        // `m.room.redaction`. It saves a non-negligible amount of computations.
1249        let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1250            raw_event.get_field::<MessageLikeEventType>("type")
1251        else {
1252            return Ok(());
1253        };
1254
1255        // It is a `m.room.redaction`! We can deserialize it entirely.
1256
1257        let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
1258            redaction,
1259        ))) = raw_event.deserialize()
1260        else {
1261            return Ok(());
1262        };
1263
1264        let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else {
1265            warn!("missing target event id from the redaction event");
1266            return Ok(());
1267        };
1268
1269        // Replace the redacted event by a redacted form, if we knew about it.
1270        let Some((location, mut target_event)) = self.find_event(event_id).await? else {
1271            trace!("redacted event is missing from the linked chunk");
1272            return Ok(());
1273        };
1274
1275        // Don't redact already redacted events.
1276        let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
1277            if deserialized.is_redacted() {
1278                return Ok(());
1279            }
1280
1281            // If the event is part of a thread, update the thread linked chunk and the
1282            // summary.
1283            extract_thread_root(target_event.raw())
1284        } else {
1285            warn!("failed to deserialize the event to redact");
1286            None
1287        };
1288
1289        if let Some(redacted_event) = apply_redaction(
1290            target_event.raw(),
1291            event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
1292            &self.state.room_version_rules.redaction,
1293        ) {
1294            // It's safe to cast `redacted_event` here:
1295            // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
1296            //   when calling .raw(), so it's still one under the hood.
1297            // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
1298            target_event.replace_raw(redacted_event.cast_unchecked());
1299
1300            self.replace_event_at(location, target_event).await?;
1301
1302            // If the redacted event was part of a thread, remove it in the thread linked
1303            // chunk too, and make sure to update the thread root's summary
1304            // as well.
1305            //
1306            // Note: there is an ordering issue here: the above `replace_event_at` must
1307            // happen BEFORE we recompute the summary, otherwise the set of
1308            // replies may include the to-be-redacted event.
1309            if let Some(thread_root) = thread_root
1310                && let Some(thread_cache) = self.state.threads.get_mut(&thread_root)
1311            {
1312                thread_cache.remove_if_present(event_id).await?;
1313
1314                // The number of replies may have changed, so update the thread summary if
1315                // needs be.
1316                let latest_event_id = thread_cache.latest_event_id().await?;
1317
1318                self.maybe_update_thread_summary(
1319                    thread_root,
1320                    latest_event_id,
1321                    post_processing_origin,
1322                )
1323                .await?;
1324            }
1325        }
1326
1327        Ok(())
1328    }
1329
1330    /// Save events into the database, without notifying observers.
1331    pub async fn save_events(
1332        &mut self,
1333        events: impl IntoIterator<Item = Event>,
1334    ) -> Result<(), EventCacheError> {
1335        let store = self.store.clone();
1336        let room_id = self.state.room_id.clone();
1337        let events = events.into_iter().collect::<Vec<_>>();
1338
1339        // Spawn a task so the save is uninterrupted by task cancellation.
1340        spawn(async move {
1341            for event in events {
1342                store.save_event(&room_id, event).await?;
1343            }
1344            super::Result::Ok(())
1345        })
1346        .await
1347        .expect("joining failed")?;
1348
1349        Ok(())
1350    }
1351
1352    #[cfg(test)]
1353    pub fn is_dirty(&self) -> bool {
1354        EventCacheStoreLockGuard::is_dirty(&self.store)
1355    }
1356
1357    /// Insert an initialized event-focused cache for the given event id.
1358    pub fn insert_event_focused_cache(
1359        &mut self,
1360        event_id: OwnedEventId,
1361        thread_mode: EventFocusThreadMode,
1362        cache: EventFocusedCache,
1363    ) {
1364        let key = EventFocusedCacheKey { focused: event_id, thread_mode };
1365        self.state.event_focused_caches.insert(key, cache);
1366    }
1367
1368    /// Get an event-focused cache for this event and thread mode, if it
1369    /// exists.
1370    ///
1371    /// Otherwise, returns `None`.
1372    pub fn get_event_focused_cache(
1373        &self,
1374        event_id: OwnedEventId,
1375        thread_mode: EventFocusThreadMode,
1376    ) -> Option<EventFocusedCache> {
1377        get_event_focused_cache(&self.state, event_id, thread_mode)
1378    }
1379}
1380
1381/// Extract a valid read receipt event from the ephemeral events, if
1382/// available.
1383fn extract_read_receipt(
1384    ephemeral_events: &[Raw<AnySyncEphemeralRoomEvent>],
1385) -> Option<ReceiptEventContent> {
1386    let mut receipt_event = None;
1387
1388    for raw_ephemeral in ephemeral_events {
1389        match raw_ephemeral.deserialize() {
1390            Ok(AnySyncEphemeralRoomEvent::Receipt(SyncReceiptEvent { content, .. })) => {
1391                receipt_event = Some(content);
1392                break;
1393            }
1394
1395            Ok(_) => {}
1396
1397            Err(err) => {
1398                error!("error when deserializing an ephemeral event from sync: {err}");
1399            }
1400        }
1401    }
1402
1403    receipt_event
1404}
1405
1406/// Get an event-focused cache for this event and thread mode, if it exists.
1407///
1408/// Otherwise, returns `None`.
1409///
1410/// Extracted as a separate function to avoid duplicating the implementation for
1411/// both the read and write guards.
1412fn get_event_focused_cache(
1413    state: &RoomEventCacheState,
1414    event_id: OwnedEventId,
1415    thread_mode: EventFocusThreadMode,
1416) -> Option<EventFocusedCache> {
1417    let key = EventFocusedCacheKey { focused: event_id, thread_mode };
1418    state.event_focused_caches.get(&key).cloned()
1419}
1420
1421/// Load a linked chunk's full metadata, making sure the chunks are
1422/// according to their their links.
1423///
1424/// Returns `None` if there's no such linked chunk in the store, or an
1425/// error if the linked chunk is malformed.
1426async fn load_linked_chunk_metadata(
1427    store_guard: &EventCacheStoreLockGuard,
1428    linked_chunk_id: LinkedChunkId<'_>,
1429) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
1430    let mut all_chunks = store_guard
1431        .load_all_chunks_metadata(linked_chunk_id)
1432        .await
1433        .map_err(EventCacheError::from)?;
1434
1435    if all_chunks.is_empty() {
1436        // There are no chunks, so there's nothing to do.
1437        return Ok(None);
1438    }
1439
1440    // Transform the vector into a hashmap, for quick lookup of the predecessors.
1441    let chunk_map: HashMap<_, _> = all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
1442
1443    // Find a last chunk.
1444    let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
1445    let Some(last) = iter.next() else {
1446        return Err(EventCacheError::InvalidLinkedChunkMetadata {
1447            details: "no last chunk found".to_owned(),
1448        });
1449    };
1450
1451    // There must at most one last chunk.
1452    if let Some(other_last) = iter.next() {
1453        return Err(EventCacheError::InvalidLinkedChunkMetadata {
1454            details: format!(
1455                "chunks {} and {} both claim to be last chunks",
1456                last.identifier.index(),
1457                other_last.identifier.index()
1458            ),
1459        });
1460    }
1461
1462    // Rewind the chain back to the first chunk, and do some checks at the same
1463    // time.
1464    let mut seen = HashSet::new();
1465    let mut current = last;
1466    loop {
1467        // If we've already seen this chunk, there's a cycle somewhere.
1468        if !seen.insert(current.identifier) {
1469            return Err(EventCacheError::InvalidLinkedChunkMetadata {
1470                details: format!(
1471                    "cycle detected in linked chunk at {}",
1472                    current.identifier.index()
1473                ),
1474            });
1475        }
1476
1477        let Some(prev_id) = current.previous else {
1478            // If there's no previous chunk, we're done.
1479            if seen.len() != all_chunks.len() {
1480                return Err(EventCacheError::InvalidLinkedChunkMetadata {
1481                    details: format!(
1482                        "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
1483                        seen.len(),
1484                        all_chunks.len()
1485                    ),
1486                });
1487            }
1488            break;
1489        };
1490
1491        // If the previous chunk is not in the map, then it's unknown
1492        // and missing.
1493        let Some(pred_meta) = chunk_map.get(&prev_id) else {
1494            return Err(EventCacheError::InvalidLinkedChunkMetadata {
1495                details: format!(
1496                    "missing predecessor {} chunk for {}",
1497                    prev_id.index(),
1498                    current.identifier.index()
1499                ),
1500            });
1501        };
1502
1503        // If the previous chunk isn't connected to the next, then the link is invalid.
1504        if pred_meta.next != Some(current.identifier) {
1505            return Err(EventCacheError::InvalidLinkedChunkMetadata {
1506                details: format!(
1507                    "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
1508                    pred_meta.identifier.index(),
1509                    pred_meta.next.map(|chunk_id| chunk_id.index()),
1510                    current.identifier.index()
1511                ),
1512            });
1513        }
1514
1515        current = *pred_meta;
1516    }
1517
1518    // At this point, `current` is the identifier of the first chunk.
1519    //
1520    // Reorder the resulting vector, by going through the chain of `next` links, and
1521    // swapping items into their final position.
1522    //
1523    // Invariant in this loop: all items in [0..i[ are in their final, correct
1524    // position.
1525    let mut current = current.identifier;
1526    for i in 0..all_chunks.len() {
1527        // Find the target metadata.
1528        let j = all_chunks
1529            .iter()
1530            .rev()
1531            .position(|meta| meta.identifier == current)
1532            .map(|j| all_chunks.len() - 1 - j)
1533            .expect("the target chunk must be present in the metadata");
1534        if i != j {
1535            all_chunks.swap(i, j);
1536        }
1537        if let Some(next) = all_chunks[i].next {
1538            current = next;
1539        }
1540    }
1541
1542    Ok(Some(all_chunks))
1543}