matrix_sdk/latest_events/
room_latest_events.rs1use std::{collections::HashMap, sync::Arc};
16
17use async_once_cell::OnceCell;
18use matrix_sdk_base::RoomInfoNotableUpdateReasons;
19use ruma::{EventId, OwnedEventId};
20use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
21use tracing::error;
22
23use super::{
24 LatestEvent,
25 latest_event::{IsLatestEventValueNone, With},
26};
27use crate::{
28 event_cache::{EventCache, EventCacheError, RoomEventCache},
29 room::WeakRoom,
30 send_queue::RoomSendQueueUpdate,
31};
32
33#[derive(Debug)]
35pub(super) struct RoomLatestEvents {
36 state: Arc<RwLock<RoomLatestEventsState>>,
38}
39
40impl RoomLatestEvents {
41 pub fn new(
43 weak_room: WeakRoom,
44 event_cache: &EventCache,
45 ) -> With<Self, IsLatestEventValueNone> {
46 let latest_event_with = Self::create_latest_event(&weak_room, None);
47
48 With::map(latest_event_with, |for_the_room| Self {
49 state: Arc::new(RwLock::new(RoomLatestEventsState {
50 for_the_room,
51 per_thread: HashMap::new(),
52 weak_room,
53 event_cache: event_cache.clone(),
54 room_event_cache: OnceCell::new(),
55 })),
56 })
57 }
58
59 fn create_latest_event(
60 weak_room: &WeakRoom,
61 thread_id: Option<&EventId>,
62 ) -> With<LatestEvent, IsLatestEventValueNone> {
63 LatestEvent::new(weak_room, thread_id)
64 }
65
66 pub async fn read(&self) -> RoomLatestEventsReadGuard {
68 RoomLatestEventsReadGuard { inner: self.state.clone().read_owned().await }
69 }
70
71 pub async fn write(&self) -> RoomLatestEventsWriteGuard {
74 RoomLatestEventsWriteGuard { inner: self.state.clone().write_owned().await }
75 }
76}
77
78#[derive(Debug)]
80struct RoomLatestEventsState {
81 for_the_room: LatestEvent,
83
84 per_thread: HashMap<OwnedEventId, LatestEvent>,
86
87 event_cache: EventCache,
89
90 room_event_cache: OnceCell<RoomEventCache>,
92
93 weak_room: WeakRoom,
98}
99
100pub(super) struct RoomLatestEventsReadGuard {
102 inner: OwnedRwLockReadGuard<RoomLatestEventsState>,
103}
104
105impl RoomLatestEventsReadGuard {
106 pub fn for_room(&self) -> &LatestEvent {
108 &self.inner.for_the_room
109 }
110
111 pub fn for_thread(&self, thread_id: &EventId) -> Option<&LatestEvent> {
113 self.inner.per_thread.get(thread_id)
114 }
115
116 #[cfg(test)]
117 pub fn per_thread(&self) -> &HashMap<OwnedEventId, LatestEvent> {
118 &self.inner.per_thread
119 }
120}
121
122pub(super) struct RoomLatestEventsWriteGuard {
124 inner: OwnedRwLockWriteGuard<RoomLatestEventsState>,
125}
126
127impl RoomLatestEventsWriteGuard {
128 pub fn has_thread(&self, thread_id: &EventId) -> bool {
131 self.inner.per_thread.contains_key(thread_id)
132 }
133
134 pub fn create_and_insert_latest_event_for_thread(&mut self, thread_id: &EventId) {
137 let latest_event_with =
138 RoomLatestEvents::create_latest_event(&self.inner.weak_room, Some(thread_id));
139
140 self.inner.per_thread.insert(thread_id.to_owned(), With::inner(latest_event_with));
141 }
142
143 pub fn forget_thread(&mut self, thread_id: &EventId) {
145 self.inner.per_thread.remove(thread_id);
146 }
147
148 pub async fn update_with_event_cache(&mut self) {
151 let Some(room) = self.inner.weak_room.get() else {
157 error!(room = ?self.inner.weak_room, "Room is unknown");
159
160 return;
161 };
162 let own_user_id = room.own_user_id();
163 let power_levels = room.power_levels().await.ok();
164
165 let inner = &mut *self.inner;
166 let for_the_room = &mut inner.for_the_room;
167 let per_thread = &mut inner.per_thread;
168
169 let room_event_cache = match inner
171 .room_event_cache
172 .get_or_try_init(async {
173 let (room_event_cache, _drop_handles) =
176 inner.event_cache.for_room(room.room_id()).await?;
177
178 Ok::<RoomEventCache, EventCacheError>(room_event_cache)
179 })
180 .await
181 {
182 Ok(room_event_cache) => room_event_cache,
183 Err(err) => {
184 error!(room_id = ?room.room_id(), ?err, "Failed to fetch the `RoomEventCache`");
185 return;
186 }
187 };
188
189 for_the_room
190 .update_with_event_cache(room_event_cache, own_user_id, power_levels.as_ref())
191 .await;
192
193 for latest_event in per_thread.values_mut() {
194 latest_event
195 .update_with_event_cache(room_event_cache, own_user_id, power_levels.as_ref())
196 .await;
197 }
198 }
199
200 pub async fn update_with_send_queue(&mut self, send_queue_update: &RoomSendQueueUpdate) {
203 let Some(room) = self.inner.weak_room.get() else {
209 return;
211 };
212 let own_user_id = room.own_user_id();
213 let power_levels = room.power_levels().await.ok();
214
215 let inner = &mut *self.inner;
216 let for_the_room = &mut inner.for_the_room;
217 let per_thread = &mut inner.per_thread;
218
219 let room_event_cache = match inner
221 .room_event_cache
222 .get_or_try_init(async {
223 let (room_event_cache, _drop_handles) =
226 inner.event_cache.for_room(room.room_id()).await?;
227
228 Ok::<RoomEventCache, EventCacheError>(room_event_cache)
229 })
230 .await
231 {
232 Ok(room_event_cache) => room_event_cache,
233 Err(err) => {
234 error!(room_id = ?room.room_id(), ?err, "Failed to fetch the `RoomEventCache`");
235 return;
236 }
237 };
238
239 for_the_room
240 .update_with_send_queue(
241 send_queue_update,
242 room_event_cache,
243 own_user_id,
244 power_levels.as_ref(),
245 )
246 .await;
247
248 for latest_event in per_thread.values_mut() {
249 latest_event
250 .update_with_send_queue(
251 send_queue_update,
252 room_event_cache,
253 own_user_id,
254 power_levels.as_ref(),
255 )
256 .await;
257 }
258 }
259
260 pub async fn update_with_room_info(&mut self, reasons: RoomInfoNotableUpdateReasons) {
263 let Some(room) = self.inner.weak_room.get() else {
265 return;
267 };
268
269 self.inner.for_the_room.update_with_room_info(room, reasons).await;
270 }
271}