Skip to main content

matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22    collections::{BTreeMap, HashSet},
23    io::{Cursor, Read, Write},
24    iter,
25    path::{Path, PathBuf},
26    str::FromStr,
27    sync::Arc,
28    time::Duration,
29};
30
31use eyeball::{SharedObservable, Subscriber};
32use futures_core::Stream;
33use futures_util::{
34    future::try_join,
35    stream::{self, StreamExt},
36};
37#[cfg(feature = "experimental-send-custom-to-device")]
38use matrix_sdk_base::crypto::CollectStrategy;
39use matrix_sdk_base::{
40    StateStoreDataKey, StateStoreDataValue,
41    cross_process_lock::{AcquireCrossProcessLockFn, CrossProcessLock, CrossProcessLockError},
42    crypto::{
43        CrossSigningBootstrapRequests, OlmMachine,
44        store::{
45            LockableCryptoStore, SecretImportError,
46            types::{RoomKeyBundleInfo, RoomKeyInfo},
47        },
48        types::{
49            SecretsBundle, SignedKey,
50            requests::{
51                OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
52            },
53        },
54    },
55    sleep::sleep,
56};
57use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
58use ruma::{
59    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
60    api::{
61        client::{
62            keys::{
63                get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
64                upload_signing_keys::v3::Request as UploadSigningKeysRequest,
65            },
66            message::send_message_event,
67            to_device::send_event_to_device::v3::{
68                Request as RumaToDeviceRequest, Response as ToDeviceResponse,
69            },
70            uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
71        },
72        error::{ErrorBody, StandardErrorBody},
73    },
74    assign,
75    events::room::{
76        MediaSource, ThumbnailInfo,
77        member::{MembershipChange, OriginalSyncRoomMemberEvent},
78    },
79};
80#[cfg(feature = "experimental-send-custom-to-device")]
81use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
82use serde::{Deserialize, de::Error as _};
83use tasks::BundleReceiverTask;
84use tokio::sync::{Mutex, RwLockReadGuard};
85use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
86use tracing::{Span, debug, error, instrument, warn};
87use url::Url;
88use vodozemac::Curve25519PublicKey;
89
90use self::{
91    backups::{Backups, types::BackupClientState},
92    futures::UploadEncryptedFile,
93    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
94    recovery::{Recovery, RecoveryState},
95    secret_storage::SecretStorage,
96    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
97    verification::{SasVerification, Verification, VerificationRequest},
98};
99use crate::{
100    Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
101    attachment::Thumbnail,
102    client::{ClientInner, WeakClient},
103    cross_process_lock::CrossProcessLockGuard,
104    error::HttpResult,
105};
106
107pub mod backups;
108pub mod futures;
109pub mod identities;
110pub mod recovery;
111pub mod secret_storage;
112pub(crate) mod tasks;
113pub mod verification;
114
115pub use matrix_sdk_base::crypto::{
116    CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
117    MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SessionCreationError,
118    SignatureError, VERSION,
119    olm::{
120        SessionCreationError as MegolmSessionCreationError,
121        SessionExportError as OlmSessionExportError,
122    },
123    vodozemac,
124};
125use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
126
127#[cfg(feature = "experimental-send-custom-to-device")]
128use crate::config::RequestConfig;
129pub use crate::error::RoomKeyImportError;
130
131/// Error type describing failures that can happen while exporting a
132/// [`SecretsBundle`] from a SQLite store.
133#[cfg(feature = "sqlite")]
134#[derive(Debug, thiserror::Error)]
135pub enum BundleExportError {
136    /// The SQLite store couldn't be opened.
137    #[error(transparent)]
138    OpenStoreError(#[from] matrix_sdk_sqlite::OpenStoreError),
139    /// Data from the SQLite store couldn't be exported.
140    #[error(transparent)]
141    StoreError(#[from] CryptoStoreError),
142    /// The store doesn't contain a secrets bundle or it couldn't be read from
143    /// the store.
144    #[error(transparent)]
145    SecretExport(#[from] matrix_sdk_base::crypto::store::SecretsBundleExportError),
146}
147
148/// Error type describing failures that can happen while importing a
149/// [`SecretsBundle`].
150#[derive(Debug, thiserror::Error)]
151pub enum BundleImportError {
152    /// The bundle couldn't be imported.
153    #[error(transparent)]
154    SecretImport(#[from] SecretImportError),
155    /// The cross-signed device keys couldn't been uploaded.
156    #[error(transparent)]
157    DeviceKeys(#[from] Error),
158}
159
160/// Attempt to export a [`SecretsBundle`] from a crypto store.
161///
162/// This method can be used to retrieve a [`SecretsBundle`] from an existing
163/// `matrix-sdk`-based client in order to import the [`SecretsBundle`] in
164/// another [`Client`] instance.
165///
166/// This can be useful for migration purposes or to allow existing client
167/// instances create new ones that will be fully verified.
168#[cfg(feature = "sqlite")]
169pub async fn export_secrets_bundle_from_store(
170    database_path: impl AsRef<Path>,
171    passphrase: Option<&str>,
172) -> std::result::Result<Option<(OwnedUserId, SecretsBundle)>, BundleExportError> {
173    use matrix_sdk_base::crypto::store::CryptoStore;
174
175    let store = matrix_sdk_sqlite::SqliteCryptoStore::open(database_path, passphrase).await?;
176    let account =
177        store.load_account().await.map_err(|e| BundleExportError::StoreError(e.into()))?;
178
179    if let Some(account) = account {
180        let machine = OlmMachine::with_store(&account.user_id, &account.device_id, store, None)
181            .await
182            .map_err(BundleExportError::StoreError)?;
183
184        let bundle = machine.store().export_secrets_bundle().await?;
185
186        Ok(Some((account.user_id.to_owned(), bundle)))
187    } else {
188        Ok(None)
189    }
190}
191
192/// All the data related to the encryption state.
193pub(crate) struct EncryptionData {
194    /// Background tasks related to encryption (key backup, initialization
195    /// tasks, etc.).
196    pub tasks: StdMutex<ClientTasks>,
197
198    /// End-to-end encryption settings.
199    pub encryption_settings: EncryptionSettings,
200
201    /// All state related to key backup.
202    pub backup_state: BackupClientState,
203
204    /// All state related to secret storage recovery.
205    pub recovery_state: SharedObservable<RecoveryState>,
206}
207
208impl EncryptionData {
209    pub fn new(encryption_settings: EncryptionSettings) -> Self {
210        Self {
211            encryption_settings,
212
213            tasks: StdMutex::new(Default::default()),
214            backup_state: Default::default(),
215            recovery_state: Default::default(),
216        }
217    }
218
219    pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
220        let weak_client = WeakClient::from_inner(client);
221
222        let mut tasks = self.tasks.lock();
223        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
224
225        if self.encryption_settings.backup_download_strategy
226            == BackupDownloadStrategy::AfterDecryptionFailure
227        {
228            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
229        }
230    }
231
232    /// Initialize the background task which listens for changes in the
233    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
234    ///
235    /// This should happen after the usual tasks have been set up and after the
236    /// E2EE initialization tasks have been set up.
237    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
238        let mut guard = self.tasks.lock();
239
240        let future = Recovery::update_state_after_backup_state_change(client);
241        let join_handle = spawn(future);
242
243        guard.update_recovery_state_after_backup = Some(join_handle);
244    }
245}
246
247/// Settings for end-to-end encryption features.
248#[derive(Clone, Copy, Debug, Default)]
249pub struct EncryptionSettings {
250    /// Automatically bootstrap cross-signing for a user once they're logged, in
251    /// case it's not already done yet.
252    ///
253    /// This requires to login with a username and password, or that MSC3967 is
254    /// enabled on the server, as of 2023-10-20.
255    pub auto_enable_cross_signing: bool,
256
257    /// Select a strategy to download room keys from the backup, by default room
258    /// keys won't be downloaded from the backup automatically.
259    ///
260    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
261    pub backup_download_strategy: BackupDownloadStrategy,
262
263    /// Automatically create a backup version if no backup exists.
264    pub auto_enable_backups: bool,
265}
266
267/// Settings for end-to-end encryption features.
268#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
269#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
270pub enum BackupDownloadStrategy {
271    /// Automatically download all room keys from the backup when the backup
272    /// recovery key has been received. The backup recovery key can be received
273    /// in two ways:
274    ///
275    /// 1. Received as a `m.secret.send` to-device event, after a successful
276    ///    interactive verification.
277    /// 2. Imported from secret storage (4S) using the
278    ///    [`SecretStore::import_secrets()`] method.
279    ///
280    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
281    OneShot,
282
283    /// Attempt to download a single room key if an event fails to be decrypted.
284    AfterDecryptionFailure,
285
286    /// Don't download any room keys automatically. The user can manually
287    /// download room keys using the [`Backups::download_room_key()`] methods.
288    ///
289    /// This is the default option.
290    #[default]
291    Manual,
292}
293
294/// The verification state of our own device
295///
296/// This enum tells us if our own user identity trusts these devices, in other
297/// words it tells us if the user identity has signed the device.
298#[derive(Clone, Copy, Debug, Eq, PartialEq)]
299pub enum VerificationState {
300    /// The verification state is unknown for now.
301    Unknown,
302    /// The device is considered to be verified, it has been signed by its user
303    /// identity.
304    Verified,
305    /// The device is unverified.
306    Unverified,
307}
308
309/// A stateful struct remembering the cross-signing keys we need to upload.
310///
311/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
312/// additional authentication, this struct will contain information on the type
313/// of authentication the user needs to complete before the upload might be
314/// continued.
315///
316/// More info can be found in the [spec].
317///
318/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
319#[derive(Debug)]
320pub struct CrossSigningResetHandle {
321    client: Client,
322    upload_request: UploadSigningKeysRequest,
323    signatures_request: UploadSignaturesRequest,
324    auth_type: CrossSigningResetAuthType,
325    is_cancelled: Mutex<bool>,
326}
327
328impl CrossSigningResetHandle {
329    /// Set up a new `CrossSigningResetHandle`.
330    pub fn new(
331        client: Client,
332        upload_request: UploadSigningKeysRequest,
333        signatures_request: UploadSignaturesRequest,
334        auth_type: CrossSigningResetAuthType,
335    ) -> Self {
336        Self {
337            client,
338            upload_request,
339            signatures_request,
340            auth_type,
341            is_cancelled: Mutex::new(false),
342        }
343    }
344
345    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
346    /// is using.
347    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
348        &self.auth_type
349    }
350
351    /// Continue the cross-signing reset by either waiting for the
352    /// authentication to be done on the side of the OAuth 2.0 server or by
353    /// providing additional [`AuthData`] the homeserver requires.
354    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
355        // Poll to see whether the reset has been authorized twice per second.
356        const RETRY_EVERY: Duration = Duration::from_millis(500);
357
358        // Give up after two minutes of polling.
359        const TIMEOUT: Duration = Duration::from_mins(2);
360
361        tokio::time::timeout(TIMEOUT, async {
362            let mut upload_request = self.upload_request.clone();
363            upload_request.auth = auth;
364
365            debug!(
366                "Repeatedly PUTting to keys/device_signing/upload until it works \
367                or we hit a permanent failure."
368            );
369            while let Err(e) = self.client.send(upload_request.clone()).await {
370                if *self.is_cancelled.lock().await {
371                    return Ok(());
372                }
373
374                match e.as_uiaa_response() {
375                    Some(uiaa_info) => {
376                        // Return the error except if we are at the `m.oauth` stage where we want to
377                        // keep polling.
378                        if !matches!(self.auth_type, CrossSigningResetAuthType::OAuth(_))
379                            && uiaa_info.auth_error.is_some()
380                        {
381                            return Err(e.into());
382                        }
383                    }
384                    None => return Err(e.into()),
385                }
386
387                debug!(
388                    "PUT to keys/device_signing/upload failed with 401. Retrying after \
389                    a short delay."
390                );
391                sleep(RETRY_EVERY).await;
392            }
393
394            self.client.send(self.signatures_request.clone()).await?;
395
396            Ok(())
397        })
398        .await
399        .unwrap_or_else(|_| {
400            warn!("Timed out waiting for keys/device_signing/upload to succeed.");
401            Err(Error::Timeout)
402        })
403    }
404
405    /// Cancel the ongoing identity reset process
406    pub async fn cancel(&self) {
407        *self.is_cancelled.lock().await = true;
408    }
409}
410
411/// information about the additional authentication that is required before the
412/// cross-signing keys can be uploaded.
413#[derive(Debug, Clone)]
414pub enum CrossSigningResetAuthType {
415    /// The homeserver requires user-interactive authentication.
416    Uiaa(UiaaInfo),
417    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
418    /// approve the upload of cross-signing keys.
419    OAuth(OAuthCrossSigningResetInfo),
420}
421
422impl CrossSigningResetAuthType {
423    fn new(error: &HttpError) -> Result<Option<Self>> {
424        if let Some(auth_info) = error.as_uiaa_response() {
425            if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
426                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
427            } else {
428                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
429            }
430        } else {
431            Ok(None)
432        }
433    }
434}
435
436/// OAuth 2.0 specific information about the required authentication for the
437/// upload of cross-signing keys.
438#[derive(Debug, Clone, Deserialize)]
439pub struct OAuthCrossSigningResetInfo {
440    /// The URL where the user can approve the reset of the cross-signing keys.
441    pub approval_url: Url,
442
443    /// Session key to use to complete the authentication.
444    pub session: Option<String>,
445}
446
447impl OAuthCrossSigningResetInfo {
448    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
449        let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
450            return Ok(None);
451        };
452
453        Ok(Some(OAuthCrossSigningResetInfo {
454            approval_url: parameters.url.as_str().try_into()?,
455            session: auth_info.session.clone(),
456        }))
457    }
458}
459
460/// A struct that helps to parse the custom error message Synapse posts if a
461/// duplicate one-time key is uploaded.
462#[derive(Clone, Debug)]
463pub struct DuplicateOneTimeKeyErrorMessage {
464    /// The previously uploaded one-time key.
465    pub old_key: Curve25519PublicKey,
466    /// The one-time key we're attempting to upload right now.
467    pub new_key: Curve25519PublicKey,
468}
469
470impl FromStr for DuplicateOneTimeKeyErrorMessage {
471    type Err = serde_json::Error;
472
473    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
474        // First we split the string into two parts, the part containing the old key and
475        // the part containing the new key. The parts are conveniently separated
476        // by a `;` character.
477        let mut split = s.split_terminator(';');
478
479        let old_key = split
480            .next()
481            .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
482        let new_key = split
483            .next()
484            .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
485
486        // Now we remove the lengthy prefix from the part containing the old key, we
487        // should be left with just the JSON of the signed key.
488        let old_key_index = old_key
489            .find("Old key:")
490            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
491
492        let old_key = old_key[old_key_index..]
493            .trim()
494            .strip_prefix("Old key:")
495            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
496
497        // The part containing the new key is much simpler, we just remove a static
498        // prefix.
499        let new_key = new_key
500            .trim()
501            .strip_prefix("new key:")
502            .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
503
504        // The JSON containing the new key is for some reason quoted using single
505        // quotes, so let's replace them with normal double quotes.
506        let new_key = new_key.replace("'", "\"");
507
508        // Let's deserialize now.
509        let old_key: SignedKey = serde_json::from_str(old_key)?;
510        let new_key: SignedKey = serde_json::from_str(&new_key)?;
511
512        // Pick out the Curve keys, we don't care about the rest that much.
513        let old_key = old_key.key();
514        let new_key = new_key.key();
515
516        Ok(Self { old_key, new_key })
517    }
518}
519
520impl Client {
521    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
522        self.base_client().olm_machine().await
523    }
524
525    pub(crate) async fn mark_request_as_sent(
526        &self,
527        request_id: &TransactionId,
528        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
529    ) -> Result<(), matrix_sdk_base::Error> {
530        Ok(self
531            .olm_machine()
532            .await
533            .as_ref()
534            .expect(
535                "We should have an olm machine once we try to mark E2EE related requests as sent",
536            )
537            .mark_request_as_sent(request_id, response)
538            .await?)
539    }
540
541    /// Query the server for users device keys.
542    ///
543    /// # Panics
544    ///
545    /// Panics if no key query needs to be done.
546    #[instrument(skip(self, device_keys))]
547    pub(crate) async fn keys_query(
548        &self,
549        request_id: &TransactionId,
550        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
551    ) -> Result<get_keys::v3::Response> {
552        let request = assign!(get_keys::v3::Request::new(), { device_keys });
553
554        let response = self.send(request).await?;
555        self.mark_request_as_sent(request_id, &response).await?;
556        self.encryption().update_state_after_keys_query(&response).await;
557
558        Ok(response)
559    }
560
561    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
562    /// encrypting and uploading a provided reader.
563    ///
564    /// # Arguments
565    ///
566    /// * `content_type` - The content type of the file.
567    /// * `reader` - The reader that should be encrypted and uploaded.
568    ///
569    /// # Examples
570    ///
571    /// ```no_run
572    /// # use matrix_sdk::Client;
573    /// # use url::Url;
574    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
575    /// use serde::{Deserialize, Serialize};
576    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
577    ///
578    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
579    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
580    /// struct CustomEventContent {
581    ///     encrypted_file: EncryptedFile,
582    /// }
583    ///
584    /// # async {
585    /// # let homeserver = Url::parse("http://example.com")?;
586    /// # let client = Client::new(homeserver).await?;
587    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
588    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
589    /// let encrypted_file = client.upload_encrypted_file(&mut reader).await?;
590    ///
591    /// room.send(CustomEventContent { encrypted_file }).await?;
592    /// # anyhow::Ok(()) };
593    /// ```
594    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
595        &'a self,
596        reader: &'a mut R,
597    ) -> UploadEncryptedFile<'a, R> {
598        UploadEncryptedFile::new(self, reader)
599    }
600
601    /// Encrypt and upload the file and thumbnails, and return the source
602    /// information.
603    pub(crate) async fn upload_encrypted_media_and_thumbnail(
604        &self,
605        data: &[u8],
606        thumbnail: Option<Thumbnail>,
607        send_progress: SharedObservable<TransmissionProgress>,
608    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
609        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
610
611        let upload_attachment = async {
612            let mut cursor = Cursor::new(data);
613            self.upload_encrypted_file(&mut cursor)
614                .with_send_progress_observable(send_progress)
615                .await
616        };
617
618        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
619
620        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
621    }
622
623    /// Uploads an encrypted thumbnail to the media repository, and returns
624    /// its source and extra information.
625    async fn upload_encrypted_thumbnail(
626        &self,
627        thumbnail: Option<Thumbnail>,
628        send_progress: SharedObservable<TransmissionProgress>,
629    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
630        let Some(thumbnail) = thumbnail else {
631            return Ok(None);
632        };
633
634        let (data, _, thumbnail_info) = thumbnail.into_parts();
635        let mut cursor = Cursor::new(data);
636
637        let file = self
638            .upload_encrypted_file(&mut cursor)
639            .with_send_progress_observable(send_progress)
640            .await?;
641
642        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
643    }
644
645    /// Claim one-time keys creating new Olm sessions.
646    ///
647    /// # Arguments
648    ///
649    /// * `users` - The list of user/device pairs that we should claim keys for.
650    pub(crate) async fn claim_one_time_keys(
651        &self,
652        users: impl Iterator<Item = &UserId>,
653    ) -> Result<()> {
654        let _lock = self.locks().key_claim_lock.lock().await;
655
656        if let Some((request_id, request)) = self
657            .olm_machine()
658            .await
659            .as_ref()
660            .ok_or(Error::NoOlmMachine)?
661            .get_missing_sessions(users)
662            .await?
663        {
664            let response = self.send(request).await?;
665            self.mark_request_as_sent(&request_id, &response).await?;
666        }
667
668        Ok(())
669    }
670
671    /// Upload the E2E encryption keys.
672    ///
673    /// This uploads the long lived device keys as well as the required amount
674    /// of one-time keys.
675    ///
676    /// # Panics
677    ///
678    /// Panics if the client isn't logged in, or if no encryption keys need to
679    /// be uploaded.
680    #[instrument(skip(self, request))]
681    pub(crate) async fn keys_upload(
682        &self,
683        request_id: &TransactionId,
684        request: &upload_keys::v3::Request,
685    ) -> Result<upload_keys::v3::Response> {
686        debug!(
687            device_keys = request.device_keys.is_some(),
688            one_time_key_count = request.one_time_keys.len(),
689            "Uploading public encryption keys",
690        );
691
692        let response = self.send(request.clone()).await?;
693        self.mark_request_as_sent(request_id, &response).await?;
694
695        Ok(response)
696    }
697
698    pub(crate) async fn room_send_helper(
699        &self,
700        request: &RoomMessageRequest,
701    ) -> Result<send_message_event::v3::Response> {
702        let content = request.content.clone();
703        let txn_id = request.txn_id.clone();
704        let room_id = &request.room_id;
705
706        self.get_room(room_id)
707            .expect("Can't send a message to a room that isn't known to the store")
708            .send(*content)
709            .with_transaction_id(txn_id)
710            .await
711            .map(|result| result.response)
712    }
713
714    pub(crate) async fn send_to_device(
715        &self,
716        request: &ToDeviceRequest,
717    ) -> HttpResult<ToDeviceResponse> {
718        let request = RumaToDeviceRequest::new_raw(
719            request.event_type.clone(),
720            request.txn_id.clone(),
721            request.messages.clone(),
722        );
723
724        self.send(request).await
725    }
726
727    pub(crate) async fn send_verification_request(
728        &self,
729        request: OutgoingVerificationRequest,
730    ) -> Result<()> {
731        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
732
733        match request {
734            ToDevice(t) => {
735                self.send_to_device(&t).await?;
736            }
737            InRoom(r) => {
738                self.room_send_helper(&r).await?;
739            }
740        }
741
742        Ok(())
743    }
744
745    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
746        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
747
748        match r.request() {
749            AnyOutgoingRequest::KeysQuery(request) => {
750                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
751            }
752            AnyOutgoingRequest::KeysUpload(request) => {
753                let response = self.keys_upload(r.request_id(), request).await;
754
755                if let Err(e) = &response {
756                    match e.as_ruma_api_error() {
757                        Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
758                            if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
759                            {
760                                // This is one of the nastiest errors we can have. The server
761                                // telling us that we already have a one-time key uploaded means
762                                // that we forgot about some of our one-time keys. This will lead to
763                                // UTDs.
764                                {
765                                    let already_reported = self
766                                        .state_store()
767                                        .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
768                                        .await?
769                                        .is_some();
770
771                                    if message.starts_with("One time key") && !already_reported {
772                                        let error_message =
773                                            DuplicateOneTimeKeyErrorMessage::from_str(message);
774
775                                        if let Ok(message) = &error_message {
776                                            error!(
777                                                sentry = true,
778                                                old_key = %message.old_key,
779                                                new_key = %message.new_key,
780                                                "Duplicate one-time keys have been uploaded"
781                                            );
782                                        } else {
783                                            error!(
784                                                sentry = true,
785                                                "Duplicate one-time keys have been uploaded"
786                                            );
787                                        }
788
789                                        self.state_store()
790                                            .set_kv_data(
791                                                StateStoreDataKey::OneTimeKeyAlreadyUploaded,
792                                                StateStoreDataValue::OneTimeKeyAlreadyUploaded,
793                                            )
794                                            .await?;
795
796                                        if let Err(e) = self
797                                            .inner
798                                            .duplicate_key_upload_error_sender
799                                            .send(error_message.ok())
800                                        {
801                                            error!(
802                                                "Failed to dispatch duplicate key upload error notification: {}",
803                                                e
804                                            );
805                                        }
806                                    }
807                                }
808                            }
809                        }
810                        _ => {}
811                    }
812
813                    response?;
814                }
815            }
816            AnyOutgoingRequest::ToDeviceRequest(request) => {
817                let response = self.send_to_device(request).await?;
818                self.mark_request_as_sent(r.request_id(), &response).await?;
819            }
820            AnyOutgoingRequest::SignatureUpload(request) => {
821                let response = self.send(request.clone()).await?;
822                self.mark_request_as_sent(r.request_id(), &response).await?;
823            }
824            AnyOutgoingRequest::RoomMessage(request) => {
825                let response = self.room_send_helper(request).await?;
826                self.mark_request_as_sent(r.request_id(), &response).await?;
827            }
828            AnyOutgoingRequest::KeysClaim(request) => {
829                let response = self.send(request.clone()).await?;
830                self.mark_request_as_sent(r.request_id(), &response).await?;
831            }
832        }
833
834        Ok(())
835    }
836
837    #[instrument(skip_all)]
838    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
839        const MAX_CONCURRENT_REQUESTS: usize = 20;
840
841        // This is needed because sometimes we need to automatically
842        // claim some one-time keys to unwedge an existing Olm session.
843        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
844            warn!("Error while claiming one-time keys {:?}", e);
845        }
846
847        let outgoing_requests = stream::iter(
848            self.olm_machine()
849                .await
850                .as_ref()
851                .ok_or(Error::NoOlmMachine)?
852                .outgoing_requests()
853                .await?,
854        )
855        .map(|r| self.send_outgoing_request(r));
856
857        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
858
859        requests
860            .for_each(|r| async move {
861                match r {
862                    Ok(_) => (),
863                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
864                }
865            })
866            .await;
867
868        Ok(())
869    }
870}
871
872#[cfg(any(feature = "testing", test))]
873impl Client {
874    /// Get the olm machine, for testing purposes only.
875    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
876        self.olm_machine().await
877    }
878
879    /// Aborts the client's bundle receiver task, for testing purposes only.
880    pub fn abort_bundle_receiver_task(&self) {
881        let tasks = self.inner.e2ee.tasks.lock();
882        if let Some(task) = tasks.receive_historic_room_key_bundles.as_ref() {
883            task.abort()
884        }
885    }
886}
887
888/// A high-level API to manage the client's encryption.
889///
890/// To get this, use [`Client::encryption()`].
891#[derive(Debug, Clone)]
892pub struct Encryption {
893    /// The underlying client.
894    client: Client,
895}
896
897impl Encryption {
898    pub(crate) fn new(client: Client) -> Self {
899        Self { client }
900    }
901
902    /// Returns the current encryption settings for this client.
903    pub(crate) fn settings(&self) -> EncryptionSettings {
904        self.client.inner.e2ee.encryption_settings
905    }
906
907    /// Get the public ed25519 key of our own device. This is usually what is
908    /// called the fingerprint of the device.
909    pub async fn ed25519_key(&self) -> Option<String> {
910        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
911    }
912
913    /// Get the public Curve25519 key of our own device.
914    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
915        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
916    }
917
918    /// Get the current device creation timestamp.
919    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
920        match self.get_own_device().await {
921            Ok(Some(device)) => device.first_time_seen_ts(),
922            // Should not happen, there should always be an own device
923            _ => MilliSecondsSinceUnixEpoch::now(),
924        }
925    }
926
927    /// This method will import all the private cross-signing keys and, if
928    /// available, the private part of a backup key and its accompanying
929    /// version into the store.
930    ///
931    /// Importing all the secrets will mark the device as verified and enable
932    /// backups if a backup key was available in the bundle.
933    ///
934    /// **Warning**: Only import this from a trusted source, i.e. if an existing
935    /// device is sharing this with a new device.
936    ///
937    /// **Warning*: Only call this method right after logging in and before the
938    /// initial sync has been started.
939    pub async fn import_secrets_bundle(
940        &self,
941        bundle: &SecretsBundle,
942    ) -> Result<(), BundleImportError> {
943        self.import_secrets_bundle_impl(bundle).await?;
944
945        // Upload the device keys, this will ensure that other devices see us as a fully
946        // verified device as soon as this method returns.
947        self.ensure_device_keys_upload().await?;
948        self.wait_for_e2ee_initialization_tasks().await;
949
950        // If our initialization tasks completed before we imported the secrets bundle,
951        // backups might not have been enabled.
952        //
953        // In this case attempt to enable them again.
954        if !self.backups().are_enabled().await {
955            self.backups().maybe_resume_backups().await?;
956        }
957
958        Ok(())
959    }
960
961    pub(crate) async fn import_secrets_bundle_impl(
962        &self,
963        bundle: &SecretsBundle,
964    ) -> Result<(), SecretImportError> {
965        let olm_machine = self.client.olm_machine().await;
966        let olm_machine =
967            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
968
969        olm_machine.store().import_secrets_bundle(bundle).await
970    }
971
972    /// Get the status of the private cross signing keys.
973    ///
974    /// This can be used to check which private cross signing keys we have
975    /// stored locally.
976    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
977        let olm = self.client.olm_machine().await;
978        let machine = olm.as_ref()?;
979        Some(machine.cross_signing_status().await)
980    }
981
982    /// Does the user have other devices that the current device can verify
983    /// against?
984    ///
985    /// The device must be signed by the user's cross-signing key, must have an
986    /// identity, and must not be a dehydrated device.
987    pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
988        let olm_machine = self.client.olm_machine().await;
989        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
990        let user_id = olm_machine.user_id();
991
992        self.ensure_initial_key_query().await?;
993
994        let devices = self.get_user_devices(user_id).await?;
995
996        let ret = devices.devices().any(|device| {
997            device.is_cross_signed_by_owner()
998                && device.curve25519_key().is_some()
999                && !device.is_dehydrated()
1000        });
1001
1002        Ok(ret)
1003    }
1004
1005    /// Get all the tracked users we know about
1006    ///
1007    /// Tracked users are users for which we keep the device list of E2EE
1008    /// capable devices up to date.
1009    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
1010        if let Some(machine) = self.client.olm_machine().await.as_ref() {
1011            machine.tracked_users().await
1012        } else {
1013            Ok(HashSet::new())
1014        }
1015    }
1016
1017    /// Get a [`Subscriber`] for the [`VerificationState`].
1018    ///
1019    /// # Examples
1020    ///
1021    /// ```no_run
1022    /// use matrix_sdk::{Client, encryption};
1023    /// use url::Url;
1024    ///
1025    /// # async {
1026    /// let homeserver = Url::parse("http://example.com")?;
1027    /// let client = Client::new(homeserver).await?;
1028    /// let mut subscriber = client.encryption().verification_state();
1029    ///
1030    /// let current_value = subscriber.get();
1031    ///
1032    /// println!("The current verification state is: {current_value:?}");
1033    ///
1034    /// if let Some(verification_state) = subscriber.next().await {
1035    ///     println!("Received verification state update {:?}", verification_state)
1036    /// }
1037    /// # anyhow::Ok(()) };
1038    /// ```
1039    pub fn verification_state(&self) -> Subscriber<VerificationState> {
1040        self.client.inner.verification_state.subscribe_reset()
1041    }
1042
1043    /// Get a verification object with the given flow id.
1044    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
1045        let olm = self.client.olm_machine().await;
1046        let olm = olm.as_ref()?;
1047        #[allow(clippy::bind_instead_of_map)]
1048        olm.get_verification(user_id, flow_id).and_then(|v| match v {
1049            matrix_sdk_base::crypto::Verification::SasV1(sas) => {
1050                Some(SasVerification { inner: sas, client: self.client.clone() }.into())
1051            }
1052            #[cfg(feature = "qrcode")]
1053            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
1054                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
1055            }
1056            _ => None,
1057        })
1058    }
1059
1060    /// Get a `VerificationRequest` object for the given user with the given
1061    /// flow id.
1062    pub async fn get_verification_request(
1063        &self,
1064        user_id: &UserId,
1065        flow_id: impl AsRef<str>,
1066    ) -> Option<VerificationRequest> {
1067        let olm = self.client.olm_machine().await;
1068        let olm = olm.as_ref()?;
1069
1070        olm.get_verification_request(user_id, flow_id)
1071            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
1072    }
1073
1074    /// Get a specific device of a user.
1075    ///
1076    /// # Arguments
1077    ///
1078    /// * `user_id` - The unique id of the user that the device belongs to.
1079    ///
1080    /// * `device_id` - The unique id of the device.
1081    ///
1082    /// Returns a `Device` if one is found and the crypto store didn't throw an
1083    /// error.
1084    ///
1085    /// This will always return None if the client hasn't been logged in.
1086    ///
1087    /// # Examples
1088    ///
1089    /// ```no_run
1090    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
1091    /// # use url::Url;
1092    /// # async {
1093    /// # let alice = user_id!("@alice:example.org");
1094    /// # let homeserver = Url::parse("http://example.com")?;
1095    /// # let client = Client::new(homeserver).await?;
1096    /// if let Some(device) =
1097    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
1098    /// {
1099    ///     println!("{:?}", device.is_verified());
1100    ///
1101    ///     if !device.is_verified() {
1102    ///         let verification = device.request_verification().await?;
1103    ///     }
1104    /// }
1105    /// # anyhow::Ok(()) };
1106    /// ```
1107    pub async fn get_device(
1108        &self,
1109        user_id: &UserId,
1110        device_id: &DeviceId,
1111    ) -> Result<Option<Device>, CryptoStoreError> {
1112        let olm = self.client.olm_machine().await;
1113        let Some(machine) = olm.as_ref() else { return Ok(None) };
1114        let device = machine.get_device(user_id, device_id, None).await?;
1115        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1116    }
1117
1118    /// A convenience method to retrieve your own device from the store.
1119    ///
1120    /// This is the same as calling [`Encryption::get_device()`] with your own
1121    /// user and device ID.
1122    ///
1123    /// This will always return a device, unless you are not logged in.
1124    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
1125        let olm = self.client.olm_machine().await;
1126        let Some(machine) = olm.as_ref() else { return Ok(None) };
1127        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
1128        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1129    }
1130
1131    /// Get a map holding all the devices of an user.
1132    ///
1133    /// This will always return an empty map if the client hasn't been logged
1134    /// in.
1135    ///
1136    /// # Arguments
1137    ///
1138    /// * `user_id` - The unique id of the user that the devices belong to.
1139    ///
1140    /// # Examples
1141    ///
1142    /// ```no_run
1143    /// # use matrix_sdk::{Client, ruma::user_id};
1144    /// # use url::Url;
1145    /// # async {
1146    /// # let alice = user_id!("@alice:example.org");
1147    /// # let homeserver = Url::parse("http://example.com")?;
1148    /// # let client = Client::new(homeserver).await?;
1149    /// let devices = client.encryption().get_user_devices(alice).await?;
1150    ///
1151    /// for device in devices.devices() {
1152    ///     println!("{device:?}");
1153    /// }
1154    /// # anyhow::Ok(()) };
1155    /// ```
1156    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1157        let devices = self
1158            .client
1159            .olm_machine()
1160            .await
1161            .as_ref()
1162            .ok_or(Error::NoOlmMachine)?
1163            .get_user_devices(user_id, None)
1164            .await?;
1165
1166        Ok(UserDevices { inner: devices, client: self.client.clone() })
1167    }
1168
1169    /// Get the E2EE identity of a user from the crypto store.
1170    ///
1171    /// Usually, we only have the E2EE identity of a user locally if the user
1172    /// is tracked, meaning that we are both members of the same encrypted room.
1173    ///
1174    /// To get the E2EE identity of a user even if it is not available locally
1175    /// use [`Encryption::request_user_identity()`].
1176    ///
1177    /// # Arguments
1178    ///
1179    /// * `user_id` - The unique id of the user that the identity belongs to.
1180    ///
1181    /// Returns a `UserIdentity` if one is found and the crypto store
1182    /// didn't throw an error.
1183    ///
1184    /// This will always return None if the client hasn't been logged in.
1185    ///
1186    /// # Examples
1187    ///
1188    /// ```no_run
1189    /// # use matrix_sdk::{Client, ruma::user_id};
1190    /// # use url::Url;
1191    /// # async {
1192    /// # let alice = user_id!("@alice:example.org");
1193    /// # let homeserver = Url::parse("http://example.com")?;
1194    /// # let client = Client::new(homeserver).await?;
1195    /// let user = client.encryption().get_user_identity(alice).await?;
1196    ///
1197    /// if let Some(user) = user {
1198    ///     println!("{:?}", user.is_verified());
1199    ///
1200    ///     let verification = user.request_verification().await?;
1201    /// }
1202    /// # anyhow::Ok(()) };
1203    /// ```
1204    pub async fn get_user_identity(
1205        &self,
1206        user_id: &UserId,
1207    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1208        let olm = self.client.olm_machine().await;
1209        let Some(olm) = olm.as_ref() else { return Ok(None) };
1210        let identity = olm.get_identity(user_id, None).await?;
1211
1212        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1213    }
1214
1215    /// Get the E2EE identity of a user from the homeserver.
1216    ///
1217    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
1218    /// E2EE identity is not found, it should mean that the user did not set
1219    /// up cross-signing.
1220    ///
1221    /// If you want the E2EE identity of a user without making a request to the
1222    /// homeserver, use [`Encryption::get_user_identity()`] instead.
1223    ///
1224    /// # Arguments
1225    ///
1226    /// * `user_id` - The ID of the user that the identity belongs to.
1227    ///
1228    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
1229    /// was an issue with the crypto store or with the request to the
1230    /// homeserver.
1231    ///
1232    /// This will always return `None` if the client hasn't been logged in.
1233    ///
1234    /// # Examples
1235    ///
1236    /// ```no_run
1237    /// # use matrix_sdk::{Client, ruma::user_id};
1238    /// # use url::Url;
1239    /// # async {
1240    /// # let alice = user_id!("@alice:example.org");
1241    /// # let homeserver = Url::parse("http://example.com")?;
1242    /// # let client = Client::new(homeserver).await?;
1243    /// let user = client.encryption().request_user_identity(alice).await?;
1244    ///
1245    /// if let Some(user) = user {
1246    ///     println!("User is verified: {:?}", user.is_verified());
1247    ///
1248    ///     let verification = user.request_verification().await?;
1249    /// }
1250    /// # anyhow::Ok(()) };
1251    /// ```
1252    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1253        let olm = self.client.olm_machine().await;
1254        let Some(olm) = olm.as_ref() else { return Ok(None) };
1255
1256        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1257        self.client.keys_query(&request_id, request.device_keys).await?;
1258
1259        let identity = olm.get_identity(user_id, None).await?;
1260        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1261    }
1262
1263    /// Returns a stream of device updates, allowing users to listen for
1264    /// notifications about new or changed devices.
1265    ///
1266    /// The stream produced by this method emits updates whenever a new device
1267    /// is discovered or when an existing device's information is changed. Users
1268    /// can subscribe to this stream and receive updates in real-time.
1269    ///
1270    /// # Examples
1271    ///
1272    /// ```no_run
1273    /// # use matrix_sdk::Client;
1274    /// # use ruma::{device_id, user_id};
1275    /// # use futures_util::{pin_mut, StreamExt};
1276    /// # let client: Client = unimplemented!();
1277    /// # async {
1278    /// let devices_stream = client.encryption().devices_stream().await?;
1279    /// let user_id = client
1280    ///     .user_id()
1281    ///     .expect("We should know our user id after we have logged in");
1282    /// pin_mut!(devices_stream);
1283    ///
1284    /// for device_updates in devices_stream.next().await {
1285    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1286    ///         for device in user_devices.values() {
1287    ///             println!("A new device has been added {}", device.device_id());
1288    ///         }
1289    ///     }
1290    /// }
1291    /// # anyhow::Ok(()) };
1292    /// ```
1293    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1294        let olm = self.client.olm_machine().await;
1295        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1296        let client = self.client.to_owned();
1297
1298        Ok(olm
1299            .store()
1300            .devices_stream()
1301            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1302    }
1303
1304    /// Returns a stream of user identity updates, allowing users to listen for
1305    /// notifications about new or changed user identities.
1306    ///
1307    /// The stream produced by this method emits updates whenever a new user
1308    /// identity is discovered or when an existing identities information is
1309    /// changed. Users can subscribe to this stream and receive updates in
1310    /// real-time.
1311    ///
1312    /// # Examples
1313    ///
1314    /// ```no_run
1315    /// # use matrix_sdk::Client;
1316    /// # use ruma::{device_id, user_id};
1317    /// # use futures_util::{pin_mut, StreamExt};
1318    /// # let client: Client = unimplemented!();
1319    /// # async {
1320    /// let identities_stream =
1321    ///     client.encryption().user_identities_stream().await?;
1322    /// pin_mut!(identities_stream);
1323    ///
1324    /// for identity_updates in identities_stream.next().await {
1325    ///     for (_, identity) in identity_updates.new {
1326    ///         println!("A new identity has been added {}", identity.user_id());
1327    ///     }
1328    /// }
1329    /// # anyhow::Ok(()) };
1330    /// ```
1331    pub async fn user_identities_stream(
1332        &self,
1333    ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1334        let olm = self.client.olm_machine().await;
1335        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1336        let client = self.client.to_owned();
1337
1338        Ok(olm
1339            .store()
1340            .user_identities_stream()
1341            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1342    }
1343
1344    /// Create and upload a new cross signing identity.
1345    ///
1346    /// # Arguments
1347    ///
1348    /// * `auth_data` - This request requires user interactive auth, the first
1349    ///   request needs to set this to `None` and will always fail with an
1350    ///   `UiaaResponse`. The response will contain information for the
1351    ///   interactive auth and the same request needs to be made but this time
1352    ///   with some `auth_data` provided.
1353    ///
1354    /// # Examples
1355    ///
1356    /// ```no_run
1357    /// # use std::collections::BTreeMap;
1358    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1359    /// # use url::Url;
1360    /// # use serde_json::json;
1361    /// # async {
1362    /// # let homeserver = Url::parse("http://example.com")?;
1363    /// # let client = Client::new(homeserver).await?;
1364    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1365    ///     if let Some(response) = e.as_uiaa_response() {
1366    ///         let mut password = uiaa::Password::new(
1367    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
1368    ///             "wordpass".to_owned(),
1369    ///         );
1370    ///         password.session = response.session.clone();
1371    ///
1372    ///         client
1373    ///             .encryption()
1374    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1375    ///             .await
1376    ///             .expect("Couldn't bootstrap cross signing")
1377    ///     } else {
1378    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1379    ///     }
1380    /// }
1381    /// # anyhow::Ok(()) };
1382    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1383        let olm = self.client.olm_machine().await;
1384        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1385
1386        let CrossSigningBootstrapRequests {
1387            upload_signing_keys_req,
1388            upload_keys_req,
1389            upload_signatures_req,
1390        } = olm.bootstrap_cross_signing(false).await?;
1391
1392        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1393            auth: auth_data,
1394            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1395            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1396            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1397        });
1398
1399        if let Some(req) = upload_keys_req {
1400            self.client.send_outgoing_request(req).await?;
1401        }
1402        self.client.send(upload_signing_keys_req).await?;
1403        self.client.send(upload_signatures_req).await?;
1404
1405        Ok(())
1406    }
1407
1408    /// Reset the cross-signing keys.
1409    ///
1410    /// # Example
1411    ///
1412    /// ```no_run
1413    /// use matrix_sdk::{ruma::api::client::uiaa, encryption::CrossSigningResetAuthType};
1414    ///
1415    /// # async {
1416    /// # let homeserver = url::Url::parse("http://example.com")?;
1417    /// # let client = matrix_sdk::Client::new(homeserver).await?;
1418    /// # let user_id = unimplemented!();
1419    /// let encryption = client.encryption();
1420    ///
1421    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1422    ///     match handle.auth_type() {
1423    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1424    ///             use matrix_sdk::ruma::api::client::uiaa;
1425    ///
1426    ///             let password = "1234".to_owned();
1427    ///             let mut password = uiaa::Password::new(user_id, password);
1428    ///             password.session = uiaa.session;
1429    ///
1430    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1431    ///         }
1432    ///         CrossSigningResetAuthType::OAuth(o) => {
1433    ///             println!(
1434    ///                 "To reset your end-to-end encryption cross-signing identity, \
1435    ///                 you first need to approve it at {}",
1436    ///                 o.approval_url
1437    ///             );
1438    ///
1439    ///             let mut oauth = uiaa::OAuth::new();
1440    ///             oauth.session = o.session;
1441    ///
1442    ///             handle.auth(Some(uiaa::AuthData::OAuth(oauth))).await?;
1443    ///         }
1444    ///     }
1445    /// }
1446    /// # anyhow::Ok(()) };
1447    /// ```
1448    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1449        let olm = self.client.olm_machine().await;
1450        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1451
1452        let CrossSigningBootstrapRequests {
1453            upload_keys_req,
1454            upload_signing_keys_req,
1455            upload_signatures_req,
1456        } = olm.bootstrap_cross_signing(true).await?;
1457
1458        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1459            auth: None,
1460            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1461            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1462            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1463        });
1464
1465        if let Some(req) = upload_keys_req {
1466            self.client.send_outgoing_request(req).await?;
1467        }
1468
1469        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1470            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1471                let client = self.client.clone();
1472
1473                Ok(Some(CrossSigningResetHandle::new(
1474                    client,
1475                    upload_signing_keys_req,
1476                    upload_signatures_req,
1477                    auth_type,
1478                )))
1479            } else {
1480                Err(error.into())
1481            }
1482        } else {
1483            self.client.send(upload_signatures_req).await?;
1484
1485            Ok(None)
1486        }
1487    }
1488
1489    /// Query the user's own device keys, if, and only if, we didn't have their
1490    /// identity in the first place.
1491    async fn ensure_initial_key_query(&self) -> Result<()> {
1492        let olm_machine = self.client.olm_machine().await;
1493        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1494
1495        let user_id = olm_machine.user_id();
1496
1497        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1498            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1499            self.client.keys_query(&request_id, request.device_keys).await?;
1500        }
1501
1502        Ok(())
1503    }
1504
1505    /// Create and upload a new cross signing identity, if that has not been
1506    /// done yet.
1507    ///
1508    /// This will only create a new cross-signing identity if the user had never
1509    /// done it before. If the user did it before, then this is a no-op.
1510    ///
1511    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1512    /// behavior of this function.
1513    ///
1514    /// # Arguments
1515    ///
1516    /// * `auth_data` - This request requires user interactive auth, the first
1517    ///   request needs to set this to `None` and will always fail with an
1518    ///   `UiaaResponse`. The response will contain information for the
1519    ///   interactive auth and the same request needs to be made but this time
1520    ///   with some `auth_data` provided.
1521    ///
1522    /// # Examples
1523    /// ```no_run
1524    /// # use std::collections::BTreeMap;
1525    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1526    /// # use url::Url;
1527    /// # use serde_json::json;
1528    /// # async {
1529    /// # let homeserver = Url::parse("http://example.com")?;
1530    /// # let client = Client::new(homeserver).await?;
1531    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1532    ///     if let Some(response) = e.as_uiaa_response() {
1533    ///         let mut password = uiaa::Password::new(
1534    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
1535    ///             "wordpass".to_owned(),
1536    ///         );
1537    ///         password.session = response.session.clone();
1538    ///
1539    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1540    ///         // avoid checks.
1541    ///         client
1542    ///             .encryption()
1543    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1544    ///             .await
1545    ///             .expect("Couldn't bootstrap cross signing")
1546    ///     } else {
1547    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1548    ///     }
1549    /// }
1550    /// # anyhow::Ok(()) };
1551    pub async fn bootstrap_cross_signing_if_needed(
1552        &self,
1553        auth_data: Option<AuthData>,
1554    ) -> Result<()> {
1555        let olm_machine = self.client.olm_machine().await;
1556        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1557        let user_id = olm_machine.user_id();
1558
1559        self.ensure_initial_key_query().await?;
1560
1561        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1562            self.bootstrap_cross_signing(auth_data).await?;
1563        }
1564
1565        Ok(())
1566    }
1567
1568    /// Export E2EE keys that match the given predicate encrypting them with the
1569    /// given passphrase.
1570    ///
1571    /// # Arguments
1572    ///
1573    /// * `path` - The file path where the exported key file will be saved.
1574    ///
1575    /// * `passphrase` - The passphrase that will be used to encrypt the
1576    ///   exported room keys.
1577    ///
1578    /// * `predicate` - A closure that will be called for every known
1579    ///   `InboundGroupSession`, which represents a room key. If the closure
1580    ///   returns `true` the `InboundGroupSessoin` will be included in the
1581    ///   export, if the closure returns `false` it will not be included.
1582    ///
1583    /// # Panics
1584    ///
1585    /// This method will panic if it isn't run on a Tokio runtime.
1586    ///
1587    /// This method will panic if it can't get enough randomness from the OS to
1588    /// encrypt the exported keys securely.
1589    ///
1590    /// # Examples
1591    ///
1592    /// ```no_run
1593    /// # use std::{path::PathBuf, time::Duration};
1594    /// # use matrix_sdk::{
1595    /// #     Client, config::SyncSettings,
1596    /// #     ruma::room_id,
1597    /// # };
1598    /// # use url::Url;
1599    /// # async {
1600    /// # let homeserver = Url::parse("http://localhost:8080")?;
1601    /// # let mut client = Client::new(homeserver).await?;
1602    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1603    /// // Export all room keys.
1604    /// client
1605    ///     .encryption()
1606    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1607    ///     .await?;
1608    ///
1609    /// // Export only the room keys for a certain room.
1610    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1611    /// let room_id = room_id!("!test:localhost");
1612    ///
1613    /// client
1614    ///     .encryption()
1615    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1616    ///     .await?;
1617    /// # anyhow::Ok(()) };
1618    /// ```
1619    #[cfg(not(target_family = "wasm"))]
1620    pub async fn export_room_keys(
1621        &self,
1622        path: PathBuf,
1623        passphrase: &str,
1624        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1625    ) -> Result<()> {
1626        let olm = self.client.olm_machine().await;
1627        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1628
1629        let keys = olm.store().export_room_keys(predicate).await?;
1630        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1631
1632        let encrypt = move || -> Result<()> {
1633            let export: String =
1634                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1635            let mut file = std::fs::File::create(path)?;
1636            file.write_all(&export.into_bytes())?;
1637            Ok(())
1638        };
1639
1640        let task = tokio::task::spawn_blocking(encrypt);
1641        task.await.expect("Task join error")
1642    }
1643
1644    /// Import E2EE keys from the given file path.
1645    ///
1646    /// # Arguments
1647    ///
1648    /// * `path` - The file path where the exported key file will can be found.
1649    ///
1650    /// * `passphrase` - The passphrase that should be used to decrypt the
1651    ///   exported room keys.
1652    ///
1653    /// Returns a tuple of numbers that represent the number of sessions that
1654    /// were imported and the total number of sessions that were found in the
1655    /// key export.
1656    ///
1657    /// # Panics
1658    ///
1659    /// This method will panic if it isn't run on a Tokio runtime.
1660    ///
1661    /// ```no_run
1662    /// # use std::{path::PathBuf, time::Duration};
1663    /// # use matrix_sdk::{
1664    /// #     Client, config::SyncSettings,
1665    /// #     ruma::room_id,
1666    /// # };
1667    /// # use url::Url;
1668    /// # async {
1669    /// # let homeserver = Url::parse("http://localhost:8080")?;
1670    /// # let mut client = Client::new(homeserver).await?;
1671    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1672    /// let result =
1673    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1674    ///
1675    /// println!(
1676    ///     "Imported {} room keys out of {}",
1677    ///     result.imported_count, result.total_count
1678    /// );
1679    /// # anyhow::Ok(()) };
1680    /// ```
1681    #[cfg(not(target_family = "wasm"))]
1682    pub async fn import_room_keys(
1683        &self,
1684        path: PathBuf,
1685        passphrase: &str,
1686    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1687        let olm = self.client.olm_machine().await;
1688        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1689        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1690
1691        let decrypt = move || {
1692            let file = std::fs::File::open(path)?;
1693            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1694        };
1695
1696        let task = tokio::task::spawn_blocking(decrypt);
1697        let import = task.await.expect("Task join error")?;
1698
1699        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1700
1701        self.backups().maybe_trigger_backup();
1702
1703        Ok(ret)
1704    }
1705
1706    /// Receive notifications of room keys being received as a [`Stream`].
1707    ///
1708    /// Each time a room key is updated in any way, an update will be sent to
1709    /// the stream. Updates that happen at the same time are batched into a
1710    /// [`Vec`].
1711    ///
1712    /// If the reader of the stream lags too far behind, an error is broadcast
1713    /// containing the number of skipped items.
1714    ///
1715    /// # Examples
1716    ///
1717    /// ```no_run
1718    /// # use matrix_sdk::Client;
1719    /// # use url::Url;
1720    /// # async {
1721    /// # let homeserver = Url::parse("http://example.com")?;
1722    /// # let client = Client::new(homeserver).await?;
1723    /// use futures_util::StreamExt;
1724    ///
1725    /// let Some(mut room_keys_stream) =
1726    ///     client.encryption().room_keys_received_stream().await
1727    /// else {
1728    ///     return Ok(());
1729    /// };
1730    ///
1731    /// while let Some(update) = room_keys_stream.next().await {
1732    ///     println!("Received room keys {update:?}");
1733    /// }
1734    /// # anyhow::Ok(()) };
1735    /// ```
1736    pub async fn room_keys_received_stream(
1737        &self,
1738    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1739    {
1740        let olm = self.client.olm_machine().await;
1741        let olm = olm.as_ref()?;
1742
1743        Some(olm.store().room_keys_received_stream())
1744    }
1745
1746    /// Receive notifications of historic room key bundles as a [`Stream`].
1747    ///
1748    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1749    ///
1750    /// Each time a historic room key bundle was received, an update will be
1751    /// sent to the stream. This stream is useful for informative purposes
1752    /// exclusively, historic room key bundles are handled by the SDK
1753    /// automatically.
1754    ///
1755    /// # Examples
1756    ///
1757    /// ```no_run
1758    /// # use matrix_sdk::Client;
1759    /// # use url::Url;
1760    /// # async {
1761    /// # let homeserver = Url::parse("http://example.com")?;
1762    /// # let client = Client::new(homeserver).await?;
1763    /// use futures_util::StreamExt;
1764    ///
1765    /// let Some(mut bundle_stream) =
1766    ///     client.encryption().historic_room_key_stream().await
1767    /// else {
1768    ///     return Ok(());
1769    /// };
1770    ///
1771    /// while let Some(bundle_info) = bundle_stream.next().await {
1772    ///     println!("Received a historic room key bundle {bundle_info:?}");
1773    /// }
1774    /// # anyhow::Ok(()) };
1775    /// ```
1776    pub async fn historic_room_key_stream(
1777        &self,
1778    ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1779        let olm = self.client.olm_machine().await;
1780        let olm = olm.as_ref()?;
1781
1782        Some(olm.store().historic_room_key_stream())
1783    }
1784
1785    /// Get the secret storage manager of the client.
1786    pub fn secret_storage(&self) -> SecretStorage {
1787        SecretStorage { client: self.client.to_owned() }
1788    }
1789
1790    /// Get the backups manager of the client.
1791    pub fn backups(&self) -> Backups {
1792        Backups { client: self.client.to_owned() }
1793    }
1794
1795    /// Get the recovery manager of the client.
1796    pub fn recovery(&self) -> Recovery {
1797        Recovery { client: self.client.to_owned() }
1798    }
1799
1800    /// Enables the crypto-store cross-process lock.
1801    ///
1802    /// This may be required if there are multiple processes that may do writes
1803    /// to the same crypto store. In that case, it's necessary to create a
1804    /// lock, so that only one process writes to it, otherwise this may
1805    /// cause confusing issues because of stale data contained in in-memory
1806    /// caches.
1807    ///
1808    /// The provided `lock_value` must be a unique identifier for this process.
1809    /// Use [`Client::cross_process_lock_config`] to get the global value, if
1810    /// multi-process is enabled.
1811    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1812        // If the lock has already been created, don't recreate it from scratch.
1813        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1814            let prev_holder = prev_lock.lock_holder();
1815            if prev_holder.is_some() && prev_holder.unwrap() == lock_value {
1816                return Ok(());
1817            }
1818            warn!(
1819                "Recreating cross-process store lock with a different holder value: \
1820                 prev was {prev_holder:?}, new is {lock_value}"
1821            );
1822        }
1823
1824        let olm_machine = self.client.base_client().olm_machine().await;
1825        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1826
1827        let lock = olm_machine.store().create_store_lock(
1828            "cross_process_lock".to_owned(),
1829            CrossProcessLockConfig::multi_process(lock_value.to_owned()),
1830        );
1831
1832        // Gently try to initialize the crypto store generation counter.
1833        //
1834        // If we don't get the lock immediately, then it is already acquired by another
1835        // process, and we'll get to reload next time we acquire the lock.
1836        {
1837            let lock_result = lock.try_lock_once().await?;
1838
1839            if lock_result.is_ok() {
1840                olm_machine
1841                    .initialize_crypto_store_generation(
1842                        &self.client.locks().crypto_store_generation,
1843                    )
1844                    .await?;
1845            }
1846        }
1847
1848        self.client
1849            .locks()
1850            .cross_process_crypto_store_lock
1851            .set(lock)
1852            .map_err(|_| Error::BadCryptoStoreState)?;
1853
1854        Ok(())
1855    }
1856
1857    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1858    /// time.
1859    ///
1860    /// Returns the current generation number.
1861    #[instrument(skip(self), fields(olm_machine_new_generation, olm_machine_generation))]
1862    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1863        let olm_machine_guard = self.client.olm_machine().await;
1864        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1865            let (new_gen, generation_number) = olm_machine
1866                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1867                .await?;
1868
1869            Span::current()
1870                .record("olm_machine_new_generation", new_gen)
1871                .record("olm_machine_generation", generation_number);
1872            debug!("OlmMachine generation maintained in CryptoStore");
1873
1874            // If the crypto store generation has changed,
1875            if new_gen {
1876                // (get rid of the reference to the current crypto store first)
1877                drop(olm_machine_guard);
1878                // Recreate the OlmMachine.
1879                self.client.base_client().regenerate_olm(None).await?;
1880            }
1881            Ok(generation_number)
1882        } else {
1883            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1884            // been initialised by the time we get here. Ideally we'd panic, or return an
1885            // error, but for now I'm just adding some logging to check if it
1886            // happens, and returning the magic number 0.
1887            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1888            Ok(0)
1889        }
1890    }
1891
1892    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1893    /// spin-waits until the lock is available.
1894    ///
1895    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1896    /// first time.
1897    ///
1898    /// Returns a guard to the lock, if it was obtained.
1899    pub async fn spin_lock_store(
1900        &self,
1901        max_backoff: Option<u32>,
1902    ) -> Result<Option<CrossProcessLockGuard>, Error> {
1903        self.lock_store(async move |lock| lock.spin_lock(max_backoff).await).await
1904    }
1905
1906    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1907    /// attempts to lock it once.
1908    ///
1909    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1910    /// first time.
1911    ///
1912    /// Returns a guard to the lock, if it was obtained.
1913    pub async fn try_lock_store_once(&self) -> Result<Option<CrossProcessLockGuard>, Error> {
1914        match self.lock_store(CrossProcessLock::try_lock_once).await {
1915            Err(Error::CrossProcessLockError(e))
1916                if matches!(*e, CrossProcessLockError::Unobtained(_)) =>
1917            {
1918                Ok(None)
1919            }
1920            other => other,
1921        }
1922    }
1923
1924    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1925    /// locks the store with the given function, `acquire`.
1926    ///
1927    /// Reloads the `OlmMachine` after obtaining the lock, if the lock is dirty.
1928    ///
1929    /// Returns a guard to the lock if it was obtained.
1930    pub async fn lock_store<F: AcquireCrossProcessLockFn<LockableCryptoStore>>(
1931        &self,
1932        acquire: F,
1933    ) -> Result<Option<CrossProcessLockGuard>, Error> {
1934        let wrap_err = |e: CryptoStoreError| {
1935            Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(Arc::new(e))))
1936        };
1937        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1938            let guard = acquire(lock).await.map_err(wrap_err)??;
1939            let _ = self.on_lock_newly_acquired().await?;
1940            Ok(Some(guard.into_guard()))
1941        } else {
1942            Ok(None)
1943        }
1944    }
1945
1946    /// Testing purposes only.
1947    #[cfg(any(test, feature = "testing"))]
1948    pub async fn uploaded_key_count(&self) -> Result<u64> {
1949        let olm_machine = self.client.olm_machine().await;
1950        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1951        Ok(olm_machine.uploaded_key_count().await?)
1952    }
1953
1954    /// Bootstrap encryption and enables event listeners for the E2EE support.
1955    ///
1956    /// Based on the `EncryptionSettings`, this call might:
1957    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1958    /// - Create a key backup if needed (POST `/room_keys/version`)
1959    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1960    ///
1961    /// As part of this process, and if needed, the current device keys would be
1962    /// uploaded to the server, new account data would be added, and cross
1963    /// signing keys and signatures might be uploaded.
1964    ///
1965    /// Should be called once we
1966    /// created a [`OlmMachine`], i.e. after logging in.
1967    ///
1968    /// # Arguments
1969    ///
1970    /// * `auth_data` - Some requests may require re-authentication. To prevent
1971    ///   the user from having to re-enter their password (or use other
1972    ///   methods), we can provide the authentication data here. This is
1973    ///   necessary for uploading cross-signing keys. However, please note that
1974    ///   there is a proposal (MSC3967) to remove this requirement, which would
1975    ///   allow for the initial upload of cross-signing keys without
1976    ///   authentication, rendering this parameter obsolete.
1977    pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1978        // It's fine to be async here as we're only getting the lock protecting the
1979        // `OlmMachine`. Since the lock shouldn't be that contested right after logging
1980        // in we won't delay the login or restoration of the Client.
1981        let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1982            Some(BundleReceiverTask::new(&self.client).await)
1983        } else {
1984            None
1985        };
1986
1987        let mut tasks = self.client.inner.e2ee.tasks.lock();
1988
1989        let this = self.clone();
1990
1991        tasks.setup_e2ee = Some(spawn(async move {
1992            // Update the current state first, so we don't have to wait for the result of
1993            // network requests
1994            this.update_verification_state().await;
1995
1996            if this.settings().auto_enable_cross_signing
1997                && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1998            {
1999                error!("Couldn't bootstrap cross signing {e:?}");
2000            }
2001
2002            if let Err(e) = this.backups().setup_and_resume().await {
2003                error!("Couldn't setup and resume backups {e:?}");
2004            }
2005            if let Err(e) = this.recovery().setup().await {
2006                error!("Couldn't setup and resume recovery {e:?}");
2007            }
2008        }));
2009
2010        tasks.receive_historic_room_key_bundles = bundle_receiver_task;
2011
2012        self.setup_room_membership_session_discard_handler();
2013    }
2014
2015    /// Waits for end-to-end encryption initialization tasks to finish, if any
2016    /// was running in the background.
2017    pub async fn wait_for_e2ee_initialization_tasks(&self) {
2018        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
2019
2020        if let Some(task) = task
2021            && let Err(err) = task.await
2022        {
2023            warn!("Error when initializing backups: {err}");
2024        }
2025    }
2026
2027    /// Upload the device keys and initial set of one-time keys to the server.
2028    ///
2029    /// This should only be called when the user logs in for the first time,
2030    /// the method will ensure that other devices see our own device as an
2031    /// end-to-end encryption enabled one.
2032    ///
2033    /// **Warning**: Do not use this method if we're already calling
2034    /// [`Client::send_outgoing_request()`]. This method is intended for
2035    /// explicitly uploading the device keys before starting a sync.
2036    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
2037        let olm = self.client.olm_machine().await;
2038        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
2039
2040        if let Some((request_id, request)) = olm.upload_device_keys().await? {
2041            self.client.keys_upload(&request_id, &request).await?;
2042
2043            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
2044            self.client.keys_query(&request_id, request.device_keys).await?;
2045        }
2046
2047        Ok(())
2048    }
2049
2050    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
2051        self.recovery().update_state_after_keys_query(response).await;
2052
2053        // Only update the verification_state if our own devices changed
2054        if let Some(user_id) = self.client.user_id() {
2055            let contains_own_device = response.device_keys.contains_key(user_id);
2056
2057            if contains_own_device {
2058                self.update_verification_state().await;
2059            }
2060        }
2061    }
2062
2063    async fn update_verification_state(&self) {
2064        match self.get_own_device().await {
2065            Ok(device) => {
2066                if let Some(device) = device {
2067                    let is_verified = device.is_cross_signed_by_owner();
2068
2069                    if is_verified {
2070                        self.client.inner.verification_state.set(VerificationState::Verified);
2071                    } else {
2072                        self.client.inner.verification_state.set(VerificationState::Unverified);
2073                    }
2074                } else {
2075                    warn!("Couldn't find out own device in the store.");
2076                    self.client.inner.verification_state.set(VerificationState::Unknown);
2077                }
2078            }
2079            Err(error) => {
2080                warn!("Failed retrieving own device: {error}");
2081                self.client.inner.verification_state.set(VerificationState::Unknown);
2082            }
2083        }
2084    }
2085
2086    /// Sets up a handler to rotate room keys when a user leaves a room.
2087    ///
2088    /// Previously, it was sufficient to check if we need to rotate the room key
2089    /// prior to sending a message. However, the history sharing feature
2090    /// ([MSC4268]) breaks this logic:
2091    ///
2092    /// 1. Alice sends a message M1 in room X;
2093    /// 2. Bob invites Charlie, who joins and immediately leaves the room;
2094    /// 3. Alice sends another message M2 in room X.
2095    ///
2096    /// Under the old logic, Alice would not rotate her key after Charlie
2097    /// leaves, resulting in M2 being encrypted with the same session as M1.
2098    /// This would allow Charlie to decrypt M2 if he ever gains access to
2099    /// the event.
2100    ///
2101    /// This handler listens for changes to the room membership, and discards
2102    /// the current room key if the event is a `leave` event.
2103    ///
2104    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
2105    fn setup_room_membership_session_discard_handler(&self) {
2106        let client = WeakClient::from_client(&self.client);
2107        self.client.add_event_handler(|ev: OriginalSyncRoomMemberEvent, room: Room| async move {
2108            let Some(client) = client.get() else {
2109                // The main client has been dropped.
2110                return;
2111            };
2112            let Some(user_id) = client.user_id() else {
2113                // We aren't logged in, so this shouldn't ever happen.
2114                return;
2115            };
2116            let olm = client.olm_machine().await;
2117            let Some(olm) = olm.as_ref() else {
2118                warn!("Cannot discard session - Olm machine is not available");
2119                return;
2120            };
2121
2122            if matches!(
2123                ev.membership_change(),
2124                MembershipChange::Joined |
2125                MembershipChange::Invited |
2126                MembershipChange::KnockAccepted |
2127                MembershipChange::InvitationAccepted |
2128                MembershipChange::ProfileChanged { .. }
2129            ) || ev.sender == user_id {
2130                // We can ignore events that did not remove us, and those that we sent.
2131                return;
2132            }
2133
2134            debug!(room_id = ?room.room_id(), member_id = ?ev.sender, "Discarding session as a user left the room");
2135
2136            // Attempt to discard the current room key. This won't do anything if we don't have one,
2137            // but that's fine since we will create a new room key whenever we try to send a message.
2138            if let Err(e) = olm.discard_room_key(room.room_id()).await {
2139                warn!(
2140                    room_id = ?room.room_id(),
2141                    "Error discarding room key after member leave: {e:?}"
2142                );
2143            }
2144        });
2145    }
2146
2147    /// Encrypts then send the given content via the `/sendToDevice` end-point
2148    /// using Olm encryption.
2149    ///
2150    /// If there are a lot of recipient devices multiple `/sendToDevice`
2151    /// requests might be sent out.
2152    ///
2153    /// # Returns
2154    /// A list of failures. The list of devices that couldn't get the messages.
2155    #[cfg(feature = "experimental-send-custom-to-device")]
2156    pub async fn encrypt_and_send_raw_to_device(
2157        &self,
2158        recipient_devices: Vec<&Device>,
2159        event_type: &str,
2160        content: Raw<AnyToDeviceEventContent>,
2161        share_strategy: CollectStrategy,
2162    ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
2163        let users = recipient_devices.iter().map(|device| device.user_id());
2164
2165        // Will claim one-time-key for users that needs it
2166        // TODO: For later optimisation: This will establish missing olm sessions with
2167        // all this users devices, but we just want for some devices.
2168        self.client.claim_one_time_keys(users).await?;
2169
2170        let olm = self.client.olm_machine().await;
2171        let olm = olm.as_ref().expect("Olm machine wasn't started");
2172
2173        let (requests, withhelds) = olm
2174            .encrypt_content_for_devices(
2175                recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
2176                event_type,
2177                &content
2178                    .deserialize_as::<serde_json::Value>()
2179                    .expect("Deserialize as Value will always work"),
2180                share_strategy,
2181            )
2182            .await?;
2183
2184        let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
2185
2186        // Push the withhelds in the failures
2187        withhelds.iter().for_each(|(d, _)| {
2188            failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
2189        });
2190
2191        // TODO: parallelize that? it's already grouping 250 devices per chunk.
2192        for request in requests {
2193            let ruma_request = RumaToDeviceRequest::new_raw(
2194                request.event_type.clone(),
2195                request.txn_id.clone(),
2196                request.messages.clone(),
2197            );
2198
2199            let send_result = self
2200                .client
2201                .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
2202                .await;
2203
2204            // If the sending failed we need to collect the failures to report them
2205            if send_result.is_err() {
2206                // Mark the sending as failed
2207                for (user_id, device_map) in request.messages {
2208                    for device_id in device_map.keys() {
2209                        match device_id {
2210                            DeviceIdOrAllDevices::DeviceId(device_id) => {
2211                                failures.push((user_id.clone(), device_id.to_owned()));
2212                            }
2213                            DeviceIdOrAllDevices::AllDevices => {
2214                                // Cannot happen in this case
2215                            }
2216                        }
2217                    }
2218                }
2219            }
2220        }
2221
2222        Ok(failures)
2223    }
2224}
2225
2226#[cfg(all(test, not(target_family = "wasm")))]
2227mod tests {
2228    use std::{
2229        ops::Not,
2230        str::FromStr,
2231        sync::{
2232            Arc,
2233            atomic::{AtomicBool, Ordering},
2234        },
2235        time::Duration,
2236    };
2237
2238    use matrix_sdk_test::{
2239        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
2240        event_factory::EventFactory,
2241    };
2242    use ruma::{
2243        event_id,
2244        events::{reaction::ReactionEventContent, relation::Annotation},
2245        user_id,
2246    };
2247    use serde_json::json;
2248    use wiremock::{
2249        Mock, MockServer, Request, ResponseTemplate,
2250        matchers::{header, method, path_regex},
2251    };
2252
2253    use crate::{
2254        Client, assert_next_matches_with_timeout,
2255        config::RequestConfig,
2256        encryption::{
2257            DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2258        },
2259        test_utils::{
2260            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2261        },
2262    };
2263
2264    #[async_test]
2265    async fn test_reaction_sending() {
2266        let server = MockServer::start().await;
2267        let client = logged_in_client(Some(server.uri())).await;
2268
2269        let event_id = event_id!("$2:example.org");
2270
2271        Mock::given(method("GET"))
2272            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2273            .and(header("authorization", "Bearer 1234"))
2274            .respond_with(
2275                ResponseTemplate::new(200)
2276                    .set_body_json(EventFactory::new().room_encryption().into_content()),
2277            )
2278            .mount(&server)
2279            .await;
2280
2281        Mock::given(method("PUT"))
2282            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2283            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2284                "event_id": event_id,
2285            })))
2286            .mount(&server)
2287            .await;
2288
2289        let f = EventFactory::new().sender(user_id!("@example:localhost"));
2290        let response = SyncResponseBuilder::default()
2291            .add_joined_room(
2292                JoinedRoomBuilder::default()
2293                    .add_state_event(
2294                        f.member(user_id!("@example:localhost")).display_name("example"),
2295                    )
2296                    .add_state_event(f.default_power_levels())
2297                    .add_state_event(f.room_encryption()),
2298            )
2299            .build_sync_response();
2300
2301        client.base_client().receive_sync_response(response).await.unwrap();
2302
2303        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2304        assert!(
2305            room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2306        );
2307
2308        let event_id = event_id!("$1:example.org");
2309        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2310        room.send(reaction).await.expect("Sending the reaction should not fail");
2311
2312        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2313    }
2314
2315    #[cfg(feature = "sqlite")]
2316    #[async_test]
2317    async fn test_generation_counter_invalidates_olm_machine() {
2318        // Create two clients using the same sqlite database.
2319
2320        use matrix_sdk_base::store::RoomLoadSettings;
2321        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2322        let session = mock_matrix_session();
2323
2324        let client1 = Client::builder()
2325            .homeserver_url("http://localhost:1234")
2326            .request_config(RequestConfig::new().disable_retry())
2327            .sqlite_store(&sqlite_path, None)
2328            .build()
2329            .await
2330            .unwrap();
2331        client1
2332            .matrix_auth()
2333            .restore_session(session.clone(), RoomLoadSettings::default())
2334            .await
2335            .unwrap();
2336
2337        let client2 = Client::builder()
2338            .homeserver_url("http://localhost:1234")
2339            .request_config(RequestConfig::new().disable_retry())
2340            .sqlite_store(sqlite_path, None)
2341            .build()
2342            .await
2343            .unwrap();
2344        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2345
2346        // When the lock isn't enabled, any attempt at locking won't return a guard.
2347        let guard = client1.encryption().try_lock_store_once().await.unwrap();
2348        assert!(guard.is_none());
2349
2350        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2351        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2352
2353        // One client can take the lock.
2354        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2355        assert!(acquired1.is_some());
2356
2357        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
2358        let initial_olm_machine =
2359            client1.olm_machine().await.clone().expect("must have an olm machine");
2360
2361        // Also enable backup to check that new machine has the same backup keys.
2362        let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new();
2363        let backup_key = decryption_key.megolm_v1_public_key();
2364        backup_key.set_version("1".to_owned());
2365        initial_olm_machine
2366            .backup_machine()
2367            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2368            .await
2369            .expect("Should save");
2370
2371        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2372
2373        assert!(client1.encryption().backups().are_enabled().await);
2374
2375        // The other client can't take the lock too.
2376        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2377        assert!(acquired2.is_none());
2378
2379        // Now have the first client release the lock,
2380        drop(acquired1);
2381        tokio::time::sleep(Duration::from_millis(100)).await;
2382
2383        // And re-take it.
2384        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2385        assert!(acquired1.is_some());
2386
2387        // In that case, the Olm Machine shouldn't change.
2388        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2389        assert!(initial_olm_machine.same_as(&olm_machine));
2390
2391        // Ok, release again.
2392        drop(acquired1);
2393        tokio::time::sleep(Duration::from_millis(100)).await;
2394
2395        // Client2 can acquire the lock.
2396        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2397        assert!(acquired2.is_some());
2398
2399        // And then release it.
2400        drop(acquired2);
2401        tokio::time::sleep(Duration::from_millis(100)).await;
2402
2403        // Client1 can acquire it again,
2404        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2405        assert!(acquired1.is_some());
2406
2407        // But now its olm machine has been invalidated and thus regenerated!
2408        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2409
2410        assert!(!initial_olm_machine.same_as(&olm_machine));
2411
2412        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2413        assert!(backup_key_new.decryption_key.is_some());
2414        assert_eq!(
2415            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2416            backup_key.to_base64()
2417        );
2418        assert!(client1.encryption().backups().are_enabled().await);
2419    }
2420
2421    #[cfg(feature = "sqlite")]
2422    #[async_test]
2423    async fn test_generation_counter_no_spurious_invalidation() {
2424        // Create two clients using the same sqlite database.
2425
2426        use matrix_sdk_base::store::RoomLoadSettings;
2427        let sqlite_path =
2428            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2429        let session = mock_matrix_session();
2430
2431        let client = Client::builder()
2432            .homeserver_url("http://localhost:1234")
2433            .request_config(RequestConfig::new().disable_retry())
2434            .sqlite_store(&sqlite_path, None)
2435            .build()
2436            .await
2437            .unwrap();
2438        client
2439            .matrix_auth()
2440            .restore_session(session.clone(), RoomLoadSettings::default())
2441            .await
2442            .unwrap();
2443
2444        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2445
2446        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2447
2448        // Enabling the lock doesn't update the olm machine.
2449        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2450        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2451
2452        {
2453            // Simulate that another client hold the lock before.
2454            let client2 = Client::builder()
2455                .homeserver_url("http://localhost:1234")
2456                .request_config(RequestConfig::new().disable_retry())
2457                .sqlite_store(sqlite_path, None)
2458                .build()
2459                .await
2460                .unwrap();
2461            client2
2462                .matrix_auth()
2463                .restore_session(session, RoomLoadSettings::default())
2464                .await
2465                .unwrap();
2466
2467            client2
2468                .encryption()
2469                .enable_cross_process_store_lock("client2".to_owned())
2470                .await
2471                .unwrap();
2472
2473            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2474            assert!(guard.is_some());
2475
2476            drop(guard);
2477            tokio::time::sleep(Duration::from_millis(100)).await;
2478        }
2479
2480        {
2481            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2482            assert!(acquired.is_some());
2483        }
2484
2485        // Taking the lock the first time will update the olm machine.
2486        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2487        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2488
2489        {
2490            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2491            assert!(acquired.is_some());
2492        }
2493
2494        // Re-taking the lock doesn't update the olm machine.
2495        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2496        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2497    }
2498
2499    #[async_test]
2500    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2501        // Given a client and a server
2502        let client = no_retry_test_client(None).await;
2503        let server = MockServer::start().await;
2504
2505        // When we subscribe to its verification state
2506        let mut verification_state = client.encryption().verification_state();
2507
2508        // We can get its initial value, and it's Unknown
2509        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2510
2511        // We set up a mocked request to check this endpoint is not called before
2512        // reading the new state
2513        let keys_requested = Arc::new(AtomicBool::new(false));
2514        let inner_bool = keys_requested.clone();
2515
2516        Mock::given(method("GET"))
2517            .and(path_regex(
2518                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2519            ))
2520            .respond_with(move |_req: &Request| {
2521                inner_bool.fetch_or(true, Ordering::SeqCst);
2522                ResponseTemplate::new(200).set_body_json(json!({}))
2523            })
2524            .mount(&server)
2525            .await;
2526
2527        // When the session is initialised and the encryption tasks spawn
2528        set_client_session(&client).await;
2529
2530        // Then we can get an updated value without waiting for any network requests
2531        assert!(keys_requested.load(Ordering::SeqCst).not());
2532        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2533    }
2534
2535    #[test]
2536    fn test_oauth_reset_info_from_uiaa_info() {
2537        let auth_info = json!({
2538            "session": "dummy",
2539            "flows": [
2540                {
2541                    "stages": [
2542                        "org.matrix.cross_signing_reset"
2543                    ]
2544                }
2545            ],
2546            "params": {
2547                "org.matrix.cross_signing_reset": {
2548                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2549                }
2550            },
2551            "msg": "To reset..."
2552        });
2553
2554        let auth_info = serde_json::from_value(auth_info)
2555            .expect("We should be able to deserialize the UiaaInfo");
2556        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2557            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2558    }
2559
2560    #[test]
2561    fn test_duplicate_one_time_key_error_parsing() {
2562        let message = concat!(
2563            r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2564            r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2565            r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2566            r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2567            r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2568            r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2569            r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2570        );
2571        let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2572            .expect("We should be able to parse the error message");
2573
2574        assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2575        assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2576
2577        DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2578            .expect_err("We shouldn't be able to parse an incomplete error message");
2579    }
2580
2581    // Helper function for the test_devices_to_verify_against_* tests.  Make a
2582    // response to a /keys/query request using the given device keys and a
2583    // pre-defined set of cross-signing keys.
2584    fn devices_to_verify_against_keys_query_response(
2585        devices: Vec<serde_json::Value>,
2586    ) -> serde_json::Value {
2587        let device_keys: serde_json::Map<String, serde_json::Value> = devices
2588            .into_iter()
2589            .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2590            .collect();
2591        json!({
2592            "device_keys": {
2593                "@example:localhost": device_keys,
2594            },
2595            "master_keys": {
2596                "@example:localhost": {
2597                    "keys": {
2598                        "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2599                    },
2600                    "usage": ["master"],
2601                    "user_id": "@example:localhost",
2602                },
2603            },
2604            "self_signing_keys": {
2605                "@example:localhost": {
2606                    "keys": {
2607                        "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2608                    },
2609                    "usage": ["self_signing"],
2610                    "user_id": "@example:localhost",
2611                    "signatures": {
2612                        "@example:localhost": {
2613                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2614                        },
2615                    },
2616                },
2617            },
2618            "user_signing_keys": {
2619                "@example:localhost": {
2620                    "keys": {
2621                        "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2622                    },
2623                    "usage": ["user_signing"],
2624                    "user_id": "@example:localhost",
2625                    "signatures": {
2626                        "@example:localhost": {
2627                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2628                        },
2629                    },
2630                },
2631            }
2632        })
2633    }
2634
2635    // The following three tests test that we can detect whether the user has
2636    // other devices that they can verify against under different conditions.
2637    #[async_test]
2638    /// Test that we detect that can't verify against another device if we have
2639    /// no devices.
2640    async fn test_devices_to_verify_against_no_devices() {
2641        let server = MockServer::start().await;
2642        let client = logged_in_client(Some(server.uri())).await;
2643
2644        Mock::given(method("POST"))
2645            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2646            .respond_with(
2647                ResponseTemplate::new(200)
2648                    .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2649            )
2650            .mount(&server)
2651            .await;
2652
2653        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2654    }
2655
2656    #[async_test]
2657    /// Test that we detect that we can verify against another cross-signed
2658    /// regular device.
2659    async fn test_devices_to_verify_against_cross_signed() {
2660        let server = MockServer::start().await;
2661        let client = logged_in_client(Some(server.uri())).await;
2662
2663        Mock::given(method("POST"))
2664            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2665            .respond_with(ResponseTemplate::new(200).set_body_json(
2666                devices_to_verify_against_keys_query_response(vec![
2667                    json!({
2668                        "algorithms": [
2669                            "m.olm.v1.curve25519-aes-sha2",
2670                            "m.megolm.v1.aes-sha2",
2671                        ],
2672                        "user_id": "@example:localhost",
2673                        "device_id": "SIGNEDDEVICE",
2674                        "keys": {
2675                            "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2676                            "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2677                        },
2678                        "signatures": {
2679                            "@example:localhost": {
2680                                "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2681                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2682                            },
2683                        },
2684                    })
2685                ])
2686            ))
2687            .mount(&server)
2688            .await;
2689
2690        assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2691    }
2692
2693    #[async_test]
2694    /// Test that we detect that we can't verify against a dehydrated or
2695    /// unsigned device.
2696    async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2697        let server = MockServer::start().await;
2698        let client = logged_in_client(Some(server.uri())).await;
2699        let user_id = client.user_id().unwrap();
2700        let olm_machine = client.olm_machine().await;
2701        let olm_machine = olm_machine.as_ref().unwrap();
2702
2703        Mock::given(method("POST"))
2704            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2705            .respond_with(ResponseTemplate::new(200).set_body_json(
2706                devices_to_verify_against_keys_query_response(vec![
2707                    json!({
2708                        "algorithms": [
2709                            "m.olm.v1.curve25519-aes-sha2",
2710                            "m.megolm.v1.aes-sha2",
2711                        ],
2712                        "user_id": "@example:localhost",
2713                        "device_id": "DEHYDRATEDDEVICE",
2714                        "keys": {
2715                            "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2716                            "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2717                        },
2718                        "dehydrated": true,
2719                        "signatures": {
2720                            "@example:localhost": {
2721                                "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2722                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2723                            },
2724                        },
2725                    }),
2726                    json!({
2727                        "algorithms": [
2728                            "m.olm.v1.curve25519-aes-sha2",
2729                            "m.megolm.v1.aes-sha2",
2730                        ],
2731                        "user_id": "@example:localhost",
2732                        "device_id": "UNSIGNEDDEVICE",
2733                        "keys": {
2734                            "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2735                            "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2736                        },
2737                        "signatures": {
2738                            "@example:localhost": {
2739                                "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2740                            },
2741                        },
2742                    }),
2743                ])
2744            ))
2745            .mount(&server)
2746            .await;
2747
2748        let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2749        client.keys_query(&request_id, request.device_keys).await.unwrap();
2750
2751        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2752    }
2753}