matrix_sdk_ui/timeline/
pinned_events_loader.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt::Formatter, sync::Arc};
16
17use futures_util::{StreamExt, stream};
18use matrix_sdk::{BoxFuture, Room, SendOutsideWasm, SyncOutsideWasm, config::RequestConfig};
19use matrix_sdk_base::deserialized_responses::TimelineEvent;
20use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, events::relation::RelationType};
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tracing::{debug, warn};
24
25/// Utility to load the pinned events in a room.
26pub struct PinnedEventsLoader {
27    /// Backend to load pinned events.
28    room: Arc<dyn PinnedEventsRoom>,
29
30    /// A list of pinned event ids we've observed previously.
31    ///
32    /// Starts as an empty vector and is updated when the list of pinned events
33    /// is updated.
34    previous_pinned_event_ids: Mutex<Vec<OwnedEventId>>,
35
36    /// Maximum number of pinned events to load (either from network or the
37    /// cache).
38    max_events_to_load: usize,
39
40    /// Number of requests to load pinned events that can run concurrently. This
41    /// is used to avoid overwhelming a home server with dozens or hundreds
42    /// of concurrent requests.
43    max_concurrent_requests: usize,
44}
45
46impl PinnedEventsLoader {
47    /// Creates a new `PinnedEventsLoader` instance.
48    pub fn new(
49        room: Arc<dyn PinnedEventsRoom>,
50        max_events_to_load: usize,
51        max_concurrent_requests: usize,
52    ) -> Self {
53        Self {
54            room,
55            max_events_to_load,
56            max_concurrent_requests,
57            previous_pinned_event_ids: Mutex::new(Vec::new()),
58        }
59    }
60
61    /// Loads the pinned events in this room, using the cache first and then
62    /// requesting the event from the homeserver if it couldn't be found.
63    /// This method will perform as many concurrent requests for events as
64    /// `max_concurrent_requests` allows, to avoid overwhelming the server.
65    ///
66    /// Returns `None` if the list of pinned events hasn't changed since the
67    /// previous time we loaded them. May return an error if there was an
68    /// issue fetching the full events.
69    pub async fn load_events(&self) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
70        let pinned_event_ids: Vec<OwnedEventId> = self
71            .room
72            .pinned_event_ids()
73            .unwrap_or_default()
74            .into_iter()
75            .rev()
76            .take(self.max_events_to_load)
77            .rev()
78            .collect();
79
80        // Check if the list of pinned events has changed since the last time.
81        if pinned_event_ids == *self.previous_pinned_event_ids.lock().await {
82            return Ok(None);
83        }
84
85        if pinned_event_ids.is_empty() {
86            *self.previous_pinned_event_ids.lock().await = Vec::new();
87            return Ok(Some(Vec::new()));
88        }
89
90        let request_config = Some(RequestConfig::default().retry_limit(3));
91
92        let mut loaded_events: Vec<TimelineEvent> =
93            stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| {
94                let provider = self.room.clone();
95                let relations_filter =
96                    Some(vec![RelationType::Annotation, RelationType::Replacement]);
97                async move {
98                    match provider
99                        .load_event_with_relations(&event_id, request_config, relations_filter)
100                        .await
101                    {
102                        Ok((event, related_events)) => {
103                            let mut events = vec![event];
104                            events.extend(related_events);
105                            Some(events)
106                        }
107                        Err(err) => {
108                            warn!("error when loading pinned event: {err}");
109                            None
110                        }
111                    }
112                }
113            }))
114            .buffer_unordered(self.max_concurrent_requests)
115            // Get only the `Some<Vec<_>>` results
116            .flat_map(stream::iter)
117            // Flatten the `Vec`s into a single one containing all their items
118            .flat_map(stream::iter)
119            .collect()
120            .await;
121
122        if loaded_events.is_empty() {
123            return Err(PinnedEventsLoaderError::TimelineReloadFailed);
124        }
125
126        // Sort using chronological ordering (oldest -> newest)
127        loaded_events.sort_by_key(|item| {
128            item.raw()
129                .deserialize()
130                .map(|e| e.origin_server_ts())
131                .unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now())
132        });
133
134        // We've successfully loaded *some* pinned events, so we can update the list of
135        // previously seen pinned events.
136        *self.previous_pinned_event_ids.lock().await = pinned_event_ids;
137
138        Ok(Some(loaded_events))
139    }
140}
141
142pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm {
143    /// Load a single room event using the cache or network and any events
144    /// related to it, if they are cached.
145    ///
146    /// You can control which types of related events are retrieved using
147    /// `related_event_filters`. A `None` value will retrieve any type of
148    /// related event.
149    fn load_event_with_relations<'a>(
150        &'a self,
151        event_id: &'a EventId,
152        request_config: Option<RequestConfig>,
153        related_event_filters: Option<Vec<RelationType>>,
154    ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), matrix_sdk::Error>>;
155
156    /// Get the pinned event ids for a room.
157    fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>>;
158
159    /// Checks whether an event id is pinned in this room.
160    ///
161    /// It avoids having to clone the whole list of event ids to check a single
162    /// value.
163    fn is_pinned_event(&self, event_id: &EventId) -> bool;
164}
165
166impl PinnedEventsRoom for Room {
167    fn load_event_with_relations<'a>(
168        &'a self,
169        event_id: &'a EventId,
170        request_config: Option<RequestConfig>,
171        related_event_filters: Option<Vec<RelationType>>,
172    ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), matrix_sdk::Error>> {
173        Box::pin(async move {
174            if let Ok((cache, _handles)) = self.event_cache().await
175                && let Some(ret) =
176                    cache.find_event_with_relations(event_id, related_event_filters).await
177            {
178                debug!("Loaded pinned event {event_id} and related events from cache");
179                return Ok(ret);
180            }
181
182            debug!("Loading pinned event {event_id} from HS");
183            self.event(event_id, request_config).await.map(|e| (e, Vec::new()))
184        })
185    }
186
187    fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>> {
188        self.clone_info().pinned_event_ids()
189    }
190
191    fn is_pinned_event(&self, event_id: &EventId) -> bool {
192        self.clone_info().is_pinned_event(event_id)
193    }
194}
195
196#[cfg(not(tarpaulin_include))]
197impl std::fmt::Debug for PinnedEventsLoader {
198    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199        f.debug_struct("PinnedEventsLoader")
200            .field("max_events_to_load", &self.max_events_to_load)
201            .finish()
202    }
203}
204
205/// Errors related to `PinnedEventsLoader` usage.
206#[derive(Error, Debug)]
207pub enum PinnedEventsLoaderError {
208    #[error("No event found for the given event id.")]
209    EventNotFound(OwnedEventId),
210
211    #[error("Timeline focus is not pinned events.")]
212    TimelineFocusNotPinnedEvents,
213
214    #[error("Could not load pinned events.")]
215    TimelineReloadFailed,
216}