Skip to main content

matrix_sdk_crypto/store/
crypto_store_wrapper.rs

1use std::{future, ops::Deref, sync::Arc};
2
3use futures_core::Stream;
4use futures_util::StreamExt;
5use matrix_sdk_common::cross_process_lock::{CrossProcessLock, CrossProcessLockConfig};
6use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, UserId};
7use tokio::sync::{Mutex, broadcast};
8use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
9use tracing::{debug, trace, warn};
10
11use super::{
12    DeviceChanges, IdentityChanges, LockableCryptoStore, caches::SessionStore,
13    types::RoomKeyBundleInfo,
14};
15use crate::{
16    CryptoStoreError, OwnUserIdentityData, Session, UserIdentityData,
17    olm::InboundGroupSession,
18    store,
19    store::{
20        Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo, RoomKeyWithheldInfo,
21        types::SecretsInboxItem,
22    },
23};
24
25/// A wrapper for crypto store implementations that adds update notifiers.
26///
27/// This is shared between [`StoreInner`] and
28/// [`crate::verification::VerificationStore`].
29#[derive(Debug)]
30pub(crate) struct CryptoStoreWrapper {
31    user_id: OwnedUserId,
32    device_id: OwnedDeviceId,
33
34    store: Arc<DynCryptoStore>,
35
36    /// A cache for the Olm Sessions.
37    sessions: SessionStore,
38
39    /// The sender side of a broadcast stream that is notified whenever we get
40    /// an update to an inbound group session.
41    room_keys_received_sender: broadcast::Sender<Vec<RoomKeyInfo>>,
42
43    /// The sender side of a broadcast stream that is notified whenever we
44    /// receive an `m.room_key.withheld` message.
45    room_keys_withheld_received_sender: broadcast::Sender<Vec<RoomKeyWithheldInfo>>,
46
47    /// The sender side of a broadcast channel which sends out secrets we
48    /// received as a `m.secret.send` event.
49    secrets_broadcaster: broadcast::Sender<SecretsInboxItem>,
50
51    /// The sender side of a broadcast channel which sends out devices and user
52    /// identities which got updated or newly created.
53    identities_broadcaster:
54        broadcast::Sender<(Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)>,
55
56    /// The sender side of a broadcast channel which sends out information about
57    /// historic room key bundles we have received.
58    historic_room_key_bundles_broadcaster: broadcast::Sender<RoomKeyBundleInfo>,
59}
60
61impl CryptoStoreWrapper {
62    pub(crate) fn new(user_id: &UserId, device_id: &DeviceId, store: impl IntoCryptoStore) -> Self {
63        let room_keys_received_sender = broadcast::Sender::new(10);
64        let room_keys_withheld_received_sender = broadcast::Sender::new(10);
65        let secrets_broadcaster = broadcast::Sender::new(10);
66        // The identities broadcaster is responsible for user identities as well as
67        // devices, that's why we increase the capacity here.
68        let identities_broadcaster = broadcast::Sender::new(20);
69        let historic_room_key_bundles_broadcaster = broadcast::Sender::new(10);
70
71        Self {
72            user_id: user_id.to_owned(),
73            device_id: device_id.to_owned(),
74            store: store.into_crypto_store(),
75            sessions: SessionStore::new(),
76            room_keys_received_sender,
77            room_keys_withheld_received_sender,
78            secrets_broadcaster,
79            identities_broadcaster,
80            historic_room_key_bundles_broadcaster,
81        }
82    }
83
84    /// Save the set of changes to the store.
85    ///
86    /// Also responsible for sending updates to the broadcast streams such as
87    /// `room_keys_received_sender` and `secrets_broadcaster`.
88    ///
89    /// # Arguments
90    ///
91    /// * `changes` - The set of changes that should be stored.
92    pub async fn save_changes(&self, changes: Changes) -> store::Result<()> {
93        let room_key_updates: Vec<_> =
94            changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();
95
96        let withheld_session_updates: Vec<_> = changes
97            .withheld_session_info
98            .iter()
99            .flat_map(|(room_id, session_map)| {
100                session_map.iter().map(|(session_id, withheld_event)| RoomKeyWithheldInfo {
101                    room_id: room_id.to_owned(),
102                    session_id: session_id.to_owned(),
103                    withheld_event: withheld_event.clone(),
104                })
105            })
106            .collect();
107
108        // If our own identity verified status changes we need to do some checks on
109        // other identities. So remember the verification status before
110        // processing the changes
111        let own_identity_was_verified_before_change = self
112            .store
113            .get_user_identity(self.user_id.as_ref())
114            .await?
115            .as_ref()
116            .and_then(|i| i.own())
117            .is_some_and(|own| own.is_verified());
118
119        let secrets = changes.secrets.to_owned();
120        let devices = changes.devices.to_owned();
121        let identities = changes.identities.to_owned();
122        let room_key_bundle_updates: Vec<_> =
123            changes.received_room_key_bundles.iter().map(RoomKeyBundleInfo::from).collect();
124
125        if devices
126            .changed
127            .iter()
128            .any(|d| d.user_id() == self.user_id && d.device_id() == self.device_id)
129        {
130            // If our own device key changes, we need to clear the
131            // session cache because the sessions contain a copy of our
132            // device key.
133            self.sessions.clear().await;
134        } else {
135            // Otherwise add the sessions to the cache.
136            for session in &changes.sessions {
137                self.sessions.add(session.clone()).await;
138            }
139        }
140
141        self.store.save_changes(changes).await?;
142
143        // If we updated our own public identity, log it for debugging purposes
144        if tracing::level_enabled!(tracing::Level::DEBUG) {
145            for updated_identity in
146                identities.new.iter().chain(identities.changed.iter()).filter_map(|id| id.own())
147            {
148                let master_key = updated_identity.master_key().get_first_key();
149                let user_signing_key = updated_identity.user_signing_key().get_first_key();
150                let self_signing_key = updated_identity.self_signing_key().get_first_key();
151
152                debug!(
153                    ?master_key,
154                    ?user_signing_key,
155                    ?self_signing_key,
156                    previously_verified = updated_identity.was_previously_verified(),
157                    verified = updated_identity.is_verified(),
158                    "Stored our own identity"
159                );
160            }
161        }
162
163        if !room_key_updates.is_empty() {
164            // Ignore the result. It can only fail if there are no listeners.
165            let _ = self.room_keys_received_sender.send(room_key_updates);
166        }
167
168        if !withheld_session_updates.is_empty() {
169            let _ = self.room_keys_withheld_received_sender.send(withheld_session_updates);
170        }
171
172        for secret in secrets {
173            let _ = self.secrets_broadcaster.send(secret);
174        }
175
176        for bundle_info in room_key_bundle_updates {
177            let _ = self.historic_room_key_bundles_broadcaster.send(bundle_info);
178        }
179
180        if !devices.is_empty() || !identities.is_empty() {
181            // Mapping the devices and user identities from the read-only variant to one's
182            // that contain side-effects requires our own identity. This is
183            // guaranteed to be up-to-date since we just persisted it.
184            let maybe_own_identity =
185                self.store.get_user_identity(&self.user_id).await?.and_then(|i| i.into_own());
186
187            // If our identity was not verified before the change and is now, that means
188            // this could impact the verification chain of other known
189            // identities.
190            if let Some(own_identity_after) = maybe_own_identity.as_ref() {
191                // Only do this if our identity is passing from not verified to verified,
192                // the previously_verified can only change in that case.
193                let own_identity_is_verified = own_identity_after.is_verified();
194
195                if !own_identity_was_verified_before_change && own_identity_is_verified {
196                    debug!(
197                        "Own identity is now verified, check all known identities for verification status changes"
198                    );
199                    // We need to review all the other identities to see if they are verified now
200                    // and mark them as such
201                    self.check_all_identities_and_update_was_previously_verified_flag_if_needed(
202                        own_identity_after,
203                    )
204                    .await?;
205                } else if own_identity_was_verified_before_change != own_identity_is_verified {
206                    // Log that the verification state of the identity changed.
207                    debug!(
208                        own_identity_is_verified,
209                        "The verification state of our own identity has changed",
210                    );
211                }
212            }
213
214            let _ = self.identities_broadcaster.send((maybe_own_identity, identities, devices));
215        }
216
217        Ok(())
218    }
219
220    async fn check_all_identities_and_update_was_previously_verified_flag_if_needed(
221        &self,
222        own_identity_after: &OwnUserIdentityData,
223    ) -> Result<(), CryptoStoreError> {
224        let tracked_users = self.store.load_tracked_users().await?;
225        let mut updated_identities: Vec<UserIdentityData> = Default::default();
226        for tracked_user in tracked_users {
227            if let Some(other_identity) = self
228                .store
229                .get_user_identity(tracked_user.user_id.as_ref())
230                .await?
231                .as_ref()
232                .and_then(|i| i.other())
233                && !other_identity.was_previously_verified()
234                && own_identity_after.is_identity_signed(other_identity)
235            {
236                trace!(?tracked_user.user_id, "Marking set verified_latch to true.");
237                other_identity.mark_as_previously_verified();
238                updated_identities.push(other_identity.clone().into());
239            }
240        }
241
242        if !updated_identities.is_empty() {
243            let identity_changes =
244                IdentityChanges { changed: updated_identities, ..Default::default() };
245            self.store
246                .save_changes(Changes {
247                    identities: identity_changes.clone(),
248                    ..Default::default()
249                })
250                .await?;
251
252            let _ = self.identities_broadcaster.send((
253                Some(own_identity_after.clone()),
254                identity_changes,
255                DeviceChanges::default(),
256            ));
257        }
258
259        Ok(())
260    }
261
262    pub async fn get_sessions(
263        &self,
264        sender_key: &str,
265    ) -> store::Result<Option<Arc<Mutex<Vec<Session>>>>> {
266        let sessions = self.sessions.get(sender_key).await;
267
268        let sessions = if sessions.is_none() {
269            let mut entries = self.sessions.entries.write().await;
270
271            let sessions = entries.get(sender_key);
272
273            if sessions.is_some() {
274                sessions.cloned()
275            } else {
276                let sessions = self.store.get_sessions(sender_key).await?;
277                let sessions = Arc::new(Mutex::new(sessions.unwrap_or_default()));
278
279                entries.insert(sender_key.to_owned(), sessions.clone());
280
281                Some(sessions)
282            }
283        } else {
284            sessions
285        };
286
287        Ok(sessions)
288    }
289
290    /// Save a list of inbound group sessions to the store.
291    ///
292    /// # Arguments
293    ///
294    /// * `sessions` - The sessions to be saved.
295    /// * `backed_up_to_version` - If the keys should be marked as having been
296    ///   backed up, the version of the backup.
297    ///
298    /// Note: some implementations ignore `backup_version` and assume the
299    /// current backup version, which is normally the same.
300    pub async fn save_inbound_group_sessions(
301        &self,
302        sessions: Vec<InboundGroupSession>,
303        backed_up_to_version: Option<&str>,
304    ) -> store::Result<()> {
305        let room_key_updates: Vec<_> = sessions.iter().map(RoomKeyInfo::from).collect();
306        self.store.save_inbound_group_sessions(sessions, backed_up_to_version).await?;
307
308        if !room_key_updates.is_empty() {
309            // Ignore the result. It can only fail if there are no listeners.
310            let _ = self.room_keys_received_sender.send(room_key_updates);
311        }
312        Ok(())
313    }
314
315    /// Receive notifications of room keys being received as a [`Stream`].
316    ///
317    /// Each time a room key is updated in any way, an update will be sent to
318    /// the stream. Updates that happen at the same time are batched into a
319    /// [`Vec`].
320    ///
321    /// If the reader of the stream lags too far behind an error will be sent to
322    /// the reader.
323    pub fn room_keys_received_stream(
324        &self,
325    ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
326        BroadcastStream::new(self.room_keys_received_sender.subscribe())
327    }
328
329    /// Receive notifications of received `m.room_key.withheld` messages.
330    ///
331    /// Each time an `m.room_key.withheld` is received and stored, an update
332    /// will be sent to the stream. Updates that happen at the same time are
333    /// batched into a [`Vec`].
334    ///
335    /// If the reader of the stream lags too far behind, a warning will be
336    /// logged and items will be dropped.
337    pub fn room_keys_withheld_received_stream(
338        &self,
339    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
340        let stream = BroadcastStream::new(self.room_keys_withheld_received_sender.subscribe());
341        Self::filter_errors_out_of_stream(stream, "room_keys_withheld_received_stream")
342    }
343
344    /// Receive notifications of gossipped secrets being received and stored in
345    /// the secret inbox as a [`Stream`].
346    pub fn secrets_stream(&self) -> impl Stream<Item = SecretsInboxItem> + use<> {
347        let stream = BroadcastStream::new(self.secrets_broadcaster.subscribe());
348        Self::filter_errors_out_of_stream(stream, "secrets_stream")
349    }
350
351    /// Receive notifications of historic room key bundles being received and
352    /// stored in the store as a [`Stream`].
353    pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
354        let stream = BroadcastStream::new(self.historic_room_key_bundles_broadcaster.subscribe());
355        Self::filter_errors_out_of_stream(stream, "bundle_stream")
356    }
357
358    /// Returns a stream of newly created or updated cryptographic identities.
359    ///
360    /// This is just a helper method which allows us to build higher level
361    /// device and user identity streams.
362    pub(super) fn identities_stream(
363        &self,
364    ) -> impl Stream<Item = (Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)> + use<>
365    {
366        let stream = BroadcastStream::new(self.identities_broadcaster.subscribe());
367        Self::filter_errors_out_of_stream(stream, "identities_stream")
368    }
369
370    /// Helper for *_stream functions: filters errors out of the stream,
371    /// creating a new Stream.
372    ///
373    /// `BroadcastStream`s gives us `Result`s which can fail with
374    /// `BroadcastStreamRecvError` if the reader falls behind. That's annoying
375    /// to work with, so here we just emit a warning and drop the errors.
376    fn filter_errors_out_of_stream<ItemType>(
377        stream: BroadcastStream<ItemType>,
378        stream_name: &str,
379    ) -> impl Stream<Item = ItemType> + use<ItemType>
380    where
381        ItemType: 'static + Clone + Send,
382    {
383        let stream_name = stream_name.to_owned();
384        stream.filter_map(move |result| {
385            future::ready(match result {
386                Ok(r) => Some(r),
387                Err(BroadcastStreamRecvError::Lagged(lag)) => {
388                    warn!("{stream_name} missed {lag} updates");
389                    None
390                }
391            })
392        })
393    }
394
395    /// Creates a [`CrossProcessLock`] for this store, that will contain the
396    /// given key and value when hold.
397    pub(crate) fn create_store_lock(
398        &self,
399        lock_key: String,
400        config: CrossProcessLockConfig,
401    ) -> CrossProcessLock<LockableCryptoStore> {
402        CrossProcessLock::new(LockableCryptoStore(self.store.clone()), lock_key, config)
403    }
404}
405
406impl Deref for CryptoStoreWrapper {
407    type Target = DynCryptoStore;
408
409    fn deref(&self) -> &Self::Target {
410        self.store.deref()
411    }
412}
413
414#[cfg(test)]
415mod test {
416    use matrix_sdk_test::async_test;
417    use ruma::user_id;
418
419    use super::*;
420    use crate::machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper;
421
422    #[async_test]
423    async fn test_cache_cleared_after_device_update() {
424        let user_id = user_id!("@alice:example.com");
425        let (first, second) =
426            get_machine_pair_with_setup_sessions_test_helper(user_id, user_id, false).await;
427
428        let sender_key = second.identity_keys().curve25519.to_base64();
429
430        first
431            .store()
432            .inner
433            .store
434            .sessions
435            .get(&sender_key)
436            .await
437            .expect("We should have a session in the cache.");
438
439        let device_data = first
440            .get_device(user_id, first.device_id(), None)
441            .await
442            .unwrap()
443            .expect("We should have access to our own device.")
444            .inner;
445
446        // When we save a new version of our device keys
447        first
448            .store()
449            .save_changes(Changes {
450                devices: DeviceChanges { changed: vec![device_data], ..Default::default() },
451                ..Default::default()
452            })
453            .await
454            .unwrap();
455
456        // Then the session is no longer in the cache
457        assert!(
458            first.store().inner.store.sessions.get(&sender_key).await.is_none(),
459            "The session should no longer be in the cache after our own device keys changed"
460        );
461    }
462}