matrix_sdk_ui/timeline/
pinned_events_loader.rs1use std::{fmt::Formatter, sync::Arc};
16
17use futures_util::{StreamExt, stream};
18use matrix_sdk::{BoxFuture, Room, SendOutsideWasm, SyncOutsideWasm, config::RequestConfig};
19use matrix_sdk_base::deserialized_responses::TimelineEvent;
20use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, events::relation::RelationType};
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tracing::{debug, warn};
24
25pub struct PinnedEventsLoader {
27 room: Arc<dyn PinnedEventsRoom>,
29
30 previous_pinned_event_ids: Mutex<Vec<OwnedEventId>>,
35
36 max_events_to_load: usize,
39
40 max_concurrent_requests: usize,
44}
45
46impl PinnedEventsLoader {
47 pub fn new(
49 room: Arc<dyn PinnedEventsRoom>,
50 max_events_to_load: usize,
51 max_concurrent_requests: usize,
52 ) -> Self {
53 Self {
54 room,
55 max_events_to_load,
56 max_concurrent_requests,
57 previous_pinned_event_ids: Mutex::new(Vec::new()),
58 }
59 }
60
61 pub async fn load_events(&self) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
70 let pinned_event_ids: Vec<OwnedEventId> = self
71 .room
72 .pinned_event_ids()
73 .unwrap_or_default()
74 .into_iter()
75 .rev()
76 .take(self.max_events_to_load)
77 .rev()
78 .collect();
79
80 if pinned_event_ids == *self.previous_pinned_event_ids.lock().await {
82 return Ok(None);
83 }
84
85 if pinned_event_ids.is_empty() {
86 *self.previous_pinned_event_ids.lock().await = Vec::new();
87 return Ok(Some(Vec::new()));
88 }
89
90 let request_config = Some(RequestConfig::default().retry_limit(3));
91
92 let mut loaded_events: Vec<TimelineEvent> =
93 stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| {
94 let provider = self.room.clone();
95 let relations_filter =
96 Some(vec![RelationType::Annotation, RelationType::Replacement]);
97 async move {
98 match provider
99 .load_event_with_relations(&event_id, request_config, relations_filter)
100 .await
101 {
102 Ok((event, related_events)) => {
103 let mut events = vec![event];
104 events.extend(related_events);
105 Some(events)
106 }
107 Err(err) => {
108 warn!("error when loading pinned event: {err}");
109 None
110 }
111 }
112 }
113 }))
114 .buffer_unordered(self.max_concurrent_requests)
115 .flat_map(stream::iter)
117 .flat_map(stream::iter)
119 .collect()
120 .await;
121
122 if loaded_events.is_empty() {
123 return Err(PinnedEventsLoaderError::TimelineReloadFailed);
124 }
125
126 loaded_events.sort_by_key(|item| {
128 item.raw()
129 .deserialize()
130 .map(|e| e.origin_server_ts())
131 .unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now())
132 });
133
134 *self.previous_pinned_event_ids.lock().await = pinned_event_ids;
137
138 Ok(Some(loaded_events))
139 }
140}
141
142pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm {
143 fn load_event_with_relations<'a>(
150 &'a self,
151 event_id: &'a EventId,
152 request_config: Option<RequestConfig>,
153 related_event_filters: Option<Vec<RelationType>>,
154 ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), matrix_sdk::Error>>;
155
156 fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>>;
158
159 fn is_pinned_event(&self, event_id: &EventId) -> bool;
164}
165
166impl PinnedEventsRoom for Room {
167 fn load_event_with_relations<'a>(
168 &'a self,
169 event_id: &'a EventId,
170 request_config: Option<RequestConfig>,
171 related_event_filters: Option<Vec<RelationType>>,
172 ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), matrix_sdk::Error>> {
173 Box::pin(async move {
174 if let Ok((cache, _handles)) = self.event_cache().await
175 && let Some(ret) =
176 cache.find_event_with_relations(event_id, related_event_filters).await
177 {
178 debug!("Loaded pinned event {event_id} and related events from cache");
179 return Ok(ret);
180 }
181
182 debug!("Loading pinned event {event_id} from HS");
183 self.event(event_id, request_config).await.map(|e| (e, Vec::new()))
184 })
185 }
186
187 fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>> {
188 self.clone_info().pinned_event_ids()
189 }
190
191 fn is_pinned_event(&self, event_id: &EventId) -> bool {
192 self.clone_info().is_pinned_event(event_id)
193 }
194}
195
196#[cfg(not(tarpaulin_include))]
197impl std::fmt::Debug for PinnedEventsLoader {
198 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199 f.debug_struct("PinnedEventsLoader")
200 .field("max_events_to_load", &self.max_events_to_load)
201 .finish()
202 }
203}
204
205#[derive(Error, Debug)]
207pub enum PinnedEventsLoaderError {
208 #[error("No event found for the given event id.")]
209 EventNotFound(OwnedEventId),
210
211 #[error("Timeline focus is not pinned events.")]
212 TimelineFocusNotPinnedEvents,
213
214 #[error("Could not load pinned events.")]
215 TimelineReloadFailed,
216}