1use std::{
16 collections::{BTreeMap, HashMap, HashSet},
17 sync::Arc,
18 time::Duration,
19};
20
21use itertools::Itertools;
22use matrix_sdk_common::{
23 deserialized_responses::{
24 AlgorithmInfo, DecryptedRoomEvent, DeviceLinkProblem, EncryptionInfo, UnableToDecryptInfo,
25 UnableToDecryptReason, UnsignedDecryptionResult, UnsignedEventLocation, VerificationLevel,
26 VerificationState,
27 },
28 locks::RwLock as StdRwLock,
29 BoxFuture,
30};
31use ruma::{
32 api::client::{
33 dehydrated_device::DehydratedDeviceData,
34 keys::{
35 claim_keys::v3::Request as KeysClaimRequest,
36 get_keys::v3::Response as KeysQueryResponse,
37 upload_keys::v3::{Request as UploadKeysRequest, Response as UploadKeysResponse},
38 upload_signatures::v3::Request as UploadSignaturesRequest,
39 },
40 sync::sync_events::DeviceLists,
41 },
42 assign,
43 events::{
44 secret::request::SecretName, AnyMessageLikeEvent, AnyMessageLikeEventContent,
45 AnyToDeviceEvent, MessageLikeEventContent,
46 },
47 serde::{JsonObject, Raw},
48 DeviceId, MilliSecondsSinceUnixEpoch, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedDeviceKeyId,
49 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
50};
51use serde_json::{value::to_raw_value, Value};
52use tokio::sync::Mutex;
53use tracing::{
54 debug, error,
55 field::{debug, display},
56 info, instrument, warn, Span,
57};
58use vodozemac::{
59 megolm::{DecryptionError, SessionOrdering},
60 Curve25519PublicKey, Ed25519Signature,
61};
62
63use crate::{
64 backups::{BackupMachine, MegolmV1BackupKey},
65 dehydrated_devices::{DehydratedDevices, DehydrationError},
66 error::{EventError, MegolmError, MegolmResult, OlmError, OlmResult, SetRoomSettingsError},
67 gossiping::GossipMachine,
68 identities::{user::UserIdentity, Device, IdentityManager, UserDevices},
69 olm::{
70 Account, CrossSigningStatus, EncryptionSettings, IdentityKeys, InboundGroupSession,
71 KnownSenderData, OlmDecryptionInfo, PrivateCrossSigningIdentity, SenderData,
72 SenderDataFinder, SessionType, StaticAccountData,
73 },
74 session_manager::{GroupSessionManager, SessionManager},
75 store::{
76 Changes, CryptoStoreWrapper, DeviceChanges, IdentityChanges, IntoCryptoStore, MemoryStore,
77 PendingChanges, Result as StoreResult, RoomKeyInfo, RoomSettings, SecretImportError, Store,
78 StoreCache, StoreTransaction,
79 },
80 types::{
81 events::{
82 olm_v1::{AnyDecryptedOlmEvent, DecryptedRoomKeyEvent},
83 room::encrypted::{
84 EncryptedEvent, EncryptedToDeviceEvent, RoomEncryptedEventContent,
85 RoomEventEncryptionScheme, SupportedEventEncryptionSchemes,
86 },
87 room_key::{MegolmV1AesSha2Content, RoomKeyContent},
88 room_key_withheld::{
89 MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent, RoomKeyWithheldEvent,
90 },
91 ToDeviceEvents,
92 },
93 requests::{
94 AnyIncomingResponse, KeysQueryRequest, OutgoingRequest, ToDeviceRequest,
95 UploadSigningKeysRequest,
96 },
97 EventEncryptionAlgorithm, Signatures,
98 },
99 utilities::timestamp_to_iso8601,
100 verification::{Verification, VerificationMachine, VerificationRequest},
101 CrossSigningKeyExport, CryptoStoreError, DecryptionSettings, DeviceData, LocalTrust,
102 RoomEventDecryptionResult, SignatureError, TrustRequirement,
103};
104
105#[derive(Clone)]
108pub struct OlmMachine {
109 pub(crate) inner: Arc<OlmMachineInner>,
110}
111
112pub struct OlmMachineInner {
113 user_id: OwnedUserId,
115 device_id: OwnedDeviceId,
117 user_identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
122 store: Store,
126 session_manager: SessionManager,
128 pub(crate) group_session_manager: GroupSessionManager,
130 verification_machine: VerificationMachine,
133 pub(crate) key_request_machine: GossipMachine,
136 identity_manager: IdentityManager,
139 backup_machine: BackupMachine,
141}
142
143#[cfg(not(tarpaulin_include))]
144impl std::fmt::Debug for OlmMachine {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct("OlmMachine")
147 .field("user_id", &self.user_id())
148 .field("device_id", &self.device_id())
149 .finish()
150 }
151}
152
153impl OlmMachine {
154 const CURRENT_GENERATION_STORE_KEY: &'static str = "generation-counter";
155 const HAS_MIGRATED_VERIFICATION_LATCH: &'static str = "HAS_MIGRATED_VERIFICATION_LATCH";
156
157 pub async fn new(user_id: &UserId, device_id: &DeviceId) -> Self {
168 OlmMachine::with_store(user_id, device_id, MemoryStore::new(), None)
169 .await
170 .expect("Reading and writing to the memory store always succeeds")
171 }
172
173 pub(crate) async fn rehydrate(
174 &self,
175 pickle_key: &[u8; 32],
176 device_id: &DeviceId,
177 device_data: Raw<DehydratedDeviceData>,
178 ) -> Result<OlmMachine, DehydrationError> {
179 let account = Account::rehydrate(pickle_key, self.user_id(), device_id, device_data)?;
180 let static_account = account.static_data().clone();
181
182 let store =
183 Arc::new(CryptoStoreWrapper::new(self.user_id(), device_id, MemoryStore::new()));
184 let device = DeviceData::from_account(&account);
185 store.save_pending_changes(PendingChanges { account: Some(account) }).await?;
186 store
187 .save_changes(Changes {
188 devices: DeviceChanges { new: vec![device], ..Default::default() },
189 ..Default::default()
190 })
191 .await?;
192
193 let (verification_machine, store, identity_manager) =
194 Self::new_helper_prelude(store, static_account, self.store().private_identity());
195
196 Ok(Self::new_helper(
197 device_id,
198 store,
199 verification_machine,
200 identity_manager,
201 self.store().private_identity(),
202 None,
203 ))
204 }
205
206 fn new_helper_prelude(
207 store_wrapper: Arc<CryptoStoreWrapper>,
208 account: StaticAccountData,
209 user_identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
210 ) -> (VerificationMachine, Store, IdentityManager) {
211 let verification_machine =
212 VerificationMachine::new(account.clone(), user_identity.clone(), store_wrapper.clone());
213 let store = Store::new(account, user_identity, store_wrapper, verification_machine.clone());
214
215 let identity_manager = IdentityManager::new(store.clone());
216
217 (verification_machine, store, identity_manager)
218 }
219
220 fn new_helper(
221 device_id: &DeviceId,
222 store: Store,
223 verification_machine: VerificationMachine,
224 identity_manager: IdentityManager,
225 user_identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
226 maybe_backup_key: Option<MegolmV1BackupKey>,
227 ) -> Self {
228 let group_session_manager = GroupSessionManager::new(store.clone());
229
230 let users_for_key_claim = Arc::new(StdRwLock::new(BTreeMap::new()));
231 let key_request_machine = GossipMachine::new(
232 store.clone(),
233 identity_manager.clone(),
234 group_session_manager.session_cache(),
235 users_for_key_claim.clone(),
236 );
237
238 let session_manager =
239 SessionManager::new(users_for_key_claim, key_request_machine.clone(), store.clone());
240
241 let backup_machine = BackupMachine::new(store.clone(), maybe_backup_key);
242
243 let inner = Arc::new(OlmMachineInner {
244 user_id: store.user_id().to_owned(),
245 device_id: device_id.to_owned(),
246 user_identity,
247 store,
248 session_manager,
249 group_session_manager,
250 verification_machine,
251 key_request_machine,
252 identity_manager,
253 backup_machine,
254 });
255
256 Self { inner }
257 }
258
259 #[instrument(skip(store, custom_account), fields(ed25519_key, curve25519_key))]
287 pub async fn with_store(
288 user_id: &UserId,
289 device_id: &DeviceId,
290 store: impl IntoCryptoStore,
291 custom_account: Option<vodozemac::olm::Account>,
292 ) -> StoreResult<Self> {
293 let store = store.into_crypto_store();
294
295 let static_account = match store.load_account().await? {
296 Some(account) => {
297 if user_id != account.user_id()
298 || device_id != account.device_id()
299 || custom_account.is_some()
300 {
301 return Err(CryptoStoreError::MismatchedAccount {
302 expected: (account.user_id().to_owned(), account.device_id().to_owned()),
303 got: (user_id.to_owned(), device_id.to_owned()),
304 });
305 }
306
307 Span::current()
308 .record("ed25519_key", display(account.identity_keys().ed25519))
309 .record("curve25519_key", display(account.identity_keys().curve25519));
310 debug!("Restored an Olm account");
311
312 account.static_data().clone()
313 }
314
315 None => {
316 let account = if let Some(account) = custom_account {
317 Account::new_helper(account, user_id, device_id)
318 } else {
319 Account::with_device_id(user_id, device_id)
320 };
321
322 let static_account = account.static_data().clone();
323
324 Span::current()
325 .record("ed25519_key", display(account.identity_keys().ed25519))
326 .record("curve25519_key", display(account.identity_keys().curve25519));
327
328 let device = DeviceData::from_account(&account);
329
330 device.set_trust_state(LocalTrust::Verified);
334
335 let changes = Changes {
336 devices: DeviceChanges { new: vec![device], ..Default::default() },
337 ..Default::default()
338 };
339 store.save_changes(changes).await?;
340 store.save_pending_changes(PendingChanges { account: Some(account) }).await?;
341
342 debug!("Created a new Olm account");
343
344 static_account
345 }
346 };
347
348 let identity = match store.load_identity().await? {
349 Some(i) => {
350 let master_key = i
351 .master_public_key()
352 .await
353 .and_then(|m| m.get_first_key().map(|m| m.to_owned()));
354 debug!(?master_key, "Restored the cross signing identity");
355 i
356 }
357 None => {
358 debug!("Creating an empty cross signing identity stub");
359 PrivateCrossSigningIdentity::empty(user_id)
360 }
361 };
362
363 let saved_keys = store.load_backup_keys().await?;
368 let maybe_backup_key = saved_keys.decryption_key.and_then(|k| {
369 if let Some(version) = saved_keys.backup_version {
370 let megolm_v1_backup_key = k.megolm_v1_public_key();
371 megolm_v1_backup_key.set_version(version);
372 Some(megolm_v1_backup_key)
373 } else {
374 None
375 }
376 });
377
378 let identity = Arc::new(Mutex::new(identity));
379 let store = Arc::new(CryptoStoreWrapper::new(user_id, device_id, store));
380
381 let (verification_machine, store, identity_manager) =
382 Self::new_helper_prelude(store, static_account, identity.clone());
383
384 Self::migration_post_verified_latch_support(&store, &identity_manager).await?;
387
388 Ok(Self::new_helper(
389 device_id,
390 store,
391 verification_machine,
392 identity_manager,
393 identity,
394 maybe_backup_key,
395 ))
396 }
397
398 pub(crate) async fn migration_post_verified_latch_support(
406 store: &Store,
407 identity_manager: &IdentityManager,
408 ) -> Result<(), CryptoStoreError> {
409 let maybe_migrate_for_identity_verified_latch =
410 store.get_custom_value(Self::HAS_MIGRATED_VERIFICATION_LATCH).await?.is_none();
411
412 if maybe_migrate_for_identity_verified_latch {
413 identity_manager.mark_all_tracked_users_as_dirty(store.cache().await?).await?;
414
415 store.set_custom_value(Self::HAS_MIGRATED_VERIFICATION_LATCH, vec![0]).await?
416 }
417 Ok(())
418 }
419
420 pub fn store(&self) -> &Store {
422 &self.inner.store
423 }
424
425 pub fn user_id(&self) -> &UserId {
427 &self.inner.user_id
428 }
429
430 pub fn device_id(&self) -> &DeviceId {
432 &self.inner.device_id
433 }
434
435 pub fn device_creation_time(&self) -> MilliSecondsSinceUnixEpoch {
442 self.inner.store.static_account().creation_local_time()
443 }
444
445 pub fn identity_keys(&self) -> IdentityKeys {
447 let account = self.inner.store.static_account();
448 account.identity_keys()
449 }
450
451 pub async fn display_name(&self) -> StoreResult<Option<String>> {
453 self.store().device_display_name().await
454 }
455
456 pub async fn tracked_users(&self) -> StoreResult<HashSet<OwnedUserId>> {
461 let cache = self.store().cache().await?;
462 Ok(self.inner.identity_manager.key_query_manager.synced(&cache).await?.tracked_users())
463 }
464
465 #[cfg(feature = "automatic-room-key-forwarding")]
474 pub fn set_room_key_requests_enabled(&self, enable: bool) {
475 self.inner.key_request_machine.set_room_key_requests_enabled(enable)
476 }
477
478 pub fn are_room_key_requests_enabled(&self) -> bool {
483 self.inner.key_request_machine.are_room_key_requests_enabled()
484 }
485
486 #[cfg(feature = "automatic-room-key-forwarding")]
495 pub fn set_room_key_forwarding_enabled(&self, enable: bool) {
496 self.inner.key_request_machine.set_room_key_forwarding_enabled(enable)
497 }
498
499 pub fn is_room_key_forwarding_enabled(&self) -> bool {
503 self.inner.key_request_machine.is_room_key_forwarding_enabled()
504 }
505
506 pub async fn outgoing_requests(&self) -> StoreResult<Vec<OutgoingRequest>> {
514 let mut requests = Vec::new();
515
516 {
517 let store_cache = self.inner.store.cache().await?;
518 let account = store_cache.account().await?;
519 if let Some(r) = self.keys_for_upload(&account).await.map(|r| OutgoingRequest {
520 request_id: TransactionId::new(),
521 request: Arc::new(r.into()),
522 }) {
523 requests.push(r);
524 }
525 }
526
527 for request in self
528 .inner
529 .identity_manager
530 .users_for_key_query()
531 .await?
532 .into_iter()
533 .map(|(request_id, r)| OutgoingRequest { request_id, request: Arc::new(r.into()) })
534 {
535 requests.push(request);
536 }
537
538 requests.append(&mut self.inner.verification_machine.outgoing_messages());
539 requests.append(&mut self.inner.key_request_machine.outgoing_to_device_requests().await?);
540
541 Ok(requests)
542 }
543
544 pub fn query_keys_for_users<'a>(
565 &self,
566 users: impl IntoIterator<Item = &'a UserId>,
567 ) -> (OwnedTransactionId, KeysQueryRequest) {
568 self.inner.identity_manager.build_key_query_for_users(users)
569 }
570
571 pub async fn mark_request_as_sent<'a>(
581 &self,
582 request_id: &TransactionId,
583 response: impl Into<AnyIncomingResponse<'a>>,
584 ) -> OlmResult<()> {
585 match response.into() {
586 AnyIncomingResponse::KeysUpload(response) => {
587 Box::pin(self.receive_keys_upload_response(response)).await?;
588 }
589 AnyIncomingResponse::KeysQuery(response) => {
590 Box::pin(self.receive_keys_query_response(request_id, response)).await?;
591 }
592 AnyIncomingResponse::KeysClaim(response) => {
593 Box::pin(
594 self.inner.session_manager.receive_keys_claim_response(request_id, response),
595 )
596 .await?;
597 }
598 AnyIncomingResponse::ToDevice(_) => {
599 Box::pin(self.mark_to_device_request_as_sent(request_id)).await?;
600 }
601 AnyIncomingResponse::SigningKeysUpload(_) => {
602 Box::pin(self.receive_cross_signing_upload_response()).await?;
603 }
604 AnyIncomingResponse::SignatureUpload(_) => {
605 self.inner.verification_machine.mark_request_as_sent(request_id);
606 }
607 AnyIncomingResponse::RoomMessage(_) => {
608 self.inner.verification_machine.mark_request_as_sent(request_id);
609 }
610 AnyIncomingResponse::KeysBackup(_) => {
611 Box::pin(self.inner.backup_machine.mark_request_as_sent(request_id)).await?;
612 }
613 };
614
615 Ok(())
616 }
617
618 async fn receive_cross_signing_upload_response(&self) -> StoreResult<()> {
620 let identity = self.inner.user_identity.lock().await;
621 identity.mark_as_shared();
622
623 let changes = Changes { private_identity: Some(identity.clone()), ..Default::default() };
624
625 self.store().save_changes(changes).await
626 }
627
628 pub async fn bootstrap_cross_signing(
647 &self,
648 reset: bool,
649 ) -> StoreResult<CrossSigningBootstrapRequests> {
650 let identity = self.inner.user_identity.lock().await.clone();
655
656 let (upload_signing_keys_req, upload_signatures_req) = if reset || identity.is_empty().await
657 {
658 info!("Creating new cross signing identity");
659
660 let (identity, upload_signing_keys_req, upload_signatures_req) = {
661 let cache = self.inner.store.cache().await?;
662 let account = cache.account().await?;
663 account.bootstrap_cross_signing().await
664 };
665
666 let public = identity.to_public_identity().await.expect(
667 "Couldn't create a public version of the identity from a new private identity",
668 );
669
670 *self.inner.user_identity.lock().await = identity.clone();
671
672 self.store()
673 .save_changes(Changes {
674 identities: IdentityChanges { new: vec![public.into()], ..Default::default() },
675 private_identity: Some(identity),
676 ..Default::default()
677 })
678 .await?;
679
680 (upload_signing_keys_req, upload_signatures_req)
681 } else {
682 info!("Trying to upload the existing cross signing identity");
683 let upload_signing_keys_req = identity.as_upload_request().await;
684
685 let upload_signatures_req = identity
687 .sign_account(self.inner.store.static_account())
688 .await
689 .expect("Can't sign device keys");
690
691 (upload_signing_keys_req, upload_signatures_req)
692 };
693
694 let upload_keys_req =
698 self.upload_device_keys().await?.map(|(_, request)| OutgoingRequest::from(request));
699
700 Ok(CrossSigningBootstrapRequests {
701 upload_signing_keys_req,
702 upload_keys_req,
703 upload_signatures_req,
704 })
705 }
706
707 pub async fn upload_device_keys(
719 &self,
720 ) -> StoreResult<Option<(OwnedTransactionId, UploadKeysRequest)>> {
721 let cache = self.store().cache().await?;
722 let account = cache.account().await?;
723
724 Ok(self.keys_for_upload(&account).await.map(|request| (TransactionId::new(), request)))
725 }
726
727 async fn receive_keys_upload_response(&self, response: &UploadKeysResponse) -> OlmResult<()> {
734 self.inner
735 .store
736 .with_transaction(|mut tr| async {
737 let account = tr.account().await?;
738 account.receive_keys_upload_response(response)?;
739 Ok((tr, ()))
740 })
741 .await
742 }
743
744 #[instrument(skip_all)]
772 pub async fn get_missing_sessions(
773 &self,
774 users: impl Iterator<Item = &UserId>,
775 ) -> StoreResult<Option<(OwnedTransactionId, KeysClaimRequest)>> {
776 self.inner.session_manager.get_missing_sessions(users).await
777 }
778
779 async fn receive_keys_query_response(
788 &self,
789 request_id: &TransactionId,
790 response: &KeysQueryResponse,
791 ) -> OlmResult<(DeviceChanges, IdentityChanges)> {
792 self.inner.identity_manager.receive_keys_query_response(request_id, response).await
793 }
794
795 async fn keys_for_upload(&self, account: &Account) -> Option<UploadKeysRequest> {
804 let (mut device_keys, one_time_keys, fallback_keys) = account.keys_for_upload();
805
806 if let Some(device_keys) = &mut device_keys {
816 let private_identity = self.store().private_identity();
817 let guard = private_identity.lock().await;
818
819 if guard.status().await.is_complete() {
820 guard.sign_device_keys(device_keys).await.expect(
821 "We should be able to sign our device keys since we confirmed that we \
822 have a complete set of private cross-signing keys",
823 );
824 }
825 }
826
827 if device_keys.is_none() && one_time_keys.is_empty() && fallback_keys.is_empty() {
828 None
829 } else {
830 let device_keys = device_keys.map(|d| d.to_raw());
831
832 Some(assign!(UploadKeysRequest::new(), {
833 device_keys, one_time_keys, fallback_keys
834 }))
835 }
836 }
837
838 async fn decrypt_to_device_event(
847 &self,
848 transaction: &mut StoreTransaction,
849 event: &EncryptedToDeviceEvent,
850 changes: &mut Changes,
851 ) -> OlmResult<OlmDecryptionInfo> {
852 let mut decrypted =
853 transaction.account().await?.decrypt_to_device_event(&self.inner.store, event).await?;
854
855 if !self.to_device_event_is_from_dehydrated_device(&decrypted, &event.sender).await? {
858 self.handle_decrypted_to_device_event(transaction.cache(), &mut decrypted, changes)
861 .await?;
862 }
863
864 Ok(decrypted)
865 }
866
867 #[instrument(
868 skip_all,
869 fields(room_id = ? content.room_id, session_id)
873 )]
874 async fn handle_key(
875 &self,
876 sender_key: Curve25519PublicKey,
877 event: &DecryptedRoomKeyEvent,
878 content: &MegolmV1AesSha2Content,
879 ) -> OlmResult<Option<InboundGroupSession>> {
880 let session = InboundGroupSession::new(
881 sender_key,
882 event.keys.ed25519,
883 &content.room_id,
884 &content.session_key,
885 SenderData::unknown(),
886 event.content.algorithm(),
887 None,
888 );
889
890 match session {
891 Ok(mut session) => {
892 Span::current().record("session_id", session.session_id());
893
894 let sender_data =
895 SenderDataFinder::find_using_event(self.store(), sender_key, event, &session)
896 .await?;
897
898 session.sender_data = sender_data;
899
900 match self.store().compare_group_session(&session).await? {
901 SessionOrdering::Better => {
902 info!("Received a new megolm room key");
903
904 Ok(Some(session))
905 }
906 comparison_result => {
907 warn!(
908 ?comparison_result,
909 "Received a megolm room key that we already have a better version \
910 of, discarding"
911 );
912
913 Ok(None)
914 }
915 }
916 }
917 Err(e) => {
918 Span::current().record("session_id", &content.session_id);
919 warn!("Received a room key event which contained an invalid session key: {e}");
920
921 Ok(None)
922 }
923 }
924 }
925
926 #[instrument(skip_all, fields(algorithm = ?event.content.algorithm()))]
928 async fn add_room_key(
929 &self,
930 sender_key: Curve25519PublicKey,
931 event: &DecryptedRoomKeyEvent,
932 ) -> OlmResult<Option<InboundGroupSession>> {
933 match &event.content {
934 RoomKeyContent::MegolmV1AesSha2(content) => {
935 self.handle_key(sender_key, event, content).await
936 }
937 #[cfg(feature = "experimental-algorithms")]
938 RoomKeyContent::MegolmV2AesSha2(content) => {
939 self.handle_key(sender_key, event, content).await
940 }
941 RoomKeyContent::Unknown(_) => {
942 warn!("Received a room key with an unsupported algorithm");
943 Ok(None)
944 }
945 }
946 }
947
948 fn add_withheld_info(&self, changes: &mut Changes, event: &RoomKeyWithheldEvent) {
949 debug!(?event.content, "Processing `m.room_key.withheld` event");
950
951 if let RoomKeyWithheldContent::MegolmV1AesSha2(
952 MegolmV1AesSha2WithheldContent::BlackListed(c)
953 | MegolmV1AesSha2WithheldContent::Unverified(c),
954 ) = &event.content
955 {
956 changes
957 .withheld_session_info
958 .entry(c.room_id.to_owned())
959 .or_default()
960 .insert(c.session_id.to_owned(), event.to_owned());
961 }
962 }
963
964 #[cfg(test)]
965 pub(crate) async fn create_outbound_group_session_with_defaults_test_helper(
966 &self,
967 room_id: &RoomId,
968 ) -> OlmResult<()> {
969 let (_, session) = self
970 .inner
971 .group_session_manager
972 .create_outbound_group_session(
973 room_id,
974 EncryptionSettings::default(),
975 SenderData::unknown(),
976 )
977 .await?;
978
979 self.store().save_inbound_group_sessions(&[session]).await?;
980
981 Ok(())
982 }
983
984 #[cfg(test)]
985 #[allow(dead_code)]
986 pub(crate) async fn create_inbound_session_test_helper(
987 &self,
988 room_id: &RoomId,
989 ) -> OlmResult<InboundGroupSession> {
990 let (_, session) = self
991 .inner
992 .group_session_manager
993 .create_outbound_group_session(
994 room_id,
995 EncryptionSettings::default(),
996 SenderData::unknown(),
997 )
998 .await?;
999
1000 Ok(session)
1001 }
1002
1003 pub async fn encrypt_room_event(
1020 &self,
1021 room_id: &RoomId,
1022 content: impl MessageLikeEventContent,
1023 ) -> MegolmResult<Raw<RoomEncryptedEventContent>> {
1024 let event_type = content.event_type().to_string();
1025 let content = Raw::new(&content)?.cast();
1026 self.encrypt_room_event_raw(room_id, &event_type, &content).await
1027 }
1028
1029 pub async fn encrypt_room_event_raw(
1049 &self,
1050 room_id: &RoomId,
1051 event_type: &str,
1052 content: &Raw<AnyMessageLikeEventContent>,
1053 ) -> MegolmResult<Raw<RoomEncryptedEventContent>> {
1054 self.inner.group_session_manager.encrypt(room_id, event_type, content).await
1055 }
1056
1057 pub async fn discard_room_key(&self, room_id: &RoomId) -> StoreResult<bool> {
1068 self.inner.group_session_manager.invalidate_group_session(room_id).await
1069 }
1070
1071 pub async fn share_room_key(
1091 &self,
1092 room_id: &RoomId,
1093 users: impl Iterator<Item = &UserId>,
1094 encryption_settings: impl Into<EncryptionSettings>,
1095 ) -> OlmResult<Vec<Arc<ToDeviceRequest>>> {
1096 self.inner.group_session_manager.share_room_key(room_id, users, encryption_settings).await
1097 }
1098
1099 #[deprecated(note = "Use OlmMachine::receive_verification_event instead", since = "0.7.0")]
1107 pub async fn receive_unencrypted_verification_event(
1108 &self,
1109 event: &AnyMessageLikeEvent,
1110 ) -> StoreResult<()> {
1111 self.inner.verification_machine.receive_any_event(event).await
1112 }
1113
1114 pub async fn receive_verification_event(&self, event: &AnyMessageLikeEvent) -> StoreResult<()> {
1119 self.inner.verification_machine.receive_any_event(event).await
1120 }
1121
1122 #[instrument(
1128 skip_all,
1129 fields(
1130 sender_key = ?decrypted.result.sender_key,
1131 event_type = decrypted.result.event.event_type(),
1132 ),
1133 )]
1134 async fn handle_decrypted_to_device_event(
1135 &self,
1136 cache: &StoreCache,
1137 decrypted: &mut OlmDecryptionInfo,
1138 changes: &mut Changes,
1139 ) -> OlmResult<()> {
1140 debug!(
1141 sender_device_keys =
1142 ?decrypted.result.event.sender_device_keys().map(|k| (k.curve25519_key(), k.ed25519_key())).unwrap_or((None, None)),
1143 "Received a decrypted to-device event",
1144 );
1145
1146 match &*decrypted.result.event {
1147 AnyDecryptedOlmEvent::RoomKey(e) => {
1148 let session = self.add_room_key(decrypted.result.sender_key, e).await?;
1149 decrypted.inbound_group_session = session;
1150 }
1151 AnyDecryptedOlmEvent::ForwardedRoomKey(e) => {
1152 let session = self
1153 .inner
1154 .key_request_machine
1155 .receive_forwarded_room_key(decrypted.result.sender_key, e)
1156 .await?;
1157 decrypted.inbound_group_session = session;
1158 }
1159 AnyDecryptedOlmEvent::SecretSend(e) => {
1160 let name = self
1161 .inner
1162 .key_request_machine
1163 .receive_secret_event(cache, decrypted.result.sender_key, e, changes)
1164 .await?;
1165
1166 if let Ok(ToDeviceEvents::SecretSend(mut e)) =
1169 decrypted.result.raw_event.deserialize_as()
1170 {
1171 e.content.secret_name = name;
1172 decrypted.result.raw_event = Raw::from_json(to_raw_value(&e)?);
1173 }
1174 }
1175 AnyDecryptedOlmEvent::Dummy(_) => {
1176 debug!("Received an `m.dummy` event");
1177 }
1178 AnyDecryptedOlmEvent::Custom(_) => {
1179 warn!("Received an unexpected encrypted to-device event");
1180 }
1181 }
1182
1183 Ok(())
1184 }
1185
1186 async fn handle_verification_event(&self, event: &ToDeviceEvents) {
1187 if let Err(e) = self.inner.verification_machine.receive_any_event(event).await {
1188 error!("Error handling a verification event: {e:?}");
1189 }
1190 }
1191
1192 async fn mark_to_device_request_as_sent(&self, request_id: &TransactionId) -> StoreResult<()> {
1194 self.inner.verification_machine.mark_request_as_sent(request_id);
1195 self.inner.key_request_machine.mark_outgoing_request_as_sent(request_id).await?;
1196 self.inner.group_session_manager.mark_request_as_sent(request_id).await?;
1197 self.inner.session_manager.mark_outgoing_request_as_sent(request_id);
1198 Ok(())
1199 }
1200
1201 pub fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
1203 self.inner.verification_machine.get_verification(user_id, flow_id)
1204 }
1205
1206 pub fn get_verification_request(
1208 &self,
1209 user_id: &UserId,
1210 flow_id: impl AsRef<str>,
1211 ) -> Option<VerificationRequest> {
1212 self.inner.verification_machine.get_request(user_id, flow_id)
1213 }
1214
1215 pub fn get_verification_requests(&self, user_id: &UserId) -> Vec<VerificationRequest> {
1217 self.inner.verification_machine.get_requests(user_id)
1218 }
1219
1220 async fn handle_to_device_event(&self, changes: &mut Changes, event: &ToDeviceEvents) {
1221 use crate::types::events::ToDeviceEvents::*;
1222
1223 match event {
1224 RoomKeyRequest(e) => self.inner.key_request_machine.receive_incoming_key_request(e),
1225 SecretRequest(e) => self.inner.key_request_machine.receive_incoming_secret_request(e),
1226 RoomKeyWithheld(e) => self.add_withheld_info(changes, e),
1227 KeyVerificationAccept(..)
1228 | KeyVerificationCancel(..)
1229 | KeyVerificationKey(..)
1230 | KeyVerificationMac(..)
1231 | KeyVerificationRequest(..)
1232 | KeyVerificationReady(..)
1233 | KeyVerificationDone(..)
1234 | KeyVerificationStart(..) => {
1235 self.handle_verification_event(event).await;
1236 }
1237 Dummy(_) | RoomKey(_) | ForwardedRoomKey(_) | RoomEncrypted(_) => {}
1238 _ => {}
1239 }
1240 }
1241
1242 fn record_message_id(event: &Raw<AnyToDeviceEvent>) {
1243 use serde::Deserialize;
1244
1245 #[derive(Deserialize)]
1246 struct ContentStub<'a> {
1247 #[serde(borrow, rename = "org.matrix.msgid")]
1248 message_id: Option<&'a str>,
1249 }
1250
1251 #[derive(Deserialize)]
1252 struct ToDeviceStub<'a> {
1253 sender: &'a str,
1254 #[serde(rename = "type")]
1255 event_type: &'a str,
1256 #[serde(borrow)]
1257 content: ContentStub<'a>,
1258 }
1259
1260 if let Ok(event) = event.deserialize_as::<ToDeviceStub<'_>>() {
1261 Span::current().record("sender", event.sender);
1262 Span::current().record("event_type", event.event_type);
1263 Span::current().record("message_id", event.content.message_id);
1264 }
1265 }
1266
1267 #[instrument(skip_all, fields(sender, event_type, message_id))]
1275 async fn receive_to_device_event(
1276 &self,
1277 transaction: &mut StoreTransaction,
1278 changes: &mut Changes,
1279 mut raw_event: Raw<AnyToDeviceEvent>,
1280 ) -> Option<Raw<AnyToDeviceEvent>> {
1281 Self::record_message_id(&raw_event);
1282
1283 let event: ToDeviceEvents = match raw_event.deserialize_as() {
1284 Ok(e) => e,
1285 Err(e) => {
1286 warn!("Received an invalid to-device event: {e}");
1288
1289 return Some(raw_event);
1290 }
1291 };
1292
1293 debug!("Received a to-device event");
1294
1295 match event {
1296 ToDeviceEvents::RoomEncrypted(e) => {
1297 let decrypted = match self.decrypt_to_device_event(transaction, &e, changes).await {
1298 Ok(e) => e,
1299 Err(err) => {
1300 if let OlmError::SessionWedged(sender, curve_key) = err {
1301 if let Err(e) = self
1302 .inner
1303 .session_manager
1304 .mark_device_as_wedged(&sender, curve_key)
1305 .await
1306 {
1307 error!(
1308 error = ?e,
1309 "Couldn't mark device from to be unwedged",
1310 );
1311 }
1312 }
1313
1314 return Some(raw_event);
1315 }
1316 };
1317
1318 match self.to_device_event_is_from_dehydrated_device(&decrypted, &e.sender).await {
1321 Ok(true) => {
1322 warn!(
1323 sender = ?e.sender,
1324 session = ?decrypted.session,
1325 "Received a to-device event from a dehydrated device. This is unexpected: ignoring event"
1326 );
1327 return None;
1328 }
1329 Ok(false) => {}
1330 Err(err) => {
1331 error!(
1332 error = ?err,
1333 "Couldn't check whether event is from dehydrated device",
1334 );
1335 }
1336 }
1337
1338 match decrypted.session {
1341 SessionType::New(s) | SessionType::Existing(s) => {
1342 changes.sessions.push(s);
1343 }
1344 }
1345
1346 changes.message_hashes.push(decrypted.message_hash);
1347
1348 if let Some(group_session) = decrypted.inbound_group_session {
1349 changes.inbound_group_sessions.push(group_session);
1350 }
1351
1352 match decrypted.result.raw_event.deserialize_as() {
1353 Ok(event) => {
1354 self.handle_to_device_event(changes, &event).await;
1355
1356 raw_event = event
1357 .serialize_zeroized()
1358 .expect("Zeroizing and reserializing our events should always work")
1359 .cast();
1360 }
1361 Err(e) => {
1362 warn!("Received an invalid encrypted to-device event: {e}");
1363 raw_event = decrypted.result.raw_event;
1364 }
1365 }
1366 }
1367
1368 e => self.handle_to_device_event(changes, &e).await,
1369 }
1370
1371 Some(raw_event)
1372 }
1373
1374 async fn to_device_event_is_from_dehydrated_device(
1380 &self,
1381 decrypted: &OlmDecryptionInfo,
1382 sender_user_id: &UserId,
1383 ) -> OlmResult<bool> {
1384 if let Some(device_keys) = decrypted.result.event.sender_device_keys() {
1386 if device_keys.dehydrated.unwrap_or(false) {
1392 return Ok(true);
1393 }
1394 }
1399
1400 Ok(self
1402 .store()
1403 .get_device_from_curve_key(sender_user_id, decrypted.result.sender_key)
1404 .await?
1405 .is_some_and(|d| d.is_dehydrated()))
1406 }
1407
1408 #[instrument(skip_all)]
1426 pub async fn receive_sync_changes(
1427 &self,
1428 sync_changes: EncryptionSyncChanges<'_>,
1429 ) -> OlmResult<(Vec<Raw<AnyToDeviceEvent>>, Vec<RoomKeyInfo>)> {
1430 let mut store_transaction = self.inner.store.transaction().await;
1431
1432 let (events, changes) =
1433 self.preprocess_sync_changes(&mut store_transaction, sync_changes).await?;
1434
1435 let room_key_updates: Vec<_> =
1438 changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();
1439
1440 self.store().save_changes(changes).await?;
1441 store_transaction.commit().await?;
1442
1443 Ok((events, room_key_updates))
1444 }
1445
1446 pub(crate) async fn preprocess_sync_changes(
1455 &self,
1456 transaction: &mut StoreTransaction,
1457 sync_changes: EncryptionSyncChanges<'_>,
1458 ) -> OlmResult<(Vec<Raw<AnyToDeviceEvent>>, Changes)> {
1459 let mut events = self.inner.verification_machine.garbage_collect();
1461
1462 let mut changes = Default::default();
1465
1466 {
1467 let account = transaction.account().await?;
1468 account.update_key_counts(
1469 sync_changes.one_time_keys_counts,
1470 sync_changes.unused_fallback_keys,
1471 )
1472 }
1473
1474 if let Err(e) = self
1475 .inner
1476 .identity_manager
1477 .receive_device_changes(
1478 transaction.cache(),
1479 sync_changes.changed_devices.changed.iter().map(|u| u.as_ref()),
1480 )
1481 .await
1482 {
1483 error!(error = ?e, "Error marking a tracked user as changed");
1484 }
1485
1486 for raw_event in sync_changes.to_device_events {
1487 let raw_event =
1488 Box::pin(self.receive_to_device_event(transaction, &mut changes, raw_event)).await;
1489
1490 if let Some(raw_event) = raw_event {
1491 events.push(raw_event);
1492 }
1493 }
1494
1495 let changed_sessions = self
1496 .inner
1497 .key_request_machine
1498 .collect_incoming_key_requests(transaction.cache())
1499 .await?;
1500
1501 changes.sessions.extend(changed_sessions);
1502 changes.next_batch_token = sync_changes.next_batch_token;
1503
1504 Ok((events, changes))
1505 }
1506
1507 pub async fn request_room_key(
1524 &self,
1525 event: &Raw<EncryptedEvent>,
1526 room_id: &RoomId,
1527 ) -> MegolmResult<(Option<OutgoingRequest>, OutgoingRequest)> {
1528 let event = event.deserialize()?;
1529 self.inner.key_request_machine.request_key(room_id, &event).await
1530 }
1531
1532 async fn get_or_update_verification_state(
1542 &self,
1543 session: &InboundGroupSession,
1544 sender: &UserId,
1545 ) -> MegolmResult<(VerificationState, Option<OwnedDeviceId>)> {
1546 fn should_recalculate_sender_data(sender_data: &SenderData) -> bool {
1557 matches!(
1558 sender_data,
1559 SenderData::UnknownDevice { .. }
1560 | SenderData::DeviceInfo { .. }
1561 | SenderData::VerificationViolation { .. }
1562 )
1563 }
1564
1565 let sender_data = if should_recalculate_sender_data(&session.sender_data) {
1566 let calculated_sender_data = SenderDataFinder::find_using_curve_key(
1568 self.store(),
1569 session.sender_key(),
1570 sender,
1571 session,
1572 )
1573 .await?;
1574
1575 if calculated_sender_data.compare_trust_level(&session.sender_data).is_gt() {
1577 let mut new_session = session.clone();
1579 new_session.sender_data = calculated_sender_data.clone();
1580 self.store().save_inbound_group_sessions(&[new_session]).await?;
1581
1582 calculated_sender_data
1584 } else {
1585 session.sender_data.clone()
1587 }
1588 } else {
1589 session.sender_data.clone()
1590 };
1591
1592 Ok(sender_data_to_verification_state(sender_data, session.has_been_imported()))
1593 }
1594
1595 pub async fn query_missing_secrets_from_other_sessions(&self) -> StoreResult<bool> {
1620 let identity = self.inner.user_identity.lock().await;
1621 let mut secrets = identity.get_missing_secrets().await;
1622
1623 if self.store().load_backup_keys().await?.decryption_key.is_none() {
1624 secrets.push(SecretName::RecoveryKey);
1625 }
1626
1627 if secrets.is_empty() {
1628 debug!("No missing requests to query");
1629 return Ok(false);
1630 }
1631
1632 let secret_requests = GossipMachine::request_missing_secrets(self.user_id(), secrets);
1633
1634 let unsent_request = self.store().get_unsent_secret_requests().await?;
1636 let not_yet_requested = secret_requests
1637 .into_iter()
1638 .filter(|request| !unsent_request.iter().any(|unsent| unsent.info == request.info))
1639 .collect_vec();
1640
1641 if not_yet_requested.is_empty() {
1642 debug!("The missing secrets have already been requested");
1643 Ok(false)
1644 } else {
1645 debug!("Requesting missing secrets");
1646
1647 let changes = Changes { key_requests: not_yet_requested, ..Default::default() };
1648
1649 self.store().save_changes(changes).await?;
1650 Ok(true)
1651 }
1652 }
1653
1654 async fn get_encryption_info(
1660 &self,
1661 session: &InboundGroupSession,
1662 sender: &UserId,
1663 ) -> MegolmResult<EncryptionInfo> {
1664 let (verification_state, device_id) =
1665 self.get_or_update_verification_state(session, sender).await?;
1666
1667 let sender = sender.to_owned();
1668
1669 Ok(EncryptionInfo {
1670 sender,
1671 sender_device: device_id,
1672 algorithm_info: AlgorithmInfo::MegolmV1AesSha2 {
1673 curve25519_key: session.sender_key().to_base64(),
1674 sender_claimed_keys: session
1675 .signing_keys()
1676 .iter()
1677 .map(|(k, v)| (k.to_owned(), v.to_base64()))
1678 .collect(),
1679 },
1680 verification_state,
1681 })
1682 }
1683
1684 async fn get_megolm_encryption_info(
1685 &self,
1686 room_id: &RoomId,
1687 event: &EncryptedEvent,
1688 content: &SupportedEventEncryptionSchemes<'_>,
1689 ) -> MegolmResult<EncryptionInfo> {
1690 let session =
1691 self.get_inbound_group_session_or_error(room_id, content.session_id()).await?;
1692 self.get_encryption_info(&session, &event.sender).await
1693 }
1694
1695 async fn decrypt_megolm_events(
1696 &self,
1697 room_id: &RoomId,
1698 event: &EncryptedEvent,
1699 content: &SupportedEventEncryptionSchemes<'_>,
1700 decryption_settings: &DecryptionSettings,
1701 ) -> MegolmResult<(JsonObject, EncryptionInfo)> {
1702 let session =
1703 self.get_inbound_group_session_or_error(room_id, content.session_id()).await?;
1704
1705 Span::current().record("sender_key", debug(session.sender_key()));
1711
1712 let result = session.decrypt(event).await;
1713 match result {
1714 Ok((decrypted_event, _)) => {
1715 let encryption_info = self.get_encryption_info(&session, &event.sender).await?;
1716
1717 self.check_sender_trust_requirement(
1718 &session,
1719 &encryption_info,
1720 &decryption_settings.sender_device_trust_requirement,
1721 )?;
1722
1723 Ok((decrypted_event, encryption_info))
1724 }
1725 Err(error) => Err(
1726 if let MegolmError::Decryption(DecryptionError::UnknownMessageIndex(_, _)) = error {
1727 let withheld_code = self
1728 .inner
1729 .store
1730 .get_withheld_info(room_id, content.session_id())
1731 .await?
1732 .map(|e| e.content.withheld_code());
1733
1734 if withheld_code.is_some() {
1735 MegolmError::MissingRoomKey(withheld_code)
1737 } else {
1738 error
1739 }
1740 } else {
1741 error
1742 },
1743 ),
1744 }
1745 }
1746
1747 fn check_sender_trust_requirement(
1750 &self,
1751 session: &InboundGroupSession,
1752 encryption_info: &EncryptionInfo,
1753 trust_requirement: &TrustRequirement,
1754 ) -> MegolmResult<()> {
1755 fn encryption_info_to_error(encryption_info: &EncryptionInfo) -> MegolmResult<()> {
1757 let VerificationState::Unverified(verification_level) =
1760 &encryption_info.verification_state
1761 else {
1762 unreachable!("inconsistent verification state");
1763 };
1764 Err(MegolmError::SenderIdentityNotTrusted(verification_level.clone()))
1765 }
1766
1767 match trust_requirement {
1768 TrustRequirement::Untrusted => Ok(()),
1769
1770 TrustRequirement::CrossSignedOrLegacy => match &session.sender_data {
1771 SenderData::VerificationViolation(..) => Err(
1774 MegolmError::SenderIdentityNotTrusted(VerificationLevel::VerificationViolation),
1775 ),
1776 SenderData::SenderUnverified(..) => Ok(()),
1777 SenderData::SenderVerified(..) => Ok(()),
1778 SenderData::DeviceInfo { legacy_session: true, .. } => Ok(()),
1779 SenderData::UnknownDevice { legacy_session: true, .. } => Ok(()),
1780 _ => encryption_info_to_error(encryption_info),
1781 },
1782
1783 TrustRequirement::CrossSigned => match &session.sender_data {
1784 SenderData::VerificationViolation(..) => Err(
1787 MegolmError::SenderIdentityNotTrusted(VerificationLevel::VerificationViolation),
1788 ),
1789 SenderData::SenderUnverified(..) => Ok(()),
1790 SenderData::SenderVerified(..) => Ok(()),
1791 _ => encryption_info_to_error(encryption_info),
1792 },
1793 }
1794 }
1795
1796 async fn get_inbound_group_session_or_error(
1801 &self,
1802 room_id: &RoomId,
1803 session_id: &str,
1804 ) -> MegolmResult<InboundGroupSession> {
1805 match self.store().get_inbound_group_session(room_id, session_id).await? {
1806 Some(session) => Ok(session),
1807 None => {
1808 let withheld_code = self
1809 .inner
1810 .store
1811 .get_withheld_info(room_id, session_id)
1812 .await?
1813 .map(|e| e.content.withheld_code());
1814 Err(MegolmError::MissingRoomKey(withheld_code))
1815 }
1816 }
1817 }
1818
1819 pub async fn try_decrypt_room_event(
1834 &self,
1835 raw_event: &Raw<EncryptedEvent>,
1836 room_id: &RoomId,
1837 decryption_settings: &DecryptionSettings,
1838 ) -> Result<RoomEventDecryptionResult, CryptoStoreError> {
1839 match self.decrypt_room_event_inner(raw_event, room_id, true, decryption_settings).await {
1840 Ok(decrypted) => Ok(RoomEventDecryptionResult::Decrypted(decrypted)),
1841 Err(err) => Ok(RoomEventDecryptionResult::UnableToDecrypt(megolm_error_to_utd_info(
1842 raw_event, err,
1843 )?)),
1844 }
1845 }
1846
1847 pub async fn decrypt_room_event(
1855 &self,
1856 event: &Raw<EncryptedEvent>,
1857 room_id: &RoomId,
1858 decryption_settings: &DecryptionSettings,
1859 ) -> MegolmResult<DecryptedRoomEvent> {
1860 self.decrypt_room_event_inner(event, room_id, true, decryption_settings).await
1861 }
1862
1863 #[instrument(name = "decrypt_room_event", skip_all, fields(?room_id, event_id, origin_server_ts, sender, algorithm, session_id, message_index, sender_key))]
1864 async fn decrypt_room_event_inner(
1865 &self,
1866 event: &Raw<EncryptedEvent>,
1867 room_id: &RoomId,
1868 decrypt_unsigned: bool,
1869 decryption_settings: &DecryptionSettings,
1870 ) -> MegolmResult<DecryptedRoomEvent> {
1871 let event = event.deserialize()?;
1872
1873 Span::current()
1874 .record("sender", debug(&event.sender))
1875 .record("event_id", debug(&event.event_id))
1876 .record(
1877 "origin_server_ts",
1878 timestamp_to_iso8601(event.origin_server_ts)
1879 .unwrap_or_else(|| "<out of range>".to_owned()),
1880 )
1881 .record("algorithm", debug(event.content.algorithm()));
1882
1883 let content: SupportedEventEncryptionSchemes<'_> = match &event.content.scheme {
1884 RoomEventEncryptionScheme::MegolmV1AesSha2(c) => {
1885 Span::current().record("sender_key", debug(c.sender_key));
1886 c.into()
1887 }
1888 #[cfg(feature = "experimental-algorithms")]
1889 RoomEventEncryptionScheme::MegolmV2AesSha2(c) => c.into(),
1890 RoomEventEncryptionScheme::Unknown(_) => {
1891 warn!("Received an encrypted room event with an unsupported algorithm");
1892 return Err(EventError::UnsupportedAlgorithm.into());
1893 }
1894 };
1895
1896 Span::current().record("session_id", content.session_id());
1897 Span::current().record("message_index", content.message_index());
1898
1899 let result =
1900 self.decrypt_megolm_events(room_id, &event, &content, decryption_settings).await;
1901
1902 if let Err(e) = &result {
1903 #[cfg(feature = "automatic-room-key-forwarding")]
1904 match e {
1905 MegolmError::MissingRoomKey(_)
1908 | MegolmError::Decryption(DecryptionError::UnknownMessageIndex(_, _)) => {
1909 self.inner
1910 .key_request_machine
1911 .create_outgoing_key_request(room_id, &event)
1912 .await?;
1913 }
1914 _ => {}
1915 }
1916
1917 warn!("Failed to decrypt a room event: {e}");
1918 }
1919
1920 let (mut decrypted_event, encryption_info) = result?;
1921
1922 let mut unsigned_encryption_info = None;
1923 if decrypt_unsigned {
1924 unsigned_encryption_info = self
1926 .decrypt_unsigned_events(&mut decrypted_event, room_id, decryption_settings)
1927 .await;
1928 }
1929
1930 let event = serde_json::from_value::<Raw<AnyMessageLikeEvent>>(decrypted_event.into())?;
1931
1932 Ok(DecryptedRoomEvent { event, encryption_info, unsigned_encryption_info })
1933 }
1934
1935 async fn decrypt_unsigned_events(
1945 &self,
1946 main_event: &mut JsonObject,
1947 room_id: &RoomId,
1948 decryption_settings: &DecryptionSettings,
1949 ) -> Option<BTreeMap<UnsignedEventLocation, UnsignedDecryptionResult>> {
1950 let unsigned = main_event.get_mut("unsigned")?.as_object_mut()?;
1951 let mut unsigned_encryption_info: Option<
1952 BTreeMap<UnsignedEventLocation, UnsignedDecryptionResult>,
1953 > = None;
1954
1955 let location = UnsignedEventLocation::RelationsReplace;
1957 let replace = location.find_mut(unsigned);
1958 if let Some(decryption_result) =
1959 self.decrypt_unsigned_event(replace, room_id, decryption_settings).await
1960 {
1961 unsigned_encryption_info
1962 .get_or_insert_with(Default::default)
1963 .insert(location, decryption_result);
1964 }
1965
1966 let location = UnsignedEventLocation::RelationsThreadLatestEvent;
1969 let thread_latest_event = location.find_mut(unsigned);
1970 if let Some(decryption_result) =
1971 self.decrypt_unsigned_event(thread_latest_event, room_id, decryption_settings).await
1972 {
1973 unsigned_encryption_info
1974 .get_or_insert_with(Default::default)
1975 .insert(location, decryption_result);
1976 }
1977
1978 unsigned_encryption_info
1979 }
1980
1981 fn decrypt_unsigned_event<'a>(
1989 &'a self,
1990 event: Option<&'a mut Value>,
1991 room_id: &'a RoomId,
1992 decryption_settings: &'a DecryptionSettings,
1993 ) -> BoxFuture<'a, Option<UnsignedDecryptionResult>> {
1994 Box::pin(async move {
1995 let event = event?;
1996
1997 let is_encrypted = event
1998 .get("type")
1999 .and_then(|type_| type_.as_str())
2000 .is_some_and(|s| s == "m.room.encrypted");
2001 if !is_encrypted {
2002 return None;
2003 }
2004
2005 let raw_event = serde_json::from_value(event.clone()).ok()?;
2006 match self
2007 .decrypt_room_event_inner(&raw_event, room_id, false, decryption_settings)
2008 .await
2009 {
2010 Ok(decrypted_event) => {
2011 *event = serde_json::to_value(decrypted_event.event).ok()?;
2013 Some(UnsignedDecryptionResult::Decrypted(decrypted_event.encryption_info))
2014 }
2015 Err(err) => {
2016 let utd_info = megolm_error_to_utd_info(&raw_event, err).ok()?;
2021 Some(UnsignedDecryptionResult::UnableToDecrypt(utd_info))
2022 }
2023 }
2024 })
2025 }
2026
2027 pub async fn is_room_key_available(
2034 &self,
2035 event: &Raw<EncryptedEvent>,
2036 room_id: &RoomId,
2037 ) -> Result<bool, CryptoStoreError> {
2038 let event = event.deserialize()?;
2039
2040 let (session_id, message_index) = match &event.content.scheme {
2041 RoomEventEncryptionScheme::MegolmV1AesSha2(c) => {
2042 (&c.session_id, c.ciphertext.message_index())
2043 }
2044 #[cfg(feature = "experimental-algorithms")]
2045 RoomEventEncryptionScheme::MegolmV2AesSha2(c) => {
2046 (&c.session_id, c.ciphertext.message_index())
2047 }
2048 RoomEventEncryptionScheme::Unknown(_) => {
2049 return Ok(false);
2051 }
2052 };
2053
2054 Ok(self
2057 .store()
2058 .get_inbound_group_session(room_id, session_id)
2059 .await?
2060 .filter(|s| s.first_known_index() <= message_index)
2061 .is_some())
2062 }
2063
2064 pub async fn get_room_event_encryption_info(
2077 &self,
2078 event: &Raw<EncryptedEvent>,
2079 room_id: &RoomId,
2080 ) -> MegolmResult<EncryptionInfo> {
2081 let event = event.deserialize()?;
2082
2083 let content: SupportedEventEncryptionSchemes<'_> = match &event.content.scheme {
2084 RoomEventEncryptionScheme::MegolmV1AesSha2(c) => c.into(),
2085 #[cfg(feature = "experimental-algorithms")]
2086 RoomEventEncryptionScheme::MegolmV2AesSha2(c) => c.into(),
2087 RoomEventEncryptionScheme::Unknown(_) => {
2088 return Err(EventError::UnsupportedAlgorithm.into());
2089 }
2090 };
2091
2092 self.get_megolm_encryption_info(room_id, &event, &content).await
2093 }
2094
2095 pub async fn update_tracked_users(
2113 &self,
2114 users: impl IntoIterator<Item = &UserId>,
2115 ) -> StoreResult<()> {
2116 self.inner.identity_manager.update_tracked_users(users).await
2117 }
2118
2119 pub async fn mark_all_tracked_users_as_dirty(&self) -> StoreResult<()> {
2124 self.inner
2125 .identity_manager
2126 .mark_all_tracked_users_as_dirty(self.inner.store.cache().await?)
2127 .await
2128 }
2129
2130 async fn wait_if_user_pending(
2131 &self,
2132 user_id: &UserId,
2133 timeout: Option<Duration>,
2134 ) -> StoreResult<()> {
2135 if let Some(timeout) = timeout {
2136 let cache = self.store().cache().await?;
2137 self.inner
2138 .identity_manager
2139 .key_query_manager
2140 .wait_if_user_key_query_pending(cache, timeout, user_id)
2141 .await?;
2142 }
2143 Ok(())
2144 }
2145
2146 #[instrument(skip(self))]
2176 pub async fn get_device(
2177 &self,
2178 user_id: &UserId,
2179 device_id: &DeviceId,
2180 timeout: Option<Duration>,
2181 ) -> StoreResult<Option<Device>> {
2182 self.wait_if_user_pending(user_id, timeout).await?;
2183 self.store().get_device(user_id, device_id).await
2184 }
2185
2186 #[instrument(skip(self))]
2200 pub async fn get_identity(
2201 &self,
2202 user_id: &UserId,
2203 timeout: Option<Duration>,
2204 ) -> StoreResult<Option<UserIdentity>> {
2205 self.wait_if_user_pending(user_id, timeout).await?;
2206 self.store().get_identity(user_id).await
2207 }
2208
2209 #[instrument(skip(self))]
2236 pub async fn get_user_devices(
2237 &self,
2238 user_id: &UserId,
2239 timeout: Option<Duration>,
2240 ) -> StoreResult<UserDevices> {
2241 self.wait_if_user_pending(user_id, timeout).await?;
2242 self.store().get_user_devices(user_id).await
2243 }
2244
2245 pub async fn cross_signing_status(&self) -> CrossSigningStatus {
2250 self.inner.user_identity.lock().await.status().await
2251 }
2252
2253 pub async fn export_cross_signing_keys(&self) -> StoreResult<Option<CrossSigningKeyExport>> {
2261 let master_key = self.store().export_secret(&SecretName::CrossSigningMasterKey).await?;
2262 let self_signing_key =
2263 self.store().export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
2264 let user_signing_key =
2265 self.store().export_secret(&SecretName::CrossSigningUserSigningKey).await?;
2266
2267 Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
2268 None
2269 } else {
2270 Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
2271 })
2272 }
2273
2274 pub async fn import_cross_signing_keys(
2279 &self,
2280 export: CrossSigningKeyExport,
2281 ) -> Result<CrossSigningStatus, SecretImportError> {
2282 self.store().import_cross_signing_keys(export).await
2283 }
2284
2285 async fn sign_with_master_key(
2286 &self,
2287 message: &str,
2288 ) -> Result<(OwnedDeviceKeyId, Ed25519Signature), SignatureError> {
2289 let identity = &*self.inner.user_identity.lock().await;
2290 let key_id = identity.master_key_id().await.ok_or(SignatureError::MissingSigningKey)?;
2291
2292 let signature = identity.sign(message).await?;
2293
2294 Ok((key_id, signature))
2295 }
2296
2297 pub async fn sign(&self, message: &str) -> Result<Signatures, CryptoStoreError> {
2303 let mut signatures = Signatures::new();
2304
2305 {
2306 let cache = self.inner.store.cache().await?;
2307 let account = cache.account().await?;
2308 let key_id = account.signing_key_id();
2309 let signature = account.sign(message);
2310 signatures.add_signature(self.user_id().to_owned(), key_id, signature);
2311 }
2312
2313 match self.sign_with_master_key(message).await {
2314 Ok((key_id, signature)) => {
2315 signatures.add_signature(self.user_id().to_owned(), key_id, signature);
2316 }
2317 Err(e) => {
2318 warn!(error = ?e, "Couldn't sign the message using the cross signing master key")
2319 }
2320 }
2321
2322 Ok(signatures)
2323 }
2324
2325 pub fn backup_machine(&self) -> &BackupMachine {
2330 &self.inner.backup_machine
2331 }
2332
2333 pub async fn initialize_crypto_store_generation(
2337 &self,
2338 generation: &Mutex<Option<u64>>,
2339 ) -> StoreResult<()> {
2340 let mut gen_guard = generation.lock().await;
2343
2344 let prev_generation =
2345 self.inner.store.get_custom_value(Self::CURRENT_GENERATION_STORE_KEY).await?;
2346
2347 let gen = match prev_generation {
2348 Some(val) => {
2349 u64::from_le_bytes(val.try_into().map_err(|_| {
2352 CryptoStoreError::InvalidLockGeneration("invalid format".to_owned())
2353 })?)
2354 .wrapping_add(1)
2355 }
2356 None => 0,
2357 };
2358
2359 tracing::debug!("Initialising crypto store generation at {}", gen);
2360
2361 self.inner
2362 .store
2363 .set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, gen.to_le_bytes().to_vec())
2364 .await?;
2365
2366 *gen_guard = Some(gen);
2367
2368 Ok(())
2369 }
2370
2371 pub async fn maintain_crypto_store_generation(
2396 &'_ self,
2397 generation: &Mutex<Option<u64>>,
2398 ) -> StoreResult<(bool, u64)> {
2399 let mut gen_guard = generation.lock().await;
2400
2401 let actual_gen = self
2407 .inner
2408 .store
2409 .get_custom_value(Self::CURRENT_GENERATION_STORE_KEY)
2410 .await?
2411 .ok_or_else(|| {
2412 CryptoStoreError::InvalidLockGeneration("counter missing in store".to_owned())
2413 })?;
2414
2415 let actual_gen =
2416 u64::from_le_bytes(actual_gen.try_into().map_err(|_| {
2417 CryptoStoreError::InvalidLockGeneration("invalid format".to_owned())
2418 })?);
2419
2420 let new_gen = match gen_guard.as_ref() {
2421 Some(expected_gen) => {
2422 if actual_gen == *expected_gen {
2423 return Ok((false, actual_gen));
2424 }
2425 actual_gen.max(*expected_gen).wrapping_add(1)
2427 }
2428 None => {
2429 actual_gen.wrapping_add(1)
2432 }
2433 };
2434
2435 tracing::debug!(
2436 "Crypto store generation mismatch: previously known was {:?}, actual is {:?}, next is {}",
2437 *gen_guard,
2438 actual_gen,
2439 new_gen
2440 );
2441
2442 *gen_guard = Some(new_gen);
2444
2445 self.inner
2447 .store
2448 .set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, new_gen.to_le_bytes().to_vec())
2449 .await?;
2450
2451 Ok((true, new_gen))
2452 }
2453
2454 pub fn dehydrated_devices(&self) -> DehydratedDevices {
2456 DehydratedDevices { inner: self.to_owned() }
2457 }
2458
2459 pub async fn room_settings(&self, room_id: &RoomId) -> StoreResult<Option<RoomSettings>> {
2464 self.inner.store.get_room_settings(room_id).await
2467 }
2468
2469 pub async fn set_room_settings(
2480 &self,
2481 room_id: &RoomId,
2482 new_settings: &RoomSettings,
2483 ) -> Result<(), SetRoomSettingsError> {
2484 let store = &self.inner.store;
2485
2486 let _store_transaction = store.transaction().await;
2491
2492 let old_settings = store.get_room_settings(room_id).await?;
2493
2494 if let Some(old_settings) = old_settings {
2507 if old_settings != *new_settings {
2508 return Err(SetRoomSettingsError::EncryptionDowngrade);
2509 } else {
2510 return Ok(());
2512 }
2513 }
2514
2515 match new_settings.algorithm {
2517 EventEncryptionAlgorithm::MegolmV1AesSha2 => (),
2518
2519 #[cfg(feature = "experimental-algorithms")]
2520 EventEncryptionAlgorithm::MegolmV2AesSha2 => (),
2521
2522 _ => {
2523 warn!(
2524 ?room_id,
2525 "Rejecting invalid encryption algorithm {}", new_settings.algorithm
2526 );
2527 return Err(SetRoomSettingsError::InvalidSettings);
2528 }
2529 }
2530
2531 store
2533 .save_changes(Changes {
2534 room_settings: HashMap::from([(room_id.to_owned(), new_settings.clone())]),
2535 ..Default::default()
2536 })
2537 .await?;
2538
2539 Ok(())
2540 }
2541
2542 #[cfg(any(feature = "testing", test))]
2546 pub fn same_as(&self, other: &OlmMachine) -> bool {
2547 Arc::ptr_eq(&self.inner, &other.inner)
2548 }
2549
2550 #[cfg(any(feature = "testing", test))]
2552 pub async fn uploaded_key_count(&self) -> Result<u64, CryptoStoreError> {
2553 let cache = self.inner.store.cache().await?;
2554 let account = cache.account().await?;
2555 Ok(account.uploaded_key_count())
2556 }
2557
2558 #[cfg(test)]
2560 pub(crate) fn identity_manager(&self) -> &IdentityManager {
2561 &self.inner.identity_manager
2562 }
2563
2564 #[cfg(test)]
2566 pub(crate) fn key_for_has_migrated_verification_latch() -> &'static str {
2567 Self::HAS_MIGRATED_VERIFICATION_LATCH
2568 }
2569}
2570
2571fn sender_data_to_verification_state(
2572 sender_data: SenderData,
2573 session_has_been_imported: bool,
2574) -> (VerificationState, Option<OwnedDeviceId>) {
2575 match sender_data {
2576 SenderData::UnknownDevice { owner_check_failed: false, .. } => {
2577 let device_link_problem = if session_has_been_imported {
2578 DeviceLinkProblem::InsecureSource
2579 } else {
2580 DeviceLinkProblem::MissingDevice
2581 };
2582
2583 (VerificationState::Unverified(VerificationLevel::None(device_link_problem)), None)
2584 }
2585 SenderData::UnknownDevice { owner_check_failed: true, .. } => (
2586 VerificationState::Unverified(VerificationLevel::None(
2587 DeviceLinkProblem::InsecureSource,
2588 )),
2589 None,
2590 ),
2591 SenderData::DeviceInfo { device_keys, .. } => (
2592 VerificationState::Unverified(VerificationLevel::UnsignedDevice),
2593 Some(device_keys.device_id),
2594 ),
2595 SenderData::VerificationViolation(KnownSenderData { device_id, .. }) => {
2596 (VerificationState::Unverified(VerificationLevel::VerificationViolation), device_id)
2597 }
2598 SenderData::SenderUnverified(KnownSenderData { device_id, .. }) => {
2599 (VerificationState::Unverified(VerificationLevel::UnverifiedIdentity), device_id)
2600 }
2601 SenderData::SenderVerified(KnownSenderData { device_id, .. }) => {
2602 (VerificationState::Verified, device_id)
2603 }
2604 }
2605}
2606
2607#[derive(Debug, Clone)]
2610pub struct CrossSigningBootstrapRequests {
2611 pub upload_keys_req: Option<OutgoingRequest>,
2618
2619 pub upload_signing_keys_req: UploadSigningKeysRequest,
2623
2624 pub upload_signatures_req: UploadSignaturesRequest,
2629}
2630
2631#[derive(Debug)]
2634pub struct EncryptionSyncChanges<'a> {
2635 pub to_device_events: Vec<Raw<AnyToDeviceEvent>>,
2637 pub changed_devices: &'a DeviceLists,
2640 pub one_time_keys_counts: &'a BTreeMap<OneTimeKeyAlgorithm, UInt>,
2642 pub unused_fallback_keys: Option<&'a [OneTimeKeyAlgorithm]>,
2644 pub next_batch_token: Option<String>,
2646}
2647
2648fn megolm_error_to_utd_info(
2656 raw_event: &Raw<EncryptedEvent>,
2657 error: MegolmError,
2658) -> Result<UnableToDecryptInfo, CryptoStoreError> {
2659 use MegolmError::*;
2660 let reason = match error {
2661 EventError(_) => UnableToDecryptReason::MalformedEncryptedEvent,
2662 Decode(_) => UnableToDecryptReason::MalformedEncryptedEvent,
2663 MissingRoomKey(maybe_withheld) => {
2664 UnableToDecryptReason::MissingMegolmSession { withheld_code: maybe_withheld }
2665 }
2666 Decryption(DecryptionError::UnknownMessageIndex(_, _)) => {
2667 UnableToDecryptReason::UnknownMegolmMessageIndex
2668 }
2669 Decryption(_) => UnableToDecryptReason::MegolmDecryptionFailure,
2670 JsonError(_) => UnableToDecryptReason::PayloadDeserializationFailure,
2671 MismatchedIdentityKeys(_) => UnableToDecryptReason::MismatchedIdentityKeys,
2672 SenderIdentityNotTrusted(level) => UnableToDecryptReason::SenderIdentityNotTrusted(level),
2673
2674 Store(error) => Err(error)?,
2677 };
2678
2679 let session_id = raw_event.deserialize().ok().and_then(|ev| match ev.content.scheme {
2680 RoomEventEncryptionScheme::MegolmV1AesSha2(s) => Some(s.session_id),
2681 #[cfg(feature = "experimental-algorithms")]
2682 RoomEventEncryptionScheme::MegolmV2AesSha2(s) => Some(s.session_id),
2683 RoomEventEncryptionScheme::Unknown(_) => None,
2684 });
2685
2686 Ok(UnableToDecryptInfo { session_id, reason })
2687}
2688
2689#[cfg(test)]
2690pub(crate) mod test_helpers;
2691
2692#[cfg(test)]
2693pub(crate) mod tests;