matrix_sdk_ui/timeline/
pinned_events_loader.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt::Formatter, sync::Arc};
16
17use futures_util::{stream, StreamExt};
18use matrix_sdk::{
19    config::RequestConfig, event_cache::paginator::PaginatorError, BoxFuture, Room,
20    SendOutsideWasm, SyncOutsideWasm,
21};
22use matrix_sdk_base::deserialized_responses::TimelineEvent;
23use ruma::{events::relation::RelationType, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId};
24use thiserror::Error;
25use tokio::sync::Mutex;
26use tracing::{debug, warn};
27
28/// Utility to load the pinned events in a room.
29pub struct PinnedEventsLoader {
30    /// Backend to load pinned events.
31    room: Arc<dyn PinnedEventsRoom>,
32
33    /// A list of pinned event ids we've observed previously.
34    ///
35    /// Starts as an empty vector and is updated when the list of pinned events
36    /// is updated.
37    previous_pinned_event_ids: Mutex<Vec<OwnedEventId>>,
38
39    /// Maximum number of pinned events to load (either from network or the
40    /// cache).
41    max_events_to_load: usize,
42
43    /// Number of requests to load pinned events that can run concurrently. This
44    /// is used to avoid overwhelming a home server with dozens or hundreds
45    /// of concurrent requests.
46    max_concurrent_requests: usize,
47}
48
49impl PinnedEventsLoader {
50    /// Creates a new `PinnedEventsLoader` instance.
51    pub fn new(
52        room: Arc<dyn PinnedEventsRoom>,
53        max_events_to_load: usize,
54        max_concurrent_requests: usize,
55    ) -> Self {
56        Self {
57            room,
58            max_events_to_load,
59            max_concurrent_requests,
60            previous_pinned_event_ids: Mutex::new(Vec::new()),
61        }
62    }
63
64    /// Loads the pinned events in this room, using the cache first and then
65    /// requesting the event from the homeserver if it couldn't be found.
66    /// This method will perform as many concurrent requests for events as
67    /// `max_concurrent_requests` allows, to avoid overwhelming the server.
68    ///
69    /// Returns `None` if the list of pinned events hasn't changed since the
70    /// previous time we loaded them. May return an error if there was an
71    /// issue fetching the full events.
72    pub async fn load_events(&self) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
73        let pinned_event_ids: Vec<OwnedEventId> = self
74            .room
75            .pinned_event_ids()
76            .unwrap_or_default()
77            .into_iter()
78            .rev()
79            .take(self.max_events_to_load)
80            .rev()
81            .collect();
82
83        // Check if the list of pinned events has changed since the last time.
84        if pinned_event_ids == *self.previous_pinned_event_ids.lock().await {
85            return Ok(None);
86        }
87
88        if pinned_event_ids.is_empty() {
89            *self.previous_pinned_event_ids.lock().await = Vec::new();
90            return Ok(Some(Vec::new()));
91        }
92
93        let request_config = Some(RequestConfig::default().retry_limit(3));
94
95        let mut loaded_events: Vec<TimelineEvent> =
96            stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| {
97                let provider = self.room.clone();
98                let relations_filter =
99                    Some(vec![RelationType::Annotation, RelationType::Replacement]);
100                async move {
101                    match provider
102                        .load_event_with_relations(&event_id, request_config, relations_filter)
103                        .await
104                    {
105                        Ok((event, related_events)) => {
106                            let mut events = vec![event];
107                            events.extend(related_events);
108                            Some(events)
109                        }
110                        Err(err) => {
111                            warn!("error when loading pinned event: {err}");
112                            None
113                        }
114                    }
115                }
116            }))
117            .buffer_unordered(self.max_concurrent_requests)
118            // Get only the `Some<Vec<_>>` results
119            .flat_map(stream::iter)
120            // Flatten the `Vec`s into a single one containing all their items
121            .flat_map(stream::iter)
122            .collect()
123            .await;
124
125        if loaded_events.is_empty() {
126            return Err(PinnedEventsLoaderError::TimelineReloadFailed);
127        }
128
129        // Sort using chronological ordering (oldest -> newest)
130        loaded_events.sort_by_key(|item| {
131            item.raw()
132                .deserialize()
133                .map(|e| e.origin_server_ts())
134                .unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now())
135        });
136
137        tracing::warn!("loaded pinned events: {:#?}", loaded_events);
138
139        // We've successfully loaded *some* pinned events, so we can update the list of
140        // previously seen pinned events.
141        *self.previous_pinned_event_ids.lock().await = pinned_event_ids;
142
143        Ok(Some(loaded_events))
144    }
145}
146
147pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm {
148    /// Load a single room event using the cache or network and any events
149    /// related to it, if they are cached.
150    ///
151    /// You can control which types of related events are retrieved using
152    /// `related_event_filters`. A `None` value will retrieve any type of
153    /// related event.
154    fn load_event_with_relations<'a>(
155        &'a self,
156        event_id: &'a EventId,
157        request_config: Option<RequestConfig>,
158        related_event_filters: Option<Vec<RelationType>>,
159    ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), PaginatorError>>;
160
161    /// Get the pinned event ids for a room.
162    fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>>;
163
164    /// Checks whether an event id is pinned in this room.
165    ///
166    /// It avoids having to clone the whole list of event ids to check a single
167    /// value.
168    fn is_pinned_event(&self, event_id: &EventId) -> bool;
169}
170
171impl PinnedEventsRoom for Room {
172    fn load_event_with_relations<'a>(
173        &'a self,
174        event_id: &'a EventId,
175        request_config: Option<RequestConfig>,
176        related_event_filters: Option<Vec<RelationType>>,
177    ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), PaginatorError>> {
178        Box::pin(async move {
179            if let Ok((cache, _handles)) = self.event_cache().await {
180                if let Some(ret) = cache.event_with_relations(event_id, related_event_filters).await
181                {
182                    debug!("Loaded pinned event {event_id} and related events from cache");
183                    return Ok(ret);
184                }
185            }
186
187            debug!("Loading pinned event {event_id} from HS");
188            self.event(event_id, request_config)
189                .await
190                .map(|e| (e, Vec::new()))
191                .map_err(|err| PaginatorError::SdkError(Box::new(err)))
192        })
193    }
194
195    fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>> {
196        self.clone_info().pinned_event_ids()
197    }
198
199    fn is_pinned_event(&self, event_id: &EventId) -> bool {
200        self.clone_info().is_pinned_event(event_id)
201    }
202}
203
204#[cfg(not(tarpaulin_include))]
205impl std::fmt::Debug for PinnedEventsLoader {
206    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
207        f.debug_struct("PinnedEventsLoader")
208            .field("max_events_to_load", &self.max_events_to_load)
209            .finish()
210    }
211}
212
213/// Errors related to `PinnedEventsLoader` usage.
214#[derive(Error, Debug)]
215pub enum PinnedEventsLoaderError {
216    #[error("No event found for the given event id.")]
217    EventNotFound(OwnedEventId),
218
219    #[error("Timeline focus is not pinned events.")]
220    TimelineFocusNotPinnedEvents,
221
222    #[error("Could not load pinned events.")]
223    TimelineReloadFailed,
224}