matrix_sdk/latest_events/
room_latest_events.rs1use std::{collections::HashMap, sync::Arc};
16
17use ruma::{EventId, OwnedEventId};
18use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
19
20use super::{LatestEvent, LatestEventsError};
21use crate::{
22 event_cache::{EventCache, EventCacheError, RoomEventCache},
23 room::WeakRoom,
24 send_queue::RoomSendQueueUpdate,
25};
26
27#[derive(Debug)]
29pub(super) struct RoomLatestEvents {
30 state: Arc<RwLock<RoomLatestEventsState>>,
32}
33
34impl RoomLatestEvents {
35 pub async fn new(
37 weak_room: WeakRoom,
38 event_cache: &EventCache,
39 ) -> Result<Option<Self>, LatestEventsError> {
40 let room_id = weak_room.room_id();
41 let room_event_cache = match event_cache.for_room(room_id).await {
42 Ok((room_event_cache, _drop_handles)) => room_event_cache,
45 Err(EventCacheError::RoomNotFound { .. }) => return Ok(None),
46 Err(err) => return Err(LatestEventsError::EventCache(err)),
47 };
48
49 Ok(Some(Self {
50 state: Arc::new(RwLock::new(RoomLatestEventsState {
51 for_the_room: Self::create_latest_event_for_inner(
52 &weak_room,
53 None,
54 &room_event_cache,
55 )
56 .await,
57 per_thread: HashMap::new(),
58 weak_room,
59 room_event_cache,
60 })),
61 }))
62 }
63
64 async fn create_latest_event_for_inner(
65 weak_room: &WeakRoom,
66 thread_id: Option<&EventId>,
67 room_event_cache: &RoomEventCache,
68 ) -> LatestEvent {
69 LatestEvent::new(weak_room, thread_id, room_event_cache).await
70 }
71
72 pub async fn read(&self) -> RoomLatestEventsReadGuard {
74 RoomLatestEventsReadGuard { inner: self.state.clone().read_owned().await }
75 }
76
77 pub async fn write(&self) -> RoomLatestEventsWriteGuard {
80 RoomLatestEventsWriteGuard { inner: self.state.clone().write_owned().await }
81 }
82}
83
84#[derive(Debug)]
86struct RoomLatestEventsState {
87 for_the_room: LatestEvent,
89
90 per_thread: HashMap<OwnedEventId, LatestEvent>,
92
93 room_event_cache: RoomEventCache,
95
96 weak_room: WeakRoom,
101}
102
103pub(super) struct RoomLatestEventsReadGuard {
105 inner: OwnedRwLockReadGuard<RoomLatestEventsState>,
106}
107
108impl RoomLatestEventsReadGuard {
109 pub fn for_room(&self) -> &LatestEvent {
111 &self.inner.for_the_room
112 }
113
114 pub fn for_thread(&self, thread_id: &EventId) -> Option<&LatestEvent> {
116 self.inner.per_thread.get(thread_id)
117 }
118
119 #[cfg(test)]
120 pub fn per_thread(&self) -> &HashMap<OwnedEventId, LatestEvent> {
121 &self.inner.per_thread
122 }
123}
124
125pub(super) struct RoomLatestEventsWriteGuard {
127 inner: OwnedRwLockWriteGuard<RoomLatestEventsState>,
128}
129
130impl RoomLatestEventsWriteGuard {
131 async fn create_latest_event_for(&self, thread_id: Option<&EventId>) -> LatestEvent {
132 RoomLatestEvents::create_latest_event_for_inner(
133 &self.inner.weak_room,
134 thread_id,
135 &self.inner.room_event_cache,
136 )
137 .await
138 }
139
140 pub fn has_thread(&self, thread_id: &EventId) -> bool {
143 self.inner.per_thread.contains_key(thread_id)
144 }
145
146 pub async fn create_and_insert_latest_event_for_thread(&mut self, thread_id: &EventId) {
149 let latest_event = self.create_latest_event_for(Some(thread_id)).await;
150
151 self.inner.per_thread.insert(thread_id.to_owned(), latest_event);
152 }
153
154 pub fn forget_thread(&mut self, thread_id: &EventId) {
156 self.inner.per_thread.remove(thread_id);
157 }
158
159 pub async fn update_with_event_cache(&mut self) {
162 let room = self.inner.weak_room.get();
168 let (own_user_id, power_levels) = match &room {
169 Some(room) => {
170 let power_levels = room.power_levels().await.ok();
171
172 (Some(room.own_user_id()), power_levels)
173 }
174
175 None => (None, None),
176 };
177
178 let inner = &mut *self.inner;
179 let for_the_room = &mut inner.for_the_room;
180 let per_thread = &mut inner.per_thread;
181 let room_event_cache = &inner.room_event_cache;
182
183 for_the_room
184 .update_with_event_cache(room_event_cache, own_user_id, power_levels.as_ref())
185 .await;
186
187 for latest_event in per_thread.values_mut() {
188 latest_event
189 .update_with_event_cache(room_event_cache, own_user_id, power_levels.as_ref())
190 .await;
191 }
192 }
193
194 pub async fn update_with_send_queue(&mut self, send_queue_update: &RoomSendQueueUpdate) {
197 let room = self.inner.weak_room.get();
203 let (own_user_id, power_levels) = match &room {
204 Some(room) => {
205 let power_levels = room.power_levels().await.ok();
206
207 (Some(room.own_user_id()), power_levels)
208 }
209
210 None => (None, None),
211 };
212
213 let inner = &mut *self.inner;
214 let for_the_room = &mut inner.for_the_room;
215 let per_thread = &mut inner.per_thread;
216 let room_event_cache = &inner.room_event_cache;
217
218 for_the_room
219 .update_with_send_queue(
220 send_queue_update,
221 room_event_cache,
222 own_user_id,
223 power_levels.as_ref(),
224 )
225 .await;
226
227 for latest_event in per_thread.values_mut() {
228 latest_event
229 .update_with_send_queue(
230 send_queue_update,
231 room_event_cache,
232 own_user_id,
233 power_levels.as_ref(),
234 )
235 .await;
236 }
237 }
238}