matrix_sdk_crypto/store/
crypto_store_wrapper.rs

1use std::{future, ops::Deref, sync::Arc};
2
3use futures_core::Stream;
4use futures_util::StreamExt;
5use matrix_sdk_common::store_locks::CrossProcessStoreLock;
6use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, UserId};
7use tokio::sync::{broadcast, Mutex};
8use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
9use tracing::{debug, trace, warn};
10
11use super::{
12    caches::SessionStore, types::RoomKeyBundleInfo, DeviceChanges, IdentityChanges,
13    LockableCryptoStore,
14};
15use crate::{
16    olm::InboundGroupSession,
17    store,
18    store::{Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo, RoomKeyWithheldInfo},
19    CryptoStoreError, GossippedSecret, OwnUserIdentityData, Session, UserIdentityData,
20};
21
22/// A wrapper for crypto store implementations that adds update notifiers.
23///
24/// This is shared between [`StoreInner`] and
25/// [`crate::verification::VerificationStore`].
26#[derive(Debug)]
27pub(crate) struct CryptoStoreWrapper {
28    user_id: OwnedUserId,
29    device_id: OwnedDeviceId,
30
31    store: Arc<DynCryptoStore>,
32
33    /// A cache for the Olm Sessions.
34    sessions: SessionStore,
35
36    /// The sender side of a broadcast stream that is notified whenever we get
37    /// an update to an inbound group session.
38    room_keys_received_sender: broadcast::Sender<Vec<RoomKeyInfo>>,
39
40    /// The sender side of a broadcast stream that is notified whenever we
41    /// receive an `m.room_key.withheld` message.
42    room_keys_withheld_received_sender: broadcast::Sender<Vec<RoomKeyWithheldInfo>>,
43
44    /// The sender side of a broadcast channel which sends out secrets we
45    /// received as a `m.secret.send` event.
46    secrets_broadcaster: broadcast::Sender<GossippedSecret>,
47
48    /// The sender side of a broadcast channel which sends out devices and user
49    /// identities which got updated or newly created.
50    identities_broadcaster:
51        broadcast::Sender<(Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)>,
52
53    /// The sender side of a broadcast channel which sends out information about
54    /// historic room key bundles we have received.
55    historic_room_key_bundles_broadcaster: broadcast::Sender<RoomKeyBundleInfo>,
56}
57
58impl CryptoStoreWrapper {
59    pub(crate) fn new(user_id: &UserId, device_id: &DeviceId, store: impl IntoCryptoStore) -> Self {
60        let room_keys_received_sender = broadcast::Sender::new(10);
61        let room_keys_withheld_received_sender = broadcast::Sender::new(10);
62        let secrets_broadcaster = broadcast::Sender::new(10);
63        // The identities broadcaster is responsible for user identities as well as
64        // devices, that's why we increase the capacity here.
65        let identities_broadcaster = broadcast::Sender::new(20);
66        let historic_room_key_bundles_broadcaster = broadcast::Sender::new(10);
67
68        Self {
69            user_id: user_id.to_owned(),
70            device_id: device_id.to_owned(),
71            store: store.into_crypto_store(),
72            sessions: SessionStore::new(),
73            room_keys_received_sender,
74            room_keys_withheld_received_sender,
75            secrets_broadcaster,
76            identities_broadcaster,
77            historic_room_key_bundles_broadcaster,
78        }
79    }
80
81    /// Save the set of changes to the store.
82    ///
83    /// Also responsible for sending updates to the broadcast streams such as
84    /// `room_keys_received_sender` and `secrets_broadcaster`.
85    ///
86    /// # Arguments
87    ///
88    /// * `changes` - The set of changes that should be stored.
89    pub async fn save_changes(&self, changes: Changes) -> store::Result<()> {
90        let room_key_updates: Vec<_> =
91            changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();
92
93        let withheld_session_updates: Vec<_> = changes
94            .withheld_session_info
95            .iter()
96            .flat_map(|(room_id, session_map)| {
97                session_map.iter().map(|(session_id, withheld_event)| RoomKeyWithheldInfo {
98                    room_id: room_id.to_owned(),
99                    session_id: session_id.to_owned(),
100                    withheld_event: withheld_event.clone(),
101                })
102            })
103            .collect();
104
105        // If our own identity verified status changes we need to do some checks on
106        // other identities. So remember the verification status before
107        // processing the changes
108        let own_identity_was_verified_before_change = self
109            .store
110            .get_user_identity(self.user_id.as_ref())
111            .await?
112            .as_ref()
113            .and_then(|i| i.own())
114            .is_some_and(|own| own.is_verified());
115
116        let secrets = changes.secrets.to_owned();
117        let devices = changes.devices.to_owned();
118        let identities = changes.identities.to_owned();
119        let room_key_bundle_updates: Vec<_> =
120            changes.received_room_key_bundles.iter().map(RoomKeyBundleInfo::from).collect();
121
122        if devices
123            .changed
124            .iter()
125            .any(|d| d.user_id() == self.user_id && d.device_id() == self.device_id)
126        {
127            // If our own device key changes, we need to clear the
128            // session cache because the sessions contain a copy of our
129            // device key.
130            self.sessions.clear().await;
131        } else {
132            // Otherwise add the sessions to the cache.
133            for session in &changes.sessions {
134                self.sessions.add(session.clone()).await;
135            }
136        }
137
138        self.store.save_changes(changes).await?;
139
140        // If we updated our own public identity, log it for debugging purposes
141        if tracing::level_enabled!(tracing::Level::DEBUG) {
142            for updated_identity in
143                identities.new.iter().chain(identities.changed.iter()).filter_map(|id| id.own())
144            {
145                let master_key = updated_identity.master_key().get_first_key();
146                let user_signing_key = updated_identity.user_signing_key().get_first_key();
147                let self_signing_key = updated_identity.self_signing_key().get_first_key();
148
149                debug!(
150                    ?master_key,
151                    ?user_signing_key,
152                    ?self_signing_key,
153                    previously_verified = updated_identity.was_previously_verified(),
154                    verified = updated_identity.is_verified(),
155                    "Stored our own identity"
156                );
157            }
158        }
159
160        if !room_key_updates.is_empty() {
161            // Ignore the result. It can only fail if there are no listeners.
162            let _ = self.room_keys_received_sender.send(room_key_updates);
163        }
164
165        if !withheld_session_updates.is_empty() {
166            let _ = self.room_keys_withheld_received_sender.send(withheld_session_updates);
167        }
168
169        for secret in secrets {
170            let _ = self.secrets_broadcaster.send(secret);
171        }
172
173        for bundle_info in room_key_bundle_updates {
174            let _ = self.historic_room_key_bundles_broadcaster.send(bundle_info);
175        }
176
177        if !devices.is_empty() || !identities.is_empty() {
178            // Mapping the devices and user identities from the read-only variant to one's
179            // that contain side-effects requires our own identity. This is
180            // guaranteed to be up-to-date since we just persisted it.
181            let maybe_own_identity =
182                self.store.get_user_identity(&self.user_id).await?.and_then(|i| i.into_own());
183
184            // If our identity was not verified before the change and is now, that means
185            // this could impact the verification chain of other known
186            // identities.
187            if let Some(own_identity_after) = maybe_own_identity.as_ref() {
188                // Only do this if our identity is passing from not verified to verified,
189                // the previously_verified can only change in that case.
190                let own_identity_is_verified = own_identity_after.is_verified();
191
192                if !own_identity_was_verified_before_change && own_identity_is_verified {
193                    debug!(
194                        "Own identity is now verified, check all known identities for verification status changes"
195                    );
196                    // We need to review all the other identities to see if they are verified now
197                    // and mark them as such
198                    self.check_all_identities_and_update_was_previously_verified_flag_if_needed(
199                        own_identity_after,
200                    )
201                    .await?;
202                } else if own_identity_was_verified_before_change != own_identity_is_verified {
203                    // Log that the verification state of the identity changed.
204                    debug!(
205                        own_identity_is_verified,
206                        "The verification state of our own identity has changed",
207                    );
208                }
209            }
210
211            let _ = self.identities_broadcaster.send((maybe_own_identity, identities, devices));
212        }
213
214        Ok(())
215    }
216
217    async fn check_all_identities_and_update_was_previously_verified_flag_if_needed(
218        &self,
219        own_identity_after: &OwnUserIdentityData,
220    ) -> Result<(), CryptoStoreError> {
221        let tracked_users = self.store.load_tracked_users().await?;
222        let mut updated_identities: Vec<UserIdentityData> = Default::default();
223        for tracked_user in tracked_users {
224            if let Some(other_identity) = self
225                .store
226                .get_user_identity(tracked_user.user_id.as_ref())
227                .await?
228                .as_ref()
229                .and_then(|i| i.other())
230            {
231                if !other_identity.was_previously_verified()
232                    && own_identity_after.is_identity_signed(other_identity)
233                {
234                    trace!(?tracked_user.user_id, "Marking set verified_latch to true.");
235                    other_identity.mark_as_previously_verified();
236                    updated_identities.push(other_identity.clone().into());
237                }
238            }
239        }
240
241        if !updated_identities.is_empty() {
242            let identity_changes =
243                IdentityChanges { changed: updated_identities, ..Default::default() };
244            self.store
245                .save_changes(Changes {
246                    identities: identity_changes.clone(),
247                    ..Default::default()
248                })
249                .await?;
250
251            let _ = self.identities_broadcaster.send((
252                Some(own_identity_after.clone()),
253                identity_changes,
254                DeviceChanges::default(),
255            ));
256        }
257
258        Ok(())
259    }
260
261    pub async fn get_sessions(
262        &self,
263        sender_key: &str,
264    ) -> store::Result<Option<Arc<Mutex<Vec<Session>>>>> {
265        let sessions = self.sessions.get(sender_key).await;
266
267        let sessions = if sessions.is_none() {
268            let mut entries = self.sessions.entries.write().await;
269
270            let sessions = entries.get(sender_key);
271
272            if sessions.is_some() {
273                sessions.cloned()
274            } else {
275                let sessions = self.store.get_sessions(sender_key).await?;
276                let sessions = Arc::new(Mutex::new(sessions.unwrap_or_default()));
277
278                entries.insert(sender_key.to_owned(), sessions.clone());
279
280                Some(sessions)
281            }
282        } else {
283            sessions
284        };
285
286        Ok(sessions)
287    }
288
289    /// Save a list of inbound group sessions to the store.
290    ///
291    /// # Arguments
292    ///
293    /// * `sessions` - The sessions to be saved.
294    /// * `backed_up_to_version` - If the keys should be marked as having been
295    ///   backed up, the version of the backup.
296    ///
297    /// Note: some implementations ignore `backup_version` and assume the
298    /// current backup version, which is normally the same.
299    pub async fn save_inbound_group_sessions(
300        &self,
301        sessions: Vec<InboundGroupSession>,
302        backed_up_to_version: Option<&str>,
303    ) -> store::Result<()> {
304        let room_key_updates: Vec<_> = sessions.iter().map(RoomKeyInfo::from).collect();
305        self.store.save_inbound_group_sessions(sessions, backed_up_to_version).await?;
306
307        if !room_key_updates.is_empty() {
308            // Ignore the result. It can only fail if there are no listeners.
309            let _ = self.room_keys_received_sender.send(room_key_updates);
310        }
311        Ok(())
312    }
313
314    /// Receive notifications of room keys being received as a [`Stream`].
315    ///
316    /// Each time a room key is updated in any way, an update will be sent to
317    /// the stream. Updates that happen at the same time are batched into a
318    /// [`Vec`].
319    ///
320    /// If the reader of the stream lags too far behind an error will be sent to
321    /// the reader.
322    pub fn room_keys_received_stream(
323        &self,
324    ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> {
325        BroadcastStream::new(self.room_keys_received_sender.subscribe())
326    }
327
328    /// Receive notifications of received `m.room_key.withheld` messages.
329    ///
330    /// Each time an `m.room_key.withheld` is received and stored, an update
331    /// will be sent to the stream. Updates that happen at the same time are
332    /// batched into a [`Vec`].
333    ///
334    /// If the reader of the stream lags too far behind, a warning will be
335    /// logged and items will be dropped.
336    pub fn room_keys_withheld_received_stream(
337        &self,
338    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> {
339        let stream = BroadcastStream::new(self.room_keys_withheld_received_sender.subscribe());
340        Self::filter_errors_out_of_stream(stream, "room_keys_withheld_received_stream")
341    }
342
343    /// Receive notifications of gossipped secrets being received and stored in
344    /// the secret inbox as a [`Stream`].
345    pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> {
346        let stream = BroadcastStream::new(self.secrets_broadcaster.subscribe());
347        Self::filter_errors_out_of_stream(stream, "secrets_stream")
348    }
349
350    /// Receive notifications of historic room key bundles being received and
351    /// stored in the store as a [`Stream`].
352    pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> {
353        let stream = BroadcastStream::new(self.historic_room_key_bundles_broadcaster.subscribe());
354        Self::filter_errors_out_of_stream(stream, "bundle_stream")
355    }
356
357    /// Returns a stream of newly created or updated cryptographic identities.
358    ///
359    /// This is just a helper method which allows us to build higher level
360    /// device and user identity streams.
361    pub(super) fn identities_stream(
362        &self,
363    ) -> impl Stream<Item = (Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)> {
364        let stream = BroadcastStream::new(self.identities_broadcaster.subscribe());
365        Self::filter_errors_out_of_stream(stream, "identities_stream")
366    }
367
368    /// Helper for *_stream functions: filters errors out of the stream,
369    /// creating a new Stream.
370    ///
371    /// `BroadcastStream`s gives us `Result`s which can fail with
372    /// `BroadcastStreamRecvError` if the reader falls behind. That's annoying
373    /// to work with, so here we just emit a warning and drop the errors.
374    fn filter_errors_out_of_stream<ItemType>(
375        stream: BroadcastStream<ItemType>,
376        stream_name: &str,
377    ) -> impl Stream<Item = ItemType>
378    where
379        ItemType: 'static + Clone + Send,
380    {
381        let stream_name = stream_name.to_owned();
382        stream.filter_map(move |result| {
383            future::ready(match result {
384                Ok(r) => Some(r),
385                Err(BroadcastStreamRecvError::Lagged(lag)) => {
386                    warn!("{stream_name} missed {lag} updates");
387                    None
388                }
389            })
390        })
391    }
392
393    /// Creates a `CrossProcessStoreLock` for this store, that will contain the
394    /// given key and value when hold.
395    pub(crate) fn create_store_lock(
396        &self,
397        lock_key: String,
398        lock_value: String,
399    ) -> CrossProcessStoreLock<LockableCryptoStore> {
400        CrossProcessStoreLock::new(LockableCryptoStore(self.store.clone()), lock_key, lock_value)
401    }
402}
403
404impl Deref for CryptoStoreWrapper {
405    type Target = DynCryptoStore;
406
407    fn deref(&self) -> &Self::Target {
408        self.store.deref()
409    }
410}
411
412#[cfg(test)]
413mod test {
414    use matrix_sdk_test::async_test;
415    use ruma::user_id;
416
417    use super::*;
418    use crate::machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper;
419
420    #[async_test]
421    async fn test_cache_cleared_after_device_update() {
422        let user_id = user_id!("@alice:example.com");
423        let (first, second) =
424            get_machine_pair_with_setup_sessions_test_helper(user_id, user_id, false).await;
425
426        let sender_key = second.identity_keys().curve25519.to_base64();
427
428        first
429            .store()
430            .inner
431            .store
432            .sessions
433            .get(&sender_key)
434            .await
435            .expect("We should have a session in the cache.");
436
437        let device_data = first
438            .get_device(user_id, first.device_id(), None)
439            .await
440            .unwrap()
441            .expect("We should have access to our own device.")
442            .inner;
443
444        // When we save a new version of our device keys
445        first
446            .store()
447            .save_changes(Changes {
448                devices: DeviceChanges { changed: vec![device_data], ..Default::default() },
449                ..Default::default()
450            })
451            .await
452            .unwrap();
453
454        // Then the session is no longer in the cache
455        assert!(
456            first.store().inner.store.sessions.get(&sender_key).await.is_none(),
457            "The session should no longer be in the cache after our own device keys changed"
458        );
459    }
460}