Skip to main content

matrix_sdk/event_cache/caches/thread/
state.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16
17use eyeball_im::VectorDiff;
18use matrix_sdk_base::{
19    event_cache::{Event, Gap, store::EventCacheStoreLock},
20    linked_chunk::{OwnedLinkedChunkId, Position, Update},
21};
22use matrix_sdk_common::executor::spawn;
23use ruma::{OwnedEventId, OwnedRoomId};
24use tokio::sync::broadcast::Sender;
25use tracing::instrument;
26
27use super::super::{
28    super::{EventsOrigin, Result, deduplicator::DeduplicationOutcome},
29    TimelineVectorDiffs,
30    event_linked_chunk::EventLinkedChunk,
31    lock,
32    room::RoomEventCacheLinkedChunkUpdate,
33};
34
35pub struct ThreadEventCacheState {
36    /// The room owning this thread.
37    #[allow(dead_code)] // for the persistent storage
38    room_id: OwnedRoomId,
39
40    /// The ID of the thread root event, which is the first event in the thread
41    /// (and eventually the first in the linked chunk).
42    thread_id: OwnedEventId,
43
44    /// Reference to the underlying backing store.
45    store: EventCacheStoreLock,
46
47    /// The linked chunk for this thread.
48    thread_linked_chunk: EventLinkedChunk,
49
50    /// A sender for live events updates in this thread.
51    pub sender: Sender<TimelineVectorDiffs>,
52
53    /// A sender for the globally observable linked chunk updates that happened
54    /// during a sync or a back-pagination.
55    ///
56    /// See also [`super::super::EventCacheInner::linked_chunk_update_sender`].
57    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
58
59    /// Have we ever waited for a previous-batch-token to come from sync, in
60    /// the context of pagination? We do this at most once per room/thread (?),
61    /// the first time we try to run backward pagination. We reset
62    /// that upon clearing the timeline events.
63    waited_for_initial_prev_token: bool,
64}
65
66impl lock::Store for ThreadEventCacheState {
67    fn store(&self) -> &EventCacheStoreLock {
68        &self.store
69    }
70}
71
72/// State for a single thread's event cache.
73///
74/// This contains all the inner mutable states that ought to be updated at
75/// the same time.
76pub type LockedThreadEventCacheState = lock::StateLock<ThreadEventCacheState>;
77
78impl LockedThreadEventCacheState {
79    /// Create a new state, or reload it from storage if it's been enabled.
80    ///
81    /// Not all events are going to be loaded. Only a portion of them. The
82    /// [`EventLinkedChunk`] relies on a [`LinkedChunk`] to store all
83    /// events. Only the last chunk will be loaded. It means the
84    /// events are loaded from the most recent to the oldest. To
85    /// load more events, see [`ThreadPagination`].
86    ///
87    /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
88    /// [`ThreadPagination`]: super::pagination::ThreadPagination
89    pub fn new(
90        room_id: OwnedRoomId,
91        thread_id: OwnedEventId,
92        store: EventCacheStoreLock,
93        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
94    ) -> Self {
95        Self::new_inner(ThreadEventCacheState {
96            room_id,
97            thread_id,
98            store,
99            thread_linked_chunk: EventLinkedChunk::new(),
100            sender: Sender::new(32),
101            linked_chunk_update_sender,
102            waited_for_initial_prev_token: false,
103        })
104    }
105}
106
107/// The read-lock guard around [`ThreadEventCacheState`].
108///
109/// See [`ThreadEventCacheStateLock::read`] to acquire it.
110pub type ThreadEventCacheStateLockReadGuard<'a> =
111    lock::StateLockReadGuard<'a, ThreadEventCacheState>;
112
113/// The write-lock guard around [`ThreadEventCacheState`].
114///
115/// See [`ThreadEventCacheStateLock::write`] to acquire it.
116pub type ThreadEventCacheStateLockWriteGuard<'a> =
117    lock::StateLockWriteGuard<'a, ThreadEventCacheState>;
118
119impl<'a> lock::Reload for ThreadEventCacheStateLockWriteGuard<'a> {
120    /// Force to shrink the room, whenever there is subscribers or not.
121    async fn reload(&mut self) -> Result<()> {
122        self.state.thread_linked_chunk.reset();
123
124        let diffs = self.state.thread_linked_chunk.updates_as_vector_diffs();
125
126        if !diffs.is_empty() {
127            let _ =
128                self.state.sender.send(TimelineVectorDiffs { diffs, origin: EventsOrigin::Cache });
129        }
130
131        Ok(())
132    }
133}
134
135impl<'a> ThreadEventCacheStateLockReadGuard<'a> {
136    /// Return a read-only reference to the underlying thread linked chunk.
137    pub fn thread_linked_chunk(&self) -> &EventLinkedChunk {
138        &self.state.thread_linked_chunk
139    }
140
141    /// Get the `waited_for_initial_prev_token` value.
142    pub fn waited_for_initial_prev_token(&self) -> bool {
143        self.state.waited_for_initial_prev_token
144    }
145}
146
147impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {
148    /// Return a read-only reference to the underlying thread linked chunk.
149    pub fn thread_linked_chunk(&self) -> &EventLinkedChunk {
150        &self.state.thread_linked_chunk
151    }
152
153    /// Return a mutable reference to the underlying thread linked chunk.
154    pub fn thread_linked_chunk_mut(&mut self) -> &mut EventLinkedChunk {
155        &mut self.state.thread_linked_chunk
156    }
157
158    /// Get the `waited_for_initial_prev_token` value.
159    pub fn waited_for_initial_prev_token_mut(&mut self) -> &mut bool {
160        &mut self.state.waited_for_initial_prev_token
161    }
162
163    pub async fn handle_sync(&mut self, events: Vec<Event>) -> Result<Vec<VectorDiff<Event>>> {
164        let deduplication = self.filter_duplicate_events(events);
165
166        if deduplication.non_empty_all_duplicates {
167            // If all events are duplicates, we don't need to do anything; ignore
168            // the new events.
169            return Ok(Vec::new());
170        }
171
172        // Remove the duplicated events from the thread chunk.
173        self.remove_events(deduplication.in_memory_duplicated_event_ids).await?;
174        assert!(
175            deduplication.in_store_duplicated_event_ids.is_empty(),
176            "persistent storage for threads is not implemented yet"
177        );
178
179        let events = deduplication.all_events;
180
181        self.state.thread_linked_chunk.push_live_events(None, &events);
182
183        self.propagate_changes().await?;
184
185        let timeline_event_diffs = self.state.thread_linked_chunk.updates_as_vector_diffs();
186
187        Ok(timeline_event_diffs)
188    }
189
190    /// Save events into the database, without notifying observers.
191    pub async fn save_events(&mut self, events: impl IntoIterator<Item = Event>) -> Result<()> {
192        let store = self.store.clone();
193        let room_id = self.state.room_id.clone();
194        let events = events.into_iter().collect::<Vec<_>>();
195
196        // Spawn a task so the save is uninterrupted by task cancellation.
197        spawn(async move {
198            for event in events {
199                store.save_event(&room_id, event).await?;
200            }
201
202            Result::Ok(())
203        })
204        .await
205        .expect("joining failed")?;
206
207        Ok(())
208    }
209
210    /// Reset this data structure as if it were brand new.
211    ///
212    /// Return a single diff update that is a clear of all events; as a
213    /// result, the caller may override any pending diff updates
214    /// with the result of this function.
215    pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>> {
216        self.reset_internal().await?;
217
218        let diff_updates = self.state.thread_linked_chunk.updates_as_vector_diffs();
219
220        // Ensure the contract defined in the doc comment is true:
221        debug_assert_eq!(diff_updates.len(), 1);
222        debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
223
224        Ok(diff_updates)
225    }
226
227    async fn reset_internal(&mut self) -> Result<()> {
228        self.state.thread_linked_chunk.reset();
229
230        self.propagate_changes().await?;
231
232        // Reset the pagination state too: pretend we never waited for the initial
233        // prev-batch token, and indicate that we're not at the start of the
234        // timeline, since we don't know about that anymore.
235        self.state.waited_for_initial_prev_token = false;
236
237        Ok(())
238    }
239
240    /// Remove events by their position, in `EventLinkedChunk`.
241    ///
242    /// This method is purposely isolated because it must ensure that
243    /// positions are sorted appropriately or it can be disastrous.
244    ///
245    /// TODO: support store.
246    #[instrument(skip_all)]
247    pub async fn remove_events(
248        &mut self,
249        in_memory_events: Vec<(OwnedEventId, Position)>,
250    ) -> Result<()> {
251        // In-memory events.
252        if in_memory_events.is_empty() {
253            // Nothing else to do, return early.
254            return Ok(());
255        }
256
257        // `remove_events_by_position` is responsible of sorting positions.
258        self.state
259            .thread_linked_chunk
260            .remove_events_by_position(
261                in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
262            )
263            .expect("failed to remove an event");
264
265        self.propagate_changes().await
266    }
267
268    pub async fn propagate_changes(&mut self) -> Result<()> {
269        let updates = self.state.thread_linked_chunk.store_updates().take();
270
271        self.send_updates_to_store(updates).await
272    }
273
274    #[allow(clippy::unused_async)] // TODO: remove once persistent storage is implemented
275    async fn send_updates_to_store(&mut self, updates: Vec<Update<Event, Gap>>) -> Result<()> {
276        // TODO: call the `persistence::send_updates_to_store` function
277        let linked_chunk_id =
278            OwnedLinkedChunkId::Thread(self.state.room_id.clone(), self.state.thread_id.clone());
279
280        let _ = self
281            .state
282            .linked_chunk_update_sender
283            .send(RoomEventCacheLinkedChunkUpdate { linked_chunk_id, updates });
284
285        Ok(())
286    }
287
288    /// Find duplicates in a thread, until there's persistent storage for
289    /// those.
290    ///
291    /// TODO: when persistent storage is implemented for thread, only use
292    /// the regular `filter_duplicate_events` method.
293    pub fn filter_duplicate_events(&self, mut new_events: Vec<Event>) -> DeduplicationOutcome {
294        let mut new_event_ids = BTreeSet::new();
295
296        new_events.retain(|event| {
297            // Only keep events with IDs, and those for which `insert` returns `true`
298            // (meaning they were not in the set).
299            event.event_id().is_some_and(|event_id| new_event_ids.insert(event_id))
300        });
301
302        let in_memory_duplicated_event_ids: Vec<_> = self
303            .state
304            .thread_linked_chunk
305            .events()
306            .filter_map(|(position, event)| {
307                let event_id = event.event_id()?;
308                new_event_ids.contains(&event_id).then_some((event_id, position))
309            })
310            .collect();
311
312        // Right now, there's no persistent storage for threads.
313        let in_store_duplicated_event_ids = Vec::new();
314
315        let at_least_one_event = !new_events.is_empty();
316        let all_duplicates = (in_memory_duplicated_event_ids.len()
317            + in_store_duplicated_event_ids.len())
318            == new_events.len();
319        let non_empty_all_duplicates = at_least_one_event && all_duplicates;
320
321        DeduplicationOutcome {
322            all_events: new_events,
323            in_memory_duplicated_event_ids,
324            in_store_duplicated_event_ids,
325            non_empty_all_duplicates,
326        }
327    }
328}