matrix_sdk_crypto/store/
mod.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Types and traits to implement the storage layer for the [`OlmMachine`]
16//!
17//! The storage layer for the [`OlmMachine`] can be customized using a trait.
18//! Implementing your own [`CryptoStore`]
19//!
20//! An in-memory only store is provided as well as an SQLite-based one,
21//! depending on your needs and targets a custom store may be implemented, e.g.
22//! for `wasm-unknown-unknown` an indexeddb store would be needed
23//!
24//! ```
25//! # use std::sync::Arc;
26//! # use matrix_sdk_crypto::{
27//! #     OlmMachine,
28//! #     store::MemoryStore,
29//! # };
30//! # use ruma::{device_id, user_id};
31//! # let user_id = user_id!("@example:localhost");
32//! # let device_id = device_id!("TEST");
33//! let store = Arc::new(MemoryStore::new());
34//!
35//! let machine = OlmMachine::with_store(user_id, device_id, store, None);
36//! ```
37//!
38//! [`OlmMachine`]: /matrix_sdk_crypto/struct.OlmMachine.html
39//! [`CryptoStore`]: trait.Cryptostore.html
40
41use std::{
42    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
43    fmt::Debug,
44    ops::Deref,
45    pin::pin,
46    sync::{atomic::Ordering, Arc},
47    time::Duration,
48};
49
50use as_variant::as_variant;
51use futures_core::Stream;
52use futures_util::StreamExt;
53use itertools::{Either, Itertools};
54use matrix_sdk_common::locks::RwLock as StdRwLock;
55use ruma::{
56    encryption::KeyUsage, events::secret::request::SecretName, DeviceId, OwnedDeviceId,
57    OwnedRoomId, OwnedUserId, RoomId, UserId,
58};
59use serde::{de::DeserializeOwned, Deserialize, Serialize};
60use thiserror::Error;
61use tokio::sync::{Mutex, MutexGuard, Notify, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
62use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
63use tracing::{error, info, instrument, trace, warn};
64use vodozemac::{base64_encode, megolm::SessionOrdering, Curve25519PublicKey};
65use zeroize::{Zeroize, ZeroizeOnDrop};
66
67#[cfg(doc)]
68use crate::{backups::BackupMachine, identities::OwnUserIdentity};
69use crate::{
70    gossiping::GossippedSecret,
71    identities::{user::UserIdentity, Device, DeviceData, UserDevices, UserIdentityData},
72    olm::{
73        Account, ExportedRoomKey, InboundGroupSession, OlmMessageHash, OutboundGroupSession,
74        PrivateCrossSigningIdentity, SenderData, Session, StaticAccountData,
75    },
76    types::{
77        events::room_key_withheld::RoomKeyWithheldEvent, BackupSecrets, CrossSigningSecrets,
78        EventEncryptionAlgorithm, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
79        SecretsBundle,
80    },
81    verification::VerificationMachine,
82    CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
83};
84
85pub mod caches;
86mod crypto_store_wrapper;
87mod error;
88mod memorystore;
89mod traits;
90
91#[cfg(any(test, feature = "testing"))]
92#[macro_use]
93#[allow(missing_docs)]
94pub mod integration_tests;
95
96use caches::{SequenceNumber, UsersForKeyQuery};
97pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
98pub use error::{CryptoStoreError, Result};
99use matrix_sdk_common::{
100    deserialized_responses::WithheldCode, store_locks::CrossProcessStoreLock, timeout::timeout,
101};
102pub use memorystore::MemoryStore;
103pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
104
105use crate::types::{
106    events::{room_key_bundle::RoomKeyBundleContent, room_key_withheld::RoomKeyWithheldContent},
107    room_history::RoomKeyBundle,
108};
109pub use crate::{
110    dehydrated_devices::DehydrationError,
111    gossiping::{GossipRequest, SecretInfo},
112};
113
114/// A wrapper for our CryptoStore trait object.
115///
116/// This is needed because we want to have a generic interface so we can
117/// store/restore objects that we can serialize. Since trait objects and
118/// generics don't mix let the CryptoStore store strings and this wrapper
119/// adds the generic interface on top.
120#[derive(Debug, Clone)]
121pub struct Store {
122    inner: Arc<StoreInner>,
123}
124
125#[derive(Debug, Default)]
126pub(crate) struct KeyQueryManager {
127    /// Record of the users that are waiting for a /keys/query.
128    users_for_key_query: Mutex<UsersForKeyQuery>,
129
130    /// Notifier that is triggered each time an update is received for a user.
131    users_for_key_query_notify: Notify,
132}
133
134impl KeyQueryManager {
135    pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
136        self.ensure_sync_tracked_users(cache).await?;
137        Ok(SyncedKeyQueryManager { cache, manager: self })
138    }
139
140    /// Load the list of users for whom we are tracking their device lists and
141    /// fill out our caches.
142    ///
143    /// This method ensures that we're only going to load the users from the
144    /// actual [`CryptoStore`] once, it will also make sure that any
145    /// concurrent calls to this method get deduplicated.
146    async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
147        // Check if the users are loaded, and in that case do nothing.
148        let loaded = cache.loaded_tracked_users.read().await;
149        if *loaded {
150            return Ok(());
151        }
152
153        // Otherwise, we may load the users.
154        drop(loaded);
155        let mut loaded = cache.loaded_tracked_users.write().await;
156
157        // Check again if the users have been loaded, in case another call to this
158        // method loaded the tracked users between the time we tried to
159        // acquire the lock and the time we actually acquired the lock.
160        if *loaded {
161            return Ok(());
162        }
163
164        let tracked_users = cache.store.load_tracked_users().await?;
165
166        let mut query_users_lock = self.users_for_key_query.lock().await;
167        let mut tracked_users_cache = cache.tracked_users.write();
168        for user in tracked_users {
169            tracked_users_cache.insert(user.user_id.to_owned());
170
171            if user.dirty {
172                query_users_lock.insert_user(&user.user_id);
173            }
174        }
175
176        *loaded = true;
177
178        Ok(())
179    }
180
181    /// Wait for a `/keys/query` response to be received if one is expected for
182    /// the given user.
183    ///
184    /// If the given timeout elapses, the method will stop waiting and return
185    /// [`UserKeyQueryResult::TimeoutExpired`].
186    ///
187    /// Requires a [`StoreCacheGuard`] to make sure the users for which a key
188    /// query is pending are up to date, but doesn't hold on to it
189    /// thereafter: the lock is short-lived in this case.
190    pub async fn wait_if_user_key_query_pending(
191        &self,
192        cache: StoreCacheGuard,
193        timeout_duration: Duration,
194        user: &UserId,
195    ) -> Result<UserKeyQueryResult> {
196        {
197            // Drop the cache early, so we don't keep it while waiting (since writing the
198            // results requires to write in the cache, thus take another lock).
199            self.ensure_sync_tracked_users(&cache).await?;
200            drop(cache);
201        }
202
203        let mut users_for_key_query = self.users_for_key_query.lock().await;
204        let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
205            return Ok(UserKeyQueryResult::WasNotPending);
206        };
207
208        let wait_for_completion = async {
209            while !waiter.completed.load(Ordering::Relaxed) {
210                // Register for being notified before releasing the mutex, so
211                // it's impossible to miss a wakeup between the last check for
212                // whether we should wait, and starting to wait.
213                let mut notified = pin!(self.users_for_key_query_notify.notified());
214                notified.as_mut().enable();
215                drop(users_for_key_query);
216
217                // Wait for a notification
218                notified.await;
219
220                // Reclaim the lock before checking the flag to avoid races
221                // when two notifications happen right after each other and the
222                // second one sets the flag we want to wait for.
223                users_for_key_query = self.users_for_key_query.lock().await;
224            }
225        };
226
227        match timeout(Box::pin(wait_for_completion), timeout_duration).await {
228            Err(_) => {
229                warn!(
230                    user_id = ?user,
231                    "The user has a pending `/keys/query` request which did \
232                    not finish yet, some devices might be missing."
233                );
234
235                Ok(UserKeyQueryResult::TimeoutExpired)
236            }
237            _ => Ok(UserKeyQueryResult::WasPending),
238        }
239    }
240}
241
242pub(crate) struct SyncedKeyQueryManager<'a> {
243    cache: &'a StoreCache,
244    manager: &'a KeyQueryManager,
245}
246
247impl SyncedKeyQueryManager<'_> {
248    /// Add entries to the list of users being tracked for device changes
249    ///
250    /// Any users not already on the list are flagged as awaiting a key query.
251    /// Users that were already in the list are unaffected.
252    pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
253        let mut store_updates = Vec::new();
254        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
255
256        {
257            let mut tracked_users = self.cache.tracked_users.write();
258            for user_id in users {
259                if tracked_users.insert(user_id.to_owned()) {
260                    key_query_lock.insert_user(user_id);
261                    store_updates.push((user_id, true))
262                }
263            }
264        }
265
266        self.cache.store.save_tracked_users(&store_updates).await
267    }
268
269    /// Process notifications that users have changed devices.
270    ///
271    /// This is used to handle the list of device-list updates that is received
272    /// from the `/sync` response. Any users *whose device lists we are
273    /// tracking* are flagged as needing a key query. Users whose devices we
274    /// are not tracking are ignored.
275    pub async fn mark_tracked_users_as_changed(
276        &self,
277        users: impl Iterator<Item = &UserId>,
278    ) -> Result<()> {
279        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
280        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
281
282        {
283            let tracked_users = &self.cache.tracked_users.read();
284            for user_id in users {
285                if tracked_users.contains(user_id) {
286                    key_query_lock.insert_user(user_id);
287                    store_updates.push((user_id, true));
288                }
289            }
290        }
291
292        self.cache.store.save_tracked_users(&store_updates).await
293    }
294
295    /// Flag that the given users devices are now up-to-date.
296    ///
297    /// This is called after processing the response to a /keys/query request.
298    /// Any users whose device lists we are tracking are removed from the
299    /// list of those pending a /keys/query.
300    pub async fn mark_tracked_users_as_up_to_date(
301        &self,
302        users: impl Iterator<Item = &UserId>,
303        sequence_number: SequenceNumber,
304    ) -> Result<()> {
305        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
306        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
307
308        {
309            let tracked_users = self.cache.tracked_users.read();
310            for user_id in users {
311                if tracked_users.contains(user_id) {
312                    let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
313                    store_updates.push((user_id, !clean));
314                }
315            }
316        }
317
318        self.cache.store.save_tracked_users(&store_updates).await?;
319        // wake up any tasks that may have been waiting for updates
320        self.manager.users_for_key_query_notify.notify_waiters();
321
322        Ok(())
323    }
324
325    /// Get the set of users that has the outdate/dirty flag set for their list
326    /// of devices.
327    ///
328    /// This set should be included in a `/keys/query` request which will update
329    /// the device list.
330    ///
331    /// # Returns
332    ///
333    /// A pair `(users, sequence_number)`, where `users` is the list of users to
334    /// be queried, and `sequence_number` is the current sequence number,
335    /// which should be returned in `mark_tracked_users_as_up_to_date`.
336    pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
337        self.manager.users_for_key_query.lock().await.users_for_key_query()
338    }
339
340    /// See the docs for [`crate::OlmMachine::tracked_users()`].
341    pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
342        self.cache.tracked_users.read().iter().cloned().collect()
343    }
344
345    /// Mark the given user as being tracked for device lists, and mark that it
346    /// has an outdated device list.
347    ///
348    /// This means that the user will be considered for a `/keys/query` request
349    /// next time [`Store::users_for_key_query()`] is called.
350    pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
351        self.manager.users_for_key_query.lock().await.insert_user(user);
352        self.cache.tracked_users.write().insert(user.to_owned());
353
354        self.cache.store.save_tracked_users(&[(user, true)]).await
355    }
356}
357
358#[derive(Debug)]
359pub(crate) struct StoreCache {
360    store: Arc<CryptoStoreWrapper>,
361    tracked_users: StdRwLock<BTreeSet<OwnedUserId>>,
362    loaded_tracked_users: RwLock<bool>,
363    account: Mutex<Option<Account>>,
364}
365
366impl StoreCache {
367    pub(crate) fn store_wrapper(&self) -> &CryptoStoreWrapper {
368        self.store.as_ref()
369    }
370
371    /// Returns a reference to the `Account`.
372    ///
373    /// Either load the account from the cache, or the store if missing from
374    /// the cache.
375    ///
376    /// Note there should always be an account stored at least in the store, so
377    /// this doesn't return an `Option`.
378    ///
379    /// Note: this method should remain private, otherwise it's possible to ask
380    /// for a `StoreTransaction`, then get the `StoreTransaction::cache()`
381    /// and thus have two different live copies of the `Account` at once.
382    async fn account(&self) -> Result<impl Deref<Target = Account> + '_> {
383        let mut guard = self.account.lock().await;
384        if guard.is_some() {
385            Ok(MutexGuard::map(guard, |acc| acc.as_mut().unwrap()))
386        } else {
387            match self.store.load_account().await? {
388                Some(account) => {
389                    *guard = Some(account);
390                    Ok(MutexGuard::map(guard, |acc| acc.as_mut().unwrap()))
391                }
392                None => Err(CryptoStoreError::AccountUnset),
393            }
394        }
395    }
396}
397
398/// Read-only store cache guard.
399///
400/// This type should hold all the methods that are available when the cache is
401/// borrowed in read-only mode, while all the write operations on those fields
402/// should happen as part of a `StoreTransaction`.
403pub(crate) struct StoreCacheGuard {
404    cache: OwnedRwLockReadGuard<StoreCache>,
405    // TODO: (bnjbvr, #2624) add cross-process lock guard here.
406}
407
408impl StoreCacheGuard {
409    /// Returns a reference to the `Account`.
410    ///
411    /// Either load the account from the cache, or the store if missing from
412    /// the cache.
413    ///
414    /// Note there should always be an account stored at least in the store, so
415    /// this doesn't return an `Option`.
416    pub async fn account(&self) -> Result<impl Deref<Target = Account> + '_> {
417        self.cache.account().await
418    }
419}
420
421impl Deref for StoreCacheGuard {
422    type Target = StoreCache;
423
424    fn deref(&self) -> &Self::Target {
425        &self.cache
426    }
427}
428
429/// A temporary transaction (that implies a write) to the underlying store.
430#[allow(missing_debug_implementations)]
431pub struct StoreTransaction {
432    store: Store,
433    changes: PendingChanges,
434    // TODO hold onto the cross-process crypto store lock + cache.
435    cache: OwnedRwLockWriteGuard<StoreCache>,
436}
437
438impl StoreTransaction {
439    /// Starts a new `StoreTransaction`.
440    async fn new(store: Store) -> Self {
441        let cache = store.inner.cache.clone();
442
443        Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
444    }
445
446    pub(crate) fn cache(&self) -> &StoreCache {
447        &self.cache
448    }
449
450    /// Returns a reference to the current `Store`.
451    pub fn store(&self) -> &Store {
452        &self.store
453    }
454
455    /// Gets a `Account` for update.
456    ///
457    /// Note: since it's guaranteed that one can't have both a
458    /// `StoreTransaction` and a `StoreCacheGuard` at runtime (since the
459    /// underlying `StoreCache` is guarded by a `RwLock` mutex), this ensures
460    /// that we can't have two copies of an `Account` alive at the same time.
461    pub async fn account(&mut self) -> Result<&mut Account> {
462        if self.changes.account.is_none() {
463            // Make sure the cache loaded the account.
464            let _ = self.cache.account().await?;
465            self.changes.account = self.cache.account.lock().await.take();
466        }
467        Ok(self.changes.account.as_mut().unwrap())
468    }
469
470    /// Commits all dirty fields to the store, and maintains the cache so it
471    /// reflects the current state of the database.
472    pub async fn commit(self) -> Result<()> {
473        if self.changes.is_empty() {
474            return Ok(());
475        }
476
477        // Save changes in the database.
478        let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
479
480        self.store.save_pending_changes(self.changes).await?;
481
482        // Make the cache coherent with the database.
483        if let Some(account) = account {
484            *self.cache.account.lock().await = Some(account);
485        }
486
487        Ok(())
488    }
489}
490
491#[derive(Debug)]
492struct StoreInner {
493    identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
494    store: Arc<CryptoStoreWrapper>,
495
496    /// In-memory cache for the current crypto store.
497    ///
498    /// âš  Must remain private.
499    cache: Arc<RwLock<StoreCache>>,
500
501    verification_machine: VerificationMachine,
502
503    /// Static account data that never changes (and thus can be loaded once and
504    /// for all when creating the store).
505    static_account: StaticAccountData,
506}
507
508/// Aggregated changes to be saved in the database.
509///
510/// This is an update version of `Changes` that will replace it as #2624
511/// progresses.
512// If you ever add a field here, make sure to update `Changes::is_empty` too.
513#[derive(Default, Debug)]
514#[allow(missing_docs)]
515pub struct PendingChanges {
516    pub account: Option<Account>,
517}
518
519impl PendingChanges {
520    /// Are there any changes stored or is this an empty `Changes` struct?
521    pub fn is_empty(&self) -> bool {
522        self.account.is_none()
523    }
524}
525
526/// Aggregated changes to be saved in the database.
527// If you ever add a field here, make sure to update `Changes::is_empty` too.
528#[derive(Default, Debug)]
529#[allow(missing_docs)]
530pub struct Changes {
531    pub private_identity: Option<PrivateCrossSigningIdentity>,
532    pub backup_version: Option<String>,
533    pub backup_decryption_key: Option<BackupDecryptionKey>,
534    pub dehydrated_device_pickle_key: Option<DehydratedDeviceKey>,
535    pub sessions: Vec<Session>,
536    pub message_hashes: Vec<OlmMessageHash>,
537    pub inbound_group_sessions: Vec<InboundGroupSession>,
538    pub outbound_group_sessions: Vec<OutboundGroupSession>,
539    pub key_requests: Vec<GossipRequest>,
540    pub identities: IdentityChanges,
541    pub devices: DeviceChanges,
542    /// Stores when a `m.room_key.withheld` is received
543    pub withheld_session_info: BTreeMap<OwnedRoomId, BTreeMap<String, RoomKeyWithheldEvent>>,
544    pub room_settings: HashMap<OwnedRoomId, RoomSettings>,
545    pub secrets: Vec<GossippedSecret>,
546    pub next_batch_token: Option<String>,
547
548    /// Historical room key history bundles that we have received and should
549    /// store.
550    pub received_room_key_bundles: Vec<StoredRoomKeyBundleData>,
551}
552
553/// Information about an [MSC4268] room key bundle.
554///
555/// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
556#[derive(Clone, Debug, Serialize, Deserialize)]
557pub struct StoredRoomKeyBundleData {
558    /// The user that sent us this data.
559    pub sender_user: OwnedUserId,
560
561    /// Information about the sender of this data and how much we trust that
562    /// information.
563    pub sender_data: SenderData,
564
565    /// The room key bundle data itself.
566    pub bundle_data: RoomKeyBundleContent,
567}
568
569/// A user for which we are tracking the list of devices.
570#[derive(Clone, Debug, Serialize, Deserialize)]
571pub struct TrackedUser {
572    /// The user ID of the user.
573    pub user_id: OwnedUserId,
574    /// The outdate/dirty flag of the user, remembers if the list of devices for
575    /// the user is considered to be out of date. If the list of devices is
576    /// out of date, a `/keys/query` request should be sent out for this
577    /// user.
578    pub dirty: bool,
579}
580
581impl Changes {
582    /// Are there any changes stored or is this an empty `Changes` struct?
583    pub fn is_empty(&self) -> bool {
584        self.private_identity.is_none()
585            && self.backup_version.is_none()
586            && self.backup_decryption_key.is_none()
587            && self.dehydrated_device_pickle_key.is_none()
588            && self.sessions.is_empty()
589            && self.message_hashes.is_empty()
590            && self.inbound_group_sessions.is_empty()
591            && self.outbound_group_sessions.is_empty()
592            && self.key_requests.is_empty()
593            && self.identities.is_empty()
594            && self.devices.is_empty()
595            && self.withheld_session_info.is_empty()
596            && self.room_settings.is_empty()
597            && self.secrets.is_empty()
598            && self.next_batch_token.is_none()
599            && self.received_room_key_bundles.is_empty()
600    }
601}
602
603/// This struct is used to remember whether an identity has undergone a change
604/// or remains the same as the one we already know about.
605///
606/// When the homeserver informs us of a potential change in a user's identity or
607/// device during a `/sync` response, it triggers a `/keys/query` request from
608/// our side. In response to this query, the server provides a comprehensive
609/// snapshot of all the user's devices and identities.
610///
611/// Our responsibility is to discern whether a device or identity is new,
612/// changed, or unchanged.
613#[derive(Debug, Clone, Default)]
614#[allow(missing_docs)]
615pub struct IdentityChanges {
616    pub new: Vec<UserIdentityData>,
617    pub changed: Vec<UserIdentityData>,
618    pub unchanged: Vec<UserIdentityData>,
619}
620
621impl IdentityChanges {
622    fn is_empty(&self) -> bool {
623        self.new.is_empty() && self.changed.is_empty()
624    }
625
626    /// Convert the vectors contained in the [`IdentityChanges`] into
627    /// three maps from user id to user identity (new, updated, unchanged).
628    fn into_maps(
629        self,
630    ) -> (
631        BTreeMap<OwnedUserId, UserIdentityData>,
632        BTreeMap<OwnedUserId, UserIdentityData>,
633        BTreeMap<OwnedUserId, UserIdentityData>,
634    ) {
635        let new: BTreeMap<_, _> = self
636            .new
637            .into_iter()
638            .map(|identity| (identity.user_id().to_owned(), identity))
639            .collect();
640
641        let changed: BTreeMap<_, _> = self
642            .changed
643            .into_iter()
644            .map(|identity| (identity.user_id().to_owned(), identity))
645            .collect();
646
647        let unchanged: BTreeMap<_, _> = self
648            .unchanged
649            .into_iter()
650            .map(|identity| (identity.user_id().to_owned(), identity))
651            .collect();
652
653        (new, changed, unchanged)
654    }
655}
656
657#[derive(Debug, Clone, Default)]
658#[allow(missing_docs)]
659pub struct DeviceChanges {
660    pub new: Vec<DeviceData>,
661    pub changed: Vec<DeviceData>,
662    pub deleted: Vec<DeviceData>,
663}
664
665/// Convert the devices and vectors contained in the [`DeviceChanges`] into
666/// a [`DeviceUpdates`] struct.
667///
668/// The [`DeviceChanges`] will contain vectors of [`DeviceData`]s which
669/// we want to convert to a [`Device`].
670fn collect_device_updates(
671    verification_machine: VerificationMachine,
672    own_identity: Option<OwnUserIdentityData>,
673    identities: IdentityChanges,
674    devices: DeviceChanges,
675) -> DeviceUpdates {
676    let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
677    let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
678
679    let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
680
681    let map_device = |device: DeviceData| {
682        let device_owner_identity = new_identities
683            .get(device.user_id())
684            .or_else(|| changed_identities.get(device.user_id()))
685            .or_else(|| unchanged_identities.get(device.user_id()))
686            .cloned();
687
688        Device {
689            inner: device,
690            verification_machine: verification_machine.to_owned(),
691            own_identity: own_identity.to_owned(),
692            device_owner_identity,
693        }
694    };
695
696    for device in devices.new {
697        let device = map_device(device);
698
699        new.entry(device.user_id().to_owned())
700            .or_default()
701            .insert(device.device_id().to_owned(), device);
702    }
703
704    for device in devices.changed {
705        let device = map_device(device);
706
707        changed
708            .entry(device.user_id().to_owned())
709            .or_default()
710            .insert(device.device_id().to_owned(), device.to_owned());
711    }
712
713    DeviceUpdates { new, changed }
714}
715
716/// Updates about [`Device`]s which got received over the `/keys/query`
717/// endpoint.
718#[derive(Clone, Debug, Default)]
719pub struct DeviceUpdates {
720    /// The list of newly discovered devices.
721    ///
722    /// A device being in this list does not necessarily mean that the device
723    /// was just created, it just means that it's the first time we're
724    /// seeing this device.
725    pub new: BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, Device>>,
726    /// The list of changed devices.
727    pub changed: BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, Device>>,
728}
729
730/// Updates about [`UserIdentity`]s which got received over the `/keys/query`
731/// endpoint.
732#[derive(Clone, Debug, Default)]
733pub struct IdentityUpdates {
734    /// The list of newly discovered user identities .
735    ///
736    /// A identity being in this list does not necessarily mean that the
737    /// identity was just created, it just means that it's the first time
738    /// we're seeing this identity.
739    pub new: BTreeMap<OwnedUserId, UserIdentity>,
740    /// The list of changed identities.
741    pub changed: BTreeMap<OwnedUserId, UserIdentity>,
742    /// The list of unchanged identities.
743    pub unchanged: BTreeMap<OwnedUserId, UserIdentity>,
744}
745
746/// The private part of a backup key.
747///
748/// The private part of the key is not used on a regular basis. Rather, it is
749/// used only when we need to *recover* the backup.
750///
751/// Typically, this private key is itself encrypted and stored in server-side
752/// secret storage (SSSS), whence it can be retrieved when it is needed for a
753/// recovery operation. Alternatively, the key can be "gossiped" between devices
754/// via "secret sharing".
755#[derive(Clone, Zeroize, ZeroizeOnDrop, Deserialize, Serialize)]
756#[serde(transparent)]
757pub struct BackupDecryptionKey {
758    pub(crate) inner: Box<[u8; BackupDecryptionKey::KEY_SIZE]>,
759}
760
761impl BackupDecryptionKey {
762    /// The number of bytes the decryption key will hold.
763    pub const KEY_SIZE: usize = 32;
764
765    /// Create a new random decryption key.
766    pub fn new() -> Result<Self, rand::Error> {
767        let mut rng = rand::thread_rng();
768
769        let mut key = Box::new([0u8; Self::KEY_SIZE]);
770        rand::Fill::try_fill(key.as_mut_slice(), &mut rng)?;
771
772        Ok(Self { inner: key })
773    }
774
775    /// Export the [`BackupDecryptionKey`] as a base64 encoded string.
776    pub fn to_base64(&self) -> String {
777        base64_encode(self.inner.as_slice())
778    }
779}
780
781#[cfg(not(tarpaulin_include))]
782impl Debug for BackupDecryptionKey {
783    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
784        f.debug_tuple("BackupDecryptionKey").field(&"...").finish()
785    }
786}
787
788/// The pickle key used to safely store the dehydrated device pickle.
789///
790/// This input key material will be expanded using HKDF into an AES key, MAC
791/// key, and an initialization vector (IV).
792#[derive(Clone, Zeroize, ZeroizeOnDrop, Deserialize, Serialize)]
793#[serde(transparent)]
794pub struct DehydratedDeviceKey {
795    pub(crate) inner: Box<[u8; DehydratedDeviceKey::KEY_SIZE]>,
796}
797
798impl DehydratedDeviceKey {
799    /// The number of bytes the encryption key will hold.
800    pub const KEY_SIZE: usize = 32;
801
802    /// Generates a new random pickle key.
803    pub fn new() -> Result<Self, rand::Error> {
804        let mut rng = rand::thread_rng();
805
806        let mut key = Box::new([0u8; Self::KEY_SIZE]);
807        rand::Fill::try_fill(key.as_mut_slice(), &mut rng)?;
808
809        Ok(Self { inner: key })
810    }
811
812    /// Creates a new dehydration pickle key from the given slice.
813    ///
814    /// Fail if the slice length is not 32.
815    pub fn from_slice(slice: &[u8]) -> Result<Self, DehydrationError> {
816        if slice.len() == 32 {
817            let mut key = Box::new([0u8; 32]);
818            key.copy_from_slice(slice);
819            Ok(DehydratedDeviceKey { inner: key })
820        } else {
821            Err(DehydrationError::PickleKeyLength(slice.len()))
822        }
823    }
824
825    /// Creates a dehydration pickle key from the given bytes.
826    pub fn from_bytes(raw_key: &[u8; 32]) -> Self {
827        let mut inner = Box::new([0u8; Self::KEY_SIZE]);
828        inner.copy_from_slice(raw_key);
829
830        Self { inner }
831    }
832
833    /// Export the [`DehydratedDeviceKey`] as a base64 encoded string.
834    pub fn to_base64(&self) -> String {
835        base64_encode(self.inner.as_slice())
836    }
837}
838
839impl From<&[u8; 32]> for DehydratedDeviceKey {
840    fn from(value: &[u8; 32]) -> Self {
841        DehydratedDeviceKey { inner: Box::new(*value) }
842    }
843}
844
845impl From<DehydratedDeviceKey> for Vec<u8> {
846    fn from(key: DehydratedDeviceKey) -> Self {
847        key.inner.to_vec()
848    }
849}
850
851#[cfg(not(tarpaulin_include))]
852impl Debug for DehydratedDeviceKey {
853    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
854        f.debug_tuple("DehydratedDeviceKey").field(&"...").finish()
855    }
856}
857
858impl DeviceChanges {
859    /// Merge the given `DeviceChanges` into this instance of `DeviceChanges`.
860    pub fn extend(&mut self, other: DeviceChanges) {
861        self.new.extend(other.new);
862        self.changed.extend(other.changed);
863        self.deleted.extend(other.deleted);
864    }
865
866    fn is_empty(&self) -> bool {
867        self.new.is_empty() && self.changed.is_empty() && self.deleted.is_empty()
868    }
869}
870
871/// Struct holding info about how many room keys the store has.
872#[derive(Debug, Clone, Default)]
873pub struct RoomKeyCounts {
874    /// The total number of room keys the store has.
875    pub total: usize,
876    /// The number of backed up room keys the store has.
877    pub backed_up: usize,
878}
879
880/// Stored versions of the backup keys.
881#[derive(Default, Clone, Debug)]
882pub struct BackupKeys {
883    /// The key used to decrypt backed up room keys.
884    pub decryption_key: Option<BackupDecryptionKey>,
885    /// The version that we are using for backups.
886    pub backup_version: Option<String>,
887}
888
889/// A struct containing private cross signing keys that can be backed up or
890/// uploaded to the secret store.
891#[derive(Default, Zeroize, ZeroizeOnDrop)]
892pub struct CrossSigningKeyExport {
893    /// The seed of the master key encoded as unpadded base64.
894    pub master_key: Option<String>,
895    /// The seed of the self signing key encoded as unpadded base64.
896    pub self_signing_key: Option<String>,
897    /// The seed of the user signing key encoded as unpadded base64.
898    pub user_signing_key: Option<String>,
899}
900
901#[cfg(not(tarpaulin_include))]
902impl Debug for CrossSigningKeyExport {
903    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
904        f.debug_struct("CrossSigningKeyExport")
905            .field("master_key", &self.master_key.is_some())
906            .field("self_signing_key", &self.self_signing_key.is_some())
907            .field("user_signing_key", &self.user_signing_key.is_some())
908            .finish_non_exhaustive()
909    }
910}
911
912/// Error describing what went wrong when importing private cross signing keys
913/// or the key backup key.
914#[derive(Debug, Error)]
915pub enum SecretImportError {
916    /// The key that we tried to import was invalid.
917    #[error(transparent)]
918    Key(#[from] vodozemac::KeyError),
919    /// The public key of the imported private key doesn't match to the public
920    /// key that was uploaded to the server.
921    #[error(
922        "The public key of the imported private key doesn't match to the \
923            public key that was uploaded to the server"
924    )]
925    MismatchedPublicKeys,
926    /// The new version of the identity couldn't be stored.
927    #[error(transparent)]
928    Store(#[from] CryptoStoreError),
929}
930
931/// Error describing what went wrong when exporting a [`SecretsBundle`].
932///
933/// The [`SecretsBundle`] can only be exported if we have all cross-signing
934/// private keys in the store.
935#[derive(Debug, Error)]
936pub enum SecretsBundleExportError {
937    /// The store itself had an error.
938    #[error(transparent)]
939    Store(#[from] CryptoStoreError),
940    /// We're missing one or multiple cross-signing keys.
941    #[error("The store is missing one or multiple cross-signing keys")]
942    MissingCrossSigningKey(KeyUsage),
943    /// We're missing all cross-signing keys.
944    #[error("The store doesn't contain any cross-signing keys")]
945    MissingCrossSigningKeys,
946    /// We have a backup key stored, but we don't know the version of the
947    /// backup.
948    #[error("The store contains a backup key, but no backup version")]
949    MissingBackupVersion,
950}
951
952/// Result type telling us if a `/keys/query` response was expected for a given
953/// user.
954#[derive(Clone, Copy, Debug, PartialEq, Eq)]
955pub(crate) enum UserKeyQueryResult {
956    WasPending,
957    WasNotPending,
958
959    /// A query was pending, but we gave up waiting
960    TimeoutExpired,
961}
962
963/// Room encryption settings which are modified by state events or user options
964#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
965pub struct RoomSettings {
966    /// The encryption algorithm that should be used in the room.
967    pub algorithm: EventEncryptionAlgorithm,
968
969    /// Should untrusted devices receive the room key, or should they be
970    /// excluded from the conversation.
971    pub only_allow_trusted_devices: bool,
972
973    /// The maximum time an encryption session should be used for, before it is
974    /// rotated.
975    pub session_rotation_period: Option<Duration>,
976
977    /// The maximum number of messages an encryption session should be used for,
978    /// before it is rotated.
979    pub session_rotation_period_messages: Option<usize>,
980}
981
982impl Default for RoomSettings {
983    fn default() -> Self {
984        Self {
985            algorithm: EventEncryptionAlgorithm::MegolmV1AesSha2,
986            only_allow_trusted_devices: false,
987            session_rotation_period: None,
988            session_rotation_period_messages: None,
989        }
990    }
991}
992
993/// Information on a room key that has been received or imported.
994#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
995pub struct RoomKeyInfo {
996    /// The [messaging algorithm] that this key is used for. Will be one of the
997    /// `m.megolm.*` algorithms.
998    ///
999    /// [messaging algorithm]: https://spec.matrix.org/v1.6/client-server-api/#messaging-algorithms
1000    pub algorithm: EventEncryptionAlgorithm,
1001
1002    /// The room where the key is used.
1003    pub room_id: OwnedRoomId,
1004
1005    /// The Curve25519 key of the device which initiated the session originally.
1006    pub sender_key: Curve25519PublicKey,
1007
1008    /// The ID of the session that the key is for.
1009    pub session_id: String,
1010}
1011
1012impl From<&InboundGroupSession> for RoomKeyInfo {
1013    fn from(group_session: &InboundGroupSession) -> Self {
1014        RoomKeyInfo {
1015            algorithm: group_session.algorithm().clone(),
1016            room_id: group_session.room_id().to_owned(),
1017            sender_key: group_session.sender_key(),
1018            session_id: group_session.session_id().to_owned(),
1019        }
1020    }
1021}
1022
1023/// Information on a room key that has been withheld
1024#[derive(Clone, Debug, Deserialize, Serialize)]
1025pub struct RoomKeyWithheldInfo {
1026    /// The room where the key is used.
1027    pub room_id: OwnedRoomId,
1028
1029    /// The ID of the session that the key is for.
1030    pub session_id: String,
1031
1032    /// The `m.room_key.withheld` event that notified us that the key is being
1033    /// withheld.
1034    pub withheld_event: RoomKeyWithheldEvent,
1035}
1036
1037impl Store {
1038    /// Create a new Store.
1039    pub(crate) fn new(
1040        account: StaticAccountData,
1041        identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
1042        store: Arc<CryptoStoreWrapper>,
1043        verification_machine: VerificationMachine,
1044    ) -> Self {
1045        Self {
1046            inner: Arc::new(StoreInner {
1047                static_account: account,
1048                identity,
1049                store: store.clone(),
1050                verification_machine,
1051                cache: Arc::new(RwLock::new(StoreCache {
1052                    store,
1053                    tracked_users: Default::default(),
1054                    loaded_tracked_users: Default::default(),
1055                    account: Default::default(),
1056                })),
1057            }),
1058        }
1059    }
1060
1061    /// UserId associated with this store
1062    pub(crate) fn user_id(&self) -> &UserId {
1063        &self.inner.static_account.user_id
1064    }
1065
1066    /// DeviceId associated with this store
1067    pub(crate) fn device_id(&self) -> &DeviceId {
1068        self.inner.verification_machine.own_device_id()
1069    }
1070
1071    /// The static data for the account associated with this store.
1072    pub(crate) fn static_account(&self) -> &StaticAccountData {
1073        &self.inner.static_account
1074    }
1075
1076    pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
1077        // TODO: (bnjbvr, #2624) If configured with a cross-process lock:
1078        // - try to take the lock,
1079        // - if acquired, look if another process touched the underlying storage,
1080        // - if yes, reload everything; if no, return current cache
1081        Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
1082    }
1083
1084    pub(crate) async fn transaction(&self) -> StoreTransaction {
1085        StoreTransaction::new(self.clone()).await
1086    }
1087
1088    // Note: bnjbvr lost against borrowck here. Ideally, the `F` parameter would
1089    // take a `&StoreTransaction`, but callers didn't quite like that.
1090    pub(crate) async fn with_transaction<
1091        T,
1092        Fut: futures_core::Future<Output = Result<(StoreTransaction, T), crate::OlmError>>,
1093        F: FnOnce(StoreTransaction) -> Fut,
1094    >(
1095        &self,
1096        func: F,
1097    ) -> Result<T, crate::OlmError> {
1098        let tr = self.transaction().await;
1099        let (tr, res) = func(tr).await?;
1100        tr.commit().await?;
1101        Ok(res)
1102    }
1103
1104    #[cfg(test)]
1105    /// test helper to reset the cross signing identity
1106    pub(crate) async fn reset_cross_signing_identity(&self) {
1107        self.inner.identity.lock().await.reset();
1108    }
1109
1110    /// PrivateCrossSigningIdentity associated with this store
1111    pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
1112        self.inner.identity.clone()
1113    }
1114
1115    /// Save the given Sessions to the store
1116    pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
1117        let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
1118
1119        self.save_changes(changes).await
1120    }
1121
1122    pub(crate) async fn get_sessions(
1123        &self,
1124        sender_key: &str,
1125    ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
1126        self.inner.store.get_sessions(sender_key).await
1127    }
1128
1129    pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
1130        self.inner.store.save_changes(changes).await
1131    }
1132
1133    /// Compare the given `InboundGroupSession` with an existing session we have
1134    /// in the store.
1135    ///
1136    /// This method returns `SessionOrdering::Better` if the given session is
1137    /// better than the one we already have or if we don't have such a
1138    /// session in the store.
1139    pub(crate) async fn compare_group_session(
1140        &self,
1141        session: &InboundGroupSession,
1142    ) -> Result<SessionOrdering> {
1143        let old_session = self
1144            .inner
1145            .store
1146            .get_inbound_group_session(session.room_id(), session.session_id())
1147            .await?;
1148
1149        Ok(if let Some(old_session) = old_session {
1150            session.compare(&old_session).await
1151        } else {
1152            SessionOrdering::Better
1153        })
1154    }
1155
1156    #[cfg(test)]
1157    /// Testing helper to allow to save only a set of devices
1158    pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
1159        let changes = Changes {
1160            devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
1161            ..Default::default()
1162        };
1163
1164        self.save_changes(changes).await
1165    }
1166
1167    /// Convenience helper to persist an array of [`InboundGroupSession`]s.
1168    pub(crate) async fn save_inbound_group_sessions(
1169        &self,
1170        sessions: &[InboundGroupSession],
1171    ) -> Result<()> {
1172        let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
1173
1174        self.save_changes(changes).await
1175    }
1176
1177    /// Get the display name of our own device.
1178    pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
1179        Ok(self
1180            .inner
1181            .store
1182            .get_device(self.user_id(), self.device_id())
1183            .await?
1184            .and_then(|d| d.display_name().map(|d| d.to_owned())))
1185    }
1186
1187    /// Get the device data for the given [`UserId`] and [`DeviceId`].
1188    ///
1189    /// *Note*: This method will include our own device which is always present
1190    /// in the store.
1191    pub(crate) async fn get_device_data(
1192        &self,
1193        user_id: &UserId,
1194        device_id: &DeviceId,
1195    ) -> Result<Option<DeviceData>> {
1196        self.inner.store.get_device(user_id, device_id).await
1197    }
1198
1199    /// Get the device data for the given [`UserId`] and [`DeviceId`].
1200    ///
1201    /// *Note*: This method will **not** include our own device.
1202    ///
1203    /// Use this method if you need a list of recipients for a given user, since
1204    /// we don't want to encrypt for our own device, otherwise take a look at
1205    /// the [`Store::get_device_data_for_user`] method.
1206    pub(crate) async fn get_device_data_for_user_filtered(
1207        &self,
1208        user_id: &UserId,
1209    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
1210        self.inner.store.get_user_devices(user_id).await.map(|mut d| {
1211            if user_id == self.user_id() {
1212                d.remove(self.device_id());
1213            }
1214            d
1215        })
1216    }
1217
1218    /// Get the [`DeviceData`] for all the devices a user has.
1219    ///
1220    /// *Note*: This method will include our own device which is always present
1221    /// in the store.
1222    ///
1223    /// Use this method if you need to operate on or update all devices of a
1224    /// user, otherwise take a look at the
1225    /// [`Store::get_device_data_for_user_filtered`] method.
1226    pub(crate) async fn get_device_data_for_user(
1227        &self,
1228        user_id: &UserId,
1229    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
1230        self.inner.store.get_user_devices(user_id).await
1231    }
1232
1233    /// Get a [`Device`] for the given user with the given
1234    /// [`Curve25519PublicKey`] key.
1235    ///
1236    /// *Note*: This method will include our own device which is always present
1237    /// in the store.
1238    pub(crate) async fn get_device_from_curve_key(
1239        &self,
1240        user_id: &UserId,
1241        curve_key: Curve25519PublicKey,
1242    ) -> Result<Option<Device>> {
1243        self.get_user_devices(user_id)
1244            .await
1245            .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
1246    }
1247
1248    /// Get all devices associated with the given [`UserId`].
1249    ///
1250    /// This method is more expensive than the
1251    /// [`Store::get_device_data_for_user`] method, since a [`Device`]
1252    /// requires the [`OwnUserIdentityData`] and the [`UserIdentityData`] of the
1253    /// device owner to be fetched from the store as well.
1254    ///
1255    /// *Note*: This method will include our own device which is always present
1256    /// in the store.
1257    pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
1258        let devices = self.get_device_data_for_user(user_id).await?;
1259
1260        let own_identity = self
1261            .inner
1262            .store
1263            .get_user_identity(self.user_id())
1264            .await?
1265            .and_then(|i| i.own().cloned());
1266        let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
1267
1268        Ok(UserDevices {
1269            inner: devices,
1270            verification_machine: self.inner.verification_machine.clone(),
1271            own_identity,
1272            device_owner_identity,
1273        })
1274    }
1275
1276    /// Get a [`Device`] for the given user with the given [`DeviceId`].
1277    ///
1278    /// This method is more expensive than the [`Store::get_device_data`] method
1279    /// since a [`Device`] requires the [`OwnUserIdentityData`] and the
1280    /// [`UserIdentityData`] of the device owner to be fetched from the
1281    /// store as well.
1282    ///
1283    /// *Note*: This method will include our own device which is always present
1284    /// in the store.
1285    pub(crate) async fn get_device(
1286        &self,
1287        user_id: &UserId,
1288        device_id: &DeviceId,
1289    ) -> Result<Option<Device>> {
1290        if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
1291            Ok(Some(self.wrap_device_data(device_data).await?))
1292        } else {
1293            Ok(None)
1294        }
1295    }
1296
1297    /// Create a new device using the supplied [`DeviceData`]. Normally we would
1298    /// call [`Self::get_device`] to find an existing device inside this
1299    /// store. Only call this if you have some existing DeviceData and want
1300    /// to wrap it with the extra information provided by a [`Device`].
1301    pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
1302        let own_identity = self
1303            .inner
1304            .store
1305            .get_user_identity(self.user_id())
1306            .await?
1307            .and_then(|i| i.own().cloned());
1308
1309        let device_owner_identity =
1310            self.inner.store.get_user_identity(device_data.user_id()).await?;
1311
1312        Ok(Device {
1313            inner: device_data,
1314            verification_machine: self.inner.verification_machine.clone(),
1315            own_identity,
1316            device_owner_identity,
1317        })
1318    }
1319
1320    ///  Get the Identity of `user_id`
1321    pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1322        let own_identity = self
1323            .inner
1324            .store
1325            .get_user_identity(self.user_id())
1326            .await?
1327            .and_then(as_variant!(UserIdentityData::Own));
1328
1329        Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
1330            UserIdentity::new(
1331                self.clone(),
1332                i,
1333                self.inner.verification_machine.to_owned(),
1334                own_identity,
1335            )
1336        }))
1337    }
1338
1339    /// Try to export the secret with the given secret name.
1340    ///
1341    /// The exported secret will be encoded as unpadded base64. Returns `Null`
1342    /// if the secret can't be found.
1343    ///
1344    /// # Arguments
1345    ///
1346    /// * `secret_name` - The name of the secret that should be exported.
1347    pub async fn export_secret(
1348        &self,
1349        secret_name: &SecretName,
1350    ) -> Result<Option<String>, CryptoStoreError> {
1351        Ok(match secret_name {
1352            SecretName::CrossSigningMasterKey
1353            | SecretName::CrossSigningUserSigningKey
1354            | SecretName::CrossSigningSelfSigningKey => {
1355                self.inner.identity.lock().await.export_secret(secret_name).await
1356            }
1357            SecretName::RecoveryKey => {
1358                if let Some(key) = self.load_backup_keys().await?.decryption_key {
1359                    let exported = key.to_base64();
1360                    Some(exported)
1361                } else {
1362                    None
1363                }
1364            }
1365            name => {
1366                warn!(secret = ?name, "Unknown secret was requested");
1367                None
1368            }
1369        })
1370    }
1371
1372    /// Export all the private cross signing keys we have.
1373    ///
1374    /// The export will contain the seed for the ed25519 keys as a unpadded
1375    /// base64 encoded string.
1376    ///
1377    /// This method returns `None` if we don't have any private cross signing
1378    /// keys.
1379    pub async fn export_cross_signing_keys(
1380        &self,
1381    ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
1382        let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
1383        let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
1384        let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
1385
1386        Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
1387            None
1388        } else {
1389            Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
1390        })
1391    }
1392
1393    /// Import our private cross signing keys.
1394    ///
1395    /// The export needs to contain the seed for the Ed25519 keys as an unpadded
1396    /// base64 encoded string.
1397    pub async fn import_cross_signing_keys(
1398        &self,
1399        export: CrossSigningKeyExport,
1400    ) -> Result<CrossSigningStatus, SecretImportError> {
1401        if let Some(public_identity) =
1402            self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1403        {
1404            let identity = self.inner.identity.lock().await;
1405
1406            identity
1407                .import_secrets(
1408                    public_identity.to_owned(),
1409                    export.master_key.as_deref(),
1410                    export.self_signing_key.as_deref(),
1411                    export.user_signing_key.as_deref(),
1412                )
1413                .await?;
1414
1415            let status = identity.status().await;
1416
1417            let diff = identity.get_public_identity_diff(&public_identity.inner).await;
1418
1419            let mut changes =
1420                Changes { private_identity: Some(identity.clone()), ..Default::default() };
1421
1422            if diff.none_differ() {
1423                public_identity.mark_as_verified();
1424                changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
1425            }
1426
1427            info!(?status, "Successfully imported the private cross-signing keys");
1428
1429            self.save_changes(changes).await?;
1430        } else {
1431            warn!(
1432                "No public identity found while importing cross-signing keys, \
1433                 a /keys/query needs to be done"
1434            );
1435        }
1436
1437        Ok(self.inner.identity.lock().await.status().await)
1438    }
1439
1440    /// Export all the secrets we have in the store into a [`SecretsBundle`].
1441    ///
1442    /// This method will export all the private cross-signing keys and, if
1443    /// available, the private part of a backup key and its accompanying
1444    /// version.
1445    ///
1446    /// The method will fail if we don't have all three private cross-signing
1447    /// keys available.
1448    ///
1449    /// **Warning**: Only export this and share it with a trusted recipient,
1450    /// i.e. if an existing device is sharing this with a new device.
1451    pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
1452        let Some(cross_signing) = self.export_cross_signing_keys().await? else {
1453            return Err(SecretsBundleExportError::MissingCrossSigningKeys);
1454        };
1455
1456        let Some(master_key) = cross_signing.master_key.clone() else {
1457            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
1458        };
1459
1460        let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
1461            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
1462        };
1463
1464        let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
1465            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
1466        };
1467
1468        let backup_keys = self.load_backup_keys().await?;
1469
1470        let backup = if let Some(key) = backup_keys.decryption_key {
1471            if let Some(backup_version) = backup_keys.backup_version {
1472                Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
1473                    MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
1474                ))
1475            } else {
1476                return Err(SecretsBundleExportError::MissingBackupVersion);
1477            }
1478        } else {
1479            None
1480        };
1481
1482        Ok(SecretsBundle {
1483            cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
1484            backup,
1485        })
1486    }
1487
1488    /// Import and persists secrets from a [`SecretsBundle`].
1489    ///
1490    /// This method will import all the private cross-signing keys and, if
1491    /// available, the private part of a backup key and its accompanying
1492    /// version into the store.
1493    ///
1494    /// **Warning**: Only import this from a trusted source, i.e. if an existing
1495    /// device is sharing this with a new device. The imported cross-signing
1496    /// keys will create a [`OwnUserIdentity`] and mark it as verified.
1497    ///
1498    /// The backup key will be persisted in the store and can be enabled using
1499    /// the [`BackupMachine`].
1500    pub async fn import_secrets_bundle(
1501        &self,
1502        bundle: &SecretsBundle,
1503    ) -> Result<(), SecretImportError> {
1504        let mut changes = Changes::default();
1505
1506        if let Some(backup_bundle) = &bundle.backup {
1507            match backup_bundle {
1508                BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1509                    changes.backup_decryption_key = Some(bundle.key.clone());
1510                    changes.backup_version = Some(bundle.backup_version.clone());
1511                }
1512            }
1513        }
1514
1515        let identity = self.inner.identity.lock().await;
1516
1517        identity
1518            .import_secrets_unchecked(
1519                Some(&bundle.cross_signing.master_key),
1520                Some(&bundle.cross_signing.self_signing_key),
1521                Some(&bundle.cross_signing.user_signing_key),
1522            )
1523            .await?;
1524
1525        let public_identity = identity.to_public_identity().await.expect(
1526            "We should be able to create a new public identity since we just imported \
1527             all the private cross-signing keys",
1528        );
1529
1530        changes.private_identity = Some(identity.clone());
1531        changes.identities.new.push(UserIdentityData::Own(public_identity));
1532
1533        Ok(self.save_changes(changes).await?)
1534    }
1535
1536    /// Import the given `secret` named `secret_name` into the keystore.
1537    pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1538        match &secret.secret_name {
1539            SecretName::CrossSigningMasterKey
1540            | SecretName::CrossSigningUserSigningKey
1541            | SecretName::CrossSigningSelfSigningKey => {
1542                if let Some(public_identity) =
1543                    self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1544                {
1545                    let identity = self.inner.identity.lock().await;
1546
1547                    identity
1548                        .import_secret(
1549                            public_identity,
1550                            &secret.secret_name,
1551                            &secret.event.content.secret,
1552                        )
1553                        .await?;
1554                    info!(
1555                        secret_name = ?secret.secret_name,
1556                        "Successfully imported a private cross signing key"
1557                    );
1558
1559                    let changes =
1560                        Changes { private_identity: Some(identity.clone()), ..Default::default() };
1561
1562                    self.save_changes(changes).await?;
1563                }
1564            }
1565            SecretName::RecoveryKey => {
1566                // We don't import the decryption key here since we'll want to
1567                // check if the public key matches to the latest version on the
1568                // server. We instead put the secret into a secret inbox where
1569                // it will stay until it either gets overwritten
1570                // or the user accepts the secret.
1571            }
1572            name => {
1573                warn!(secret = ?name, "Tried to import an unknown secret");
1574            }
1575        }
1576
1577        Ok(())
1578    }
1579
1580    /// Check whether there is a global flag to only encrypt messages for
1581    /// trusted devices or for everyone.
1582    pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1583        let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1584        Ok(value)
1585    }
1586
1587    /// Set global flag whether to encrypt messages for untrusted devices, or
1588    /// whether they should be excluded from the conversation.
1589    pub async fn set_only_allow_trusted_devices(
1590        &self,
1591        block_untrusted_devices: bool,
1592    ) -> Result<()> {
1593        self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1594    }
1595
1596    /// Get custom stored value associated with a key
1597    pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1598        let Some(value) = self.get_custom_value(key).await? else {
1599            return Ok(None);
1600        };
1601        let deserialized = self.deserialize_value(&value)?;
1602        Ok(Some(deserialized))
1603    }
1604
1605    /// Store custom value associated with a key
1606    pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1607        let serialized = self.serialize_value(value)?;
1608        self.set_custom_value(key, serialized).await?;
1609        Ok(())
1610    }
1611
1612    fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1613        let serialized =
1614            rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1615        Ok(serialized)
1616    }
1617
1618    fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1619        let deserialized =
1620            rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1621        Ok(deserialized)
1622    }
1623
1624    /// Receive notifications of room keys being received as a [`Stream`].
1625    ///
1626    /// Each time a room key is updated in any way, an update will be sent to
1627    /// the stream. Updates that happen at the same time are batched into a
1628    /// [`Vec`].
1629    ///
1630    /// If the reader of the stream lags too far behind an error will be sent to
1631    /// the reader.
1632    ///
1633    /// The stream will terminate once all references to the underlying
1634    /// `CryptoStoreWrapper` are dropped.
1635    pub fn room_keys_received_stream(
1636        &self,
1637    ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> {
1638        self.inner.store.room_keys_received_stream()
1639    }
1640
1641    /// Receive notifications of received `m.room_key.withheld` messages.
1642    ///
1643    /// Each time an `m.room_key.withheld` is received and stored, an update
1644    /// will be sent to the stream. Updates that happen at the same time are
1645    /// batched into a [`Vec`].
1646    ///
1647    /// If the reader of the stream lags too far behind, a warning will be
1648    /// logged and items will be dropped.
1649    pub fn room_keys_withheld_received_stream(
1650        &self,
1651    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> {
1652        self.inner.store.room_keys_withheld_received_stream()
1653    }
1654
1655    /// Returns a stream of user identity updates, allowing users to listen for
1656    /// notifications about new or changed user identities.
1657    ///
1658    /// The stream produced by this method emits updates whenever a new user
1659    /// identity is discovered or when an existing identities information is
1660    /// changed. Users can subscribe to this stream and receive updates in
1661    /// real-time.
1662    ///
1663    /// Caution: the returned stream will never terminate, and it holds a
1664    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1665    /// resource leaks.
1666    ///
1667    /// # Examples
1668    ///
1669    /// ```no_run
1670    /// # use matrix_sdk_crypto::OlmMachine;
1671    /// # use ruma::{device_id, user_id};
1672    /// # use futures_util::{pin_mut, StreamExt};
1673    /// # let machine: OlmMachine = unimplemented!();
1674    /// # futures_executor::block_on(async {
1675    /// let identities_stream = machine.store().user_identities_stream();
1676    /// pin_mut!(identities_stream);
1677    ///
1678    /// for identity_updates in identities_stream.next().await {
1679    ///     for (_, identity) in identity_updates.new {
1680    ///         println!("A new identity has been added {}", identity.user_id());
1681    ///     }
1682    /// }
1683    /// # });
1684    /// ```
1685    pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> {
1686        let verification_machine = self.inner.verification_machine.to_owned();
1687
1688        let this = self.clone();
1689        self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1690            let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1691
1692            let map_identity = |(user_id, identity)| {
1693                (
1694                    user_id,
1695                    UserIdentity::new(
1696                        this.clone(),
1697                        identity,
1698                        verification_machine.to_owned(),
1699                        own_identity.to_owned(),
1700                    ),
1701                )
1702            };
1703
1704            let new = new_identities.into_iter().map(map_identity).collect();
1705            let changed = changed_identities.into_iter().map(map_identity).collect();
1706            let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1707
1708            IdentityUpdates { new, changed, unchanged }
1709        })
1710    }
1711
1712    /// Returns a stream of device updates, allowing users to listen for
1713    /// notifications about new or changed devices.
1714    ///
1715    /// The stream produced by this method emits updates whenever a new device
1716    /// is discovered or when an existing device's information is changed. Users
1717    /// can subscribe to this stream and receive updates in real-time.
1718    ///
1719    /// Caution: the returned stream will never terminate, and it holds a
1720    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1721    /// resource leaks.
1722    ///
1723    /// # Examples
1724    ///
1725    /// ```no_run
1726    /// # use matrix_sdk_crypto::OlmMachine;
1727    /// # use ruma::{device_id, user_id};
1728    /// # use futures_util::{pin_mut, StreamExt};
1729    /// # let machine: OlmMachine = unimplemented!();
1730    /// # futures_executor::block_on(async {
1731    /// let devices_stream = machine.store().devices_stream();
1732    /// pin_mut!(devices_stream);
1733    ///
1734    /// for device_updates in devices_stream.next().await {
1735    ///     if let Some(user_devices) = device_updates.new.get(machine.user_id()) {
1736    ///         for device in user_devices.values() {
1737    ///             println!("A new device has been added {}", device.device_id());
1738    ///         }
1739    ///     }
1740    /// }
1741    /// # });
1742    /// ```
1743    pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> {
1744        let verification_machine = self.inner.verification_machine.to_owned();
1745
1746        self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1747            collect_device_updates(
1748                verification_machine.to_owned(),
1749                own_identity,
1750                identities,
1751                devices,
1752            )
1753        })
1754    }
1755
1756    /// Returns a [`Stream`] of user identity and device updates
1757    ///
1758    /// The stream returned by this method returns the same data as
1759    /// [`Store::user_identities_stream`] and [`Store::devices_stream`] but does
1760    /// not include references to the `VerificationMachine`. It is therefore a
1761    /// lower-level view on that data.
1762    ///
1763    /// The stream will terminate once all references to the underlying
1764    /// `CryptoStoreWrapper` are dropped.
1765    pub fn identities_stream_raw(&self) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> {
1766        self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1767    }
1768
1769    /// Creates a `CrossProcessStoreLock` for this store, that will contain the
1770    /// given key and value when hold.
1771    pub fn create_store_lock(
1772        &self,
1773        lock_key: String,
1774        lock_value: String,
1775    ) -> CrossProcessStoreLock<LockableCryptoStore> {
1776        self.inner.store.create_store_lock(lock_key, lock_value)
1777    }
1778
1779    /// Receive notifications of gossipped secrets being received and stored in
1780    /// the secret inbox as a [`Stream`].
1781    ///
1782    /// The gossipped secrets are received using the `m.secret.send` event type
1783    /// and are guaranteed to have been received over a 1-to-1 Olm
1784    /// [`Session`] from a verified [`Device`].
1785    ///
1786    /// The [`GossippedSecret`] can also be later found in the secret inbox and
1787    /// retrieved using the [`CryptoStore::get_secrets_from_inbox()`] method.
1788    ///
1789    /// After a suitable secret of a certain type has been found it can be
1790    /// removed from the store
1791    /// using the [`CryptoStore::delete_secrets_from_inbox()`] method.
1792    ///
1793    /// The only secret this will currently broadcast is the
1794    /// `m.megolm_backup.v1`.
1795    ///
1796    /// If the reader of the stream lags too far behind, a warning will be
1797    /// logged and items will be dropped.
1798    ///
1799    /// # Examples
1800    ///
1801    /// ```no_run
1802    /// # use matrix_sdk_crypto::OlmMachine;
1803    /// # use ruma::{device_id, user_id};
1804    /// # use futures_util::{pin_mut, StreamExt};
1805    /// # let alice = user_id!("@alice:example.org").to_owned();
1806    /// # futures_executor::block_on(async {
1807    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1808    ///
1809    /// let secret_stream = machine.store().secrets_stream();
1810    /// pin_mut!(secret_stream);
1811    ///
1812    /// for secret in secret_stream.next().await {
1813    ///     // Accept the secret if it's valid, then delete all the secrets of this type.
1814    ///     machine.store().delete_secrets_from_inbox(&secret.secret_name);
1815    /// }
1816    /// # });
1817    /// ```
1818    pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> {
1819        self.inner.store.secrets_stream()
1820    }
1821
1822    /// Import the given room keys into the store.
1823    ///
1824    /// # Arguments
1825    ///
1826    /// * `exported_keys` - The keys to be imported.
1827    /// * `from_backup_version` - If the keys came from key backup, the key
1828    ///   backup version. This will cause the keys to be marked as already
1829    ///   backed up, and therefore not requiring another backup.
1830    /// * `progress_listener` - Callback which will be called after each key is
1831    ///   processed. Called with arguments `(processed, total)` where
1832    ///   `processed` is the number of keys processed so far, and `total` is the
1833    ///   total number of keys (i.e., `exported_keys.len()`).
1834    pub async fn import_room_keys(
1835        &self,
1836        exported_keys: Vec<ExportedRoomKey>,
1837        from_backup_version: Option<&str>,
1838        progress_listener: impl Fn(usize, usize),
1839    ) -> Result<RoomKeyImportResult> {
1840        let exported_keys: Vec<&ExportedRoomKey> = exported_keys.iter().collect();
1841        self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1842    }
1843
1844    /// Import the given room keys into our store.
1845    ///
1846    /// # Arguments
1847    ///
1848    /// * `exported_keys` - A list of previously exported keys that should be
1849    ///   imported into our store. If we already have a better version of a key
1850    ///   the key will *not* be imported.
1851    ///
1852    /// Returns a tuple of numbers that represent the number of sessions that
1853    /// were imported and the total number of sessions that were found in the
1854    /// key export.
1855    ///
1856    /// # Examples
1857    ///
1858    /// ```no_run
1859    /// # use std::io::Cursor;
1860    /// # use matrix_sdk_crypto::{OlmMachine, decrypt_room_key_export};
1861    /// # use ruma::{device_id, user_id};
1862    /// # let alice = user_id!("@alice:example.org");
1863    /// # async {
1864    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1865    /// # let export = Cursor::new("".to_owned());
1866    /// let exported_keys = decrypt_room_key_export(export, "1234").unwrap();
1867    /// machine.store().import_exported_room_keys(exported_keys, |_, _| {}).await.unwrap();
1868    /// # };
1869    /// ```
1870    pub async fn import_exported_room_keys(
1871        &self,
1872        exported_keys: Vec<ExportedRoomKey>,
1873        progress_listener: impl Fn(usize, usize),
1874    ) -> Result<RoomKeyImportResult> {
1875        self.import_room_keys(exported_keys, None, progress_listener).await
1876    }
1877
1878    async fn import_sessions_impl<T>(
1879        &self,
1880        room_keys: Vec<T>,
1881        from_backup_version: Option<&str>,
1882        progress_listener: impl Fn(usize, usize),
1883    ) -> Result<RoomKeyImportResult>
1884    where
1885        T: TryInto<InboundGroupSession> + RoomKeyExport + Copy,
1886        T::Error: Debug,
1887    {
1888        let mut sessions = Vec::new();
1889
1890        async fn new_session_better(
1891            session: &InboundGroupSession,
1892            old_session: Option<InboundGroupSession>,
1893        ) -> bool {
1894            if let Some(old_session) = &old_session {
1895                session.compare(old_session).await == SessionOrdering::Better
1896            } else {
1897                true
1898            }
1899        }
1900
1901        let total_count = room_keys.len();
1902        let mut keys = BTreeMap::new();
1903
1904        for (i, key) in room_keys.into_iter().enumerate() {
1905            match key.try_into() {
1906                Ok(session) => {
1907                    let old_session = self
1908                        .inner
1909                        .store
1910                        .get_inbound_group_session(session.room_id(), session.session_id())
1911                        .await?;
1912
1913                    // Only import the session if we didn't have this session or
1914                    // if it's a better version of the same session.
1915                    if new_session_better(&session, old_session).await {
1916                        if from_backup_version.is_some() {
1917                            session.mark_as_backed_up();
1918                        }
1919
1920                        keys.entry(session.room_id().to_owned())
1921                            .or_insert_with(BTreeMap::new)
1922                            .entry(session.sender_key().to_base64())
1923                            .or_insert_with(BTreeSet::new)
1924                            .insert(session.session_id().to_owned());
1925
1926                        sessions.push(session);
1927                    }
1928                }
1929                Err(e) => {
1930                    warn!(
1931                        sender_key = key.sender_key().to_base64(),
1932                        room_id = ?key.room_id(),
1933                        session_id = key.session_id(),
1934                        error = ?e,
1935                        "Couldn't import a room key from a file export."
1936                    );
1937                }
1938            }
1939
1940            progress_listener(i, total_count);
1941        }
1942
1943        let imported_count = sessions.len();
1944
1945        self.inner.store.save_inbound_group_sessions(sessions, from_backup_version).await?;
1946
1947        info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1948
1949        Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1950    }
1951
1952    pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1953        self.inner.store.clone()
1954    }
1955
1956    /// Export the keys that match the given predicate.
1957    ///
1958    /// # Arguments
1959    ///
1960    /// * `predicate` - A closure that will be called for every known
1961    ///   `InboundGroupSession`, which represents a room key. If the closure
1962    ///   returns `true` the `InboundGroupSession` will be included in the
1963    ///   export, if the closure returns `false` it will not be included.
1964    ///
1965    /// # Examples
1966    ///
1967    /// ```no_run
1968    /// # use matrix_sdk_crypto::{OlmMachine, encrypt_room_key_export};
1969    /// # use ruma::{device_id, user_id, room_id};
1970    /// # let alice = user_id!("@alice:example.org");
1971    /// # async {
1972    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1973    /// let room_id = room_id!("!test:localhost");
1974    /// let exported_keys = machine.store().export_room_keys(|s| s.room_id() == room_id).await.unwrap();
1975    /// let encrypted_export = encrypt_room_key_export(&exported_keys, "1234", 1);
1976    /// # };
1977    /// ```
1978    pub async fn export_room_keys(
1979        &self,
1980        predicate: impl FnMut(&InboundGroupSession) -> bool,
1981    ) -> Result<Vec<ExportedRoomKey>> {
1982        let mut exported = Vec::new();
1983
1984        let mut sessions = self.get_inbound_group_sessions().await?;
1985        sessions.retain(predicate);
1986
1987        for session in sessions {
1988            let export = session.export().await;
1989            exported.push(export);
1990        }
1991
1992        Ok(exported)
1993    }
1994
1995    /// Export room keys matching a predicate, providing them as an async
1996    /// `Stream`.
1997    ///
1998    /// # Arguments
1999    ///
2000    /// * `predicate` - A closure that will be called for every known
2001    ///   `InboundGroupSession`, which represents a room key. If the closure
2002    ///   returns `true` the `InboundGroupSession` will be included in the
2003    ///   export, if the closure returns `false` it will not be included.
2004    ///
2005    /// # Examples
2006    ///
2007    /// ```no_run
2008    /// use std::pin::pin;
2009    ///
2010    /// use matrix_sdk_crypto::{olm::ExportedRoomKey, OlmMachine};
2011    /// use ruma::{device_id, room_id, user_id};
2012    /// use tokio_stream::StreamExt;
2013    /// # async {
2014    /// let alice = user_id!("@alice:example.org");
2015    /// let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
2016    /// let room_id = room_id!("!test:localhost");
2017    /// let mut keys = pin!(machine
2018    ///     .store()
2019    ///     .export_room_keys_stream(|s| s.room_id() == room_id)
2020    ///     .await
2021    ///     .unwrap());
2022    /// while let Some(key) = keys.next().await {
2023    ///     println!("{}", key.room_id);
2024    /// }
2025    /// # };
2026    /// ```
2027    pub async fn export_room_keys_stream(
2028        &self,
2029        predicate: impl FnMut(&InboundGroupSession) -> bool,
2030    ) -> Result<impl Stream<Item = ExportedRoomKey>> {
2031        // TODO: if/when there is a get_inbound_group_sessions_stream, use that here.
2032        let sessions = self.get_inbound_group_sessions().await?;
2033        Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
2034            .then(|session| async move { session.export().await }))
2035    }
2036
2037    /// Assemble a room key bundle for sharing encrypted history, as per
2038    /// [MSC4268].
2039    ///
2040    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
2041    pub async fn build_room_key_bundle(
2042        &self,
2043        room_id: &RoomId,
2044    ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
2045        // TODO: make this WAY more efficient. We should only fetch sessions for the
2046        //   correct room.
2047        let mut sessions = self.get_inbound_group_sessions().await?;
2048        sessions.retain(|session| session.room_id == room_id);
2049
2050        let mut bundle = RoomKeyBundle::default();
2051        for session in sessions {
2052            if session.shared_history() {
2053                bundle.room_keys.push(session.export().await.into());
2054            } else {
2055                bundle.withheld.push(RoomKeyWithheldContent::new(
2056                    session.algorithm().to_owned(),
2057                    WithheldCode::Unauthorised,
2058                    session.room_id().to_owned(),
2059                    session.session_id().to_owned(),
2060                    session.sender_key().to_owned(),
2061                    self.device_id().to_owned(),
2062                ));
2063            }
2064        }
2065
2066        Ok(bundle)
2067    }
2068
2069    /// Import the contents of a downloaded and decrypted [MSC4268] key bundle.
2070    ///
2071    /// # Arguments
2072    ///
2073    /// * `bundle` - The decrypted and deserialized bundle itself.
2074    /// * `room_id` - The room that we expect this bundle to correspond to.
2075    /// * `sender_user` - The user that sent us the to-device message pointing
2076    ///   to this data.
2077    /// * `sender_data` - Information on the sending device at the time we
2078    ///   received that message.
2079    ///
2080    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
2081    #[instrument(skip(self, bundle, progress_listener), fields(bundle_size = bundle.room_keys.len()))]
2082    pub async fn receive_room_key_bundle(
2083        &self,
2084        room_id: &RoomId,
2085        sender_user: &UserId,
2086        sender_data: &SenderData,
2087        bundle: RoomKeyBundle,
2088        progress_listener: impl Fn(usize, usize),
2089    ) -> Result<(), CryptoStoreError> {
2090        let (good, bad): (Vec<_>, Vec<_>) = bundle.room_keys.iter().partition_map(|key| {
2091            if key.room_id != room_id {
2092                trace!("Ignoring key for incorrect room {} in bundle", key.room_id);
2093                Either::Right(key)
2094            } else {
2095                Either::Left(key)
2096            }
2097        });
2098
2099        match (bad.is_empty(), good.is_empty()) {
2100            // Case 1: Completely empty bundle.
2101            (true, true) => {
2102                warn!("Received a completely empty room key bundle");
2103            }
2104
2105            // Case 2: A bundle for the wrong room.
2106            (false, true) => {
2107                let bad_keys: Vec<_> =
2108                    bad.iter().map(|&key| (&key.room_id, &key.session_id)).collect();
2109
2110                warn!(
2111                    ?bad_keys,
2112                    "Received a room key bundle for the wrong room, ignoring all room keys from the bundle"
2113                );
2114            }
2115
2116            // Case 3: A bundle containing useful room keys.
2117            (_, false) => {
2118                // We have at least some good keys, if we also have some bad ones let's mention
2119                // that here.
2120                if !bad.is_empty() {
2121                    warn!(
2122                        bad_key_count = bad.len(),
2123                        "The room key bundle contained some room keys \
2124                         that were meant for a different room"
2125                    );
2126                }
2127
2128                self.import_sessions_impl(good, None, progress_listener).await?;
2129            }
2130        }
2131
2132        Ok(())
2133    }
2134}
2135
2136impl Deref for Store {
2137    type Target = DynCryptoStore;
2138
2139    fn deref(&self) -> &Self::Target {
2140        self.inner.store.deref().deref()
2141    }
2142}
2143
2144/// A crypto store that implements primitives for cross-process locking.
2145#[derive(Clone, Debug)]
2146pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
2147
2148impl matrix_sdk_common::store_locks::BackingStore for LockableCryptoStore {
2149    type LockError = CryptoStoreError;
2150
2151    async fn try_lock(
2152        &self,
2153        lease_duration_ms: u32,
2154        key: &str,
2155        holder: &str,
2156    ) -> std::result::Result<bool, Self::LockError> {
2157        self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
2158    }
2159}
2160
2161#[cfg(test)]
2162mod tests {
2163    use std::pin::pin;
2164
2165    use futures_util::StreamExt;
2166    use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
2167    use matrix_sdk_test::async_test;
2168    use ruma::{device_id, room_id, user_id, RoomId};
2169    use vodozemac::megolm::SessionKey;
2170
2171    use crate::{
2172        machine::test_helpers::get_machine_pair,
2173        olm::{InboundGroupSession, SenderData},
2174        store::DehydratedDeviceKey,
2175        types::EventEncryptionAlgorithm,
2176        OlmMachine,
2177    };
2178
2179    #[async_test]
2180    async fn test_import_room_keys_notifies_stream() {
2181        use futures_util::FutureExt;
2182
2183        let (alice, bob, _) =
2184            get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2185
2186        let room1_id = room_id!("!room1:localhost");
2187        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2188        let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
2189
2190        let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
2191        bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
2192
2193        let room_keys = room_keys_received_stream
2194            .next()
2195            .now_or_never()
2196            .flatten()
2197            .expect("We should have received an update of room key infos")
2198            .unwrap();
2199        assert_eq!(room_keys.len(), 1);
2200        assert_eq!(room_keys[0].room_id, "!room1:localhost");
2201    }
2202
2203    #[async_test]
2204    async fn test_export_room_keys_provides_selected_keys() {
2205        // Given an OlmMachine with room keys in it
2206        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2207        let room1_id = room_id!("!room1:localhost");
2208        let room2_id = room_id!("!room2:localhost");
2209        let room3_id = room_id!("!room3:localhost");
2210        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2211        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2212        alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
2213
2214        // When I export some of the keys
2215        let keys = alice
2216            .store()
2217            .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
2218            .await
2219            .unwrap();
2220
2221        // Then the requested keys were provided
2222        assert_eq!(keys.len(), 2);
2223        assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2224        assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2225        assert_eq!(keys[0].room_id, "!room2:localhost");
2226        assert_eq!(keys[1].room_id, "!room3:localhost");
2227        assert_eq!(keys[0].session_key.to_base64().len(), 220);
2228        assert_eq!(keys[1].session_key.to_base64().len(), 220);
2229    }
2230
2231    #[async_test]
2232    async fn test_export_room_keys_stream_can_provide_all_keys() {
2233        // Given an OlmMachine with room keys in it
2234        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2235        let room1_id = room_id!("!room1:localhost");
2236        let room2_id = room_id!("!room2:localhost");
2237        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2238        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2239
2240        // When I export the keys as a stream
2241        let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
2242
2243        // And collect them
2244        let mut collected = vec![];
2245        while let Some(key) = keys.next().await {
2246            collected.push(key);
2247        }
2248
2249        // Then all the keys were provided
2250        assert_eq!(collected.len(), 2);
2251        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2252        assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2253        assert_eq!(collected[0].room_id, "!room1:localhost");
2254        assert_eq!(collected[1].room_id, "!room2:localhost");
2255        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2256        assert_eq!(collected[1].session_key.to_base64().len(), 220);
2257    }
2258
2259    #[async_test]
2260    async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
2261        // Given an OlmMachine with room keys in it
2262        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2263        let room1_id = room_id!("!room1:localhost");
2264        let room2_id = room_id!("!room2:localhost");
2265        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2266        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2267
2268        // When I export the keys as a stream
2269        let mut keys =
2270            pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
2271
2272        // And collect them
2273        let mut collected = vec![];
2274        while let Some(key) = keys.next().await {
2275            collected.push(key);
2276        }
2277
2278        // Then all the keys matching our predicate were provided, and no others
2279        assert_eq!(collected.len(), 1);
2280        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2281        assert_eq!(collected[0].room_id, "!room1:localhost");
2282        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2283    }
2284
2285    #[async_test]
2286    async fn test_export_secrets_bundle() {
2287        let user_id = user_id!("@alice:example.com");
2288        let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
2289
2290        let _ = first
2291            .bootstrap_cross_signing(false)
2292            .await
2293            .expect("We should be able to bootstrap cross-signing");
2294
2295        let bundle = first.store().export_secrets_bundle().await.expect(
2296            "We should be able to export the secrets bundle, now that we \
2297             have the cross-signing keys",
2298        );
2299
2300        assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
2301
2302        second
2303            .store()
2304            .import_secrets_bundle(&bundle)
2305            .await
2306            .expect("We should be able to import the secrets bundle");
2307
2308        let status = second.cross_signing_status().await;
2309        let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
2310
2311        assert!(identity.is_verified(), "The public identity should be marked as verified.");
2312
2313        assert!(status.is_complete(), "We should have imported all the cross-signing keys");
2314    }
2315
2316    #[async_test]
2317    async fn test_create_dehydrated_device_key() {
2318        let pickle_key = DehydratedDeviceKey::new()
2319            .expect("Should be able to create a random dehydrated device key");
2320
2321        let to_vec = pickle_key.inner.to_vec();
2322        let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
2323            .expect("Should be able to create a dehydrated device key from slice");
2324
2325        assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
2326    }
2327
2328    #[async_test]
2329    async fn test_create_dehydrated_errors() {
2330        let too_small = [0u8; 22];
2331        let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
2332
2333        assert!(pickle_key.is_err());
2334
2335        let too_big = [0u8; 40];
2336        let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
2337
2338        assert!(pickle_key.is_err());
2339    }
2340
2341    #[async_test]
2342    async fn test_build_room_key_bundle() {
2343        // Given: Alice has sent a number of room keys to Bob, including some in the
2344        // wrong room, and some that are not marked as shared...
2345        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2346        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2347
2348        let room1_id = room_id!("!room1:localhost");
2349        let room2_id = room_id!("!room2:localhost");
2350
2351        /* We use hardcoded megolm session data, to get a stable output snapshot. These were all created with:
2352
2353           println!("{}", vodozemac::megolm::GroupSession::new(Default::default()).session_key().to_base64());
2354        */
2355        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2356        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2357        let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
2358        let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
2359
2360        let sessions = [
2361            create_inbound_group_session_with_visibility(
2362                &alice,
2363                room1_id,
2364                &SessionKey::from_base64(session_key1).unwrap(),
2365                true,
2366            ),
2367            create_inbound_group_session_with_visibility(
2368                &alice,
2369                room1_id,
2370                &SessionKey::from_base64(session_key2).unwrap(),
2371                true,
2372            ),
2373            create_inbound_group_session_with_visibility(
2374                &alice,
2375                room1_id,
2376                &SessionKey::from_base64(session_key3).unwrap(),
2377                false,
2378            ),
2379            create_inbound_group_session_with_visibility(
2380                &alice,
2381                room2_id,
2382                &SessionKey::from_base64(session_key4).unwrap(),
2383                true,
2384            ),
2385        ];
2386        bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
2387
2388        // When I build the bundle
2389        let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
2390
2391        // Then the bundle matches the snapshot.
2392
2393        // We sort the sessions in the bundle, so that the snapshot is stable.
2394        bundle.room_keys.sort_by_key(|session| session.session_id.clone());
2395
2396        // We also substitute alice's keys in the snapshot with placeholders
2397        let alice_curve_key = alice.identity_keys().curve25519.to_base64();
2398        let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
2399            assert_eq!(value.as_str().unwrap(), alice_curve_key);
2400            "[alice curve key]"
2401        };
2402        let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
2403        let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
2404            assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
2405            "[alice ed25519 key]"
2406        };
2407
2408        insta::with_settings!({ sort_maps => true }, {
2409            assert_json_snapshot!(bundle, {
2410                ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
2411                ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
2412                ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
2413            });
2414        });
2415    }
2416
2417    /// Create an inbound Megolm session for the given room.
2418    ///
2419    /// `olm_machine` is used to set the `sender_key` and `signing_key`
2420    /// fields of the resultant session.
2421    fn create_inbound_group_session_with_visibility(
2422        olm_machine: &OlmMachine,
2423        room_id: &RoomId,
2424        session_key: &SessionKey,
2425        shared_history: bool,
2426    ) -> InboundGroupSession {
2427        let identity_keys = &olm_machine.store().static_account().identity_keys;
2428        InboundGroupSession::new(
2429            identity_keys.curve25519,
2430            identity_keys.ed25519,
2431            room_id,
2432            session_key,
2433            SenderData::unknown(),
2434            EventEncryptionAlgorithm::MegolmV1AesSha2,
2435            None,
2436            shared_history,
2437        )
2438        .unwrap()
2439    }
2440}