matrix_sdk/event_cache/caches/pinned_events/
mod.rs1use std::{collections::BTreeSet, sync::Arc};
16
17use futures_util::{StreamExt as _, stream};
18use matrix_sdk_base::{
19 event_cache::{Event, store::EventCacheStoreLock},
20 linked_chunk::{LinkedChunkId, OwnedLinkedChunkId},
21 serde_helpers::extract_relation,
22 task_monitor::BackgroundTaskHandle,
23};
24use ruma::{
25 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
26 events::{
27 AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType, relation::RelationType,
28 },
29 room_version_rules::RedactionRules,
30 serde::Raw,
31};
32use tokio::sync::broadcast::{Receiver, Sender};
33use tracing::{debug, instrument, trace, warn};
34
35#[cfg(feature = "e2e-encryption")]
36use super::super::redecryptor::ResolvedUtd;
37use super::{
38 super::{EventCacheError, EventsOrigin, Result, persistence::send_updates_to_store},
39 event_linked_chunk::EventLinkedChunk,
40 lock,
41 room::RoomEventCacheLinkedChunkUpdate,
42};
43use crate::{
44 Room, client::WeakClient, config::RequestConfig, event_cache::TimelineVectorDiffs,
45 room::WeakRoom,
46};
47
48pub(in super::super) struct PinnedEventCacheState {
49 room_id: OwnedRoomId,
51
52 sender: Sender<TimelineVectorDiffs>,
54
55 chunk: EventLinkedChunk,
62
63 store: EventCacheStoreLock,
66
67 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
72}
73
74impl lock::Store for PinnedEventCacheState {
75 fn store(&self) -> &EventCacheStoreLock {
76 &self.store
77 }
78}
79
80#[cfg(not(tarpaulin_include))]
81impl std::fmt::Debug for PinnedEventCacheState {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 f.debug_struct("PinnedEventCacheState")
84 .field("room_id", &self.room_id)
85 .field("chunk", &self.chunk)
86 .finish_non_exhaustive()
87 }
88}
89
90pub type PinnedEventCacheStateLock = lock::StateLock<PinnedEventCacheState>;
95
96pub type PinnedEventCacheStateLockWriteGuard<'a> =
97 lock::StateLockWriteGuard<'a, PinnedEventCacheState>;
98
99impl<'a> lock::Reload for PinnedEventCacheStateLockWriteGuard<'a> {
100 async fn reload(&mut self) -> Result<()> {
101 self.reload_from_storage().await?;
102
103 Ok(())
104 }
105}
106
107impl<'a> PinnedEventCacheStateLockWriteGuard<'a> {
108 async fn reload_from_storage(&mut self) -> Result<()> {
111 let room_id = self.state.room_id.clone();
112 let linked_chunk_id = LinkedChunkId::PinnedEvents(&room_id);
113
114 let (last_chunk, chunk_id_gen) = self.store.load_last_chunk(linked_chunk_id).await?;
115
116 let Some(last_chunk) = last_chunk else {
117 if self.state.chunk.events().next().is_some() {
120 self.state.chunk.reset();
121 self.notify_subscribers(EventsOrigin::Sync);
122 }
123
124 return Ok(());
125 };
126
127 {
128 let mut current_chunk_identifier = last_chunk.identifier;
129 self.state.chunk.replace_with(Some(last_chunk), chunk_id_gen)?;
130
131 while let Some(previous_chunk) =
133 self.store.load_previous_chunk(linked_chunk_id, current_chunk_identifier).await?
134 {
135 current_chunk_identifier = previous_chunk.identifier;
136 self.state.chunk.insert_new_chunk_as_first(previous_chunk)?;
137 }
138 }
139
140 self.state.chunk.store_updates().take();
142
143 self.notify_subscribers(EventsOrigin::Cache);
145
146 Ok(())
147 }
148
149 async fn replace_all_events(&mut self, new_events: Vec<Event>) -> Result<()> {
150 trace!("resetting all pinned events in linked chunk");
151
152 let previous_pinned_event_ids = self.state.current_event_ids();
153
154 if new_events.iter().filter_map(|e| e.event_id()).collect::<BTreeSet<_>>()
155 == previous_pinned_event_ids.iter().cloned().collect()
156 {
157 return Ok(());
159 }
160
161 if self.state.chunk.events().next().is_some() {
162 self.state.chunk.reset();
163 }
164
165 self.state.chunk.push_live_events(None, &new_events);
166 self.propagate_changes().await?;
167 self.notify_subscribers(EventsOrigin::Sync);
168
169 Ok(())
170 }
171
172 async fn propagate_changes(&mut self) -> Result<()> {
175 let updates = self.state.chunk.store_updates().take();
176 let linked_chunk_id = OwnedLinkedChunkId::PinnedEvents(self.state.room_id.clone());
177 send_updates_to_store(
178 &self.store,
179 linked_chunk_id,
180 &self.state.linked_chunk_update_sender,
181 updates,
182 )
183 .await
184 }
185
186 fn notify_subscribers(&mut self, origin: EventsOrigin) {
188 let diffs = self.state.chunk.updates_as_vector_diffs();
189 if !diffs.is_empty() {
190 let _ = self.state.sender.send(TimelineVectorDiffs { diffs, origin });
191 }
192 }
193}
194
195impl PinnedEventCacheState {
196 fn current_event_ids(&self) -> Vec<OwnedEventId> {
198 self.chunk.events().filter_map(|(_position, event)| event.event_id()).collect()
199 }
200}
201
202#[derive(Clone)]
206pub struct PinnedEventCache {
207 state: Arc<PinnedEventCacheStateLock>,
208
209 _task: Arc<BackgroundTaskHandle>,
212}
213
214impl PinnedEventCache {
215 pub(in super::super) fn new(
217 room: Room,
218 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
219 store: EventCacheStoreLock,
220 ) -> Self {
221 let sender = Sender::new(32);
222
223 let room_id = room.room_id().to_owned();
224
225 let chunk = EventLinkedChunk::new();
226
227 let state =
228 PinnedEventCacheState { room_id, chunk, sender, linked_chunk_update_sender, store };
229 let state = Arc::new(PinnedEventCacheStateLock::new_inner(state));
230
231 let task = Arc::new(
232 room.client()
233 .task_monitor()
234 .spawn_infinite_task(
235 "pinned_event_listener_task",
236 Self::pinned_event_listener_task(room, state.clone()),
237 )
238 .abort_on_drop(),
239 );
240
241 Self { state, _task: task }
242 }
243
244 pub async fn subscribe(&self) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
246 let guard = self.state.read().await?;
247 let events = guard.state.chunk.events().map(|(_position, item)| item.clone()).collect();
248
249 let recv = guard.state.sender.subscribe();
250
251 Ok((events, recv))
252 }
253
254 #[cfg(feature = "e2e-encryption")]
258 pub(in crate::event_cache) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<()> {
259 let mut guard = self.state.write().await?;
260
261 if guard.state.chunk.replace_utds(events) {
262 guard.propagate_changes().await?;
263 guard.notify_subscribers(EventsOrigin::Cache);
264 }
265
266 Ok(())
267 }
268
269 fn extract_relation_target(raw: &Raw<AnySyncTimelineEvent>) -> Option<OwnedEventId> {
272 let (rel_type, event_id) = extract_relation(raw)?;
273
274 match rel_type {
276 RelationType::Thread => None,
277 _ => Some(event_id),
278 }
279 }
280
281 fn extract_redaction_target(
284 raw: &Raw<AnySyncTimelineEvent>,
285 room_redaction_rules: &RedactionRules,
286 ) -> Option<OwnedEventId> {
287 if raw.get_field::<MessageLikeEventType>("type").ok()??
290 != MessageLikeEventType::RoomRedaction
291 {
292 return None;
293 }
294
295 let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(redaction)) =
296 raw.deserialize().ok()?
297 else {
298 return None;
299 };
300
301 redaction.redacts(room_redaction_rules).map(ToOwned::to_owned).or_else(|| {
302 warn!("missing target event id from the redaction event");
303 None
304 })
305 }
306
307 pub(in super::super) async fn maybe_add_live_related_events(
310 &mut self,
311 events: &[Event],
312 room_redaction_rules: &RedactionRules,
313 ) -> Result<()> {
314 trace!("checking live events for relations to pinned events");
315 let mut guard = self.state.write().await?;
316
317 let pinned_event_ids: BTreeSet<OwnedEventId> =
318 guard.state.current_event_ids().into_iter().collect();
319
320 if pinned_event_ids.is_empty() {
321 return Ok(());
322 }
323
324 let mut new_relations = Vec::new();
325
326 for ev in events {
329 if let Some(relation_target) = Self::extract_relation_target(ev.raw())
331 && pinned_event_ids.contains(&relation_target)
332 {
333 new_relations.push(ev.clone());
334 continue;
335 }
336
337 if let Some(redaction_target) =
339 Self::extract_redaction_target(ev.raw(), room_redaction_rules)
340 && pinned_event_ids.contains(&redaction_target)
341 {
342 new_relations.push(ev.clone());
343 continue;
344 }
345 }
346
347 if !new_relations.is_empty() {
348 trace!("found {} new related events to pinned events", new_relations.len());
349
350 guard.state.chunk.push_live_events(None, &new_relations);
352
353 guard.propagate_changes().await?;
354 guard.notify_subscribers(EventsOrigin::Sync);
355 }
356
357 Ok(())
358 }
359
360 #[instrument(fields(%room_id = room.room_id()), skip(room, state))]
361 async fn pinned_event_listener_task(room: Room, state: Arc<PinnedEventCacheStateLock>) {
362 debug!("pinned events listener task started");
363
364 let reload_from_network = async |room: Room| {
365 let events = match Self::reload_pinned_events(room).await {
366 Ok(Some(events)) => events,
367 Ok(None) => Vec::new(),
368 Err(err) => {
369 warn!("error when loading pinned events: {err}");
370 return;
371 }
372 };
373
374 match state.write().await {
377 Ok(mut guard) => {
378 guard.replace_all_events(events).await.unwrap_or_else(|err| {
379 warn!("error when replacing pinned events: {err}");
380 });
381 }
382
383 Err(err) => {
384 warn!("error when acquiring write lock to replace pinned events: {err}");
385 }
386 }
387 };
388
389 match state.write().await {
391 Ok(mut guard) => {
392 guard.reload_from_storage().await.unwrap_or_else(|err| {
394 warn!("error when reloading pinned events from storage, at start: {err}");
395 });
396
397 let actual_pinned_events = room.pinned_event_ids().unwrap_or_default();
399 let reloaded_set =
400 guard.state.current_event_ids().into_iter().collect::<BTreeSet<_>>();
401
402 if actual_pinned_events.len() != reloaded_set.len()
403 || actual_pinned_events.iter().any(|event_id| !reloaded_set.contains(event_id))
404 {
405 drop(guard);
407 reload_from_network(room.clone()).await;
408 }
409 }
410
411 Err(err) => {
412 warn!("error when acquiring write lock to initialize pinned events: {err}");
413 }
414 }
415
416 let weak_room =
417 WeakRoom::new(WeakClient::from_client(&room.client()), room.room_id().to_owned());
418
419 let mut stream = room.pinned_event_ids_stream();
420
421 drop(room);
422
423 while let Some(new_list) = stream.next().await {
425 trace!("handling update");
426
427 let guard = match state.read().await {
428 Ok(guard) => guard,
429 Err(err) => {
430 warn!("error when acquiring read lock to handle pinned events update: {err}");
431 break;
432 }
433 };
434
435 let current_set = guard.state.current_event_ids().into_iter().collect::<BTreeSet<_>>();
437
438 if !new_list.is_empty()
439 && new_list.iter().all(|event_id| current_set.contains(event_id))
440 {
441 continue;
443 }
444
445 let Some(room) = weak_room.get() else {
446 debug!("room has been dropped, ending pinned events listener task");
447 break;
448 };
449
450 drop(guard);
451
452 reload_from_network(room).await;
454 }
455
456 debug!("pinned events listener task ended");
457 }
458
459 async fn reload_pinned_events(room: Room) -> Result<Option<Vec<Event>>> {
468 let (max_events_to_load, max_concurrent_requests) = {
469 let client = room.client();
470 let config = client.event_cache().config();
471 (config.max_pinned_events_to_load, config.max_pinned_events_concurrent_requests)
472 };
473
474 let pinned_event_ids: Vec<OwnedEventId> = room
475 .pinned_event_ids()
476 .unwrap_or_default()
477 .into_iter()
478 .rev()
479 .take(max_events_to_load)
480 .rev()
481 .collect();
482
483 if pinned_event_ids.is_empty() {
484 return Ok(Some(Vec::new()));
485 }
486
487 let mut num_successful_loads = 0;
488
489 let mut loaded_events: Vec<Event> =
490 stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| {
491 let room = room.clone();
492 let filter = vec![RelationType::Annotation, RelationType::Replacement];
493 let request_config = RequestConfig::default().retry_limit(3);
494
495 async move {
496 let (target, mut relations) = room
497 .load_or_fetch_event_with_relations(
498 &event_id,
499 Some(filter),
500 Some(request_config),
501 )
502 .await?;
503
504 relations.insert(0, target);
505 Ok::<_, crate::Error>(relations)
506 }
507 }))
508 .buffer_unordered(max_concurrent_requests)
509 .inspect(|result| {
511 if result.is_ok() {
512 num_successful_loads += 1;
513 }
514 })
515 .flat_map(stream::iter)
517 .flat_map(stream::iter)
519 .collect()
520 .await;
521
522 if num_successful_loads != pinned_event_ids.len() {
523 warn!(
524 "only successfully loaded {} out of {} pinned events",
525 num_successful_loads,
526 pinned_event_ids.len()
527 );
528 }
529
530 if loaded_events.is_empty() {
531 return Err(EventCacheError::UnableToLoadPinnedEvents);
534 }
535
536 loaded_events.sort_by_key(|item| {
541 item.raw()
542 .deserialize()
543 .map(|e| e.origin_server_ts())
544 .unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now())
545 });
546
547 Ok(Some(loaded_events))
548 }
549}