Skip to main content

matrix_sdk/event_cache/caches/pinned_events/
mod.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, sync::Arc};
16
17use futures_util::{StreamExt as _, stream};
18use matrix_sdk_base::{
19    event_cache::{Event, store::EventCacheStoreLock},
20    linked_chunk::{LinkedChunkId, OwnedLinkedChunkId},
21    serde_helpers::extract_relation,
22    task_monitor::BackgroundTaskHandle,
23};
24use ruma::{
25    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
26    events::{
27        AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType, relation::RelationType,
28    },
29    room_version_rules::RedactionRules,
30    serde::Raw,
31};
32use tokio::sync::broadcast::{Receiver, Sender};
33use tracing::{debug, instrument, trace, warn};
34
35#[cfg(feature = "e2e-encryption")]
36use super::super::redecryptor::ResolvedUtd;
37use super::{
38    super::{EventCacheError, EventsOrigin, Result, persistence::send_updates_to_store},
39    event_linked_chunk::EventLinkedChunk,
40    lock,
41    room::RoomEventCacheLinkedChunkUpdate,
42};
43use crate::{
44    Room, client::WeakClient, config::RequestConfig, event_cache::TimelineVectorDiffs,
45    room::WeakRoom,
46};
47
48pub(in super::super) struct PinnedEventCacheState {
49    /// The ID of the room owning this list of pinned events.
50    room_id: OwnedRoomId,
51
52    /// A sender for live events updates in this room's pinned events list.
53    sender: Sender<TimelineVectorDiffs>,
54
55    /// The linked chunk representing this room's pinned events.
56    ///
57    /// This linked chunk also contains related events. The events are sorted in
58    /// the chronological order (oldest to newest), since it would be otherwise
59    /// impossible to order them correctly, given that we fetch their
60    /// relations over time.
61    chunk: EventLinkedChunk,
62
63    /// Reference to the underlying backing store.
64    // TODO: can be removed?
65    store: EventCacheStoreLock,
66
67    /// A sender for the globally observable linked chunk updates that happened
68    /// during a sync or a back-pagination.
69    ///
70    /// See also [`super::super::EventCacheInner::linked_chunk_update_sender`].
71    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
72}
73
74impl lock::Store for PinnedEventCacheState {
75    fn store(&self) -> &EventCacheStoreLock {
76        &self.store
77    }
78}
79
80#[cfg(not(tarpaulin_include))]
81impl std::fmt::Debug for PinnedEventCacheState {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        f.debug_struct("PinnedEventCacheState")
84            .field("room_id", &self.room_id)
85            .field("chunk", &self.chunk)
86            .finish_non_exhaustive()
87    }
88}
89
90/// State for pinned events of a room's event cache.
91///
92/// This contains all the inner mutable states that ought to be updated at
93/// the same time.
94pub type PinnedEventCacheStateLock = lock::StateLock<PinnedEventCacheState>;
95
96pub type PinnedEventCacheStateLockWriteGuard<'a> =
97    lock::StateLockWriteGuard<'a, PinnedEventCacheState>;
98
99impl<'a> lock::Reload for PinnedEventCacheStateLockWriteGuard<'a> {
100    async fn reload(&mut self) -> Result<()> {
101        self.reload_from_storage().await?;
102
103        Ok(())
104    }
105}
106
107impl<'a> PinnedEventCacheStateLockWriteGuard<'a> {
108    /// Reload all the pinned events from storage, replacing the current linked
109    /// chunk.
110    async fn reload_from_storage(&mut self) -> Result<()> {
111        let room_id = self.state.room_id.clone();
112        let linked_chunk_id = LinkedChunkId::PinnedEvents(&room_id);
113
114        let (last_chunk, chunk_id_gen) = self.store.load_last_chunk(linked_chunk_id).await?;
115
116        let Some(last_chunk) = last_chunk else {
117            // No pinned events stored, make sure the in-memory linked chunk is sync'd (i.e.
118            // empty), and return.
119            if self.state.chunk.events().next().is_some() {
120                self.state.chunk.reset();
121                self.notify_subscribers(EventsOrigin::Sync);
122            }
123
124            return Ok(());
125        };
126
127        {
128            let mut current_chunk_identifier = last_chunk.identifier;
129            self.state.chunk.replace_with(Some(last_chunk), chunk_id_gen)?;
130
131            // Reload the entire chunk.
132            while let Some(previous_chunk) =
133                self.store.load_previous_chunk(linked_chunk_id, current_chunk_identifier).await?
134            {
135                current_chunk_identifier = previous_chunk.identifier;
136                self.state.chunk.insert_new_chunk_as_first(previous_chunk)?;
137            }
138        }
139
140        // Empty store updates, since we just reloaded from storage.
141        self.state.chunk.store_updates().take();
142
143        // Let observers know about it.
144        self.notify_subscribers(EventsOrigin::Cache);
145
146        Ok(())
147    }
148
149    async fn replace_all_events(&mut self, new_events: Vec<Event>) -> Result<()> {
150        trace!("resetting all pinned events in linked chunk");
151
152        let previous_pinned_event_ids = self.state.current_event_ids();
153
154        if new_events.iter().filter_map(|e| e.event_id()).collect::<BTreeSet<_>>()
155            == previous_pinned_event_ids.iter().cloned().collect()
156        {
157            // No change in the list of pinned events.
158            return Ok(());
159        }
160
161        if self.state.chunk.events().next().is_some() {
162            self.state.chunk.reset();
163        }
164
165        self.state.chunk.push_live_events(None, &new_events);
166        self.propagate_changes().await?;
167        self.notify_subscribers(EventsOrigin::Sync);
168
169        Ok(())
170    }
171
172    /// Propagate the changes in this linked chunk to observers, and save the
173    /// changes on disk.
174    async fn propagate_changes(&mut self) -> Result<()> {
175        let updates = self.state.chunk.store_updates().take();
176        let linked_chunk_id = OwnedLinkedChunkId::PinnedEvents(self.state.room_id.clone());
177        send_updates_to_store(
178            &self.store,
179            linked_chunk_id,
180            &self.state.linked_chunk_update_sender,
181            updates,
182        )
183        .await
184    }
185
186    /// Notify subscribers of timeline updates.
187    fn notify_subscribers(&mut self, origin: EventsOrigin) {
188        let diffs = self.state.chunk.updates_as_vector_diffs();
189        if !diffs.is_empty() {
190            let _ = self.state.sender.send(TimelineVectorDiffs { diffs, origin });
191        }
192    }
193}
194
195impl PinnedEventCacheState {
196    /// Return a list of the current event IDs in this linked chunk.
197    fn current_event_ids(&self) -> Vec<OwnedEventId> {
198        self.chunk.events().filter_map(|(_position, event)| event.event_id()).collect()
199    }
200}
201
202/// All the information related to a room's pinned events cache.
203///
204/// This is cheap to clone, because it's a shallow data type.
205#[derive(Clone)]
206pub struct PinnedEventCache {
207    state: Arc<PinnedEventCacheStateLock>,
208
209    /// The task handling the refreshing of pinned events for this specific
210    /// room.
211    _task: Arc<BackgroundTaskHandle>,
212}
213
214impl PinnedEventCache {
215    /// Creates a new [`PinnedEventCache`] for the given room.
216    pub(in super::super) fn new(
217        room: Room,
218        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
219        store: EventCacheStoreLock,
220    ) -> Self {
221        let sender = Sender::new(32);
222
223        let room_id = room.room_id().to_owned();
224
225        let chunk = EventLinkedChunk::new();
226
227        let state =
228            PinnedEventCacheState { room_id, chunk, sender, linked_chunk_update_sender, store };
229        let state = Arc::new(PinnedEventCacheStateLock::new_inner(state));
230
231        let task = Arc::new(
232            room.client()
233                .task_monitor()
234                .spawn_infinite_task(
235                    "pinned_event_listener_task",
236                    Self::pinned_event_listener_task(room, state.clone()),
237                )
238                .abort_on_drop(),
239        );
240
241        Self { state, _task: task }
242    }
243
244    /// Subscribe to live events from this room's pinned events cache.
245    pub async fn subscribe(&self) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
246        let guard = self.state.read().await?;
247        let events = guard.state.chunk.events().map(|(_position, item)| item.clone()).collect();
248
249        let recv = guard.state.sender.subscribe();
250
251        Ok((events, recv))
252    }
253
254    /// Try to locate the events in the linked chunk corresponding to the given
255    /// list of decrypted events, and replace them, while alerting observers
256    /// about the update.
257    #[cfg(feature = "e2e-encryption")]
258    pub(in crate::event_cache) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<()> {
259        let mut guard = self.state.write().await?;
260
261        if guard.state.chunk.replace_utds(events) {
262            guard.propagate_changes().await?;
263            guard.notify_subscribers(EventsOrigin::Cache);
264        }
265
266        Ok(())
267    }
268
269    /// Given a raw event, try to extract the target event ID of a relation as
270    /// defined with `m.relates_to`.
271    fn extract_relation_target(raw: &Raw<AnySyncTimelineEvent>) -> Option<OwnedEventId> {
272        let (rel_type, event_id) = extract_relation(raw)?;
273
274        // Don't include thread responses in the pinned event chunk.
275        match rel_type {
276            RelationType::Thread => None,
277            _ => Some(event_id),
278        }
279    }
280
281    /// Given a raw event, try to extract the target event ID of a live
282    /// redaction.
283    fn extract_redaction_target(
284        raw: &Raw<AnySyncTimelineEvent>,
285        room_redaction_rules: &RedactionRules,
286    ) -> Option<OwnedEventId> {
287        // Try to find a redaction, but do not deserialize the entire event if we aren't
288        // certain it's a `m.room.redaction`.
289        if raw.get_field::<MessageLikeEventType>("type").ok()??
290            != MessageLikeEventType::RoomRedaction
291        {
292            return None;
293        }
294
295        let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(redaction)) =
296            raw.deserialize().ok()?
297        else {
298            return None;
299        };
300
301        redaction.redacts(room_redaction_rules).map(ToOwned::to_owned).or_else(|| {
302            warn!("missing target event id from the redaction event");
303            None
304        })
305    }
306
307    /// Check if any of the given events relate to an event in the pinned events
308    /// linked chunk, and append it, in this case.
309    pub(in super::super) async fn maybe_add_live_related_events(
310        &mut self,
311        events: &[Event],
312        room_redaction_rules: &RedactionRules,
313    ) -> Result<()> {
314        trace!("checking live events for relations to pinned events");
315        let mut guard = self.state.write().await?;
316
317        let pinned_event_ids: BTreeSet<OwnedEventId> =
318            guard.state.current_event_ids().into_iter().collect();
319
320        if pinned_event_ids.is_empty() {
321            return Ok(());
322        }
323
324        let mut new_relations = Vec::new();
325
326        // For all events that relate to an event in the pinned events chunk, push this
327        // event to the linked chunk, and propagate changes to observers.
328        for ev in events {
329            // Try to find a regular relation in ev.
330            if let Some(relation_target) = Self::extract_relation_target(ev.raw())
331                && pinned_event_ids.contains(&relation_target)
332            {
333                new_relations.push(ev.clone());
334                continue;
335            }
336
337            // Try to find a redaction in ev.
338            if let Some(redaction_target) =
339                Self::extract_redaction_target(ev.raw(), room_redaction_rules)
340                && pinned_event_ids.contains(&redaction_target)
341            {
342                new_relations.push(ev.clone());
343                continue;
344            }
345        }
346
347        if !new_relations.is_empty() {
348            trace!("found {} new related events to pinned events", new_relations.len());
349
350            // We've found new relations; append them to the linked chunk.
351            guard.state.chunk.push_live_events(None, &new_relations);
352
353            guard.propagate_changes().await?;
354            guard.notify_subscribers(EventsOrigin::Sync);
355        }
356
357        Ok(())
358    }
359
360    #[instrument(fields(%room_id = room.room_id()), skip(room, state))]
361    async fn pinned_event_listener_task(room: Room, state: Arc<PinnedEventCacheStateLock>) {
362        debug!("pinned events listener task started");
363
364        let reload_from_network = async |room: Room| {
365            let events = match Self::reload_pinned_events(room).await {
366                Ok(Some(events)) => events,
367                Ok(None) => Vec::new(),
368                Err(err) => {
369                    warn!("error when loading pinned events: {err}");
370                    return;
371                }
372            };
373
374            // Replace the whole linked chunk with those new events, and propagate updates
375            // to the observers.
376            match state.write().await {
377                Ok(mut guard) => {
378                    guard.replace_all_events(events).await.unwrap_or_else(|err| {
379                        warn!("error when replacing pinned events: {err}");
380                    });
381                }
382
383                Err(err) => {
384                    warn!("error when acquiring write lock to replace pinned events: {err}");
385                }
386            }
387        };
388
389        // Reload the pinned events from the storage first.
390        match state.write().await {
391            Ok(mut guard) => {
392                // On startup, reload the pinned events from storage.
393                guard.reload_from_storage().await.unwrap_or_else(|err| {
394                    warn!("error when reloading pinned events from storage, at start: {err}");
395                });
396
397                // Compare the initial list of pinned events to the one in the linked chunk.
398                let actual_pinned_events = room.pinned_event_ids().unwrap_or_default();
399                let reloaded_set =
400                    guard.state.current_event_ids().into_iter().collect::<BTreeSet<_>>();
401
402                if actual_pinned_events.len() != reloaded_set.len()
403                    || actual_pinned_events.iter().any(|event_id| !reloaded_set.contains(event_id))
404                {
405                    // Reload the list of pinned events from network.
406                    drop(guard);
407                    reload_from_network(room.clone()).await;
408                }
409            }
410
411            Err(err) => {
412                warn!("error when acquiring write lock to initialize pinned events: {err}");
413            }
414        }
415
416        let weak_room =
417            WeakRoom::new(WeakClient::from_client(&room.client()), room.room_id().to_owned());
418
419        let mut stream = room.pinned_event_ids_stream();
420
421        drop(room);
422
423        // Whenever the list of pinned events changes, reload it.
424        while let Some(new_list) = stream.next().await {
425            trace!("handling update");
426
427            let guard = match state.read().await {
428                Ok(guard) => guard,
429                Err(err) => {
430                    warn!("error when acquiring read lock to handle pinned events update: {err}");
431                    break;
432                }
433            };
434
435            // Compare to the current linked chunk.
436            let current_set = guard.state.current_event_ids().into_iter().collect::<BTreeSet<_>>();
437
438            if !new_list.is_empty()
439                && new_list.iter().all(|event_id| current_set.contains(event_id))
440            {
441                // All the events in the pinned list are the same, don't reload.
442                continue;
443            }
444
445            let Some(room) = weak_room.get() else {
446                debug!("room has been dropped, ending pinned events listener task");
447                break;
448            };
449
450            drop(guard);
451
452            // Event IDs differ, so reload all the pinned events.
453            reload_from_network(room).await;
454        }
455
456        debug!("pinned events listener task ended");
457    }
458
459    /// Loads the pinned events in this room, using the cache first and then
460    /// requesting the event from the homeserver if it couldn't be found.
461    /// This method will perform as many concurrent requests for events as
462    /// `max_concurrent_requests` allows, to avoid overwhelming the server.
463    ///
464    /// Returns `None` if the list of pinned events hasn't changed since the
465    /// previous time we loaded them. May return an error if there was an
466    /// issue fetching the full events.
467    async fn reload_pinned_events(room: Room) -> Result<Option<Vec<Event>>> {
468        let (max_events_to_load, max_concurrent_requests) = {
469            let client = room.client();
470            let config = client.event_cache().config();
471            (config.max_pinned_events_to_load, config.max_pinned_events_concurrent_requests)
472        };
473
474        let pinned_event_ids: Vec<OwnedEventId> = room
475            .pinned_event_ids()
476            .unwrap_or_default()
477            .into_iter()
478            .rev()
479            .take(max_events_to_load)
480            .rev()
481            .collect();
482
483        if pinned_event_ids.is_empty() {
484            return Ok(Some(Vec::new()));
485        }
486
487        let mut num_successful_loads = 0;
488
489        let mut loaded_events: Vec<Event> =
490            stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| {
491                let room = room.clone();
492                let filter = vec![RelationType::Annotation, RelationType::Replacement];
493                let request_config = RequestConfig::default().retry_limit(3);
494
495                async move {
496                    let (target, mut relations) = room
497                        .load_or_fetch_event_with_relations(
498                            &event_id,
499                            Some(filter),
500                            Some(request_config),
501                        )
502                        .await?;
503
504                    relations.insert(0, target);
505                    Ok::<_, crate::Error>(relations)
506                }
507            }))
508            .buffer_unordered(max_concurrent_requests)
509            // Count successful queries.
510            .inspect(|result| {
511                if result.is_ok() {
512                    num_successful_loads += 1;
513                }
514            })
515            // Get rid of error results.
516            .flat_map(stream::iter)
517            // Flatten the list of `Vec<Event>` into a list of `Event`.
518            .flat_map(stream::iter)
519            .collect()
520            .await;
521
522        if num_successful_loads != pinned_event_ids.len() {
523            warn!(
524                "only successfully loaded {} out of {} pinned events",
525                num_successful_loads,
526                pinned_event_ids.len()
527            );
528        }
529
530        if loaded_events.is_empty() {
531            // If the list of loaded events is empty, we ran into an error to load *all* the
532            // pinned events, which needs to be reported to the caller.
533            return Err(EventCacheError::UnableToLoadPinnedEvents);
534        }
535
536        // Since we have all the events and their related events, we can't nicely sort
537        // them, since we've lost all ordering information from using /event or
538        // /relations. Resort to sorting using chronological ordering (oldest ->
539        // newest).
540        loaded_events.sort_by_key(|item| {
541            item.raw()
542                .deserialize()
543                .map(|e| e.origin_server_ts())
544                .unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now())
545        });
546
547        Ok(Some(loaded_events))
548    }
549}