matrix_sdk/event_cache/caches/thread/
mod.rs1pub mod pagination;
18mod state;
19
20use std::{fmt, sync::Arc};
21
22use matrix_sdk_base::event_cache::{Event, store::EventCacheStoreLock};
23use ruma::{EventId, OwnedEventId, OwnedRoomId};
24pub(super) use state::LockedThreadEventCacheState;
25use tokio::sync::broadcast::{Receiver, Sender};
26use tracing::error;
27
28use self::pagination::ThreadPagination;
29use super::{
30 super::Result, EventsOrigin, TimelineVectorDiffs, room::RoomEventCacheLinkedChunkUpdate,
31};
32use crate::room::WeakRoom;
33
34pub(super) struct ThreadEventCache {
36 inner: Arc<ThreadEventCacheInner>,
37}
38
39struct ThreadEventCacheInner {
41 thread_id: OwnedEventId,
43
44 weak_room: WeakRoom,
46
47 state: LockedThreadEventCacheState,
49}
50
51impl fmt::Debug for ThreadEventCache {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 f.debug_struct("ThreadEventCache").finish_non_exhaustive()
54 }
55}
56
57impl ThreadEventCache {
58 pub fn new(
60 room_id: OwnedRoomId,
61 thread_id: OwnedEventId,
62 weak_room: WeakRoom,
63 store: EventCacheStoreLock,
64 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
65 ) -> Self {
66 Self {
67 inner: Arc::new(ThreadEventCacheInner {
68 thread_id: thread_id.clone(),
69 weak_room,
70 state: LockedThreadEventCacheState::new(
71 room_id,
72 thread_id,
73 store,
74 linked_chunk_update_sender,
75 ),
76 }),
77 }
78 }
79
80 pub async fn subscribe(&self) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
82 let state = self.inner.state.read().await?;
83
84 let events =
85 state.thread_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
86
87 let recv = state.state.sender.subscribe();
88
89 Ok((events, recv))
90 }
91
92 pub fn pagination(&self) -> ThreadPagination {
95 ThreadPagination::new(self.inner.clone())
96 }
97
98 pub async fn clear(&mut self) -> Result<()> {
100 let mut state = self.inner.state.write().await?;
101
102 let updates_as_vector_diffs = state.reset().await?;
103
104 if !updates_as_vector_diffs.is_empty() {
105 let _ = state.state.sender.send(TimelineVectorDiffs {
106 diffs: updates_as_vector_diffs,
107 origin: EventsOrigin::Cache,
108 });
109 }
110
111 Ok(())
112 }
113
114 pub async fn add_live_events(&mut self, events: Vec<Event>) -> Result<()> {
117 if events.is_empty() {
118 return Ok(());
119 }
120
121 let mut state = self.inner.state.write().await?;
122 let timeline_event_diffs = state.handle_sync(events).await?;
123
124 if !timeline_event_diffs.is_empty() {
125 let _ = state.state.sender.send(TimelineVectorDiffs {
126 diffs: timeline_event_diffs,
127 origin: EventsOrigin::Sync,
128 });
129 }
130
131 Ok(())
132 }
133
134 pub(super) async fn remove_if_present(&mut self, event_id: &EventId) -> Result<()> {
139 let mut state = self.inner.state.write().await?;
140
141 let Some(position) = state.thread_linked_chunk().events().find_map(|(position, event)| {
142 (event.event_id().as_deref() == Some(event_id)).then_some(position)
143 }) else {
144 return Ok(());
146 };
147
148 if let Err(err) = state.remove_events(vec![(event_id.to_owned(), position)]).await {
149 error!(%err, "a thread linked chunk position was valid a few lines above, but invalid when deleting");
150 return Err(err);
151 }
152
153 let timeline_event_diffs = state.thread_linked_chunk_mut().updates_as_vector_diffs();
154
155 if !timeline_event_diffs.is_empty() {
156 let _ = state.state.sender.send(TimelineVectorDiffs {
157 diffs: timeline_event_diffs,
158 origin: EventsOrigin::Sync,
159 });
160 }
161
162 Ok(())
163 }
164
165 pub async fn latest_event_id(&self) -> Result<Option<OwnedEventId>> {
167 Ok(self
168 .inner
169 .state
170 .read()
171 .await?
172 .thread_linked_chunk()
173 .revents()
174 .next()
175 .and_then(|(_position, event)| event.event_id()))
176 }
177}