Skip to main content

matrix_sdk/event_cache/caches/thread/
mod.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Threads-related data structures.
16
17pub mod pagination;
18mod state;
19
20use std::{fmt, sync::Arc};
21
22use matrix_sdk_base::event_cache::{Event, store::EventCacheStoreLock};
23use ruma::{EventId, OwnedEventId, OwnedRoomId};
24pub(super) use state::LockedThreadEventCacheState;
25use tokio::sync::broadcast::{Receiver, Sender};
26use tracing::error;
27
28use self::pagination::ThreadPagination;
29use super::{
30    super::Result, EventsOrigin, TimelineVectorDiffs, room::RoomEventCacheLinkedChunkUpdate,
31};
32use crate::room::WeakRoom;
33
34/// All the information related to a single thread.
35pub(super) struct ThreadEventCache {
36    inner: Arc<ThreadEventCacheInner>,
37}
38
39/// The (non-cloneable) details of the `RoomEventCache`.
40struct ThreadEventCacheInner {
41    /// The thread root ID.
42    thread_id: OwnedEventId,
43
44    /// The room where this thread belongs to.
45    weak_room: WeakRoom,
46
47    /// State for this thread's event cache.
48    state: LockedThreadEventCacheState,
49}
50
51impl fmt::Debug for ThreadEventCache {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        f.debug_struct("ThreadEventCache").finish_non_exhaustive()
54    }
55}
56
57impl ThreadEventCache {
58    /// Create a new empty thread event cache.
59    pub fn new(
60        room_id: OwnedRoomId,
61        thread_id: OwnedEventId,
62        weak_room: WeakRoom,
63        store: EventCacheStoreLock,
64        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
65    ) -> Self {
66        Self {
67            inner: Arc::new(ThreadEventCacheInner {
68                thread_id: thread_id.clone(),
69                weak_room,
70                state: LockedThreadEventCacheState::new(
71                    room_id,
72                    thread_id,
73                    store,
74                    linked_chunk_update_sender,
75                ),
76            }),
77        }
78    }
79
80    /// Subscribe to live events from this thread.
81    pub async fn subscribe(&self) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
82        let state = self.inner.state.read().await?;
83
84        let events =
85            state.thread_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
86
87        let recv = state.state.sender.subscribe();
88
89        Ok((events, recv))
90    }
91
92    /// Return a [`ThreadPagination`] useful for running back-pagination queries
93    /// in this thread.
94    pub fn pagination(&self) -> ThreadPagination {
95        ThreadPagination::new(self.inner.clone())
96    }
97
98    /// Clear a thread, after a gappy sync for instance.
99    pub async fn clear(&mut self) -> Result<()> {
100        let mut state = self.inner.state.write().await?;
101
102        let updates_as_vector_diffs = state.reset().await?;
103
104        if !updates_as_vector_diffs.is_empty() {
105            let _ = state.state.sender.send(TimelineVectorDiffs {
106                diffs: updates_as_vector_diffs,
107                origin: EventsOrigin::Cache,
108            });
109        }
110
111        Ok(())
112    }
113
114    /// Push some live events to this thread, and propagate the updates to
115    /// the listeners.
116    pub async fn add_live_events(&mut self, events: Vec<Event>) -> Result<()> {
117        if events.is_empty() {
118            return Ok(());
119        }
120
121        let mut state = self.inner.state.write().await?;
122        let timeline_event_diffs = state.handle_sync(events).await?;
123
124        if !timeline_event_diffs.is_empty() {
125            let _ = state.state.sender.send(TimelineVectorDiffs {
126                diffs: timeline_event_diffs,
127                origin: EventsOrigin::Sync,
128            });
129        }
130
131        Ok(())
132    }
133
134    /// Remove an event from an thread event linked chunk, if it exists.
135    ///
136    /// If the event has been found and removed, then an update will be
137    /// propagated to observers.
138    pub(super) async fn remove_if_present(&mut self, event_id: &EventId) -> Result<()> {
139        let mut state = self.inner.state.write().await?;
140
141        let Some(position) = state.thread_linked_chunk().events().find_map(|(position, event)| {
142            (event.event_id().as_deref() == Some(event_id)).then_some(position)
143        }) else {
144            // Event not found in the linked chunk, nothing to do.
145            return Ok(());
146        };
147
148        if let Err(err) = state.remove_events(vec![(event_id.to_owned(), position)]).await {
149            error!(%err, "a thread linked chunk position was valid a few lines above, but invalid when deleting");
150            return Err(err);
151        }
152
153        let timeline_event_diffs = state.thread_linked_chunk_mut().updates_as_vector_diffs();
154
155        if !timeline_event_diffs.is_empty() {
156            let _ = state.state.sender.send(TimelineVectorDiffs {
157                diffs: timeline_event_diffs,
158                origin: EventsOrigin::Sync,
159            });
160        }
161
162        Ok(())
163    }
164
165    /// Returns the latest event ID in this thread, if any.
166    pub async fn latest_event_id(&self) -> Result<Option<OwnedEventId>> {
167        Ok(self
168            .inner
169            .state
170            .read()
171            .await?
172            .thread_linked_chunk()
173            .revents()
174            .next()
175            .and_then(|(_position, event)| event.event_id()))
176    }
177}