matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_arch = "wasm32", allow(unused_imports))]
18
19use std::{
20    collections::{BTreeMap, HashSet},
21    io::{Cursor, Read, Write},
22    iter,
23    path::PathBuf,
24    sync::Arc,
25};
26
27use eyeball::{SharedObservable, Subscriber};
28use futures_core::Stream;
29use futures_util::{
30    future::try_join,
31    stream::{self, StreamExt},
32};
33use matrix_sdk_base::crypto::{
34    store::RoomKeyInfo,
35    types::requests::{
36        OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
37    },
38    CrossSigningBootstrapRequests, OlmMachine,
39};
40use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
41use ruma::{
42    api::client::{
43        keys::{
44            get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
45            upload_signing_keys::v3::Request as UploadSigningKeysRequest,
46        },
47        message::send_message_event,
48        to_device::send_event_to_device::v3::{
49            Request as RumaToDeviceRequest, Response as ToDeviceResponse,
50        },
51        uiaa::{AuthData, UiaaInfo},
52    },
53    assign,
54    events::{
55        direct::DirectUserIdentifier,
56        room::{MediaSource, ThumbnailInfo},
57    },
58    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
59};
60use serde::Deserialize;
61use tokio::sync::{Mutex, RwLockReadGuard};
62use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
63use tracing::{debug, error, instrument, trace, warn};
64use url::Url;
65use vodozemac::Curve25519PublicKey;
66
67use self::{
68    backups::{types::BackupClientState, Backups},
69    futures::UploadEncryptedFile,
70    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
71    recovery::{Recovery, RecoveryState},
72    secret_storage::SecretStorage,
73    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
74    verification::{SasVerification, Verification, VerificationRequest},
75};
76use crate::{
77    attachment::Thumbnail,
78    client::{ClientInner, WeakClient},
79    error::HttpResult,
80    store_locks::CrossProcessStoreLockGuard,
81    Client, Error, HttpError, Result, Room, TransmissionProgress,
82};
83
84pub mod backups;
85pub mod futures;
86pub mod identities;
87pub mod recovery;
88pub mod secret_storage;
89pub(crate) mod tasks;
90pub mod verification;
91
92pub use matrix_sdk_base::crypto::{
93    olm::{
94        SessionCreationError as MegolmSessionCreationError,
95        SessionExportError as OlmSessionExportError,
96    },
97    vodozemac, CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError,
98    LocalTrust, MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
99    SessionCreationError, SignatureError, VERSION,
100};
101
102pub use crate::error::RoomKeyImportError;
103
104/// All the data related to the encryption state.
105pub(crate) struct EncryptionData {
106    /// Background tasks related to encryption (key backup, initialization
107    /// tasks, etc.).
108    pub tasks: StdMutex<ClientTasks>,
109
110    /// End-to-end encryption settings.
111    pub encryption_settings: EncryptionSettings,
112
113    /// All state related to key backup.
114    pub backup_state: BackupClientState,
115
116    /// All state related to secret storage recovery.
117    pub recovery_state: SharedObservable<RecoveryState>,
118}
119
120impl EncryptionData {
121    pub fn new(encryption_settings: EncryptionSettings) -> Self {
122        Self {
123            encryption_settings,
124
125            tasks: StdMutex::new(Default::default()),
126            backup_state: Default::default(),
127            recovery_state: Default::default(),
128        }
129    }
130
131    pub fn initialize_room_key_tasks(&self, client: &Arc<ClientInner>) {
132        let weak_client = WeakClient::from_inner(client);
133
134        let mut tasks = self.tasks.lock();
135        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
136
137        if self.encryption_settings.backup_download_strategy
138            == BackupDownloadStrategy::AfterDecryptionFailure
139        {
140            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
141        }
142    }
143
144    /// Initialize the background task which listens for changes in the
145    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
146    ///
147    /// This should happen after the usual tasks have been set up and after the
148    /// E2EE initialization tasks have been set up.
149    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
150        let mut guard = self.tasks.lock();
151
152        let future = Recovery::update_state_after_backup_state_change(client);
153        let join_handle = spawn(future);
154
155        guard.update_recovery_state_after_backup = Some(join_handle);
156    }
157}
158
159/// Settings for end-to-end encryption features.
160#[derive(Clone, Copy, Debug, Default)]
161pub struct EncryptionSettings {
162    /// Automatically bootstrap cross-signing for a user once they're logged, in
163    /// case it's not already done yet.
164    ///
165    /// This requires to login with a username and password, or that MSC3967 is
166    /// enabled on the server, as of 2023-10-20.
167    pub auto_enable_cross_signing: bool,
168
169    /// Select a strategy to download room keys from the backup, by default room
170    /// keys won't be downloaded from the backup automatically.
171    ///
172    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
173    pub backup_download_strategy: BackupDownloadStrategy,
174
175    /// Automatically create a backup version if no backup exists.
176    pub auto_enable_backups: bool,
177}
178
179/// Settings for end-to-end encryption features.
180#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
181#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
182pub enum BackupDownloadStrategy {
183    /// Automatically download all room keys from the backup when the backup
184    /// recovery key has been received. The backup recovery key can be received
185    /// in two ways:
186    ///
187    /// 1. Received as a `m.secret.send` to-device event, after a successful
188    ///    interactive verification.
189    /// 2. Imported from secret storage (4S) using the
190    ///    [`SecretStore::import_secrets()`] method.
191    ///
192    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
193    OneShot,
194
195    /// Attempt to download a single room key if an event fails to be decrypted.
196    AfterDecryptionFailure,
197
198    /// Don't download any room keys automatically. The user can manually
199    /// download room keys using the [`Backups::download_room_key()`] methods.
200    ///
201    /// This is the default option.
202    #[default]
203    Manual,
204}
205
206/// The verification state of our own device
207///
208/// This enum tells us if our own user identity trusts these devices, in other
209/// words it tells us if the user identity has signed the device.
210#[derive(Clone, Copy, Debug, Eq, PartialEq)]
211pub enum VerificationState {
212    /// The verification state is unknown for now.
213    Unknown,
214    /// The device is considered to be verified, it has been signed by its user
215    /// identity.
216    Verified,
217    /// The device is unverified.
218    Unverified,
219}
220
221/// Wraps together a `CrossProcessLockStoreGuard` and a generation number.
222#[derive(Debug)]
223pub struct CrossProcessLockStoreGuardWithGeneration {
224    _guard: CrossProcessStoreLockGuard,
225    generation: u64,
226}
227
228impl CrossProcessLockStoreGuardWithGeneration {
229    /// Return the Crypto Store generation associated with this store lock.
230    pub fn generation(&self) -> u64 {
231        self.generation
232    }
233}
234
235/// A stateful struct remembering the cross-signing keys we need to upload.
236///
237/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
238/// additional authentication, this struct will contain information on the type
239/// of authentication the user needs to complete before the upload might be
240/// continued.
241///
242/// More info can be found in the [spec].
243///
244/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
245#[derive(Debug)]
246pub struct CrossSigningResetHandle {
247    client: Client,
248    upload_request: UploadSigningKeysRequest,
249    signatures_request: UploadSignaturesRequest,
250    auth_type: CrossSigningResetAuthType,
251    is_cancelled: Mutex<bool>,
252}
253
254impl CrossSigningResetHandle {
255    /// Set up a new `CrossSigningResetHandle`.
256    pub fn new(
257        client: Client,
258        upload_request: UploadSigningKeysRequest,
259        signatures_request: UploadSignaturesRequest,
260        auth_type: CrossSigningResetAuthType,
261    ) -> Self {
262        Self {
263            client,
264            upload_request,
265            signatures_request,
266            auth_type,
267            is_cancelled: Mutex::new(false),
268        }
269    }
270
271    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
272    /// is using.
273    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
274        &self.auth_type
275    }
276
277    /// Continue the cross-signing reset by either waiting for the
278    /// authentication to be done on the side of the OAuth 2.0 server or by
279    /// providing additional [`AuthData`] the homeserver requires.
280    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
281        let mut upload_request = self.upload_request.clone();
282        upload_request.auth = auth;
283
284        while let Err(e) = self.client.send(upload_request.clone()).await {
285            if *self.is_cancelled.lock().await {
286                return Ok(());
287            }
288
289            match e.as_uiaa_response() {
290                Some(uiaa_info) => {
291                    if uiaa_info.auth_error.is_some() {
292                        return Err(e.into());
293                    }
294                }
295                None => return Err(e.into()),
296            }
297        }
298
299        self.client.send(self.signatures_request.clone()).await?;
300
301        Ok(())
302    }
303
304    /// Cancel the ongoing identity reset process
305    pub async fn cancel(&self) {
306        *self.is_cancelled.lock().await = true;
307    }
308}
309
310/// information about the additional authentication that is required before the
311/// cross-signing keys can be uploaded.
312#[derive(Debug, Clone)]
313pub enum CrossSigningResetAuthType {
314    /// The homeserver requires user-interactive authentication.
315    Uiaa(UiaaInfo),
316    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
317    /// approve the upload of cross-signing keys.
318    OAuth(OAuthCrossSigningResetInfo),
319}
320
321impl CrossSigningResetAuthType {
322    fn new(error: &HttpError) -> Result<Option<Self>> {
323        if let Some(auth_info) = error.as_uiaa_response() {
324            if let Ok(auth_info) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
325                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
326            } else {
327                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
328            }
329        } else {
330            Ok(None)
331        }
332    }
333}
334
335/// OAuth 2.0 specific information about the required authentication for the
336/// upload of cross-signing keys.
337#[derive(Debug, Clone, Deserialize)]
338pub struct OAuthCrossSigningResetInfo {
339    /// The URL where the user can approve the reset of the cross-signing keys.
340    pub approval_url: Url,
341}
342
343impl OAuthCrossSigningResetInfo {
344    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Self> {
345        let parameters =
346            serde_json::from_str::<OAuthCrossSigningResetUiaaParameters>(auth_info.params.get())?;
347
348        Ok(OAuthCrossSigningResetInfo { approval_url: parameters.reset.url })
349    }
350}
351
352/// The parsed `parameters` part of a [`ruma::api::client::uiaa::UiaaInfo`]
353/// response
354#[derive(Debug, Deserialize)]
355struct OAuthCrossSigningResetUiaaParameters {
356    /// The URL where the user can approve the reset of the cross-signing keys.
357    #[serde(rename = "org.matrix.cross_signing_reset")]
358    reset: OAuthCrossSigningResetUiaaResetParameter,
359}
360
361/// The `org.matrix.cross_signing_reset` part of the Uiaa response `parameters``
362/// dictionary.
363#[derive(Debug, Deserialize)]
364struct OAuthCrossSigningResetUiaaResetParameter {
365    /// The URL where the user can approve the reset of the cross-signing keys.
366    url: Url,
367}
368
369impl Client {
370    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
371        self.base_client().olm_machine().await
372    }
373
374    pub(crate) async fn mark_request_as_sent(
375        &self,
376        request_id: &TransactionId,
377        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
378    ) -> Result<(), matrix_sdk_base::Error> {
379        Ok(self
380            .olm_machine()
381            .await
382            .as_ref()
383            .expect(
384                "We should have an olm machine once we try to mark E2EE related requests as sent",
385            )
386            .mark_request_as_sent(request_id, response)
387            .await?)
388    }
389
390    /// Query the server for users device keys.
391    ///
392    /// # Panics
393    ///
394    /// Panics if no key query needs to be done.
395    #[instrument(skip(self, device_keys))]
396    pub(crate) async fn keys_query(
397        &self,
398        request_id: &TransactionId,
399        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
400    ) -> Result<get_keys::v3::Response> {
401        let request = assign!(get_keys::v3::Request::new(), { device_keys });
402
403        let response = self.send(request).await?;
404        self.mark_request_as_sent(request_id, &response).await?;
405        self.encryption().update_state_after_keys_query(&response).await;
406
407        Ok(response)
408    }
409
410    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
411    /// encrypting and uploading a provided reader.
412    ///
413    /// # Arguments
414    ///
415    /// * `content_type` - The content type of the file.
416    /// * `reader` - The reader that should be encrypted and uploaded.
417    ///
418    /// # Examples
419    ///
420    /// ```no_run
421    /// # use matrix_sdk::Client;
422    /// # use url::Url;
423    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
424    /// use serde::{Deserialize, Serialize};
425    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
426    ///
427    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
428    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
429    /// struct CustomEventContent {
430    ///     encrypted_file: EncryptedFile,
431    /// }
432    ///
433    /// # async {
434    /// # let homeserver = Url::parse("http://example.com")?;
435    /// # let client = Client::new(homeserver).await?;
436    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
437    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
438    /// let encrypted_file = client.upload_encrypted_file(&mime::TEXT_PLAIN, &mut reader).await?;
439    ///
440    /// room.send(CustomEventContent { encrypted_file }).await?;
441    /// # anyhow::Ok(()) };
442    /// ```
443    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
444        &'a self,
445        content_type: &'a mime::Mime,
446        reader: &'a mut R,
447    ) -> UploadEncryptedFile<'a, R> {
448        UploadEncryptedFile::new(self, content_type, reader)
449    }
450
451    /// Encrypt and upload the file and thumbnails, and return the source
452    /// information.
453    pub(crate) async fn upload_encrypted_media_and_thumbnail(
454        &self,
455        content_type: &mime::Mime,
456        data: &[u8],
457        thumbnail: Option<Thumbnail>,
458        send_progress: SharedObservable<TransmissionProgress>,
459    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
460        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
461
462        let upload_attachment = async {
463            let mut cursor = Cursor::new(data);
464            self.upload_encrypted_file(content_type, &mut cursor)
465                .with_send_progress_observable(send_progress)
466                .await
467        };
468
469        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
470
471        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
472    }
473
474    /// Uploads an encrypted thumbnail to the media repository, and returns
475    /// its source and extra information.
476    async fn upload_encrypted_thumbnail(
477        &self,
478        thumbnail: Option<Thumbnail>,
479        send_progress: SharedObservable<TransmissionProgress>,
480    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
481        let Some(thumbnail) = thumbnail else {
482            return Ok(None);
483        };
484
485        let (data, content_type, thumbnail_info) = thumbnail.into_parts();
486        let mut cursor = Cursor::new(data);
487
488        let file = self
489            .upload_encrypted_file(&content_type, &mut cursor)
490            .with_send_progress_observable(send_progress)
491            .await?;
492
493        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
494    }
495
496    /// Claim one-time keys creating new Olm sessions.
497    ///
498    /// # Arguments
499    ///
500    /// * `users` - The list of user/device pairs that we should claim keys for.
501    pub(crate) async fn claim_one_time_keys(
502        &self,
503        users: impl Iterator<Item = &UserId>,
504    ) -> Result<()> {
505        let _lock = self.locks().key_claim_lock.lock().await;
506
507        if let Some((request_id, request)) = self
508            .olm_machine()
509            .await
510            .as_ref()
511            .ok_or(Error::NoOlmMachine)?
512            .get_missing_sessions(users)
513            .await?
514        {
515            let response = self.send(request).await?;
516            self.mark_request_as_sent(&request_id, &response).await?;
517        }
518
519        Ok(())
520    }
521
522    /// Upload the E2E encryption keys.
523    ///
524    /// This uploads the long lived device keys as well as the required amount
525    /// of one-time keys.
526    ///
527    /// # Panics
528    ///
529    /// Panics if the client isn't logged in, or if no encryption keys need to
530    /// be uploaded.
531    #[instrument(skip(self, request))]
532    pub(crate) async fn keys_upload(
533        &self,
534        request_id: &TransactionId,
535        request: &upload_keys::v3::Request,
536    ) -> Result<upload_keys::v3::Response> {
537        debug!(
538            device_keys = request.device_keys.is_some(),
539            one_time_key_count = request.one_time_keys.len(),
540            "Uploading public encryption keys",
541        );
542
543        let response = self.send(request.clone()).await?;
544        self.mark_request_as_sent(request_id, &response).await?;
545
546        Ok(response)
547    }
548
549    pub(crate) async fn room_send_helper(
550        &self,
551        request: &RoomMessageRequest,
552    ) -> Result<send_message_event::v3::Response> {
553        let content = request.content.clone();
554        let txn_id = request.txn_id.clone();
555        let room_id = &request.room_id;
556
557        self.get_room(room_id)
558            .expect("Can't send a message to a room that isn't known to the store")
559            .send(content)
560            .with_transaction_id(txn_id)
561            .await
562    }
563
564    pub(crate) async fn send_to_device(
565        &self,
566        request: &ToDeviceRequest,
567    ) -> HttpResult<ToDeviceResponse> {
568        let request = RumaToDeviceRequest::new_raw(
569            request.event_type.clone(),
570            request.txn_id.clone(),
571            request.messages.clone(),
572        );
573
574        self.send(request).await
575    }
576
577    pub(crate) async fn send_verification_request(
578        &self,
579        request: OutgoingVerificationRequest,
580    ) -> Result<()> {
581        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
582
583        match request {
584            ToDevice(t) => {
585                self.send_to_device(&t).await?;
586            }
587            InRoom(r) => {
588                self.room_send_helper(&r).await?;
589            }
590        }
591
592        Ok(())
593    }
594
595    /// Get the existing DM room with the given user, if any.
596    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
597        let rooms = self.joined_rooms();
598
599        // Find the room we share with the `user_id` and only with `user_id`
600        let room = rooms.into_iter().find(|r| {
601            let targets = r.direct_targets();
602            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
603        });
604
605        trace!(?room, "Found room");
606        room
607    }
608
609    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
610        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
611
612        match r.request() {
613            AnyOutgoingRequest::KeysQuery(request) => {
614                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
615            }
616            AnyOutgoingRequest::KeysUpload(request) => {
617                self.keys_upload(r.request_id(), request).await?;
618            }
619            AnyOutgoingRequest::ToDeviceRequest(request) => {
620                let response = self.send_to_device(request).await?;
621                self.mark_request_as_sent(r.request_id(), &response).await?;
622            }
623            AnyOutgoingRequest::SignatureUpload(request) => {
624                let response = self.send(request.clone()).await?;
625                self.mark_request_as_sent(r.request_id(), &response).await?;
626            }
627            AnyOutgoingRequest::RoomMessage(request) => {
628                let response = self.room_send_helper(request).await?;
629                self.mark_request_as_sent(r.request_id(), &response).await?;
630            }
631            AnyOutgoingRequest::KeysClaim(request) => {
632                let response = self.send(request.clone()).await?;
633                self.mark_request_as_sent(r.request_id(), &response).await?;
634            }
635        }
636
637        Ok(())
638    }
639
640    #[instrument(skip_all)]
641    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
642        const MAX_CONCURRENT_REQUESTS: usize = 20;
643
644        // This is needed because sometimes we need to automatically
645        // claim some one-time keys to unwedge an existing Olm session.
646        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
647            warn!("Error while claiming one-time keys {:?}", e);
648        }
649
650        let outgoing_requests = stream::iter(
651            self.olm_machine()
652                .await
653                .as_ref()
654                .ok_or(Error::NoOlmMachine)?
655                .outgoing_requests()
656                .await?,
657        )
658        .map(|r| self.send_outgoing_request(r));
659
660        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
661
662        requests
663            .for_each(|r| async move {
664                match r {
665                    Ok(_) => (),
666                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
667                }
668            })
669            .await;
670
671        Ok(())
672    }
673}
674
675#[cfg(any(feature = "testing", test))]
676impl Client {
677    /// Get the olm machine, for testing purposes only.
678    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
679        self.olm_machine().await
680    }
681}
682
683/// A high-level API to manage the client's encryption.
684///
685/// To get this, use [`Client::encryption()`].
686#[derive(Debug, Clone)]
687pub struct Encryption {
688    /// The underlying client.
689    client: Client,
690}
691
692impl Encryption {
693    pub(crate) fn new(client: Client) -> Self {
694        Self { client }
695    }
696
697    /// Returns the current encryption settings for this client.
698    pub(crate) fn settings(&self) -> EncryptionSettings {
699        self.client.inner.e2ee.encryption_settings
700    }
701
702    /// Get the public ed25519 key of our own device. This is usually what is
703    /// called the fingerprint of the device.
704    pub async fn ed25519_key(&self) -> Option<String> {
705        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
706    }
707
708    /// Get the public Curve25519 key of our own device.
709    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
710        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
711    }
712
713    /// Get the current device creation timestamp.
714    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
715        match self.get_own_device().await {
716            Ok(Some(device)) => device.first_time_seen_ts(),
717            // Should not happen, there should always be an own device
718            _ => MilliSecondsSinceUnixEpoch::now(),
719        }
720    }
721
722    #[cfg(not(target_arch = "wasm32"))]
723    pub(crate) async fn import_secrets_bundle(
724        &self,
725        bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
726    ) -> Result<(), SecretImportError> {
727        let olm_machine = self.client.olm_machine().await;
728        let olm_machine =
729            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
730
731        olm_machine.store().import_secrets_bundle(bundle).await
732    }
733
734    /// Get the status of the private cross signing keys.
735    ///
736    /// This can be used to check which private cross signing keys we have
737    /// stored locally.
738    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
739        let olm = self.client.olm_machine().await;
740        let machine = olm.as_ref()?;
741        Some(machine.cross_signing_status().await)
742    }
743
744    /// Get all the tracked users we know about
745    ///
746    /// Tracked users are users for which we keep the device list of E2EE
747    /// capable devices up to date.
748    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
749        if let Some(machine) = self.client.olm_machine().await.as_ref() {
750            machine.tracked_users().await
751        } else {
752            Ok(HashSet::new())
753        }
754    }
755
756    /// Get a [`Subscriber`] for the [`VerificationState`].
757    ///
758    /// # Examples
759    ///
760    /// ```no_run
761    /// use matrix_sdk::{encryption, Client};
762    /// use url::Url;
763    ///
764    /// # async {
765    /// let homeserver = Url::parse("http://example.com")?;
766    /// let client = Client::new(homeserver).await?;
767    /// let mut subscriber = client.encryption().verification_state();
768    ///
769    /// let current_value = subscriber.get();
770    ///
771    /// println!("The current verification state is: {current_value:?}");
772    ///
773    /// if let Some(verification_state) = subscriber.next().await {
774    ///     println!("Received verification state update {:?}", verification_state)
775    /// }
776    /// # anyhow::Ok(()) };
777    /// ```
778    pub fn verification_state(&self) -> Subscriber<VerificationState> {
779        self.client.inner.verification_state.subscribe_reset()
780    }
781
782    /// Get a verification object with the given flow id.
783    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
784        let olm = self.client.olm_machine().await;
785        let olm = olm.as_ref()?;
786        #[allow(clippy::bind_instead_of_map)]
787        olm.get_verification(user_id, flow_id).and_then(|v| match v {
788            matrix_sdk_base::crypto::Verification::SasV1(s) => {
789                Some(SasVerification { inner: s, client: self.client.clone() }.into())
790            }
791            #[cfg(feature = "qrcode")]
792            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
793                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
794            }
795            _ => None,
796        })
797    }
798
799    /// Get a `VerificationRequest` object for the given user with the given
800    /// flow id.
801    pub async fn get_verification_request(
802        &self,
803        user_id: &UserId,
804        flow_id: impl AsRef<str>,
805    ) -> Option<VerificationRequest> {
806        let olm = self.client.olm_machine().await;
807        let olm = olm.as_ref()?;
808
809        olm.get_verification_request(user_id, flow_id)
810            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
811    }
812
813    /// Get a specific device of a user.
814    ///
815    /// # Arguments
816    ///
817    /// * `user_id` - The unique id of the user that the device belongs to.
818    ///
819    /// * `device_id` - The unique id of the device.
820    ///
821    /// Returns a `Device` if one is found and the crypto store didn't throw an
822    /// error.
823    ///
824    /// This will always return None if the client hasn't been logged in.
825    ///
826    /// # Examples
827    ///
828    /// ```no_run
829    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
830    /// # use url::Url;
831    /// # async {
832    /// # let alice = user_id!("@alice:example.org");
833    /// # let homeserver = Url::parse("http://example.com")?;
834    /// # let client = Client::new(homeserver).await?;
835    /// if let Some(device) =
836    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
837    /// {
838    ///     println!("{:?}", device.is_verified());
839    ///
840    ///     if !device.is_verified() {
841    ///         let verification = device.request_verification().await?;
842    ///     }
843    /// }
844    /// # anyhow::Ok(()) };
845    /// ```
846    pub async fn get_device(
847        &self,
848        user_id: &UserId,
849        device_id: &DeviceId,
850    ) -> Result<Option<Device>, CryptoStoreError> {
851        let olm = self.client.olm_machine().await;
852        let Some(machine) = olm.as_ref() else { return Ok(None) };
853        let device = machine.get_device(user_id, device_id, None).await?;
854        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
855    }
856
857    /// A convenience method to retrieve your own device from the store.
858    ///
859    /// This is the same as calling [`Encryption::get_device()`] with your own
860    /// user and device ID.
861    ///
862    /// This will always return a device, unless you are not logged in.
863    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
864        let olm = self.client.olm_machine().await;
865        let Some(machine) = olm.as_ref() else { return Ok(None) };
866        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
867        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
868    }
869
870    /// Get a map holding all the devices of an user.
871    ///
872    /// This will always return an empty map if the client hasn't been logged
873    /// in.
874    ///
875    /// # Arguments
876    ///
877    /// * `user_id` - The unique id of the user that the devices belong to.
878    ///
879    /// # Examples
880    ///
881    /// ```no_run
882    /// # use matrix_sdk::{Client, ruma::user_id};
883    /// # use url::Url;
884    /// # async {
885    /// # let alice = user_id!("@alice:example.org");
886    /// # let homeserver = Url::parse("http://example.com")?;
887    /// # let client = Client::new(homeserver).await?;
888    /// let devices = client.encryption().get_user_devices(alice).await?;
889    ///
890    /// for device in devices.devices() {
891    ///     println!("{device:?}");
892    /// }
893    /// # anyhow::Ok(()) };
894    /// ```
895    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
896        let devices = self
897            .client
898            .olm_machine()
899            .await
900            .as_ref()
901            .ok_or(Error::NoOlmMachine)?
902            .get_user_devices(user_id, None)
903            .await?;
904
905        Ok(UserDevices { inner: devices, client: self.client.clone() })
906    }
907
908    /// Get the E2EE identity of a user from the crypto store.
909    ///
910    /// Usually, we only have the E2EE identity of a user locally if the user
911    /// is tracked, meaning that we are both members of the same encrypted room.
912    ///
913    /// To get the E2EE identity of a user even if it is not available locally
914    /// use [`Encryption::request_user_identity()`].
915    ///
916    /// # Arguments
917    ///
918    /// * `user_id` - The unique id of the user that the identity belongs to.
919    ///
920    /// Returns a `UserIdentity` if one is found and the crypto store
921    /// didn't throw an error.
922    ///
923    /// This will always return None if the client hasn't been logged in.
924    ///
925    /// # Examples
926    ///
927    /// ```no_run
928    /// # use matrix_sdk::{Client, ruma::user_id};
929    /// # use url::Url;
930    /// # async {
931    /// # let alice = user_id!("@alice:example.org");
932    /// # let homeserver = Url::parse("http://example.com")?;
933    /// # let client = Client::new(homeserver).await?;
934    /// let user = client.encryption().get_user_identity(alice).await?;
935    ///
936    /// if let Some(user) = user {
937    ///     println!("{:?}", user.is_verified());
938    ///
939    ///     let verification = user.request_verification().await?;
940    /// }
941    /// # anyhow::Ok(()) };
942    /// ```
943    pub async fn get_user_identity(
944        &self,
945        user_id: &UserId,
946    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
947        let olm = self.client.olm_machine().await;
948        let Some(olm) = olm.as_ref() else { return Ok(None) };
949        let identity = olm.get_identity(user_id, None).await?;
950
951        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
952    }
953
954    /// Get the E2EE identity of a user from the homeserver.
955    ///
956    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
957    /// E2EE identity is not found, it should mean that the user did not set
958    /// up cross-signing.
959    ///
960    /// If you want the E2EE identity of a user without making a request to the
961    /// homeserver, use [`Encryption::get_user_identity()`] instead.
962    ///
963    /// # Arguments
964    ///
965    /// * `user_id` - The ID of the user that the identity belongs to.
966    ///
967    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
968    /// was an issue with the crypto store or with the request to the
969    /// homeserver.
970    ///
971    /// This will always return `None` if the client hasn't been logged in.
972    ///
973    /// # Examples
974    ///
975    /// ```no_run
976    /// # use matrix_sdk::{Client, ruma::user_id};
977    /// # use url::Url;
978    /// # async {
979    /// # let alice = user_id!("@alice:example.org");
980    /// # let homeserver = Url::parse("http://example.com")?;
981    /// # let client = Client::new(homeserver).await?;
982    /// let user = client.encryption().request_user_identity(alice).await?;
983    ///
984    /// if let Some(user) = user {
985    ///     println!("User is verified: {:?}", user.is_verified());
986    ///
987    ///     let verification = user.request_verification().await?;
988    /// }
989    /// # anyhow::Ok(()) };
990    /// ```
991    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
992        let olm = self.client.olm_machine().await;
993        let Some(olm) = olm.as_ref() else { return Ok(None) };
994
995        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
996        self.client.keys_query(&request_id, request.device_keys).await?;
997
998        let identity = olm.get_identity(user_id, None).await?;
999        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1000    }
1001
1002    /// Returns a stream of device updates, allowing users to listen for
1003    /// notifications about new or changed devices.
1004    ///
1005    /// The stream produced by this method emits updates whenever a new device
1006    /// is discovered or when an existing device's information is changed. Users
1007    /// can subscribe to this stream and receive updates in real-time.
1008    ///
1009    /// # Examples
1010    ///
1011    /// ```no_run
1012    /// # use matrix_sdk::Client;
1013    /// # use ruma::{device_id, user_id};
1014    /// # use futures_util::{pin_mut, StreamExt};
1015    /// # let client: Client = unimplemented!();
1016    /// # async {
1017    /// let devices_stream = client.encryption().devices_stream().await?;
1018    /// let user_id = client
1019    ///     .user_id()
1020    ///     .expect("We should know our user id after we have logged in");
1021    /// pin_mut!(devices_stream);
1022    ///
1023    /// for device_updates in devices_stream.next().await {
1024    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1025    ///         for device in user_devices.values() {
1026    ///             println!("A new device has been added {}", device.device_id());
1027    ///         }
1028    ///     }
1029    /// }
1030    /// # anyhow::Ok(()) };
1031    /// ```
1032    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates>> {
1033        let olm = self.client.olm_machine().await;
1034        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1035        let client = self.client.to_owned();
1036
1037        Ok(olm
1038            .store()
1039            .devices_stream()
1040            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1041    }
1042
1043    /// Returns a stream of user identity updates, allowing users to listen for
1044    /// notifications about new or changed user identities.
1045    ///
1046    /// The stream produced by this method emits updates whenever a new user
1047    /// identity is discovered or when an existing identities information is
1048    /// changed. Users can subscribe to this stream and receive updates in
1049    /// real-time.
1050    ///
1051    /// # Examples
1052    ///
1053    /// ```no_run
1054    /// # use matrix_sdk::Client;
1055    /// # use ruma::{device_id, user_id};
1056    /// # use futures_util::{pin_mut, StreamExt};
1057    /// # let client: Client = unimplemented!();
1058    /// # async {
1059    /// let identities_stream =
1060    ///     client.encryption().user_identities_stream().await?;
1061    /// pin_mut!(identities_stream);
1062    ///
1063    /// for identity_updates in identities_stream.next().await {
1064    ///     for (_, identity) in identity_updates.new {
1065    ///         println!("A new identity has been added {}", identity.user_id());
1066    ///     }
1067    /// }
1068    /// # anyhow::Ok(()) };
1069    /// ```
1070    pub async fn user_identities_stream(&self) -> Result<impl Stream<Item = IdentityUpdates>> {
1071        let olm = self.client.olm_machine().await;
1072        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1073        let client = self.client.to_owned();
1074
1075        Ok(olm
1076            .store()
1077            .user_identities_stream()
1078            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1079    }
1080
1081    /// Create and upload a new cross signing identity.
1082    ///
1083    /// # Arguments
1084    ///
1085    /// * `auth_data` - This request requires user interactive auth, the first
1086    ///   request needs to set this to `None` and will always fail with an
1087    ///   `UiaaResponse`. The response will contain information for the
1088    ///   interactive auth and the same request needs to be made but this time
1089    ///   with some `auth_data` provided.
1090    ///
1091    /// # Examples
1092    ///
1093    /// ```no_run
1094    /// # use std::collections::BTreeMap;
1095    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1096    /// # use url::Url;
1097    /// # use serde_json::json;
1098    /// # async {
1099    /// # let homeserver = Url::parse("http://example.com")?;
1100    /// # let client = Client::new(homeserver).await?;
1101    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1102    ///     if let Some(response) = e.as_uiaa_response() {
1103    ///         let mut password = uiaa::Password::new(
1104    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1105    ///             "wordpass".to_owned(),
1106    ///         );
1107    ///         password.session = response.session.clone();
1108    ///
1109    ///         client
1110    ///             .encryption()
1111    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1112    ///             .await
1113    ///             .expect("Couldn't bootstrap cross signing")
1114    ///     } else {
1115    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1116    ///     }
1117    /// }
1118    /// # anyhow::Ok(()) };
1119    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1120        let olm = self.client.olm_machine().await;
1121        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1122
1123        let CrossSigningBootstrapRequests {
1124            upload_signing_keys_req,
1125            upload_keys_req,
1126            upload_signatures_req,
1127        } = olm.bootstrap_cross_signing(false).await?;
1128
1129        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1130            auth: auth_data,
1131            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1132            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1133            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1134        });
1135
1136        if let Some(req) = upload_keys_req {
1137            self.client.send_outgoing_request(req).await?;
1138        }
1139        self.client.send(upload_signing_keys_req).await?;
1140        self.client.send(upload_signatures_req).await?;
1141
1142        Ok(())
1143    }
1144
1145    /// Reset the cross-signing keys.
1146    ///
1147    /// # Example
1148    ///
1149    /// ```no_run
1150    /// # use matrix_sdk::{ruma::api::client::uiaa, Client, encryption::CrossSigningResetAuthType};
1151    /// # use url::Url;
1152    /// # async {
1153    /// # let homeserver = Url::parse("http://example.com")?;
1154    /// # let client = Client::new(homeserver).await?;
1155    /// # let user_id = unimplemented!();
1156    /// let encryption = client.encryption();
1157    ///
1158    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1159    ///     match handle.auth_type() {
1160    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1161    ///             use matrix_sdk::ruma::api::client::uiaa;
1162    ///
1163    ///             let password = "1234".to_owned();
1164    ///             let mut password = uiaa::Password::new(user_id, password);
1165    ///             password.session = uiaa.session;
1166    ///
1167    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1168    ///         }
1169    ///         CrossSigningResetAuthType::OAuth(o) => {
1170    ///             println!(
1171    ///                 "To reset your end-to-end encryption cross-signing identity, \
1172    ///                 you first need to approve it at {}",
1173    ///                 o.approval_url
1174    ///             );
1175    ///             handle.auth(None).await?;
1176    ///         }
1177    ///     }
1178    /// }
1179    /// # anyhow::Ok(()) };
1180    /// ```
1181    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1182        let olm = self.client.olm_machine().await;
1183        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1184
1185        let CrossSigningBootstrapRequests {
1186            upload_keys_req,
1187            upload_signing_keys_req,
1188            upload_signatures_req,
1189        } = olm.bootstrap_cross_signing(true).await?;
1190
1191        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1192            auth: None,
1193            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1194            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1195            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1196        });
1197
1198        if let Some(req) = upload_keys_req {
1199            self.client.send_outgoing_request(req).await?;
1200        }
1201
1202        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1203            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1204                let client = self.client.clone();
1205
1206                Ok(Some(CrossSigningResetHandle::new(
1207                    client,
1208                    upload_signing_keys_req,
1209                    upload_signatures_req,
1210                    auth_type,
1211                )))
1212            } else {
1213                Err(error.into())
1214            }
1215        } else {
1216            self.client.send(upload_signatures_req).await?;
1217
1218            Ok(None)
1219        }
1220    }
1221
1222    /// Query the user's own device keys, if, and only if, we didn't have their
1223    /// identity in the first place.
1224    async fn ensure_initial_key_query(&self) -> Result<()> {
1225        let olm_machine = self.client.olm_machine().await;
1226        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1227
1228        let user_id = olm_machine.user_id();
1229
1230        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1231            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1232            self.client.keys_query(&request_id, request.device_keys).await?;
1233        }
1234
1235        Ok(())
1236    }
1237
1238    /// Create and upload a new cross signing identity, if that has not been
1239    /// done yet.
1240    ///
1241    /// This will only create a new cross-signing identity if the user had never
1242    /// done it before. If the user did it before, then this is a no-op.
1243    ///
1244    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1245    /// behavior of this function.
1246    ///
1247    /// # Arguments
1248    ///
1249    /// * `auth_data` - This request requires user interactive auth, the first
1250    ///   request needs to set this to `None` and will always fail with an
1251    ///   `UiaaResponse`. The response will contain information for the
1252    ///   interactive auth and the same request needs to be made but this time
1253    ///   with some `auth_data` provided.
1254    ///
1255    /// # Examples
1256    /// ```no_run
1257    /// # use std::collections::BTreeMap;
1258    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1259    /// # use url::Url;
1260    /// # use serde_json::json;
1261    /// # async {
1262    /// # let homeserver = Url::parse("http://example.com")?;
1263    /// # let client = Client::new(homeserver).await?;
1264    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1265    ///     if let Some(response) = e.as_uiaa_response() {
1266    ///         let mut password = uiaa::Password::new(
1267    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1268    ///             "wordpass".to_owned(),
1269    ///         );
1270    ///         password.session = response.session.clone();
1271    ///
1272    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1273    ///         // avoid checks.
1274    ///         client
1275    ///             .encryption()
1276    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1277    ///             .await
1278    ///             .expect("Couldn't bootstrap cross signing")
1279    ///     } else {
1280    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1281    ///     }
1282    /// }
1283    /// # anyhow::Ok(()) };
1284    pub async fn bootstrap_cross_signing_if_needed(
1285        &self,
1286        auth_data: Option<AuthData>,
1287    ) -> Result<()> {
1288        let olm_machine = self.client.olm_machine().await;
1289        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1290        let user_id = olm_machine.user_id();
1291
1292        self.ensure_initial_key_query().await?;
1293
1294        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1295            self.bootstrap_cross_signing(auth_data).await?;
1296        }
1297
1298        Ok(())
1299    }
1300
1301    /// Export E2EE keys that match the given predicate encrypting them with the
1302    /// given passphrase.
1303    ///
1304    /// # Arguments
1305    ///
1306    /// * `path` - The file path where the exported key file will be saved.
1307    ///
1308    /// * `passphrase` - The passphrase that will be used to encrypt the
1309    ///   exported room keys.
1310    ///
1311    /// * `predicate` - A closure that will be called for every known
1312    ///   `InboundGroupSession`, which represents a room key. If the closure
1313    ///   returns `true` the `InboundGroupSessoin` will be included in the
1314    ///   export, if the closure returns `false` it will not be included.
1315    ///
1316    /// # Panics
1317    ///
1318    /// This method will panic if it isn't run on a Tokio runtime.
1319    ///
1320    /// This method will panic if it can't get enough randomness from the OS to
1321    /// encrypt the exported keys securely.
1322    ///
1323    /// # Examples
1324    ///
1325    /// ```no_run
1326    /// # use std::{path::PathBuf, time::Duration};
1327    /// # use matrix_sdk::{
1328    /// #     Client, config::SyncSettings,
1329    /// #     ruma::room_id,
1330    /// # };
1331    /// # use url::Url;
1332    /// # async {
1333    /// # let homeserver = Url::parse("http://localhost:8080")?;
1334    /// # let mut client = Client::new(homeserver).await?;
1335    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1336    /// // Export all room keys.
1337    /// client
1338    ///     .encryption()
1339    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1340    ///     .await?;
1341    ///
1342    /// // Export only the room keys for a certain room.
1343    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1344    /// let room_id = room_id!("!test:localhost");
1345    ///
1346    /// client
1347    ///     .encryption()
1348    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1349    ///     .await?;
1350    /// # anyhow::Ok(()) };
1351    /// ```
1352    #[cfg(not(target_arch = "wasm32"))]
1353    pub async fn export_room_keys(
1354        &self,
1355        path: PathBuf,
1356        passphrase: &str,
1357        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1358    ) -> Result<()> {
1359        let olm = self.client.olm_machine().await;
1360        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1361
1362        let keys = olm.store().export_room_keys(predicate).await?;
1363        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1364
1365        let encrypt = move || -> Result<()> {
1366            let export: String =
1367                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1368            let mut file = std::fs::File::create(path)?;
1369            file.write_all(&export.into_bytes())?;
1370            Ok(())
1371        };
1372
1373        let task = tokio::task::spawn_blocking(encrypt);
1374        task.await.expect("Task join error")
1375    }
1376
1377    /// Import E2EE keys from the given file path.
1378    ///
1379    /// # Arguments
1380    ///
1381    /// * `path` - The file path where the exported key file will can be found.
1382    ///
1383    /// * `passphrase` - The passphrase that should be used to decrypt the
1384    ///   exported room keys.
1385    ///
1386    /// Returns a tuple of numbers that represent the number of sessions that
1387    /// were imported and the total number of sessions that were found in the
1388    /// key export.
1389    ///
1390    /// # Panics
1391    ///
1392    /// This method will panic if it isn't run on a Tokio runtime.
1393    ///
1394    /// ```no_run
1395    /// # use std::{path::PathBuf, time::Duration};
1396    /// # use matrix_sdk::{
1397    /// #     Client, config::SyncSettings,
1398    /// #     ruma::room_id,
1399    /// # };
1400    /// # use url::Url;
1401    /// # async {
1402    /// # let homeserver = Url::parse("http://localhost:8080")?;
1403    /// # let mut client = Client::new(homeserver).await?;
1404    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1405    /// let result =
1406    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1407    ///
1408    /// println!(
1409    ///     "Imported {} room keys out of {}",
1410    ///     result.imported_count, result.total_count
1411    /// );
1412    /// # anyhow::Ok(()) };
1413    /// ```
1414    #[cfg(not(target_arch = "wasm32"))]
1415    pub async fn import_room_keys(
1416        &self,
1417        path: PathBuf,
1418        passphrase: &str,
1419    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1420        let olm = self.client.olm_machine().await;
1421        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1422        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1423
1424        let decrypt = move || {
1425            let file = std::fs::File::open(path)?;
1426            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1427        };
1428
1429        let task = tokio::task::spawn_blocking(decrypt);
1430        let import = task.await.expect("Task join error")?;
1431
1432        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1433
1434        self.backups().maybe_trigger_backup();
1435
1436        Ok(ret)
1437    }
1438
1439    /// Receive notifications of room keys being received as a [`Stream`].
1440    ///
1441    /// Each time a room key is updated in any way, an update will be sent to
1442    /// the stream. Updates that happen at the same time are batched into a
1443    /// [`Vec`].
1444    ///
1445    /// If the reader of the stream lags too far behind, an error is broadcast
1446    /// containing the number of skipped items.
1447    ///
1448    /// # Examples
1449    ///
1450    /// ```no_run
1451    /// # use matrix_sdk::Client;
1452    /// # use url::Url;
1453    /// # async {
1454    /// # let homeserver = Url::parse("http://example.com")?;
1455    /// # let client = Client::new(homeserver).await?;
1456    /// use futures_util::StreamExt;
1457    ///
1458    /// let Some(mut room_keys_stream) =
1459    ///     client.encryption().room_keys_received_stream().await
1460    /// else {
1461    ///     return Ok(());
1462    /// };
1463    ///
1464    /// while let Some(update) = room_keys_stream.next().await {
1465    ///     println!("Received room keys {update:?}");
1466    /// }
1467    /// # anyhow::Ok(()) };
1468    /// ```
1469    pub async fn room_keys_received_stream(
1470        &self,
1471    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>> {
1472        let olm = self.client.olm_machine().await;
1473        let olm = olm.as_ref()?;
1474
1475        Some(olm.store().room_keys_received_stream())
1476    }
1477
1478    /// Get the secret storage manager of the client.
1479    pub fn secret_storage(&self) -> SecretStorage {
1480        SecretStorage { client: self.client.to_owned() }
1481    }
1482
1483    /// Get the backups manager of the client.
1484    pub fn backups(&self) -> Backups {
1485        Backups { client: self.client.to_owned() }
1486    }
1487
1488    /// Get the recovery manager of the client.
1489    pub fn recovery(&self) -> Recovery {
1490        Recovery { client: self.client.to_owned() }
1491    }
1492
1493    /// Enables the crypto-store cross-process lock.
1494    ///
1495    /// This may be required if there are multiple processes that may do writes
1496    /// to the same crypto store. In that case, it's necessary to create a
1497    /// lock, so that only one process writes to it, otherwise this may
1498    /// cause confusing issues because of stale data contained in in-memory
1499    /// caches.
1500    ///
1501    /// The provided `lock_value` must be a unique identifier for this process.
1502    /// Check [`Client::cross_process_store_locks_holder_name`] to
1503    /// get the global value.
1504    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1505        // If the lock has already been created, don't recreate it from scratch.
1506        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1507            let prev_holder = prev_lock.lock_holder();
1508            if prev_holder == lock_value {
1509                return Ok(());
1510            }
1511            warn!("Recreating cross-process store lock with a different holder value: prev was {prev_holder}, new is {lock_value}");
1512        }
1513
1514        let olm_machine = self.client.base_client().olm_machine().await;
1515        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1516
1517        let lock =
1518            olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1519
1520        // Gently try to initialize the crypto store generation counter.
1521        //
1522        // If we don't get the lock immediately, then it is already acquired by another
1523        // process, and we'll get to reload next time we acquire the lock.
1524        {
1525            let guard = lock.try_lock_once().await?;
1526            if guard.is_some() {
1527                olm_machine
1528                    .initialize_crypto_store_generation(
1529                        &self.client.locks().crypto_store_generation,
1530                    )
1531                    .await?;
1532            }
1533        }
1534
1535        self.client
1536            .locks()
1537            .cross_process_crypto_store_lock
1538            .set(lock)
1539            .map_err(|_| Error::BadCryptoStoreState)?;
1540
1541        Ok(())
1542    }
1543
1544    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1545    /// time.
1546    ///
1547    /// Returns the current generation number.
1548    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1549        let olm_machine_guard = self.client.olm_machine().await;
1550        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1551            let (new_gen, generation_number) = olm_machine
1552                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1553                .await?;
1554            // If the crypto store generation has changed,
1555            if new_gen {
1556                // (get rid of the reference to the current crypto store first)
1557                drop(olm_machine_guard);
1558                // Recreate the OlmMachine.
1559                self.client.base_client().regenerate_olm(None).await?;
1560            }
1561            Ok(generation_number)
1562        } else {
1563            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1564            // been initialised by the time we get here. Ideally we'd panic, or return an
1565            // error, but for now I'm just adding some logging to check if it
1566            // happens, and returning the magic number 0.
1567            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1568            Ok(0)
1569        }
1570    }
1571
1572    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1573    /// spin-waits until the lock is available.
1574    ///
1575    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1576    /// first time.
1577    pub async fn spin_lock_store(
1578        &self,
1579        max_backoff: Option<u32>,
1580    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1581        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1582            let guard = lock.spin_lock(max_backoff).await?;
1583
1584            let generation = self.on_lock_newly_acquired().await?;
1585
1586            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1587        } else {
1588            Ok(None)
1589        }
1590    }
1591
1592    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1593    /// attempts to lock it once.
1594    ///
1595    /// Returns a guard to the lock, if it was obtained.
1596    pub async fn try_lock_store_once(
1597        &self,
1598    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1599        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1600            let maybe_guard = lock.try_lock_once().await?;
1601
1602            let Some(guard) = maybe_guard else {
1603                return Ok(None);
1604            };
1605
1606            let generation = self.on_lock_newly_acquired().await?;
1607
1608            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1609        } else {
1610            Ok(None)
1611        }
1612    }
1613
1614    /// Testing purposes only.
1615    #[cfg(any(test, feature = "testing"))]
1616    pub async fn uploaded_key_count(&self) -> Result<u64> {
1617        let olm_machine = self.client.olm_machine().await;
1618        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1619        Ok(olm_machine.uploaded_key_count().await?)
1620    }
1621
1622    /// Bootstrap encryption and enables event listeners for the E2EE support.
1623    ///
1624    /// Based on the `EncryptionSettings`, this call might:
1625    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1626    /// - Create a key backup if needed (POST `/room_keys/version`)
1627    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1628    ///
1629    /// As part of this process, and if needed, the current device keys would be
1630    /// uploaded to the server, new account data would be added, and cross
1631    /// signing keys and signatures might be uploaded.
1632    ///
1633    /// Should be called once we
1634    /// created a [`OlmMachine`], i.e. after logging in.
1635    ///
1636    /// # Arguments
1637    ///
1638    /// * `auth_data` - Some requests may require re-authentication. To prevent
1639    ///   the user from having to re-enter their password (or use other
1640    ///   methods), we can provide the authentication data here. This is
1641    ///   necessary for uploading cross-signing keys. However, please note that
1642    ///   there is a proposal (MSC3967) to remove this requirement, which would
1643    ///   allow for the initial upload of cross-signing keys without
1644    ///   authentication, rendering this parameter obsolete.
1645    pub(crate) fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1646        let mut tasks = self.client.inner.e2ee.tasks.lock();
1647
1648        let this = self.clone();
1649        tasks.setup_e2ee = Some(spawn(async move {
1650            // Update the current state first, so we don't have to wait for the result of
1651            // network requests
1652            this.update_verification_state().await;
1653
1654            if this.settings().auto_enable_cross_signing {
1655                if let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await {
1656                    error!("Couldn't bootstrap cross signing {e:?}");
1657                }
1658            }
1659
1660            if let Err(e) = this.backups().setup_and_resume().await {
1661                error!("Couldn't setup and resume backups {e:?}");
1662            }
1663            if let Err(e) = this.recovery().setup().await {
1664                error!("Couldn't setup and resume recovery {e:?}");
1665            }
1666        }));
1667    }
1668
1669    /// Waits for end-to-end encryption initialization tasks to finish, if any
1670    /// was running in the background.
1671    pub async fn wait_for_e2ee_initialization_tasks(&self) {
1672        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1673
1674        if let Some(task) = task {
1675            if let Err(err) = task.await {
1676                warn!("Error when initializing backups: {err}");
1677            }
1678        }
1679    }
1680
1681    /// Upload the device keys and initial set of one-time keys to the server.
1682    ///
1683    /// This should only be called when the user logs in for the first time,
1684    /// the method will ensure that other devices see our own device as an
1685    /// end-to-end encryption enabled one.
1686    ///
1687    /// **Warning**: Do not use this method if we're already calling
1688    /// [`Client::send_outgoing_request()`]. This method is intended for
1689    /// explicitly uploading the device keys before starting a sync.
1690    #[cfg(not(target_arch = "wasm32"))]
1691    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1692        let olm = self.client.olm_machine().await;
1693        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1694
1695        if let Some((request_id, request)) = olm.upload_device_keys().await? {
1696            self.client.keys_upload(&request_id, &request).await?;
1697
1698            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1699            self.client.keys_query(&request_id, request.device_keys).await?;
1700        }
1701
1702        Ok(())
1703    }
1704
1705    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1706        self.recovery().update_state_after_keys_query(response).await;
1707
1708        // Only update the verification_state if our own devices changed
1709        if let Some(user_id) = self.client.user_id() {
1710            let contains_own_device = response.device_keys.contains_key(user_id);
1711
1712            if contains_own_device {
1713                self.update_verification_state().await;
1714            }
1715        }
1716    }
1717
1718    async fn update_verification_state(&self) {
1719        match self.get_own_device().await {
1720            Ok(device) => {
1721                if let Some(device) = device {
1722                    let is_verified = device.is_cross_signed_by_owner();
1723
1724                    if is_verified {
1725                        self.client.inner.verification_state.set(VerificationState::Verified);
1726                    } else {
1727                        self.client.inner.verification_state.set(VerificationState::Unverified);
1728                    }
1729                } else {
1730                    warn!("Couldn't find out own device in the store.");
1731                    self.client.inner.verification_state.set(VerificationState::Unknown);
1732                }
1733            }
1734            Err(error) => {
1735                warn!("Failed retrieving own device: {error}");
1736                self.client.inner.verification_state.set(VerificationState::Unknown);
1737            }
1738        }
1739    }
1740}
1741
1742#[cfg(all(test, not(target_arch = "wasm32")))]
1743mod tests {
1744    use std::{
1745        ops::Not,
1746        sync::{
1747            atomic::{AtomicBool, Ordering},
1748            Arc,
1749        },
1750        time::Duration,
1751    };
1752
1753    use matrix_sdk_test::{
1754        async_test, test_json, GlobalAccountDataTestEvent, JoinedRoomBuilder, StateTestEvent,
1755        SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
1756    };
1757    use ruma::{
1758        event_id,
1759        events::{reaction::ReactionEventContent, relation::Annotation},
1760        user_id,
1761    };
1762    use serde_json::json;
1763    use wiremock::{
1764        matchers::{header, method, path_regex},
1765        Mock, MockServer, Request, ResponseTemplate,
1766    };
1767
1768    use crate::{
1769        assert_next_matches_with_timeout,
1770        config::RequestConfig,
1771        encryption::{OAuthCrossSigningResetInfo, VerificationState},
1772        test_utils::{
1773            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
1774        },
1775        Client,
1776    };
1777
1778    #[async_test]
1779    async fn test_reaction_sending() {
1780        let server = MockServer::start().await;
1781        let client = logged_in_client(Some(server.uri())).await;
1782
1783        let event_id = event_id!("$2:example.org");
1784
1785        Mock::given(method("GET"))
1786            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
1787            .and(header("authorization", "Bearer 1234"))
1788            .respond_with(
1789                ResponseTemplate::new(200)
1790                    .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
1791            )
1792            .mount(&server)
1793            .await;
1794
1795        Mock::given(method("PUT"))
1796            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
1797            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1798                "event_id": event_id,
1799            })))
1800            .mount(&server)
1801            .await;
1802
1803        let response = SyncResponseBuilder::default()
1804            .add_joined_room(
1805                JoinedRoomBuilder::default()
1806                    .add_state_event(StateTestEvent::Member)
1807                    .add_state_event(StateTestEvent::PowerLevels)
1808                    .add_state_event(StateTestEvent::Encryption),
1809            )
1810            .build_sync_response();
1811
1812        client.base_client().receive_sync_response(response).await.unwrap();
1813
1814        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
1815        assert!(room
1816            .latest_encryption_state()
1817            .await
1818            .expect("Getting encryption state")
1819            .is_encrypted());
1820
1821        let event_id = event_id!("$1:example.org");
1822        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
1823        room.send(reaction).await.expect("Sending the reaction should not fail");
1824
1825        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
1826    }
1827
1828    #[async_test]
1829    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
1830        let server = MockServer::start().await;
1831        let client = logged_in_client(Some(server.uri())).await;
1832        // This is the user ID that is inside MemberAdditional.
1833        // Note the confusing username, so we can share
1834        // GlobalAccountDataTestEvent::Direct with the invited test.
1835        let user_id = user_id!("@invited:localhost");
1836
1837        // When we receive a sync response saying "invited" is invited to a DM
1838        let response = SyncResponseBuilder::default()
1839            .add_joined_room(
1840                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
1841            )
1842            .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1843            .build_sync_response();
1844        client.base_client().receive_sync_response(response).await.unwrap();
1845
1846        // Then get_dm_room finds this room
1847        let found_room = client.get_dm_room(user_id).expect("DM not found!");
1848        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1849    }
1850
1851    #[async_test]
1852    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
1853        let server = MockServer::start().await;
1854        let client = logged_in_client(Some(server.uri())).await;
1855        // This is the user ID that is inside MemberInvite
1856        let user_id = user_id!("@invited:localhost");
1857
1858        // When we receive a sync response saying "invited" is invited to a DM
1859        let response = SyncResponseBuilder::default()
1860            .add_joined_room(
1861                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
1862            )
1863            .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1864            .build_sync_response();
1865        client.base_client().receive_sync_response(response).await.unwrap();
1866
1867        // Then get_dm_room finds this room
1868        let found_room = client.get_dm_room(user_id).expect("DM not found!");
1869        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1870    }
1871
1872    #[async_test]
1873    async fn test_get_dm_room_still_finds_left_room() {
1874        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
1875        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
1876
1877        let server = MockServer::start().await;
1878        let client = logged_in_client(Some(server.uri())).await;
1879        // This is the user ID that is inside MemberAdditional.
1880        // Note the confusing username, so we can share
1881        // GlobalAccountDataTestEvent::Direct with the invited test.
1882        let user_id = user_id!("@invited:localhost");
1883
1884        // When we receive a sync response saying "invited" is invited to a DM
1885        let response = SyncResponseBuilder::default()
1886            .add_joined_room(
1887                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
1888            )
1889            .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1890            .build_sync_response();
1891        client.base_client().receive_sync_response(response).await.unwrap();
1892
1893        // Then get_dm_room finds this room
1894        let found_room = client.get_dm_room(user_id).expect("DM not found!");
1895        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1896    }
1897
1898    #[cfg(feature = "sqlite")]
1899    #[async_test]
1900    async fn test_generation_counter_invalidates_olm_machine() {
1901        // Create two clients using the same sqlite database.
1902
1903        use matrix_sdk_base::store::RoomLoadSettings;
1904        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
1905        let session = mock_matrix_session();
1906
1907        let client1 = Client::builder()
1908            .homeserver_url("http://localhost:1234")
1909            .request_config(RequestConfig::new().disable_retry())
1910            .sqlite_store(&sqlite_path, None)
1911            .build()
1912            .await
1913            .unwrap();
1914        client1
1915            .matrix_auth()
1916            .restore_session(session.clone(), RoomLoadSettings::default())
1917            .await
1918            .unwrap();
1919
1920        let client2 = Client::builder()
1921            .homeserver_url("http://localhost:1234")
1922            .request_config(RequestConfig::new().disable_retry())
1923            .sqlite_store(sqlite_path, None)
1924            .build()
1925            .await
1926            .unwrap();
1927        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
1928
1929        // When the lock isn't enabled, any attempt at locking won't return a guard.
1930        let guard = client1.encryption().try_lock_store_once().await.unwrap();
1931        assert!(guard.is_none());
1932
1933        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
1934        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
1935
1936        // One client can take the lock.
1937        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
1938        assert!(acquired1.is_some());
1939
1940        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
1941        let initial_olm_machine =
1942            client1.olm_machine().await.clone().expect("must have an olm machine");
1943
1944        // Also enable backup to check that new machine has the same backup keys.
1945        let decryption_key = matrix_sdk_base::crypto::store::BackupDecryptionKey::new()
1946            .expect("Can't create new recovery key");
1947        let backup_key = decryption_key.megolm_v1_public_key();
1948        backup_key.set_version("1".to_owned());
1949        initial_olm_machine
1950            .backup_machine()
1951            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
1952            .await
1953            .expect("Should save");
1954
1955        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
1956
1957        assert!(client1.encryption().backups().are_enabled().await);
1958
1959        // The other client can't take the lock too.
1960        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
1961        assert!(acquired2.is_none());
1962
1963        // Now have the first client release the lock,
1964        drop(acquired1);
1965        tokio::time::sleep(Duration::from_millis(100)).await;
1966
1967        // And re-take it.
1968        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
1969        assert!(acquired1.is_some());
1970
1971        // In that case, the Olm Machine shouldn't change.
1972        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
1973        assert!(initial_olm_machine.same_as(&olm_machine));
1974
1975        // Ok, release again.
1976        drop(acquired1);
1977        tokio::time::sleep(Duration::from_millis(100)).await;
1978
1979        // Client2 can acquire the lock.
1980        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
1981        assert!(acquired2.is_some());
1982
1983        // And then release it.
1984        drop(acquired2);
1985        tokio::time::sleep(Duration::from_millis(100)).await;
1986
1987        // Client1 can acquire it again,
1988        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
1989        assert!(acquired1.is_some());
1990
1991        // But now its olm machine has been invalidated and thus regenerated!
1992        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
1993
1994        assert!(!initial_olm_machine.same_as(&olm_machine));
1995
1996        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
1997        assert!(backup_key_new.decryption_key.is_some());
1998        assert_eq!(
1999            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2000            backup_key.to_base64()
2001        );
2002        assert!(client1.encryption().backups().are_enabled().await);
2003    }
2004
2005    #[cfg(feature = "sqlite")]
2006    #[async_test]
2007    async fn test_generation_counter_no_spurious_invalidation() {
2008        // Create two clients using the same sqlite database.
2009
2010        use matrix_sdk_base::store::RoomLoadSettings;
2011        let sqlite_path =
2012            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2013        let session = mock_matrix_session();
2014
2015        let client = Client::builder()
2016            .homeserver_url("http://localhost:1234")
2017            .request_config(RequestConfig::new().disable_retry())
2018            .sqlite_store(&sqlite_path, None)
2019            .build()
2020            .await
2021            .unwrap();
2022        client
2023            .matrix_auth()
2024            .restore_session(session.clone(), RoomLoadSettings::default())
2025            .await
2026            .unwrap();
2027
2028        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2029
2030        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2031
2032        // Enabling the lock doesn't update the olm machine.
2033        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2034        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2035
2036        {
2037            // Simulate that another client hold the lock before.
2038            let client2 = Client::builder()
2039                .homeserver_url("http://localhost:1234")
2040                .request_config(RequestConfig::new().disable_retry())
2041                .sqlite_store(sqlite_path, None)
2042                .build()
2043                .await
2044                .unwrap();
2045            client2
2046                .matrix_auth()
2047                .restore_session(session, RoomLoadSettings::default())
2048                .await
2049                .unwrap();
2050
2051            client2
2052                .encryption()
2053                .enable_cross_process_store_lock("client2".to_owned())
2054                .await
2055                .unwrap();
2056
2057            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2058            assert!(guard.is_some());
2059
2060            drop(guard);
2061            tokio::time::sleep(Duration::from_millis(100)).await;
2062        }
2063
2064        {
2065            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2066            assert!(acquired.is_some());
2067        }
2068
2069        // Taking the lock the first time will update the olm machine.
2070        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2071        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2072
2073        {
2074            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2075            assert!(acquired.is_some());
2076        }
2077
2078        // Re-taking the lock doesn't update the olm machine.
2079        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2080        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2081    }
2082
2083    #[async_test]
2084    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2085        // Given a client and a server
2086        let client = no_retry_test_client(None).await;
2087        let server = MockServer::start().await;
2088
2089        // When we subscribe to its verification state
2090        let mut verification_state = client.encryption().verification_state();
2091
2092        // We can get its initial value, and it's Unknown
2093        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2094
2095        // We set up a mocked request to check this endpoint is not called before
2096        // reading the new state
2097        let keys_requested = Arc::new(AtomicBool::new(false));
2098        let inner_bool = keys_requested.clone();
2099
2100        Mock::given(method("GET"))
2101            .and(path_regex(
2102                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2103            ))
2104            .respond_with(move |_req: &Request| {
2105                inner_bool.fetch_or(true, Ordering::SeqCst);
2106                ResponseTemplate::new(200).set_body_json(json!({}))
2107            })
2108            .mount(&server)
2109            .await;
2110
2111        // When the session is initialised and the encryption tasks spawn
2112        set_client_session(&client).await;
2113
2114        // Then we can get an updated value without waiting for any network requests
2115        assert!(keys_requested.load(Ordering::SeqCst).not());
2116        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2117    }
2118
2119    #[test]
2120    fn test_oauth_reset_info_from_uiaa_info() {
2121        let auth_info = json!({
2122            "session": "dummy",
2123            "flows": [
2124                {
2125                    "stages": [
2126                        "org.matrix.cross_signing_reset"
2127                    ]
2128                }
2129            ],
2130            "params": {
2131                "org.matrix.cross_signing_reset": {
2132                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2133                }
2134            },
2135            "msg": "To reset..."
2136        });
2137
2138        let auth_info = serde_json::from_value(auth_info)
2139            .expect("We should be able to deserialize the UiaaInfo");
2140        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2141            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2142    }
2143}