Skip to main content

matrix_sdk/latest_events/
room_latest_events.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::HashMap, sync::Arc};
16
17use async_once_cell::OnceCell;
18use matrix_sdk_base::RoomInfoNotableUpdateReasons;
19use ruma::{EventId, OwnedEventId};
20use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
21use tracing::error;
22
23use super::{
24    LatestEvent,
25    latest_event::{IsLatestEventValueNone, With},
26};
27use crate::{
28    event_cache::{EventCache, EventCacheError, RoomEventCache},
29    room::WeakRoom,
30    send_queue::RoomSendQueueUpdate,
31};
32
33/// Type holding the [`LatestEvent`] for a room and for all its threads.
34#[derive(Debug)]
35pub(super) struct RoomLatestEvents {
36    /// The state of this type.
37    state: Arc<RwLock<RoomLatestEventsState>>,
38}
39
40impl RoomLatestEvents {
41    /// Create a new [`RoomLatestEvents`].
42    pub fn new(
43        weak_room: WeakRoom,
44        event_cache: &EventCache,
45    ) -> With<Self, IsLatestEventValueNone> {
46        let latest_event_with = Self::create_latest_event(&weak_room, None);
47
48        With::map(latest_event_with, |for_the_room| Self {
49            state: Arc::new(RwLock::new(RoomLatestEventsState {
50                for_the_room,
51                per_thread: HashMap::new(),
52                weak_room,
53                event_cache: event_cache.clone(),
54                room_event_cache: OnceCell::new(),
55            })),
56        })
57    }
58
59    fn create_latest_event(
60        weak_room: &WeakRoom,
61        thread_id: Option<&EventId>,
62    ) -> With<LatestEvent, IsLatestEventValueNone> {
63        LatestEvent::new(weak_room, thread_id)
64    }
65
66    /// Lock this type with shared read access, and return an owned lock guard.
67    pub async fn read(&self) -> RoomLatestEventsReadGuard {
68        RoomLatestEventsReadGuard { inner: self.state.clone().read_owned().await }
69    }
70
71    /// Lock this type with exclusive write access, and return an owned lock
72    /// guard.
73    pub async fn write(&self) -> RoomLatestEventsWriteGuard {
74        RoomLatestEventsWriteGuard { inner: self.state.clone().write_owned().await }
75    }
76}
77
78/// The state of [`RoomLatestEvents`].
79#[derive(Debug)]
80struct RoomLatestEventsState {
81    /// The latest event of the room.
82    for_the_room: LatestEvent,
83
84    /// The latest events for each thread.
85    per_thread: HashMap<OwnedEventId, LatestEvent>,
86
87    /// The event cache.
88    event_cache: EventCache,
89
90    /// The room event cache (lazily-loaded).
91    room_event_cache: OnceCell<RoomEventCache>,
92
93    /// The (weak) room.
94    ///
95    /// It used to to get the power-levels of the user for this room when
96    /// computing the latest events.
97    weak_room: WeakRoom,
98}
99
100/// The owned lock guard returned by [`RoomLatestEvents::read`].
101pub(super) struct RoomLatestEventsReadGuard {
102    inner: OwnedRwLockReadGuard<RoomLatestEventsState>,
103}
104
105impl RoomLatestEventsReadGuard {
106    /// Get the [`LatestEvent`] for the room.
107    pub fn for_room(&self) -> &LatestEvent {
108        &self.inner.for_the_room
109    }
110
111    /// Get the [`LatestEvent`] for the thread if it exists.
112    pub fn for_thread(&self, thread_id: &EventId) -> Option<&LatestEvent> {
113        self.inner.per_thread.get(thread_id)
114    }
115
116    #[cfg(test)]
117    pub fn per_thread(&self) -> &HashMap<OwnedEventId, LatestEvent> {
118        &self.inner.per_thread
119    }
120}
121
122/// The owned lock guard returned by [`RoomLatestEvents::write`].
123pub(super) struct RoomLatestEventsWriteGuard {
124    inner: OwnedRwLockWriteGuard<RoomLatestEventsState>,
125}
126
127impl RoomLatestEventsWriteGuard {
128    /// Check whether this [`RoomLatestEvents`] has a latest event for a
129    /// particular thread.
130    pub fn has_thread(&self, thread_id: &EventId) -> bool {
131        self.inner.per_thread.contains_key(thread_id)
132    }
133
134    /// Create the [`LatestEvent`] for thread `thread_id` and insert it in this
135    /// [`RoomLatestEvents`].
136    pub fn create_and_insert_latest_event_for_thread(&mut self, thread_id: &EventId) {
137        let latest_event_with =
138            RoomLatestEvents::create_latest_event(&self.inner.weak_room, Some(thread_id));
139
140        self.inner.per_thread.insert(thread_id.to_owned(), With::inner(latest_event_with));
141    }
142
143    /// Forget the thread `thread_id`.
144    pub fn forget_thread(&mut self, thread_id: &EventId) {
145        self.inner.per_thread.remove(thread_id);
146    }
147
148    /// Update the latest events for the room and its threads, based on the
149    /// event cache data.
150    pub async fn update_with_event_cache(&mut self) {
151        // Get the power levels of the user for the current room if the `WeakRoom` is
152        // still valid.
153        //
154        // Get it once for all the updates of all the latest events for this room (be
155        // the room and its threads).
156        let Some(room) = self.inner.weak_room.get() else {
157            // No room? Let's stop the update.
158            error!(room = ?self.inner.weak_room, "Room is unknown");
159
160            return;
161        };
162        let own_user_id = room.own_user_id();
163        let power_levels = room.power_levels().await.ok();
164
165        let inner = &mut *self.inner;
166        let for_the_room = &mut inner.for_the_room;
167        let per_thread = &mut inner.per_thread;
168
169        // Lazy-load the `RoomEventCache`.
170        let room_event_cache = match inner
171            .room_event_cache
172            .get_or_try_init(async {
173                // It's fine to drop the `EventCacheDropHandles` here as the caller
174                // (`LatestEventState`) owns a clone of the `EventCache`.
175                let (room_event_cache, _drop_handles) =
176                    inner.event_cache.for_room(room.room_id()).await?;
177
178                Ok::<RoomEventCache, EventCacheError>(room_event_cache)
179            })
180            .await
181        {
182            Ok(room_event_cache) => room_event_cache,
183            Err(err) => {
184                error!(room_id = ?room.room_id(), ?err, "Failed to fetch the `RoomEventCache`");
185                return;
186            }
187        };
188
189        for_the_room
190            .update_with_event_cache(room_event_cache, own_user_id, power_levels.as_ref())
191            .await;
192
193        for latest_event in per_thread.values_mut() {
194            latest_event
195                .update_with_event_cache(room_event_cache, own_user_id, power_levels.as_ref())
196                .await;
197        }
198    }
199
200    /// Update the latest events for the room and its threads, based on the
201    /// send queue update.
202    pub async fn update_with_send_queue(&mut self, send_queue_update: &RoomSendQueueUpdate) {
203        // Get the power levels of the user for the current room if the `WeakRoom` is
204        // still valid.
205        //
206        // Get it once for all the updates of all the latest events for this room (be
207        // the room and its threads).
208        let Some(room) = self.inner.weak_room.get() else {
209            // No room? Let's stop the update.
210            return;
211        };
212        let own_user_id = room.own_user_id();
213        let power_levels = room.power_levels().await.ok();
214
215        let inner = &mut *self.inner;
216        let for_the_room = &mut inner.for_the_room;
217        let per_thread = &mut inner.per_thread;
218
219        // Lazy-load the `RoomEventCache`.
220        let room_event_cache = match inner
221            .room_event_cache
222            .get_or_try_init(async {
223                // It's fine to drop the `EventCacheDropHandles` here as the caller
224                // (`LatestEventState`) owns a clone of the `EventCache`.
225                let (room_event_cache, _drop_handles) =
226                    inner.event_cache.for_room(room.room_id()).await?;
227
228                Ok::<RoomEventCache, EventCacheError>(room_event_cache)
229            })
230            .await
231        {
232            Ok(room_event_cache) => room_event_cache,
233            Err(err) => {
234                error!(room_id = ?room.room_id(), ?err, "Failed to fetch the `RoomEventCache`");
235                return;
236            }
237        };
238
239        for_the_room
240            .update_with_send_queue(
241                send_queue_update,
242                room_event_cache,
243                own_user_id,
244                power_levels.as_ref(),
245            )
246            .await;
247
248        for latest_event in per_thread.values_mut() {
249            latest_event
250                .update_with_send_queue(
251                    send_queue_update,
252                    room_event_cache,
253                    own_user_id,
254                    power_levels.as_ref(),
255                )
256                .await;
257        }
258    }
259
260    /// Update the latest events for the room and its threads, based on the room
261    /// info.
262    pub async fn update_with_room_info(&mut self, reasons: RoomInfoNotableUpdateReasons) {
263        // Get the state of the current room if the `WeakRoom` is still valid.
264        let Some(room) = self.inner.weak_room.get() else {
265            // No room? Let's stop the update.
266            return;
267        };
268
269        self.inner.for_the_room.update_with_room_info(room, reasons).await;
270    }
271}