matrix_sdk_ui/timeline/
pinned_events_loader.rs1use std::{fmt::Formatter, sync::Arc};
16
17use futures_util::{stream, StreamExt};
18use matrix_sdk::{
19 config::RequestConfig, event_cache::paginator::PaginatorError, BoxFuture, Room,
20 SendOutsideWasm, SyncOutsideWasm,
21};
22use matrix_sdk_base::deserialized_responses::TimelineEvent;
23use ruma::{events::relation::RelationType, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId};
24use thiserror::Error;
25use tokio::sync::Mutex;
26use tracing::{debug, warn};
27
28pub struct PinnedEventsLoader {
30 room: Arc<dyn PinnedEventsRoom>,
32
33 previous_pinned_event_ids: Mutex<Vec<OwnedEventId>>,
38
39 max_events_to_load: usize,
42
43 max_concurrent_requests: usize,
47}
48
49impl PinnedEventsLoader {
50 pub fn new(
52 room: Arc<dyn PinnedEventsRoom>,
53 max_events_to_load: usize,
54 max_concurrent_requests: usize,
55 ) -> Self {
56 Self {
57 room,
58 max_events_to_load,
59 max_concurrent_requests,
60 previous_pinned_event_ids: Mutex::new(Vec::new()),
61 }
62 }
63
64 pub async fn load_events(&self) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
73 let pinned_event_ids: Vec<OwnedEventId> = self
74 .room
75 .pinned_event_ids()
76 .unwrap_or_default()
77 .into_iter()
78 .rev()
79 .take(self.max_events_to_load)
80 .rev()
81 .collect();
82
83 if pinned_event_ids == *self.previous_pinned_event_ids.lock().await {
85 return Ok(None);
86 }
87
88 if pinned_event_ids.is_empty() {
89 *self.previous_pinned_event_ids.lock().await = Vec::new();
90 return Ok(Some(Vec::new()));
91 }
92
93 let request_config = Some(RequestConfig::default().retry_limit(3));
94
95 let mut loaded_events: Vec<TimelineEvent> =
96 stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| {
97 let provider = self.room.clone();
98 let relations_filter =
99 Some(vec![RelationType::Annotation, RelationType::Replacement]);
100 async move {
101 match provider
102 .load_event_with_relations(&event_id, request_config, relations_filter)
103 .await
104 {
105 Ok((event, related_events)) => {
106 let mut events = vec![event];
107 events.extend(related_events);
108 Some(events)
109 }
110 Err(err) => {
111 warn!("error when loading pinned event: {err}");
112 None
113 }
114 }
115 }
116 }))
117 .buffer_unordered(self.max_concurrent_requests)
118 .flat_map(stream::iter)
120 .flat_map(stream::iter)
122 .collect()
123 .await;
124
125 if loaded_events.is_empty() {
126 return Err(PinnedEventsLoaderError::TimelineReloadFailed);
127 }
128
129 loaded_events.sort_by_key(|item| {
131 item.raw()
132 .deserialize()
133 .map(|e| e.origin_server_ts())
134 .unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now())
135 });
136
137 tracing::warn!("loaded pinned events: {:#?}", loaded_events);
138
139 *self.previous_pinned_event_ids.lock().await = pinned_event_ids;
142
143 Ok(Some(loaded_events))
144 }
145}
146
147pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm {
148 fn load_event_with_relations<'a>(
155 &'a self,
156 event_id: &'a EventId,
157 request_config: Option<RequestConfig>,
158 related_event_filters: Option<Vec<RelationType>>,
159 ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), PaginatorError>>;
160
161 fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>>;
163
164 fn is_pinned_event(&self, event_id: &EventId) -> bool;
169}
170
171impl PinnedEventsRoom for Room {
172 fn load_event_with_relations<'a>(
173 &'a self,
174 event_id: &'a EventId,
175 request_config: Option<RequestConfig>,
176 related_event_filters: Option<Vec<RelationType>>,
177 ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), PaginatorError>> {
178 Box::pin(async move {
179 if let Ok((cache, _handles)) = self.event_cache().await {
180 if let Some(ret) = cache.event_with_relations(event_id, related_event_filters).await
181 {
182 debug!("Loaded pinned event {event_id} and related events from cache");
183 return Ok(ret);
184 }
185 }
186
187 debug!("Loading pinned event {event_id} from HS");
188 self.event(event_id, request_config)
189 .await
190 .map(|e| (e, Vec::new()))
191 .map_err(|err| PaginatorError::SdkError(Box::new(err)))
192 })
193 }
194
195 fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>> {
196 self.clone_info().pinned_event_ids()
197 }
198
199 fn is_pinned_event(&self, event_id: &EventId) -> bool {
200 self.clone_info().is_pinned_event(event_id)
201 }
202}
203
204#[cfg(not(tarpaulin_include))]
205impl std::fmt::Debug for PinnedEventsLoader {
206 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
207 f.debug_struct("PinnedEventsLoader")
208 .field("max_events_to_load", &self.max_events_to_load)
209 .finish()
210 }
211}
212
213#[derive(Error, Debug)]
215pub enum PinnedEventsLoaderError {
216 #[error("No event found for the given event id.")]
217 EventNotFound(OwnedEventId),
218
219 #[error("Timeline focus is not pinned events.")]
220 TimelineFocusNotPinnedEvents,
221
222 #[error("Could not load pinned events.")]
223 TimelineReloadFailed,
224}