matrix_sdk/latest_events/
room_latest_events.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::HashMap, sync::Arc};
16
17use ruma::{EventId, OwnedEventId};
18use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
19
20use super::{LatestEvent, LatestEventsError};
21use crate::{
22    event_cache::{EventCache, EventCacheError, RoomEventCache},
23    room::WeakRoom,
24    send_queue::RoomSendQueueUpdate,
25};
26
27/// Type holding the [`LatestEvent`] for a room and for all its threads.
28#[derive(Debug)]
29pub(super) struct RoomLatestEvents {
30    /// The state of this type.
31    state: Arc<RwLock<RoomLatestEventsState>>,
32}
33
34impl RoomLatestEvents {
35    /// Create a new [`RoomLatestEvents`].
36    pub async fn new(
37        weak_room: WeakRoom,
38        event_cache: &EventCache,
39    ) -> Result<Option<Self>, LatestEventsError> {
40        let room_id = weak_room.room_id();
41        let room_event_cache = match event_cache.for_room(room_id).await {
42            // It's fine to drop the `EventCacheDropHandles` here as the caller
43            // (`LatestEventState`) owns a clone of the `EventCache`.
44            Ok((room_event_cache, _drop_handles)) => room_event_cache,
45            Err(EventCacheError::RoomNotFound { .. }) => return Ok(None),
46            Err(err) => return Err(LatestEventsError::EventCache(err)),
47        };
48
49        Ok(Some(Self {
50            state: Arc::new(RwLock::new(RoomLatestEventsState {
51                for_the_room: Self::create_latest_event_for_inner(
52                    &weak_room,
53                    None,
54                    &room_event_cache,
55                )
56                .await,
57                per_thread: HashMap::new(),
58                weak_room,
59                room_event_cache,
60            })),
61        }))
62    }
63
64    async fn create_latest_event_for_inner(
65        weak_room: &WeakRoom,
66        thread_id: Option<&EventId>,
67        room_event_cache: &RoomEventCache,
68    ) -> LatestEvent {
69        LatestEvent::new(weak_room, thread_id, room_event_cache).await
70    }
71
72    /// Lock this type with shared read access, and return an owned lock guard.
73    pub async fn read(&self) -> RoomLatestEventsReadGuard {
74        RoomLatestEventsReadGuard { inner: self.state.clone().read_owned().await }
75    }
76
77    /// Lock this type with exclusive write access, and return an owned lock
78    /// guard.
79    pub async fn write(&self) -> RoomLatestEventsWriteGuard {
80        RoomLatestEventsWriteGuard { inner: self.state.clone().write_owned().await }
81    }
82}
83
84/// The state of [`RoomLatestEvents`].
85#[derive(Debug)]
86struct RoomLatestEventsState {
87    /// The latest event of the room.
88    for_the_room: LatestEvent,
89
90    /// The latest events for each thread.
91    per_thread: HashMap<OwnedEventId, LatestEvent>,
92
93    /// The room event cache associated to this room.
94    room_event_cache: RoomEventCache,
95
96    /// The (weak) room.
97    ///
98    /// It used to to get the power-levels of the user for this room when
99    /// computing the latest events.
100    weak_room: WeakRoom,
101}
102
103/// The owned lock guard returned by [`RoomLatestEvents::read`].
104pub(super) struct RoomLatestEventsReadGuard {
105    inner: OwnedRwLockReadGuard<RoomLatestEventsState>,
106}
107
108impl RoomLatestEventsReadGuard {
109    /// Get the [`LatestEvent`] for the room.
110    pub fn for_room(&self) -> &LatestEvent {
111        &self.inner.for_the_room
112    }
113
114    /// Get the [`LatestEvent`] for the thread if it exists.
115    pub fn for_thread(&self, thread_id: &EventId) -> Option<&LatestEvent> {
116        self.inner.per_thread.get(thread_id)
117    }
118
119    #[cfg(test)]
120    pub fn per_thread(&self) -> &HashMap<OwnedEventId, LatestEvent> {
121        &self.inner.per_thread
122    }
123}
124
125/// The owned lock guard returned by [`RoomLatestEvents::write`].
126pub(super) struct RoomLatestEventsWriteGuard {
127    inner: OwnedRwLockWriteGuard<RoomLatestEventsState>,
128}
129
130impl RoomLatestEventsWriteGuard {
131    async fn create_latest_event_for(&self, thread_id: Option<&EventId>) -> LatestEvent {
132        RoomLatestEvents::create_latest_event_for_inner(
133            &self.inner.weak_room,
134            thread_id,
135            &self.inner.room_event_cache,
136        )
137        .await
138    }
139
140    /// Check whether this [`RoomLatestEvents`] has a latest event for a
141    /// particular thread.
142    pub fn has_thread(&self, thread_id: &EventId) -> bool {
143        self.inner.per_thread.contains_key(thread_id)
144    }
145
146    /// Create the [`LatestEvent`] for thread `thread_id` and insert it in this
147    /// [`RoomLatestEvents`].
148    pub async fn create_and_insert_latest_event_for_thread(&mut self, thread_id: &EventId) {
149        let latest_event = self.create_latest_event_for(Some(thread_id)).await;
150
151        self.inner.per_thread.insert(thread_id.to_owned(), latest_event);
152    }
153
154    /// Forget the thread `thread_id`.
155    pub fn forget_thread(&mut self, thread_id: &EventId) {
156        self.inner.per_thread.remove(thread_id);
157    }
158
159    /// Update the latest events for the room and its threads, based on the
160    /// event cache data.
161    pub async fn update_with_event_cache(&mut self) {
162        // Get the power levels of the user for the current room if the `WeakRoom` is
163        // still valid.
164        //
165        // Get it once for all the updates of all the latest events for this room (be
166        // the room and its threads).
167        let room = self.inner.weak_room.get();
168        let (own_user_id, power_levels) = match &room {
169            Some(room) => {
170                let power_levels = room.power_levels().await.ok();
171
172                (Some(room.own_user_id()), power_levels)
173            }
174
175            None => (None, None),
176        };
177
178        let inner = &mut *self.inner;
179        let for_the_room = &mut inner.for_the_room;
180        let per_thread = &mut inner.per_thread;
181        let room_event_cache = &inner.room_event_cache;
182
183        for_the_room
184            .update_with_event_cache(room_event_cache, own_user_id, power_levels.as_ref())
185            .await;
186
187        for latest_event in per_thread.values_mut() {
188            latest_event
189                .update_with_event_cache(room_event_cache, own_user_id, power_levels.as_ref())
190                .await;
191        }
192    }
193
194    /// Update the latest events for the room and its threads, based on the
195    /// send queue update.
196    pub async fn update_with_send_queue(&mut self, send_queue_update: &RoomSendQueueUpdate) {
197        // Get the power levels of the user for the current room if the `WeakRoom` is
198        // still valid.
199        //
200        // Get it once for all the updates of all the latest events for this room (be
201        // the room and its threads).
202        let room = self.inner.weak_room.get();
203        let (own_user_id, power_levels) = match &room {
204            Some(room) => {
205                let power_levels = room.power_levels().await.ok();
206
207                (Some(room.own_user_id()), power_levels)
208            }
209
210            None => (None, None),
211        };
212
213        let inner = &mut *self.inner;
214        let for_the_room = &mut inner.for_the_room;
215        let per_thread = &mut inner.per_thread;
216        let room_event_cache = &inner.room_event_cache;
217
218        for_the_room
219            .update_with_send_queue(
220                send_queue_update,
221                room_event_cache,
222                own_user_id,
223                power_levels.as_ref(),
224            )
225            .await;
226
227        for latest_event in per_thread.values_mut() {
228            latest_event
229                .update_with_send_queue(
230                    send_queue_update,
231                    room_event_cache,
232                    own_user_id,
233                    power_levels.as_ref(),
234                )
235                .await;
236        }
237    }
238}