matrix_sdk/event_cache/caches/thread/
state.rs1use std::collections::BTreeSet;
16
17use eyeball_im::VectorDiff;
18use matrix_sdk_base::{
19 event_cache::{Event, Gap, store::EventCacheStoreLock},
20 linked_chunk::{OwnedLinkedChunkId, Position, Update},
21};
22use matrix_sdk_common::executor::spawn;
23use ruma::{OwnedEventId, OwnedRoomId};
24use tokio::sync::broadcast::Sender;
25use tracing::instrument;
26
27use super::super::{
28 super::{EventsOrigin, Result, deduplicator::DeduplicationOutcome},
29 TimelineVectorDiffs,
30 event_linked_chunk::EventLinkedChunk,
31 lock,
32 room::RoomEventCacheLinkedChunkUpdate,
33};
34
35pub struct ThreadEventCacheState {
36 #[allow(dead_code)] room_id: OwnedRoomId,
39
40 thread_id: OwnedEventId,
43
44 store: EventCacheStoreLock,
46
47 thread_linked_chunk: EventLinkedChunk,
49
50 pub sender: Sender<TimelineVectorDiffs>,
52
53 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
58
59 waited_for_initial_prev_token: bool,
64}
65
66impl lock::Store for ThreadEventCacheState {
67 fn store(&self) -> &EventCacheStoreLock {
68 &self.store
69 }
70}
71
72pub type LockedThreadEventCacheState = lock::StateLock<ThreadEventCacheState>;
77
78impl LockedThreadEventCacheState {
79 pub fn new(
90 room_id: OwnedRoomId,
91 thread_id: OwnedEventId,
92 store: EventCacheStoreLock,
93 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
94 ) -> Self {
95 Self::new_inner(ThreadEventCacheState {
96 room_id,
97 thread_id,
98 store,
99 thread_linked_chunk: EventLinkedChunk::new(),
100 sender: Sender::new(32),
101 linked_chunk_update_sender,
102 waited_for_initial_prev_token: false,
103 })
104 }
105}
106
107pub type ThreadEventCacheStateLockReadGuard<'a> =
111 lock::StateLockReadGuard<'a, ThreadEventCacheState>;
112
113pub type ThreadEventCacheStateLockWriteGuard<'a> =
117 lock::StateLockWriteGuard<'a, ThreadEventCacheState>;
118
119impl<'a> lock::Reload for ThreadEventCacheStateLockWriteGuard<'a> {
120 async fn reload(&mut self) -> Result<()> {
122 self.state.thread_linked_chunk.reset();
123
124 let diffs = self.state.thread_linked_chunk.updates_as_vector_diffs();
125
126 if !diffs.is_empty() {
127 let _ =
128 self.state.sender.send(TimelineVectorDiffs { diffs, origin: EventsOrigin::Cache });
129 }
130
131 Ok(())
132 }
133}
134
135impl<'a> ThreadEventCacheStateLockReadGuard<'a> {
136 pub fn thread_linked_chunk(&self) -> &EventLinkedChunk {
138 &self.state.thread_linked_chunk
139 }
140
141 pub fn waited_for_initial_prev_token(&self) -> bool {
143 self.state.waited_for_initial_prev_token
144 }
145}
146
147impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {
148 pub fn thread_linked_chunk(&self) -> &EventLinkedChunk {
150 &self.state.thread_linked_chunk
151 }
152
153 pub fn thread_linked_chunk_mut(&mut self) -> &mut EventLinkedChunk {
155 &mut self.state.thread_linked_chunk
156 }
157
158 pub fn waited_for_initial_prev_token_mut(&mut self) -> &mut bool {
160 &mut self.state.waited_for_initial_prev_token
161 }
162
163 pub async fn handle_sync(&mut self, events: Vec<Event>) -> Result<Vec<VectorDiff<Event>>> {
164 let deduplication = self.filter_duplicate_events(events);
165
166 if deduplication.non_empty_all_duplicates {
167 return Ok(Vec::new());
170 }
171
172 self.remove_events(deduplication.in_memory_duplicated_event_ids).await?;
174 assert!(
175 deduplication.in_store_duplicated_event_ids.is_empty(),
176 "persistent storage for threads is not implemented yet"
177 );
178
179 let events = deduplication.all_events;
180
181 self.state.thread_linked_chunk.push_live_events(None, &events);
182
183 self.propagate_changes().await?;
184
185 let timeline_event_diffs = self.state.thread_linked_chunk.updates_as_vector_diffs();
186
187 Ok(timeline_event_diffs)
188 }
189
190 pub async fn save_events(&mut self, events: impl IntoIterator<Item = Event>) -> Result<()> {
192 let store = self.store.clone();
193 let room_id = self.state.room_id.clone();
194 let events = events.into_iter().collect::<Vec<_>>();
195
196 spawn(async move {
198 for event in events {
199 store.save_event(&room_id, event).await?;
200 }
201
202 Result::Ok(())
203 })
204 .await
205 .expect("joining failed")?;
206
207 Ok(())
208 }
209
210 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>> {
216 self.reset_internal().await?;
217
218 let diff_updates = self.state.thread_linked_chunk.updates_as_vector_diffs();
219
220 debug_assert_eq!(diff_updates.len(), 1);
222 debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
223
224 Ok(diff_updates)
225 }
226
227 async fn reset_internal(&mut self) -> Result<()> {
228 self.state.thread_linked_chunk.reset();
229
230 self.propagate_changes().await?;
231
232 self.state.waited_for_initial_prev_token = false;
236
237 Ok(())
238 }
239
240 #[instrument(skip_all)]
247 pub async fn remove_events(
248 &mut self,
249 in_memory_events: Vec<(OwnedEventId, Position)>,
250 ) -> Result<()> {
251 if in_memory_events.is_empty() {
253 return Ok(());
255 }
256
257 self.state
259 .thread_linked_chunk
260 .remove_events_by_position(
261 in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
262 )
263 .expect("failed to remove an event");
264
265 self.propagate_changes().await
266 }
267
268 pub async fn propagate_changes(&mut self) -> Result<()> {
269 let updates = self.state.thread_linked_chunk.store_updates().take();
270
271 self.send_updates_to_store(updates).await
272 }
273
274 #[allow(clippy::unused_async)] async fn send_updates_to_store(&mut self, updates: Vec<Update<Event, Gap>>) -> Result<()> {
276 let linked_chunk_id =
278 OwnedLinkedChunkId::Thread(self.state.room_id.clone(), self.state.thread_id.clone());
279
280 let _ = self
281 .state
282 .linked_chunk_update_sender
283 .send(RoomEventCacheLinkedChunkUpdate { linked_chunk_id, updates });
284
285 Ok(())
286 }
287
288 pub fn filter_duplicate_events(&self, mut new_events: Vec<Event>) -> DeduplicationOutcome {
294 let mut new_event_ids = BTreeSet::new();
295
296 new_events.retain(|event| {
297 event.event_id().is_some_and(|event_id| new_event_ids.insert(event_id))
300 });
301
302 let in_memory_duplicated_event_ids: Vec<_> = self
303 .state
304 .thread_linked_chunk
305 .events()
306 .filter_map(|(position, event)| {
307 let event_id = event.event_id()?;
308 new_event_ids.contains(&event_id).then_some((event_id, position))
309 })
310 .collect();
311
312 let in_store_duplicated_event_ids = Vec::new();
314
315 let at_least_one_event = !new_events.is_empty();
316 let all_duplicates = (in_memory_duplicated_event_ids.len()
317 + in_store_duplicated_event_ids.len())
318 == new_events.len();
319 let non_empty_all_duplicates = at_least_one_event && all_duplicates;
320
321 DeduplicationOutcome {
322 all_events: new_events,
323 in_memory_duplicated_event_ids,
324 in_store_duplicated_event_ids,
325 non_empty_all_duplicates,
326 }
327 }
328}