Skip to main content

matrix_sdk/event_cache/
persistence.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use matrix_sdk_base::{
16    deserialized_responses::TimelineEventKind,
17    event_cache::{Event, Gap, store::EventCacheStoreLockGuard},
18    executor::spawn,
19    linked_chunk::{OwnedLinkedChunkId, Update},
20};
21use ruma::serde::Raw;
22use tokio::sync::broadcast::Sender;
23use tracing::trace;
24
25use crate::event_cache::{Result, caches::room::RoomEventCacheLinkedChunkUpdate};
26
27/// Propagate linked chunk updates to the store and to the linked chunk update
28/// observers.
29pub(super) async fn send_updates_to_store(
30    store: &EventCacheStoreLockGuard,
31    linked_chunk_id: OwnedLinkedChunkId,
32    linked_chunk_update_sender: &Sender<RoomEventCacheLinkedChunkUpdate>,
33    mut updates: Vec<Update<Event, Gap>>,
34) -> Result<()> {
35    if updates.is_empty() {
36        return Ok(());
37    }
38
39    // Strip relations from updates which insert or replace items.
40    //
41    // The reason we're doing this, is that consumers of the event cache might look
42    // into bundled relations, and assume they're up to date. If we were to keep
43    // the relations in the events, when storing them, then it could be that
44    // they become outdated (as soon as a new relation comes over sync), so we'd
45    // need to update the bundled relations in this case, which would
46    // have a non-negligible cost, as we'd need to look up related events for each
47    // forwarded to a listener.
48    //
49    // As a result, we choose to strip bundled relations from events when we forward
50    // them to the store, and consumers have to explicitly ask for relations.
51    for update in updates.iter_mut() {
52        match update {
53            Update::PushItems { items, .. } => strip_relations_from_events(items),
54            Update::ReplaceItem { item, .. } => strip_relations_from_event(item),
55            // Other update kinds don't involve adding new events.
56            Update::NewItemsChunk { .. }
57            | Update::NewGapChunk { .. }
58            | Update::RemoveChunk(_)
59            | Update::RemoveItem { .. }
60            | Update::DetachLastItems { .. }
61            | Update::StartReattachItems
62            | Update::EndReattachItems
63            | Update::Clear => {}
64        }
65    }
66
67    // Spawn a task to make sure that all the changes are effectively forwarded to
68    // the store, even if the call to this method gets aborted.
69    //
70    // The store cross-process locking involves an actual mutex, which ensures that
71    // storing updates happens in the expected order.
72
73    let store = store.clone();
74    let cloned_updates = updates.clone();
75    let cloned_linked_chunk_id = linked_chunk_id.clone();
76
77    spawn(async move {
78        trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
79
80        store.handle_linked_chunk_updates(cloned_linked_chunk_id.as_ref(), cloned_updates).await?;
81        trace!("linked chunk updates applied");
82
83        Result::Ok(())
84    })
85    .await
86    .expect("joining failed")?;
87
88    // Forward that the store got updated to observers.
89    let _ = linked_chunk_update_sender
90        .send(RoomEventCacheLinkedChunkUpdate { linked_chunk_id, updates });
91
92    Ok(())
93}
94
95/// Strips the bundled relations from a collection of events.
96fn strip_relations_from_events(items: &mut [Event]) {
97    for ev in items.iter_mut() {
98        strip_relations_from_event(ev);
99    }
100}
101
102/// Strips the bundled relations from an event, if they were present.
103fn strip_relations_from_event(ev: &mut Event) {
104    match &mut ev.kind {
105        TimelineEventKind::Decrypted(decrypted) => {
106            // Remove all information about encryption info for
107            // the bundled events.
108            decrypted.unsigned_encryption_info = None;
109
110            // Remove the `unsigned`/`m.relations` field, if needs be.
111            strip_relations_if_present(&mut decrypted.event);
112        }
113
114        TimelineEventKind::UnableToDecrypt { event, .. }
115        | TimelineEventKind::PlainText { event } => {
116            strip_relations_if_present(event);
117        }
118    }
119}
120
121/// Removes the bundled relations from an event, if they were present.
122///
123/// Only replaces the present if it contained bundled relations.
124fn strip_relations_if_present<T>(event: &mut Raw<T>) {
125    // We're going to get rid of the `unsigned`/`m.relations` field, if it's
126    // present.
127    // Use a closure that returns an option so we can quickly short-circuit.
128    let mut closure = || -> Option<()> {
129        let mut val: serde_json::Value = event.deserialize_as().ok()?;
130        let unsigned = val.get_mut("unsigned")?;
131        let unsigned_obj = unsigned.as_object_mut()?;
132        if unsigned_obj.remove("m.relations").is_some() {
133            *event = Raw::new(&val).ok()?.cast_unchecked();
134        }
135        None
136    };
137    let _ = closure();
138}