1#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22 collections::{BTreeMap, HashSet},
23 io::{Cursor, Read, Write},
24 iter,
25 path::{Path, PathBuf},
26 str::FromStr,
27 sync::Arc,
28 time::Duration,
29};
30
31use eyeball::{SharedObservable, Subscriber};
32use futures_core::Stream;
33use futures_util::{
34 future::try_join,
35 stream::{self, StreamExt},
36};
37#[cfg(feature = "experimental-send-custom-to-device")]
38use matrix_sdk_base::crypto::CollectStrategy;
39use matrix_sdk_base::{
40 StateStoreDataKey, StateStoreDataValue,
41 cross_process_lock::{AcquireCrossProcessLockFn, CrossProcessLock, CrossProcessLockError},
42 crypto::{
43 CrossSigningBootstrapRequests, OlmMachine,
44 store::{
45 LockableCryptoStore, SecretImportError,
46 types::{RoomKeyBundleInfo, RoomKeyInfo},
47 },
48 types::{
49 SecretsBundle, SignedKey,
50 requests::{
51 OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
52 },
53 },
54 },
55 sleep::sleep,
56};
57use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
58use ruma::{
59 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
60 api::{
61 client::{
62 keys::{
63 get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
64 upload_signing_keys::v3::Request as UploadSigningKeysRequest,
65 },
66 message::send_message_event,
67 to_device::send_event_to_device::v3::{
68 Request as RumaToDeviceRequest, Response as ToDeviceResponse,
69 },
70 uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
71 },
72 error::{ErrorBody, StandardErrorBody},
73 },
74 assign,
75 events::room::{
76 MediaSource, ThumbnailInfo,
77 member::{MembershipChange, OriginalSyncRoomMemberEvent},
78 },
79};
80#[cfg(feature = "experimental-send-custom-to-device")]
81use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
82use serde::{Deserialize, de::Error as _};
83use tasks::BundleReceiverTask;
84use tokio::sync::{Mutex, RwLockReadGuard};
85use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
86use tracing::{Span, debug, error, instrument, warn};
87use url::Url;
88use vodozemac::Curve25519PublicKey;
89
90use self::{
91 backups::{Backups, types::BackupClientState},
92 futures::UploadEncryptedFile,
93 identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
94 recovery::{Recovery, RecoveryState},
95 secret_storage::SecretStorage,
96 tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
97 verification::{SasVerification, Verification, VerificationRequest},
98};
99use crate::{
100 Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
101 attachment::Thumbnail,
102 client::{ClientInner, WeakClient},
103 cross_process_lock::CrossProcessLockGuard,
104 error::HttpResult,
105};
106
107pub mod backups;
108pub mod futures;
109pub mod identities;
110pub mod recovery;
111pub mod secret_storage;
112pub(crate) mod tasks;
113pub mod verification;
114
115pub use matrix_sdk_base::crypto::{
116 CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
117 MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SessionCreationError,
118 SignatureError, VERSION,
119 olm::{
120 SessionCreationError as MegolmSessionCreationError,
121 SessionExportError as OlmSessionExportError,
122 },
123 vodozemac,
124};
125use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
126
127#[cfg(feature = "experimental-send-custom-to-device")]
128use crate::config::RequestConfig;
129pub use crate::error::RoomKeyImportError;
130
131#[cfg(feature = "sqlite")]
134#[derive(Debug, thiserror::Error)]
135pub enum BundleExportError {
136 #[error(transparent)]
138 OpenStoreError(#[from] matrix_sdk_sqlite::OpenStoreError),
139 #[error(transparent)]
141 StoreError(#[from] CryptoStoreError),
142 #[error(transparent)]
145 SecretExport(#[from] matrix_sdk_base::crypto::store::SecretsBundleExportError),
146}
147
148#[derive(Debug, thiserror::Error)]
151pub enum BundleImportError {
152 #[error(transparent)]
154 SecretImport(#[from] SecretImportError),
155 #[error(transparent)]
157 DeviceKeys(#[from] Error),
158}
159
160#[cfg(feature = "sqlite")]
169pub async fn export_secrets_bundle_from_store(
170 database_path: impl AsRef<Path>,
171 passphrase: Option<&str>,
172) -> std::result::Result<Option<(OwnedUserId, SecretsBundle)>, BundleExportError> {
173 use matrix_sdk_base::crypto::store::CryptoStore;
174
175 let store = matrix_sdk_sqlite::SqliteCryptoStore::open(database_path, passphrase).await?;
176 let account =
177 store.load_account().await.map_err(|e| BundleExportError::StoreError(e.into()))?;
178
179 if let Some(account) = account {
180 let machine = OlmMachine::with_store(&account.user_id, &account.device_id, store, None)
181 .await
182 .map_err(BundleExportError::StoreError)?;
183
184 let bundle = machine.store().export_secrets_bundle().await?;
185
186 Ok(Some((account.user_id.to_owned(), bundle)))
187 } else {
188 Ok(None)
189 }
190}
191
192pub(crate) struct EncryptionData {
194 pub tasks: StdMutex<ClientTasks>,
197
198 pub encryption_settings: EncryptionSettings,
200
201 pub backup_state: BackupClientState,
203
204 pub recovery_state: SharedObservable<RecoveryState>,
206}
207
208impl EncryptionData {
209 pub fn new(encryption_settings: EncryptionSettings) -> Self {
210 Self {
211 encryption_settings,
212
213 tasks: StdMutex::new(Default::default()),
214 backup_state: Default::default(),
215 recovery_state: Default::default(),
216 }
217 }
218
219 pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
220 let weak_client = WeakClient::from_inner(client);
221
222 let mut tasks = self.tasks.lock();
223 tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
224
225 if self.encryption_settings.backup_download_strategy
226 == BackupDownloadStrategy::AfterDecryptionFailure
227 {
228 tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
229 }
230 }
231
232 pub fn initialize_recovery_state_update_task(&self, client: &Client) {
238 let mut guard = self.tasks.lock();
239
240 let future = Recovery::update_state_after_backup_state_change(client);
241 let join_handle = spawn(future);
242
243 guard.update_recovery_state_after_backup = Some(join_handle);
244 }
245}
246
247#[derive(Clone, Copy, Debug, Default)]
249pub struct EncryptionSettings {
250 pub auto_enable_cross_signing: bool,
256
257 pub backup_download_strategy: BackupDownloadStrategy,
262
263 pub auto_enable_backups: bool,
265}
266
267#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
269#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
270pub enum BackupDownloadStrategy {
271 OneShot,
282
283 AfterDecryptionFailure,
285
286 #[default]
291 Manual,
292}
293
294#[derive(Clone, Copy, Debug, Eq, PartialEq)]
299pub enum VerificationState {
300 Unknown,
302 Verified,
305 Unverified,
307}
308
309#[derive(Debug)]
320pub struct CrossSigningResetHandle {
321 client: Client,
322 upload_request: UploadSigningKeysRequest,
323 signatures_request: UploadSignaturesRequest,
324 auth_type: CrossSigningResetAuthType,
325 is_cancelled: Mutex<bool>,
326}
327
328impl CrossSigningResetHandle {
329 pub fn new(
331 client: Client,
332 upload_request: UploadSigningKeysRequest,
333 signatures_request: UploadSignaturesRequest,
334 auth_type: CrossSigningResetAuthType,
335 ) -> Self {
336 Self {
337 client,
338 upload_request,
339 signatures_request,
340 auth_type,
341 is_cancelled: Mutex::new(false),
342 }
343 }
344
345 pub fn auth_type(&self) -> &CrossSigningResetAuthType {
348 &self.auth_type
349 }
350
351 pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
355 const RETRY_EVERY: Duration = Duration::from_millis(500);
357
358 const TIMEOUT: Duration = Duration::from_mins(2);
360
361 tokio::time::timeout(TIMEOUT, async {
362 let mut upload_request = self.upload_request.clone();
363 upload_request.auth = auth;
364
365 debug!(
366 "Repeatedly PUTting to keys/device_signing/upload until it works \
367 or we hit a permanent failure."
368 );
369 while let Err(e) = self.client.send(upload_request.clone()).await {
370 if *self.is_cancelled.lock().await {
371 return Ok(());
372 }
373
374 match e.as_uiaa_response() {
375 Some(uiaa_info) => {
376 if !matches!(self.auth_type, CrossSigningResetAuthType::OAuth(_))
379 && uiaa_info.auth_error.is_some()
380 {
381 return Err(e.into());
382 }
383 }
384 None => return Err(e.into()),
385 }
386
387 debug!(
388 "PUT to keys/device_signing/upload failed with 401. Retrying after \
389 a short delay."
390 );
391 sleep(RETRY_EVERY).await;
392 }
393
394 self.client.send(self.signatures_request.clone()).await?;
395
396 Ok(())
397 })
398 .await
399 .unwrap_or_else(|_| {
400 warn!("Timed out waiting for keys/device_signing/upload to succeed.");
401 Err(Error::Timeout)
402 })
403 }
404
405 pub async fn cancel(&self) {
407 *self.is_cancelled.lock().await = true;
408 }
409}
410
411#[derive(Debug, Clone)]
414pub enum CrossSigningResetAuthType {
415 Uiaa(UiaaInfo),
417 OAuth(OAuthCrossSigningResetInfo),
420}
421
422impl CrossSigningResetAuthType {
423 fn new(error: &HttpError) -> Result<Option<Self>> {
424 if let Some(auth_info) = error.as_uiaa_response() {
425 if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
426 Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
427 } else {
428 Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
429 }
430 } else {
431 Ok(None)
432 }
433 }
434}
435
436#[derive(Debug, Clone, Deserialize)]
439pub struct OAuthCrossSigningResetInfo {
440 pub approval_url: Url,
442
443 pub session: Option<String>,
445}
446
447impl OAuthCrossSigningResetInfo {
448 fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
449 let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
450 return Ok(None);
451 };
452
453 Ok(Some(OAuthCrossSigningResetInfo {
454 approval_url: parameters.url.as_str().try_into()?,
455 session: auth_info.session.clone(),
456 }))
457 }
458}
459
460#[derive(Clone, Debug)]
463pub struct DuplicateOneTimeKeyErrorMessage {
464 pub old_key: Curve25519PublicKey,
466 pub new_key: Curve25519PublicKey,
468}
469
470impl FromStr for DuplicateOneTimeKeyErrorMessage {
471 type Err = serde_json::Error;
472
473 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
474 let mut split = s.split_terminator(';');
478
479 let old_key = split
480 .next()
481 .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
482 let new_key = split
483 .next()
484 .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
485
486 let old_key_index = old_key
489 .find("Old key:")
490 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
491
492 let old_key = old_key[old_key_index..]
493 .trim()
494 .strip_prefix("Old key:")
495 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
496
497 let new_key = new_key
500 .trim()
501 .strip_prefix("new key:")
502 .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
503
504 let new_key = new_key.replace("'", "\"");
507
508 let old_key: SignedKey = serde_json::from_str(old_key)?;
510 let new_key: SignedKey = serde_json::from_str(&new_key)?;
511
512 let old_key = old_key.key();
514 let new_key = new_key.key();
515
516 Ok(Self { old_key, new_key })
517 }
518}
519
520impl Client {
521 pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
522 self.base_client().olm_machine().await
523 }
524
525 pub(crate) async fn mark_request_as_sent(
526 &self,
527 request_id: &TransactionId,
528 response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
529 ) -> Result<(), matrix_sdk_base::Error> {
530 Ok(self
531 .olm_machine()
532 .await
533 .as_ref()
534 .expect(
535 "We should have an olm machine once we try to mark E2EE related requests as sent",
536 )
537 .mark_request_as_sent(request_id, response)
538 .await?)
539 }
540
541 #[instrument(skip(self, device_keys))]
547 pub(crate) async fn keys_query(
548 &self,
549 request_id: &TransactionId,
550 device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
551 ) -> Result<get_keys::v3::Response> {
552 let request = assign!(get_keys::v3::Request::new(), { device_keys });
553
554 let response = self.send(request).await?;
555 self.mark_request_as_sent(request_id, &response).await?;
556 self.encryption().update_state_after_keys_query(&response).await;
557
558 Ok(response)
559 }
560
561 pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
595 &'a self,
596 reader: &'a mut R,
597 ) -> UploadEncryptedFile<'a, R> {
598 UploadEncryptedFile::new(self, reader)
599 }
600
601 pub(crate) async fn upload_encrypted_media_and_thumbnail(
604 &self,
605 data: &[u8],
606 thumbnail: Option<Thumbnail>,
607 send_progress: SharedObservable<TransmissionProgress>,
608 ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
609 let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
610
611 let upload_attachment = async {
612 let mut cursor = Cursor::new(data);
613 self.upload_encrypted_file(&mut cursor)
614 .with_send_progress_observable(send_progress)
615 .await
616 };
617
618 let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
619
620 Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
621 }
622
623 async fn upload_encrypted_thumbnail(
626 &self,
627 thumbnail: Option<Thumbnail>,
628 send_progress: SharedObservable<TransmissionProgress>,
629 ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
630 let Some(thumbnail) = thumbnail else {
631 return Ok(None);
632 };
633
634 let (data, _, thumbnail_info) = thumbnail.into_parts();
635 let mut cursor = Cursor::new(data);
636
637 let file = self
638 .upload_encrypted_file(&mut cursor)
639 .with_send_progress_observable(send_progress)
640 .await?;
641
642 Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
643 }
644
645 pub(crate) async fn claim_one_time_keys(
651 &self,
652 users: impl Iterator<Item = &UserId>,
653 ) -> Result<()> {
654 let _lock = self.locks().key_claim_lock.lock().await;
655
656 if let Some((request_id, request)) = self
657 .olm_machine()
658 .await
659 .as_ref()
660 .ok_or(Error::NoOlmMachine)?
661 .get_missing_sessions(users)
662 .await?
663 {
664 let response = self.send(request).await?;
665 self.mark_request_as_sent(&request_id, &response).await?;
666 }
667
668 Ok(())
669 }
670
671 #[instrument(skip(self, request))]
681 pub(crate) async fn keys_upload(
682 &self,
683 request_id: &TransactionId,
684 request: &upload_keys::v3::Request,
685 ) -> Result<upload_keys::v3::Response> {
686 debug!(
687 device_keys = request.device_keys.is_some(),
688 one_time_key_count = request.one_time_keys.len(),
689 "Uploading public encryption keys",
690 );
691
692 let response = self.send(request.clone()).await?;
693 self.mark_request_as_sent(request_id, &response).await?;
694
695 Ok(response)
696 }
697
698 pub(crate) async fn room_send_helper(
699 &self,
700 request: &RoomMessageRequest,
701 ) -> Result<send_message_event::v3::Response> {
702 let content = request.content.clone();
703 let txn_id = request.txn_id.clone();
704 let room_id = &request.room_id;
705
706 self.get_room(room_id)
707 .expect("Can't send a message to a room that isn't known to the store")
708 .send(*content)
709 .with_transaction_id(txn_id)
710 .await
711 .map(|result| result.response)
712 }
713
714 pub(crate) async fn send_to_device(
715 &self,
716 request: &ToDeviceRequest,
717 ) -> HttpResult<ToDeviceResponse> {
718 let request = RumaToDeviceRequest::new_raw(
719 request.event_type.clone(),
720 request.txn_id.clone(),
721 request.messages.clone(),
722 );
723
724 self.send(request).await
725 }
726
727 pub(crate) async fn send_verification_request(
728 &self,
729 request: OutgoingVerificationRequest,
730 ) -> Result<()> {
731 use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
732
733 match request {
734 ToDevice(t) => {
735 self.send_to_device(&t).await?;
736 }
737 InRoom(r) => {
738 self.room_send_helper(&r).await?;
739 }
740 }
741
742 Ok(())
743 }
744
745 async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
746 use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
747
748 match r.request() {
749 AnyOutgoingRequest::KeysQuery(request) => {
750 self.keys_query(r.request_id(), request.device_keys.clone()).await?;
751 }
752 AnyOutgoingRequest::KeysUpload(request) => {
753 let response = self.keys_upload(r.request_id(), request).await;
754
755 if let Err(e) = &response {
756 match e.as_ruma_api_error() {
757 Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
758 if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
759 {
760 {
765 let already_reported = self
766 .state_store()
767 .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
768 .await?
769 .is_some();
770
771 if message.starts_with("One time key") && !already_reported {
772 let error_message =
773 DuplicateOneTimeKeyErrorMessage::from_str(message);
774
775 if let Ok(message) = &error_message {
776 error!(
777 sentry = true,
778 old_key = %message.old_key,
779 new_key = %message.new_key,
780 "Duplicate one-time keys have been uploaded"
781 );
782 } else {
783 error!(
784 sentry = true,
785 "Duplicate one-time keys have been uploaded"
786 );
787 }
788
789 self.state_store()
790 .set_kv_data(
791 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
792 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
793 )
794 .await?;
795
796 if let Err(e) = self
797 .inner
798 .duplicate_key_upload_error_sender
799 .send(error_message.ok())
800 {
801 error!(
802 "Failed to dispatch duplicate key upload error notification: {}",
803 e
804 );
805 }
806 }
807 }
808 }
809 }
810 _ => {}
811 }
812
813 response?;
814 }
815 }
816 AnyOutgoingRequest::ToDeviceRequest(request) => {
817 let response = self.send_to_device(request).await?;
818 self.mark_request_as_sent(r.request_id(), &response).await?;
819 }
820 AnyOutgoingRequest::SignatureUpload(request) => {
821 let response = self.send(request.clone()).await?;
822 self.mark_request_as_sent(r.request_id(), &response).await?;
823 }
824 AnyOutgoingRequest::RoomMessage(request) => {
825 let response = self.room_send_helper(request).await?;
826 self.mark_request_as_sent(r.request_id(), &response).await?;
827 }
828 AnyOutgoingRequest::KeysClaim(request) => {
829 let response = self.send(request.clone()).await?;
830 self.mark_request_as_sent(r.request_id(), &response).await?;
831 }
832 }
833
834 Ok(())
835 }
836
837 #[instrument(skip_all)]
838 pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
839 const MAX_CONCURRENT_REQUESTS: usize = 20;
840
841 if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
844 warn!("Error while claiming one-time keys {:?}", e);
845 }
846
847 let outgoing_requests = stream::iter(
848 self.olm_machine()
849 .await
850 .as_ref()
851 .ok_or(Error::NoOlmMachine)?
852 .outgoing_requests()
853 .await?,
854 )
855 .map(|r| self.send_outgoing_request(r));
856
857 let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
858
859 requests
860 .for_each(|r| async move {
861 match r {
862 Ok(_) => (),
863 Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
864 }
865 })
866 .await;
867
868 Ok(())
869 }
870}
871
872#[cfg(any(feature = "testing", test))]
873impl Client {
874 pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
876 self.olm_machine().await
877 }
878
879 pub fn abort_bundle_receiver_task(&self) {
881 let tasks = self.inner.e2ee.tasks.lock();
882 if let Some(task) = tasks.receive_historic_room_key_bundles.as_ref() {
883 task.abort()
884 }
885 }
886}
887
888#[derive(Debug, Clone)]
892pub struct Encryption {
893 client: Client,
895}
896
897impl Encryption {
898 pub(crate) fn new(client: Client) -> Self {
899 Self { client }
900 }
901
902 pub(crate) fn settings(&self) -> EncryptionSettings {
904 self.client.inner.e2ee.encryption_settings
905 }
906
907 pub async fn ed25519_key(&self) -> Option<String> {
910 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
911 }
912
913 pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
915 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
916 }
917
918 pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
920 match self.get_own_device().await {
921 Ok(Some(device)) => device.first_time_seen_ts(),
922 _ => MilliSecondsSinceUnixEpoch::now(),
924 }
925 }
926
927 pub async fn import_secrets_bundle(
940 &self,
941 bundle: &SecretsBundle,
942 ) -> Result<(), BundleImportError> {
943 self.import_secrets_bundle_impl(bundle).await?;
944
945 self.ensure_device_keys_upload().await?;
948 self.wait_for_e2ee_initialization_tasks().await;
949
950 if !self.backups().are_enabled().await {
955 self.backups().maybe_resume_backups().await?;
956 }
957
958 Ok(())
959 }
960
961 pub(crate) async fn import_secrets_bundle_impl(
962 &self,
963 bundle: &SecretsBundle,
964 ) -> Result<(), SecretImportError> {
965 let olm_machine = self.client.olm_machine().await;
966 let olm_machine =
967 olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
968
969 olm_machine.store().import_secrets_bundle(bundle).await
970 }
971
972 pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
977 let olm = self.client.olm_machine().await;
978 let machine = olm.as_ref()?;
979 Some(machine.cross_signing_status().await)
980 }
981
982 pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
988 let olm_machine = self.client.olm_machine().await;
989 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
990 let user_id = olm_machine.user_id();
991
992 self.ensure_initial_key_query().await?;
993
994 let devices = self.get_user_devices(user_id).await?;
995
996 let ret = devices.devices().any(|device| {
997 device.is_cross_signed_by_owner()
998 && device.curve25519_key().is_some()
999 && !device.is_dehydrated()
1000 });
1001
1002 Ok(ret)
1003 }
1004
1005 pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
1010 if let Some(machine) = self.client.olm_machine().await.as_ref() {
1011 machine.tracked_users().await
1012 } else {
1013 Ok(HashSet::new())
1014 }
1015 }
1016
1017 pub fn verification_state(&self) -> Subscriber<VerificationState> {
1040 self.client.inner.verification_state.subscribe_reset()
1041 }
1042
1043 pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
1045 let olm = self.client.olm_machine().await;
1046 let olm = olm.as_ref()?;
1047 #[allow(clippy::bind_instead_of_map)]
1048 olm.get_verification(user_id, flow_id).and_then(|v| match v {
1049 matrix_sdk_base::crypto::Verification::SasV1(sas) => {
1050 Some(SasVerification { inner: sas, client: self.client.clone() }.into())
1051 }
1052 #[cfg(feature = "qrcode")]
1053 matrix_sdk_base::crypto::Verification::QrV1(qr) => {
1054 Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
1055 }
1056 _ => None,
1057 })
1058 }
1059
1060 pub async fn get_verification_request(
1063 &self,
1064 user_id: &UserId,
1065 flow_id: impl AsRef<str>,
1066 ) -> Option<VerificationRequest> {
1067 let olm = self.client.olm_machine().await;
1068 let olm = olm.as_ref()?;
1069
1070 olm.get_verification_request(user_id, flow_id)
1071 .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
1072 }
1073
1074 pub async fn get_device(
1108 &self,
1109 user_id: &UserId,
1110 device_id: &DeviceId,
1111 ) -> Result<Option<Device>, CryptoStoreError> {
1112 let olm = self.client.olm_machine().await;
1113 let Some(machine) = olm.as_ref() else { return Ok(None) };
1114 let device = machine.get_device(user_id, device_id, None).await?;
1115 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1116 }
1117
1118 pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
1125 let olm = self.client.olm_machine().await;
1126 let Some(machine) = olm.as_ref() else { return Ok(None) };
1127 let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
1128 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1129 }
1130
1131 pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1157 let devices = self
1158 .client
1159 .olm_machine()
1160 .await
1161 .as_ref()
1162 .ok_or(Error::NoOlmMachine)?
1163 .get_user_devices(user_id, None)
1164 .await?;
1165
1166 Ok(UserDevices { inner: devices, client: self.client.clone() })
1167 }
1168
1169 pub async fn get_user_identity(
1205 &self,
1206 user_id: &UserId,
1207 ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1208 let olm = self.client.olm_machine().await;
1209 let Some(olm) = olm.as_ref() else { return Ok(None) };
1210 let identity = olm.get_identity(user_id, None).await?;
1211
1212 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1213 }
1214
1215 pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1253 let olm = self.client.olm_machine().await;
1254 let Some(olm) = olm.as_ref() else { return Ok(None) };
1255
1256 let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1257 self.client.keys_query(&request_id, request.device_keys).await?;
1258
1259 let identity = olm.get_identity(user_id, None).await?;
1260 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1261 }
1262
1263 pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1294 let olm = self.client.olm_machine().await;
1295 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1296 let client = self.client.to_owned();
1297
1298 Ok(olm
1299 .store()
1300 .devices_stream()
1301 .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1302 }
1303
1304 pub async fn user_identities_stream(
1332 &self,
1333 ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1334 let olm = self.client.olm_machine().await;
1335 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1336 let client = self.client.to_owned();
1337
1338 Ok(olm
1339 .store()
1340 .user_identities_stream()
1341 .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1342 }
1343
1344 pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1383 let olm = self.client.olm_machine().await;
1384 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1385
1386 let CrossSigningBootstrapRequests {
1387 upload_signing_keys_req,
1388 upload_keys_req,
1389 upload_signatures_req,
1390 } = olm.bootstrap_cross_signing(false).await?;
1391
1392 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1393 auth: auth_data,
1394 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1395 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1396 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1397 });
1398
1399 if let Some(req) = upload_keys_req {
1400 self.client.send_outgoing_request(req).await?;
1401 }
1402 self.client.send(upload_signing_keys_req).await?;
1403 self.client.send(upload_signatures_req).await?;
1404
1405 Ok(())
1406 }
1407
1408 pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1449 let olm = self.client.olm_machine().await;
1450 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1451
1452 let CrossSigningBootstrapRequests {
1453 upload_keys_req,
1454 upload_signing_keys_req,
1455 upload_signatures_req,
1456 } = olm.bootstrap_cross_signing(true).await?;
1457
1458 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1459 auth: None,
1460 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1461 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1462 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1463 });
1464
1465 if let Some(req) = upload_keys_req {
1466 self.client.send_outgoing_request(req).await?;
1467 }
1468
1469 if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1470 if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1471 let client = self.client.clone();
1472
1473 Ok(Some(CrossSigningResetHandle::new(
1474 client,
1475 upload_signing_keys_req,
1476 upload_signatures_req,
1477 auth_type,
1478 )))
1479 } else {
1480 Err(error.into())
1481 }
1482 } else {
1483 self.client.send(upload_signatures_req).await?;
1484
1485 Ok(None)
1486 }
1487 }
1488
1489 async fn ensure_initial_key_query(&self) -> Result<()> {
1492 let olm_machine = self.client.olm_machine().await;
1493 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1494
1495 let user_id = olm_machine.user_id();
1496
1497 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1498 let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1499 self.client.keys_query(&request_id, request.device_keys).await?;
1500 }
1501
1502 Ok(())
1503 }
1504
1505 pub async fn bootstrap_cross_signing_if_needed(
1552 &self,
1553 auth_data: Option<AuthData>,
1554 ) -> Result<()> {
1555 let olm_machine = self.client.olm_machine().await;
1556 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1557 let user_id = olm_machine.user_id();
1558
1559 self.ensure_initial_key_query().await?;
1560
1561 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1562 self.bootstrap_cross_signing(auth_data).await?;
1563 }
1564
1565 Ok(())
1566 }
1567
1568 #[cfg(not(target_family = "wasm"))]
1620 pub async fn export_room_keys(
1621 &self,
1622 path: PathBuf,
1623 passphrase: &str,
1624 predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1625 ) -> Result<()> {
1626 let olm = self.client.olm_machine().await;
1627 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1628
1629 let keys = olm.store().export_room_keys(predicate).await?;
1630 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1631
1632 let encrypt = move || -> Result<()> {
1633 let export: String =
1634 matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1635 let mut file = std::fs::File::create(path)?;
1636 file.write_all(&export.into_bytes())?;
1637 Ok(())
1638 };
1639
1640 let task = tokio::task::spawn_blocking(encrypt);
1641 task.await.expect("Task join error")
1642 }
1643
1644 #[cfg(not(target_family = "wasm"))]
1682 pub async fn import_room_keys(
1683 &self,
1684 path: PathBuf,
1685 passphrase: &str,
1686 ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1687 let olm = self.client.olm_machine().await;
1688 let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1689 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1690
1691 let decrypt = move || {
1692 let file = std::fs::File::open(path)?;
1693 matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1694 };
1695
1696 let task = tokio::task::spawn_blocking(decrypt);
1697 let import = task.await.expect("Task join error")?;
1698
1699 let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1700
1701 self.backups().maybe_trigger_backup();
1702
1703 Ok(ret)
1704 }
1705
1706 pub async fn room_keys_received_stream(
1737 &self,
1738 ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1739 {
1740 let olm = self.client.olm_machine().await;
1741 let olm = olm.as_ref()?;
1742
1743 Some(olm.store().room_keys_received_stream())
1744 }
1745
1746 pub async fn historic_room_key_stream(
1777 &self,
1778 ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1779 let olm = self.client.olm_machine().await;
1780 let olm = olm.as_ref()?;
1781
1782 Some(olm.store().historic_room_key_stream())
1783 }
1784
1785 pub fn secret_storage(&self) -> SecretStorage {
1787 SecretStorage { client: self.client.to_owned() }
1788 }
1789
1790 pub fn backups(&self) -> Backups {
1792 Backups { client: self.client.to_owned() }
1793 }
1794
1795 pub fn recovery(&self) -> Recovery {
1797 Recovery { client: self.client.to_owned() }
1798 }
1799
1800 pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1812 if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1814 let prev_holder = prev_lock.lock_holder();
1815 if prev_holder.is_some() && prev_holder.unwrap() == lock_value {
1816 return Ok(());
1817 }
1818 warn!(
1819 "Recreating cross-process store lock with a different holder value: \
1820 prev was {prev_holder:?}, new is {lock_value}"
1821 );
1822 }
1823
1824 let olm_machine = self.client.base_client().olm_machine().await;
1825 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1826
1827 let lock = olm_machine.store().create_store_lock(
1828 "cross_process_lock".to_owned(),
1829 CrossProcessLockConfig::multi_process(lock_value.to_owned()),
1830 );
1831
1832 {
1837 let lock_result = lock.try_lock_once().await?;
1838
1839 if lock_result.is_ok() {
1840 olm_machine
1841 .initialize_crypto_store_generation(
1842 &self.client.locks().crypto_store_generation,
1843 )
1844 .await?;
1845 }
1846 }
1847
1848 self.client
1849 .locks()
1850 .cross_process_crypto_store_lock
1851 .set(lock)
1852 .map_err(|_| Error::BadCryptoStoreState)?;
1853
1854 Ok(())
1855 }
1856
1857 #[instrument(skip(self), fields(olm_machine_new_generation, olm_machine_generation))]
1862 async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1863 let olm_machine_guard = self.client.olm_machine().await;
1864 if let Some(olm_machine) = olm_machine_guard.as_ref() {
1865 let (new_gen, generation_number) = olm_machine
1866 .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1867 .await?;
1868
1869 Span::current()
1870 .record("olm_machine_new_generation", new_gen)
1871 .record("olm_machine_generation", generation_number);
1872 debug!("OlmMachine generation maintained in CryptoStore");
1873
1874 if new_gen {
1876 drop(olm_machine_guard);
1878 self.client.base_client().regenerate_olm(None).await?;
1880 }
1881 Ok(generation_number)
1882 } else {
1883 warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1888 Ok(0)
1889 }
1890 }
1891
1892 pub async fn spin_lock_store(
1900 &self,
1901 max_backoff: Option<u32>,
1902 ) -> Result<Option<CrossProcessLockGuard>, Error> {
1903 self.lock_store(async move |lock| lock.spin_lock(max_backoff).await).await
1904 }
1905
1906 pub async fn try_lock_store_once(&self) -> Result<Option<CrossProcessLockGuard>, Error> {
1914 match self.lock_store(CrossProcessLock::try_lock_once).await {
1915 Err(Error::CrossProcessLockError(e))
1916 if matches!(*e, CrossProcessLockError::Unobtained(_)) =>
1917 {
1918 Ok(None)
1919 }
1920 other => other,
1921 }
1922 }
1923
1924 pub async fn lock_store<F: AcquireCrossProcessLockFn<LockableCryptoStore>>(
1931 &self,
1932 acquire: F,
1933 ) -> Result<Option<CrossProcessLockGuard>, Error> {
1934 let wrap_err = |e: CryptoStoreError| {
1935 Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(Arc::new(e))))
1936 };
1937 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1938 let guard = acquire(lock).await.map_err(wrap_err)??;
1939 let _ = self.on_lock_newly_acquired().await?;
1940 Ok(Some(guard.into_guard()))
1941 } else {
1942 Ok(None)
1943 }
1944 }
1945
1946 #[cfg(any(test, feature = "testing"))]
1948 pub async fn uploaded_key_count(&self) -> Result<u64> {
1949 let olm_machine = self.client.olm_machine().await;
1950 let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1951 Ok(olm_machine.uploaded_key_count().await?)
1952 }
1953
1954 pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1978 let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1982 Some(BundleReceiverTask::new(&self.client).await)
1983 } else {
1984 None
1985 };
1986
1987 let mut tasks = self.client.inner.e2ee.tasks.lock();
1988
1989 let this = self.clone();
1990
1991 tasks.setup_e2ee = Some(spawn(async move {
1992 this.update_verification_state().await;
1995
1996 if this.settings().auto_enable_cross_signing
1997 && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1998 {
1999 error!("Couldn't bootstrap cross signing {e:?}");
2000 }
2001
2002 if let Err(e) = this.backups().setup_and_resume().await {
2003 error!("Couldn't setup and resume backups {e:?}");
2004 }
2005 if let Err(e) = this.recovery().setup().await {
2006 error!("Couldn't setup and resume recovery {e:?}");
2007 }
2008 }));
2009
2010 tasks.receive_historic_room_key_bundles = bundle_receiver_task;
2011
2012 self.setup_room_membership_session_discard_handler();
2013 }
2014
2015 pub async fn wait_for_e2ee_initialization_tasks(&self) {
2018 let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
2019
2020 if let Some(task) = task
2021 && let Err(err) = task.await
2022 {
2023 warn!("Error when initializing backups: {err}");
2024 }
2025 }
2026
2027 pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
2037 let olm = self.client.olm_machine().await;
2038 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
2039
2040 if let Some((request_id, request)) = olm.upload_device_keys().await? {
2041 self.client.keys_upload(&request_id, &request).await?;
2042
2043 let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
2044 self.client.keys_query(&request_id, request.device_keys).await?;
2045 }
2046
2047 Ok(())
2048 }
2049
2050 pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
2051 self.recovery().update_state_after_keys_query(response).await;
2052
2053 if let Some(user_id) = self.client.user_id() {
2055 let contains_own_device = response.device_keys.contains_key(user_id);
2056
2057 if contains_own_device {
2058 self.update_verification_state().await;
2059 }
2060 }
2061 }
2062
2063 async fn update_verification_state(&self) {
2064 match self.get_own_device().await {
2065 Ok(device) => {
2066 if let Some(device) = device {
2067 let is_verified = device.is_cross_signed_by_owner();
2068
2069 if is_verified {
2070 self.client.inner.verification_state.set(VerificationState::Verified);
2071 } else {
2072 self.client.inner.verification_state.set(VerificationState::Unverified);
2073 }
2074 } else {
2075 warn!("Couldn't find out own device in the store.");
2076 self.client.inner.verification_state.set(VerificationState::Unknown);
2077 }
2078 }
2079 Err(error) => {
2080 warn!("Failed retrieving own device: {error}");
2081 self.client.inner.verification_state.set(VerificationState::Unknown);
2082 }
2083 }
2084 }
2085
2086 fn setup_room_membership_session_discard_handler(&self) {
2106 let client = WeakClient::from_client(&self.client);
2107 self.client.add_event_handler(|ev: OriginalSyncRoomMemberEvent, room: Room| async move {
2108 let Some(client) = client.get() else {
2109 return;
2111 };
2112 let Some(user_id) = client.user_id() else {
2113 return;
2115 };
2116 let olm = client.olm_machine().await;
2117 let Some(olm) = olm.as_ref() else {
2118 warn!("Cannot discard session - Olm machine is not available");
2119 return;
2120 };
2121
2122 if matches!(
2123 ev.membership_change(),
2124 MembershipChange::Joined |
2125 MembershipChange::Invited |
2126 MembershipChange::KnockAccepted |
2127 MembershipChange::InvitationAccepted |
2128 MembershipChange::ProfileChanged { .. }
2129 ) || ev.sender == user_id {
2130 return;
2132 }
2133
2134 debug!(room_id = ?room.room_id(), member_id = ?ev.sender, "Discarding session as a user left the room");
2135
2136 if let Err(e) = olm.discard_room_key(room.room_id()).await {
2139 warn!(
2140 room_id = ?room.room_id(),
2141 "Error discarding room key after member leave: {e:?}"
2142 );
2143 }
2144 });
2145 }
2146
2147 #[cfg(feature = "experimental-send-custom-to-device")]
2156 pub async fn encrypt_and_send_raw_to_device(
2157 &self,
2158 recipient_devices: Vec<&Device>,
2159 event_type: &str,
2160 content: Raw<AnyToDeviceEventContent>,
2161 share_strategy: CollectStrategy,
2162 ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
2163 let users = recipient_devices.iter().map(|device| device.user_id());
2164
2165 self.client.claim_one_time_keys(users).await?;
2169
2170 let olm = self.client.olm_machine().await;
2171 let olm = olm.as_ref().expect("Olm machine wasn't started");
2172
2173 let (requests, withhelds) = olm
2174 .encrypt_content_for_devices(
2175 recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
2176 event_type,
2177 &content
2178 .deserialize_as::<serde_json::Value>()
2179 .expect("Deserialize as Value will always work"),
2180 share_strategy,
2181 )
2182 .await?;
2183
2184 let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
2185
2186 withhelds.iter().for_each(|(d, _)| {
2188 failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
2189 });
2190
2191 for request in requests {
2193 let ruma_request = RumaToDeviceRequest::new_raw(
2194 request.event_type.clone(),
2195 request.txn_id.clone(),
2196 request.messages.clone(),
2197 );
2198
2199 let send_result = self
2200 .client
2201 .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
2202 .await;
2203
2204 if send_result.is_err() {
2206 for (user_id, device_map) in request.messages {
2208 for device_id in device_map.keys() {
2209 match device_id {
2210 DeviceIdOrAllDevices::DeviceId(device_id) => {
2211 failures.push((user_id.clone(), device_id.to_owned()));
2212 }
2213 DeviceIdOrAllDevices::AllDevices => {
2214 }
2216 }
2217 }
2218 }
2219 }
2220 }
2221
2222 Ok(failures)
2223 }
2224}
2225
2226#[cfg(all(test, not(target_family = "wasm")))]
2227mod tests {
2228 use std::{
2229 ops::Not,
2230 str::FromStr,
2231 sync::{
2232 Arc,
2233 atomic::{AtomicBool, Ordering},
2234 },
2235 time::Duration,
2236 };
2237
2238 use matrix_sdk_test::{
2239 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
2240 event_factory::EventFactory,
2241 };
2242 use ruma::{
2243 event_id,
2244 events::{reaction::ReactionEventContent, relation::Annotation},
2245 user_id,
2246 };
2247 use serde_json::json;
2248 use wiremock::{
2249 Mock, MockServer, Request, ResponseTemplate,
2250 matchers::{header, method, path_regex},
2251 };
2252
2253 use crate::{
2254 Client, assert_next_matches_with_timeout,
2255 config::RequestConfig,
2256 encryption::{
2257 DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2258 },
2259 test_utils::{
2260 client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2261 },
2262 };
2263
2264 #[async_test]
2265 async fn test_reaction_sending() {
2266 let server = MockServer::start().await;
2267 let client = logged_in_client(Some(server.uri())).await;
2268
2269 let event_id = event_id!("$2:example.org");
2270
2271 Mock::given(method("GET"))
2272 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2273 .and(header("authorization", "Bearer 1234"))
2274 .respond_with(
2275 ResponseTemplate::new(200)
2276 .set_body_json(EventFactory::new().room_encryption().into_content()),
2277 )
2278 .mount(&server)
2279 .await;
2280
2281 Mock::given(method("PUT"))
2282 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2283 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2284 "event_id": event_id,
2285 })))
2286 .mount(&server)
2287 .await;
2288
2289 let f = EventFactory::new().sender(user_id!("@example:localhost"));
2290 let response = SyncResponseBuilder::default()
2291 .add_joined_room(
2292 JoinedRoomBuilder::default()
2293 .add_state_event(
2294 f.member(user_id!("@example:localhost")).display_name("example"),
2295 )
2296 .add_state_event(f.default_power_levels())
2297 .add_state_event(f.room_encryption()),
2298 )
2299 .build_sync_response();
2300
2301 client.base_client().receive_sync_response(response).await.unwrap();
2302
2303 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2304 assert!(
2305 room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2306 );
2307
2308 let event_id = event_id!("$1:example.org");
2309 let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2310 room.send(reaction).await.expect("Sending the reaction should not fail");
2311
2312 room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2313 }
2314
2315 #[cfg(feature = "sqlite")]
2316 #[async_test]
2317 async fn test_generation_counter_invalidates_olm_machine() {
2318 use matrix_sdk_base::store::RoomLoadSettings;
2321 let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2322 let session = mock_matrix_session();
2323
2324 let client1 = Client::builder()
2325 .homeserver_url("http://localhost:1234")
2326 .request_config(RequestConfig::new().disable_retry())
2327 .sqlite_store(&sqlite_path, None)
2328 .build()
2329 .await
2330 .unwrap();
2331 client1
2332 .matrix_auth()
2333 .restore_session(session.clone(), RoomLoadSettings::default())
2334 .await
2335 .unwrap();
2336
2337 let client2 = Client::builder()
2338 .homeserver_url("http://localhost:1234")
2339 .request_config(RequestConfig::new().disable_retry())
2340 .sqlite_store(sqlite_path, None)
2341 .build()
2342 .await
2343 .unwrap();
2344 client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2345
2346 let guard = client1.encryption().try_lock_store_once().await.unwrap();
2348 assert!(guard.is_none());
2349
2350 client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2351 client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2352
2353 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2355 assert!(acquired1.is_some());
2356
2357 let initial_olm_machine =
2359 client1.olm_machine().await.clone().expect("must have an olm machine");
2360
2361 let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new();
2363 let backup_key = decryption_key.megolm_v1_public_key();
2364 backup_key.set_version("1".to_owned());
2365 initial_olm_machine
2366 .backup_machine()
2367 .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2368 .await
2369 .expect("Should save");
2370
2371 initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2372
2373 assert!(client1.encryption().backups().are_enabled().await);
2374
2375 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2377 assert!(acquired2.is_none());
2378
2379 drop(acquired1);
2381 tokio::time::sleep(Duration::from_millis(100)).await;
2382
2383 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2385 assert!(acquired1.is_some());
2386
2387 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2389 assert!(initial_olm_machine.same_as(&olm_machine));
2390
2391 drop(acquired1);
2393 tokio::time::sleep(Duration::from_millis(100)).await;
2394
2395 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2397 assert!(acquired2.is_some());
2398
2399 drop(acquired2);
2401 tokio::time::sleep(Duration::from_millis(100)).await;
2402
2403 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2405 assert!(acquired1.is_some());
2406
2407 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2409
2410 assert!(!initial_olm_machine.same_as(&olm_machine));
2411
2412 let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2413 assert!(backup_key_new.decryption_key.is_some());
2414 assert_eq!(
2415 backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2416 backup_key.to_base64()
2417 );
2418 assert!(client1.encryption().backups().are_enabled().await);
2419 }
2420
2421 #[cfg(feature = "sqlite")]
2422 #[async_test]
2423 async fn test_generation_counter_no_spurious_invalidation() {
2424 use matrix_sdk_base::store::RoomLoadSettings;
2427 let sqlite_path =
2428 std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2429 let session = mock_matrix_session();
2430
2431 let client = Client::builder()
2432 .homeserver_url("http://localhost:1234")
2433 .request_config(RequestConfig::new().disable_retry())
2434 .sqlite_store(&sqlite_path, None)
2435 .build()
2436 .await
2437 .unwrap();
2438 client
2439 .matrix_auth()
2440 .restore_session(session.clone(), RoomLoadSettings::default())
2441 .await
2442 .unwrap();
2443
2444 let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2445
2446 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2447
2448 let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2450 assert!(initial_olm_machine.same_as(&after_enabling_lock));
2451
2452 {
2453 let client2 = Client::builder()
2455 .homeserver_url("http://localhost:1234")
2456 .request_config(RequestConfig::new().disable_retry())
2457 .sqlite_store(sqlite_path, None)
2458 .build()
2459 .await
2460 .unwrap();
2461 client2
2462 .matrix_auth()
2463 .restore_session(session, RoomLoadSettings::default())
2464 .await
2465 .unwrap();
2466
2467 client2
2468 .encryption()
2469 .enable_cross_process_store_lock("client2".to_owned())
2470 .await
2471 .unwrap();
2472
2473 let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2474 assert!(guard.is_some());
2475
2476 drop(guard);
2477 tokio::time::sleep(Duration::from_millis(100)).await;
2478 }
2479
2480 {
2481 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2482 assert!(acquired.is_some());
2483 }
2484
2485 let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2487 assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2488
2489 {
2490 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2491 assert!(acquired.is_some());
2492 }
2493
2494 let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2496 assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2497 }
2498
2499 #[async_test]
2500 async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2501 let client = no_retry_test_client(None).await;
2503 let server = MockServer::start().await;
2504
2505 let mut verification_state = client.encryption().verification_state();
2507
2508 assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2510
2511 let keys_requested = Arc::new(AtomicBool::new(false));
2514 let inner_bool = keys_requested.clone();
2515
2516 Mock::given(method("GET"))
2517 .and(path_regex(
2518 r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2519 ))
2520 .respond_with(move |_req: &Request| {
2521 inner_bool.fetch_or(true, Ordering::SeqCst);
2522 ResponseTemplate::new(200).set_body_json(json!({}))
2523 })
2524 .mount(&server)
2525 .await;
2526
2527 set_client_session(&client).await;
2529
2530 assert!(keys_requested.load(Ordering::SeqCst).not());
2532 assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2533 }
2534
2535 #[test]
2536 fn test_oauth_reset_info_from_uiaa_info() {
2537 let auth_info = json!({
2538 "session": "dummy",
2539 "flows": [
2540 {
2541 "stages": [
2542 "org.matrix.cross_signing_reset"
2543 ]
2544 }
2545 ],
2546 "params": {
2547 "org.matrix.cross_signing_reset": {
2548 "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2549 }
2550 },
2551 "msg": "To reset..."
2552 });
2553
2554 let auth_info = serde_json::from_value(auth_info)
2555 .expect("We should be able to deserialize the UiaaInfo");
2556 OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2557 .expect("We should be able to fetch the cross-signing reset info from the auth info");
2558 }
2559
2560 #[test]
2561 fn test_duplicate_one_time_key_error_parsing() {
2562 let message = concat!(
2563 r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2564 r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2565 r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2566 r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2567 r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2568 r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2569 r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2570 );
2571 let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2572 .expect("We should be able to parse the error message");
2573
2574 assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2575 assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2576
2577 DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2578 .expect_err("We shouldn't be able to parse an incomplete error message");
2579 }
2580
2581 fn devices_to_verify_against_keys_query_response(
2585 devices: Vec<serde_json::Value>,
2586 ) -> serde_json::Value {
2587 let device_keys: serde_json::Map<String, serde_json::Value> = devices
2588 .into_iter()
2589 .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2590 .collect();
2591 json!({
2592 "device_keys": {
2593 "@example:localhost": device_keys,
2594 },
2595 "master_keys": {
2596 "@example:localhost": {
2597 "keys": {
2598 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2599 },
2600 "usage": ["master"],
2601 "user_id": "@example:localhost",
2602 },
2603 },
2604 "self_signing_keys": {
2605 "@example:localhost": {
2606 "keys": {
2607 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2608 },
2609 "usage": ["self_signing"],
2610 "user_id": "@example:localhost",
2611 "signatures": {
2612 "@example:localhost": {
2613 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2614 },
2615 },
2616 },
2617 },
2618 "user_signing_keys": {
2619 "@example:localhost": {
2620 "keys": {
2621 "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2622 },
2623 "usage": ["user_signing"],
2624 "user_id": "@example:localhost",
2625 "signatures": {
2626 "@example:localhost": {
2627 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2628 },
2629 },
2630 },
2631 }
2632 })
2633 }
2634
2635 #[async_test]
2638 async fn test_devices_to_verify_against_no_devices() {
2641 let server = MockServer::start().await;
2642 let client = logged_in_client(Some(server.uri())).await;
2643
2644 Mock::given(method("POST"))
2645 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2646 .respond_with(
2647 ResponseTemplate::new(200)
2648 .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2649 )
2650 .mount(&server)
2651 .await;
2652
2653 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2654 }
2655
2656 #[async_test]
2657 async fn test_devices_to_verify_against_cross_signed() {
2660 let server = MockServer::start().await;
2661 let client = logged_in_client(Some(server.uri())).await;
2662
2663 Mock::given(method("POST"))
2664 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2665 .respond_with(ResponseTemplate::new(200).set_body_json(
2666 devices_to_verify_against_keys_query_response(vec![
2667 json!({
2668 "algorithms": [
2669 "m.olm.v1.curve25519-aes-sha2",
2670 "m.megolm.v1.aes-sha2",
2671 ],
2672 "user_id": "@example:localhost",
2673 "device_id": "SIGNEDDEVICE",
2674 "keys": {
2675 "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2676 "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2677 },
2678 "signatures": {
2679 "@example:localhost": {
2680 "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2681 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2682 },
2683 },
2684 })
2685 ])
2686 ))
2687 .mount(&server)
2688 .await;
2689
2690 assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2691 }
2692
2693 #[async_test]
2694 async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2697 let server = MockServer::start().await;
2698 let client = logged_in_client(Some(server.uri())).await;
2699 let user_id = client.user_id().unwrap();
2700 let olm_machine = client.olm_machine().await;
2701 let olm_machine = olm_machine.as_ref().unwrap();
2702
2703 Mock::given(method("POST"))
2704 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2705 .respond_with(ResponseTemplate::new(200).set_body_json(
2706 devices_to_verify_against_keys_query_response(vec![
2707 json!({
2708 "algorithms": [
2709 "m.olm.v1.curve25519-aes-sha2",
2710 "m.megolm.v1.aes-sha2",
2711 ],
2712 "user_id": "@example:localhost",
2713 "device_id": "DEHYDRATEDDEVICE",
2714 "keys": {
2715 "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2716 "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2717 },
2718 "dehydrated": true,
2719 "signatures": {
2720 "@example:localhost": {
2721 "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2722 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2723 },
2724 },
2725 }),
2726 json!({
2727 "algorithms": [
2728 "m.olm.v1.curve25519-aes-sha2",
2729 "m.megolm.v1.aes-sha2",
2730 ],
2731 "user_id": "@example:localhost",
2732 "device_id": "UNSIGNEDDEVICE",
2733 "keys": {
2734 "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2735 "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2736 },
2737 "signatures": {
2738 "@example:localhost": {
2739 "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2740 },
2741 },
2742 }),
2743 ])
2744 ))
2745 .mount(&server)
2746 .await;
2747
2748 let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2749 client.keys_query(&request_id, request.device_keys).await.unwrap();
2750
2751 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2752 }
2753}