Skip to main content

matrix_sdk_crypto/gossiping/
machine.rs

1// Copyright 2020, 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// TODO
16//
17// handle the case where we can't create a session with a device. clearing our
18// stale key share requests that we'll never be able to handle.
19//
20// If we don't trust the device store an object that remembers the request and
21// let the users introspect that object.
22
23#[cfg(feature = "experimental-push-secrets")]
24use std::collections::HashMap;
25use std::{
26    collections::{BTreeMap, BTreeSet, btree_map::Entry},
27    mem,
28    sync::{
29        Arc,
30        atomic::{AtomicBool, Ordering},
31    },
32};
33
34use matrix_sdk_common::locks::RwLock as StdRwLock;
35use ruma::{
36    DeviceId, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedTransactionId, OwnedUserId, RoomId,
37    TransactionId, UserId,
38    api::client::keys::claim_keys::v3::Request as KeysClaimRequest,
39    events::secret::request::{
40        RequestAction, SecretName, ToDeviceSecretRequestEvent as SecretRequestEvent,
41    },
42};
43use tracing::{Span, debug, field::debug, info, instrument, trace, warn};
44use vodozemac::Curve25519PublicKey;
45
46use super::{GossipRequest, GossippedSecret, RequestEvent, RequestInfo, SecretInfo, WaitQueue};
47use crate::{
48    Device, MegolmError,
49    error::{EventError, OlmError, OlmResult},
50    identities::IdentityManager,
51    olm::{InboundGroupSession, Session},
52    session_manager::GroupSessionCache,
53    store::{CryptoStoreError, SecretImportError, Store, caches::StoreCache, types::Changes},
54    types::{
55        events::{
56            EventType,
57            forwarded_room_key::ForwardedRoomKeyContent,
58            olm_v1::{DecryptedForwardedRoomKeyEvent, DecryptedSecretSendEvent},
59            room::encrypted::EncryptedEvent,
60            room_key_request::RoomKeyRequestEvent,
61            secret_send::SecretSendContent,
62        },
63        requests::{OutgoingRequest, ToDeviceRequest},
64    },
65};
66#[cfg(feature = "experimental-push-secrets")]
67use crate::{
68    error::SecretPushError,
69    types::events::{olm_v1::DecryptedSecretPushEvent, secret_push::SecretPushContent},
70};
71
72#[derive(Clone, Debug)]
73pub(crate) struct GossipMachine {
74    inner: Arc<GossipMachineInner>,
75}
76
77#[derive(Debug)]
78pub(crate) struct GossipMachineInner {
79    store: Store,
80    #[cfg(feature = "automatic-room-key-forwarding")]
81    outbound_group_sessions: GroupSessionCache,
82    outgoing_requests: StdRwLock<BTreeMap<OwnedTransactionId, OutgoingRequest>>,
83    incoming_key_requests: StdRwLock<BTreeMap<RequestInfo, RequestEvent>>,
84    wait_queue: WaitQueue,
85    users_for_key_claim: Arc<StdRwLock<BTreeMap<OwnedUserId, BTreeSet<OwnedDeviceId>>>>,
86
87    /// Whether we should respond to incoming `m.room_key_request` messages.
88    room_key_forwarding_enabled: AtomicBool,
89
90    /// Whether we should send out `m.room_key_request` messages.
91    room_key_requests_enabled: AtomicBool,
92
93    identity_manager: IdentityManager,
94}
95
96impl GossipMachine {
97    pub fn new(
98        store: Store,
99        identity_manager: IdentityManager,
100        #[allow(unused)] outbound_group_sessions: GroupSessionCache,
101        users_for_key_claim: Arc<StdRwLock<BTreeMap<OwnedUserId, BTreeSet<OwnedDeviceId>>>>,
102    ) -> Self {
103        let room_key_forwarding_enabled =
104            AtomicBool::new(cfg!(feature = "automatic-room-key-forwarding"));
105
106        let room_key_requests_enabled =
107            AtomicBool::new(cfg!(feature = "automatic-room-key-forwarding"));
108
109        Self {
110            inner: Arc::new(GossipMachineInner {
111                store,
112                #[cfg(feature = "automatic-room-key-forwarding")]
113                outbound_group_sessions,
114                outgoing_requests: Default::default(),
115                incoming_key_requests: Default::default(),
116                wait_queue: WaitQueue::new(),
117                users_for_key_claim,
118                room_key_forwarding_enabled,
119                room_key_requests_enabled,
120                identity_manager,
121            }),
122        }
123    }
124
125    pub(crate) fn identity_manager(&self) -> &IdentityManager {
126        &self.inner.identity_manager
127    }
128
129    #[cfg(feature = "automatic-room-key-forwarding")]
130    pub fn set_room_key_forwarding_enabled(&self, enabled: bool) {
131        self.inner.room_key_forwarding_enabled.store(enabled, Ordering::SeqCst)
132    }
133
134    pub fn is_room_key_forwarding_enabled(&self) -> bool {
135        self.inner.room_key_forwarding_enabled.load(Ordering::SeqCst)
136    }
137
138    /// Configure whether we should send outgoing `m.room_key_request`s on
139    /// decryption failure.
140    #[cfg(feature = "automatic-room-key-forwarding")]
141    pub fn set_room_key_requests_enabled(&self, enabled: bool) {
142        self.inner.room_key_requests_enabled.store(enabled, Ordering::SeqCst)
143    }
144
145    /// Query whether we should send outgoing `m.room_key_request`s on
146    /// decryption failure.
147    pub fn are_room_key_requests_enabled(&self) -> bool {
148        self.inner.room_key_requests_enabled.load(Ordering::SeqCst)
149    }
150
151    /// Load stored outgoing requests that were not yet sent out.
152    async fn load_outgoing_requests(&self) -> Result<Vec<OutgoingRequest>, CryptoStoreError> {
153        Ok(self
154            .inner
155            .store
156            .get_unsent_secret_requests()
157            .await?
158            .into_iter()
159            .filter(|i| !i.sent_out)
160            .map(|info| info.to_request(self.device_id()))
161            .collect())
162    }
163
164    /// Our own user id.
165    pub fn user_id(&self) -> &UserId {
166        &self.inner.store.static_account().user_id
167    }
168
169    /// Our own device ID.
170    pub fn device_id(&self) -> &DeviceId {
171        &self.inner.store.static_account().device_id
172    }
173
174    pub async fn outgoing_to_device_requests(
175        &self,
176    ) -> Result<Vec<OutgoingRequest>, CryptoStoreError> {
177        let mut key_requests = self.load_outgoing_requests().await?;
178        let key_forwards: Vec<OutgoingRequest> =
179            self.inner.outgoing_requests.read().values().cloned().collect();
180        key_requests.extend(key_forwards);
181
182        let users_for_key_claim: BTreeMap<_, _> = self
183            .inner
184            .users_for_key_claim
185            .read()
186            .iter()
187            .map(|(key, value)| {
188                let device_map = value
189                    .iter()
190                    .map(|d| (d.to_owned(), OneTimeKeyAlgorithm::SignedCurve25519))
191                    .collect();
192
193                (key.to_owned(), device_map)
194            })
195            .collect();
196
197        if !users_for_key_claim.is_empty() {
198            let key_claim_request = KeysClaimRequest::new(users_for_key_claim);
199            key_requests.push(OutgoingRequest {
200                request_id: TransactionId::new(),
201                request: Arc::new(key_claim_request.into()),
202            });
203        }
204
205        Ok(key_requests)
206    }
207
208    /// Receive a room key request event.
209    pub fn receive_incoming_key_request(&self, event: &RoomKeyRequestEvent) {
210        self.receive_event(event.clone().into())
211    }
212
213    fn receive_event(&self, event: RequestEvent) {
214        // Some servers might send to-device events to ourselves if we send one
215        // out using a wildcard instead of a specific device as a recipient.
216        //
217        // Check if we're the sender of this request event and ignore it if
218        // so.
219        if event.sender() == self.user_id() && event.requesting_device_id() == self.device_id() {
220            trace!("Received a secret request event from ourselves, ignoring")
221        } else {
222            let request_info = event.to_request_info();
223            self.inner.incoming_key_requests.write().insert(request_info, event);
224        }
225    }
226
227    pub fn receive_incoming_secret_request(&self, event: &SecretRequestEvent) {
228        self.receive_event(event.clone().into())
229    }
230
231    /// Handle all the incoming key requests that are queued up and empty our
232    /// key request queue.
233    pub async fn collect_incoming_key_requests(
234        &self,
235        cache: &StoreCache,
236    ) -> OlmResult<Vec<Session>> {
237        let mut changed_sessions = Vec::new();
238
239        let incoming_key_requests = mem::take(&mut *self.inner.incoming_key_requests.write());
240
241        for event in incoming_key_requests.values() {
242            if let Some(s) = match event {
243                #[cfg(feature = "automatic-room-key-forwarding")]
244                RequestEvent::KeyShare(e) => Box::pin(self.handle_key_request(cache, e)).await?,
245                RequestEvent::Secret(e) => Box::pin(self.handle_secret_request(cache, e)).await?,
246                #[cfg(not(feature = "automatic-room-key-forwarding"))]
247                _ => None,
248            } {
249                changed_sessions.push(s);
250            }
251        }
252
253        Ok(changed_sessions)
254    }
255
256    /// Store the key share request for later, once we get an Olm session with
257    /// the given device [`retry_keyshare`](#method.retry_keyshare) should be
258    /// called.
259    fn handle_key_share_without_session(&self, device: Device, event: RequestEvent) {
260        self.inner
261            .users_for_key_claim
262            .write()
263            .entry(device.user_id().to_owned())
264            .or_default()
265            .insert(device.device_id().into());
266        self.inner.wait_queue.insert(&device, event);
267    }
268
269    /// Retry keyshares for a device that previously didn't have an Olm session
270    /// with us.
271    ///
272    /// This should be only called if the given user/device got a new Olm
273    /// session.
274    ///
275    /// # Arguments
276    ///
277    /// * `user_id` - The user id of the device that we created the Olm session
278    ///   with.
279    ///
280    /// * `device_id` - The device ID of the device that got the Olm session.
281    pub fn retry_keyshare(&self, user_id: &UserId, device_id: &DeviceId) {
282        if let Entry::Occupied(mut e) =
283            self.inner.users_for_key_claim.write().entry(user_id.to_owned())
284        {
285            e.get_mut().remove(device_id);
286
287            if e.get().is_empty() {
288                e.remove();
289            }
290        }
291
292        let mut incoming_key_requests = self.inner.incoming_key_requests.write();
293        for (key, event) in self.inner.wait_queue.remove(user_id, device_id) {
294            incoming_key_requests.entry(key).or_insert(event);
295        }
296    }
297
298    /// Push a secret to all of our other verified devices.
299    ///
300    /// This function assumes that we already have Olm sessions with the other
301    /// devices.  This can be done by calling
302    /// [`OlmMachine::get_missing_sessions()`].
303    ///
304    /// * `secret_name` - The name of the secret to push
305    #[cfg(feature = "experimental-push-secrets")]
306    pub async fn push_secret_to_verified_devices(
307        &self,
308        secret_name: SecretName,
309    ) -> Result<HashMap<OwnedDeviceId, OlmError>, SecretPushError> {
310        let content = if let Some(secret) = self.inner.store.export_secret(&secret_name).await? {
311            SecretPushContent::new(secret_name.clone(), secret)
312        } else {
313            info!(?secret_name, "Can't push a secret, secret isn't found");
314            return Err(SecretPushError::MissingSecret);
315        };
316
317        let devices = self.inner.store.get_user_devices(self.user_id()).await?;
318        let mut errors = HashMap::new();
319
320        for device in devices.devices() {
321            if !device.is_our_own_device() && device.is_verified() {
322                let event_type = content.event_type().to_owned();
323                match device.encrypt(&event_type, content.clone()).await {
324                    Ok((_used_session, content, message_id)) => {
325                        let encrypted_event_type = content.event_type().to_owned();
326                        let request = ToDeviceRequest::new(
327                            device.user_id(),
328                            device.device_id().to_owned(),
329                            &encrypted_event_type,
330                            content.cast(),
331                        );
332                        let request = OutgoingRequest {
333                            request_id: request.txn_id.clone(),
334                            request: Arc::new(request.into()),
335                        };
336                        debug!(
337                            device = ?device.device_id(),
338                            event_type,
339                            request_id = ?request.request_id,
340                            ?secret_name,
341                            ?message_id,
342                            "Creating outgoing secret push to-device request",
343                        );
344                        self.inner
345                            .outgoing_requests
346                            .write()
347                            .insert(request.request_id.clone(), request);
348                    }
349                    Err(err) => {
350                        info!(?secret_name, device_id = ?device.device_id(), ?err, "Can't push secret to device");
351                        errors.insert(device.device_id().to_owned(), err);
352                    }
353                }
354            }
355        }
356
357        Ok(errors)
358    }
359
360    /// Handle a received secret push event.
361    ///
362    /// Checks that the sender device is verified, then adds it to the changes.
363    #[cfg(feature = "experimental-push-secrets")]
364    pub async fn receive_secret_push_event(
365        &self,
366        sender_key: &Curve25519PublicKey,
367        event: &DecryptedSecretPushEvent,
368        changes: &mut Changes,
369    ) -> Result<(), CryptoStoreError> {
370        // Only accept events from verified own-devices
371        let sender = &event.sender;
372        if sender != self.user_id() {
373            // Ignore if sent from a different user
374            warn!(?sender, "Received secret push from a different user");
375            return Ok(());
376        }
377        let Some(device) = self.inner.store.get_device_from_curve_key(sender, *sender_key).await?
378        else {
379            warn!(?sender, ?sender_key, "Received secret push from unknown device");
380            return Ok(());
381        };
382        if !device.is_verified() {
383            warn!(?sender, device_id = ?device.device_id(), "Received secret push from unverified device");
384            return Ok(());
385        }
386        changes.secrets.push(event.content.clone().into());
387        Ok(())
388    }
389
390    async fn handle_secret_request(
391        &self,
392        cache: &StoreCache,
393        event: &SecretRequestEvent,
394    ) -> OlmResult<Option<Session>> {
395        let secret_name = match &event.content.action {
396            RequestAction::Request(r) => &r.name,
397            // We ignore cancellations here since there's nothing to serve.
398            RequestAction::RequestCancellation => return Ok(None),
399            action => {
400                warn!(?action, "Unknown secret request action");
401                return Ok(None);
402            }
403        };
404
405        let content = if let Some(secret) = self.inner.store.export_secret(secret_name).await? {
406            SecretSendContent::new(event.content.request_id.to_owned(), secret)
407        } else {
408            info!(?secret_name, "Can't serve a secret request, secret isn't found");
409            return Ok(None);
410        };
411
412        let device =
413            self.inner.store.get_device(&event.sender, &event.content.requesting_device_id).await?;
414
415        if let Some(device) = device {
416            if device.user_id() == self.user_id() {
417                if device.is_verified() {
418                    info!(
419                        user_id = ?device.user_id(),
420                        device_id = ?device.device_id(),
421                        ?secret_name,
422                        "Sharing a secret with a device",
423                    );
424
425                    match self.share_secret(&device, content, secret_name).await {
426                        Ok(s) => Ok(Some(s)),
427                        Err(OlmError::MissingSession) => {
428                            info!(
429                                user_id = ?device.user_id(),
430                                device_id = ?device.device_id(),
431                                ?secret_name,
432                                "Secret request is missing an Olm session, \
433                                putting the request in the wait queue",
434                            );
435                            self.handle_key_share_without_session(device, event.clone().into());
436
437                            Ok(None)
438                        }
439                        Err(e) => Err(e),
440                    }
441                } else {
442                    info!(
443                        user_id = ?device.user_id(),
444                        device_id = ?device.device_id(),
445                        ?secret_name,
446                        "Received a secret request that we won't serve, the device isn't trusted",
447                    );
448
449                    Ok(None)
450                }
451            } else {
452                info!(
453                    user_id = ?device.user_id(),
454                    device_id = ?device.device_id(),
455                    ?secret_name,
456                    "Received a secret request that we won't serve, the device doesn't belong to us",
457                );
458
459                Ok(None)
460            }
461        } else {
462            warn!(
463                user_id = ?event.sender,
464                device_id = ?event.content.requesting_device_id,
465                ?secret_name,
466                "Received a secret request from an unknown device",
467            );
468            self.inner
469                .identity_manager
470                .key_query_manager
471                .synced(cache)
472                .await?
473                .mark_user_as_changed(&event.sender)
474                .await?;
475
476            Ok(None)
477        }
478    }
479
480    /// Try to encrypt the given `InboundGroupSession` for the given `Device` as
481    /// a forwarded room key.
482    ///
483    /// This method might fail if we do not share an 1-to-1 Olm session with the
484    /// given `Device`, in that case we're going to queue up an
485    /// `/keys/claim` request to be sent out and retry once the 1-to-1 Olm
486    /// session has been established.
487    #[cfg(feature = "automatic-room-key-forwarding")]
488    async fn try_to_forward_room_key(
489        &self,
490        event: &RoomKeyRequestEvent,
491        device: Device,
492        session: &InboundGroupSession,
493        message_index: Option<u32>,
494    ) -> OlmResult<Option<Session>> {
495        info!(?message_index, "Serving a room key request",);
496
497        match self.forward_room_key(session, &device, message_index).await {
498            Ok(s) => Ok(Some(s)),
499            Err(OlmError::MissingSession) => {
500                info!(
501                    "Key request is missing an Olm session, putting the request in the wait queue",
502                );
503                self.handle_key_share_without_session(device, event.to_owned().into());
504
505                Ok(None)
506            }
507            Err(OlmError::SessionExport(e)) => {
508                warn!(
509                    "Can't serve a room key request, the session \
510                     can't be exported into a forwarded room key: {e:?}",
511                );
512                Ok(None)
513            }
514            Err(e) => Err(e),
515        }
516    }
517
518    /// Answer a room key request after we found the matching
519    /// `InboundGroupSession`.
520    #[cfg(feature = "automatic-room-key-forwarding")]
521    async fn answer_room_key_request(
522        &self,
523        cache: &StoreCache,
524        event: &RoomKeyRequestEvent,
525        session: &InboundGroupSession,
526    ) -> OlmResult<Option<Session>> {
527        use super::KeyForwardDecision;
528
529        let device =
530            self.inner.store.get_device(&event.sender, &event.content.requesting_device_id).await?;
531
532        let Some(device) = device else {
533            warn!("Received a key request from an unknown device");
534            self.identity_manager()
535                .key_query_manager
536                .synced(cache)
537                .await?
538                .mark_user_as_changed(&event.sender)
539                .await?;
540
541            return Ok(None);
542        };
543
544        match self.should_share_key(&device, session).await {
545            Ok(message_index) => {
546                self.try_to_forward_room_key(event, device, session, message_index).await
547            }
548            Err(e) => {
549                if let KeyForwardDecision::ChangedSenderKey = e {
550                    warn!(
551                        "Received a key request from a device that changed \
552                         their Curve25519 sender key"
553                    );
554                } else {
555                    debug!(
556                        reason = ?e,
557                        "Received a key request that we won't serve",
558                    );
559                }
560
561                Ok(None)
562            }
563        }
564    }
565
566    #[cfg(feature = "automatic-room-key-forwarding")]
567    #[tracing::instrument(
568        skip_all,
569        fields(
570            user_id = ?event.sender,
571            device_id = ?event.content.requesting_device_id,
572            ?room_id,
573            session_id
574        )
575    )]
576    async fn handle_supported_key_request(
577        &self,
578        cache: &StoreCache,
579        event: &RoomKeyRequestEvent,
580        room_id: &RoomId,
581        session_id: &str,
582    ) -> OlmResult<Option<Session>> {
583        let session = self.inner.store.get_inbound_group_session(room_id, session_id).await?;
584
585        if let Some(s) = session {
586            self.answer_room_key_request(cache, event, &s).await
587        } else {
588            debug!("Received a room key request for an unknown inbound group session",);
589
590            Ok(None)
591        }
592    }
593
594    /// Handle a single incoming key request.
595    #[cfg(feature = "automatic-room-key-forwarding")]
596    async fn handle_key_request(
597        &self,
598        cache: &StoreCache,
599        event: &RoomKeyRequestEvent,
600    ) -> OlmResult<Option<Session>> {
601        use crate::types::events::room_key_request::{Action, RequestedKeyInfo};
602
603        if self.inner.room_key_forwarding_enabled.load(Ordering::SeqCst) {
604            match &event.content.action {
605                Action::Request(info) => match info {
606                    RequestedKeyInfo::MegolmV1AesSha2(i) => {
607                        self.handle_supported_key_request(cache, event, &i.room_id, &i.session_id)
608                            .await
609                    }
610                    #[cfg(feature = "experimental-algorithms")]
611                    RequestedKeyInfo::MegolmV2AesSha2(i) => {
612                        self.handle_supported_key_request(cache, event, &i.room_id, &i.session_id)
613                            .await
614                    }
615                    RequestedKeyInfo::Unknown(i) => {
616                        debug!(
617                            sender = ?event.sender,
618                            algorithm = ?i.algorithm,
619                            "Received a room key request for a unsupported algorithm"
620                        );
621                        Ok(None)
622                    }
623                },
624                // We ignore cancellations here since there's nothing to serve.
625                Action::Cancellation => Ok(None),
626            }
627        } else {
628            debug!(
629                sender = ?event.sender,
630                "Received a room key request, but room key forwarding has been turned off"
631            );
632            Ok(None)
633        }
634    }
635
636    /// Add an `m.secret.send` request to the `outgoing_requests` queue.
637    ///
638    /// # Arguments
639    ///
640    /// * `device` is the device to send the request to.
641    /// * `content` is the actual event content, containing the secret to send.
642    /// * `secret_name` is the name of the secret e.g. `m.megolm_backup.v1`
643    ///   (used for logging).
644    async fn share_secret(
645        &self,
646        device: &Device,
647        content: SecretSendContent,
648        secret_name: &SecretName,
649    ) -> OlmResult<Session> {
650        let event_type = content.event_type().to_owned();
651        let (used_session, content, message_id) = device.encrypt(&event_type, content).await?;
652
653        let encrypted_event_type = content.event_type().to_owned();
654
655        let request = ToDeviceRequest::new(
656            device.user_id(),
657            device.device_id().to_owned(),
658            &encrypted_event_type,
659            content.cast(),
660        );
661
662        let request = OutgoingRequest {
663            request_id: request.txn_id.clone(),
664            request: Arc::new(request.into()),
665        };
666
667        debug!(
668            recipient = ?device.user_id(),
669            event_type,
670            request_id = ?request.request_id,
671            ?secret_name,
672            ?message_id,
673            "Creating outgoing `m.secret.send` to-device request"
674        );
675
676        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
677
678        Ok(used_session)
679    }
680
681    #[cfg(feature = "automatic-room-key-forwarding")]
682    async fn forward_room_key(
683        &self,
684        session: &InboundGroupSession,
685        device: &Device,
686        message_index: Option<u32>,
687    ) -> OlmResult<Session> {
688        let (used_session, content) =
689            device.encrypt_room_key_for_forwarding(session.clone(), message_index).await?;
690
691        let event_type = content.event_type().to_owned();
692
693        let request = ToDeviceRequest::new(
694            device.user_id(),
695            device.device_id().to_owned(),
696            &event_type,
697            content.cast(),
698        );
699
700        let request = OutgoingRequest {
701            request_id: request.txn_id.clone(),
702            request: Arc::new(request.into()),
703        };
704
705        debug!(
706            recipient = ?device.user_id(),
707            event_type = event_type,
708            request_id = ?request.request_id,
709            session_id = session.session_id(),
710            "Creating outgoing `m.forwarded_room_key` to-device request"
711        );
712
713        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
714
715        Ok(used_session)
716    }
717
718    /// Check if it's ok to share a session with the given device.
719    ///
720    /// The logic for this is currently as follows:
721    ///
722    /// * Share the session in full, starting from the earliest known index, if
723    ///   the requesting device is our own, trusted (verified) device.
724    ///
725    /// * For other requesting devices, share only a limited session and only if
726    ///   we originally shared with that device because it was present when the
727    ///   message was initially sent. By limited, we mean that the session will
728    ///   not be shared in full, but only from the message index at that moment.
729    ///   Since this information is recorded in the outbound session, we need to
730    ///   have it for this to work.
731    ///
732    /// * In all other cases, refuse to share the session.
733    ///
734    /// # Arguments
735    ///
736    /// * `device` - The device that is requesting a session from us.
737    ///
738    /// * `session` - The session that was requested to be shared.
739    ///
740    /// # Return value
741    ///
742    /// A `Result` representing whether we should share the session:
743    ///
744    /// - `Ok(None)`: Should share the entire session, starting with the
745    ///   earliest known index.
746    /// - `Ok(Some(i))`: Should share the session, but only starting from index
747    ///   i.
748    /// - `Err(x)`: Should *refuse* to share the session. `x` is the reason for
749    ///   the refusal.
750    #[cfg(feature = "automatic-room-key-forwarding")]
751    async fn should_share_key(
752        &self,
753        device: &Device,
754        session: &InboundGroupSession,
755    ) -> Result<Option<u32>, super::KeyForwardDecision> {
756        use super::KeyForwardDecision;
757        use crate::olm::ShareState;
758
759        let outbound_session = self
760            .inner
761            .outbound_group_sessions
762            .get_or_load(session.room_id())
763            .await
764            .filter(|outgoing_session| outgoing_session.session_id() == session.session_id());
765
766        // If this is our own, verified device, we share the entire session from the
767        // earliest known index.
768        if device.user_id() == self.user_id() && device.is_verified() {
769            Ok(None)
770        // Otherwise, if the records show we previously shared with this device,
771        // we'll reshare the session from the index we previously shared
772        // at. For this, we need an outbound session because this
773        // information is recorded there.
774        } else if let Some(outbound) = outbound_session {
775            match outbound.sharing_view().get_share_state(&device.inner) {
776                ShareState::Shared { message_index, olm_wedging_index: _ } => {
777                    Ok(Some(message_index))
778                }
779                ShareState::SharedButChangedSenderKey => Err(KeyForwardDecision::ChangedSenderKey),
780                ShareState::NotShared => Err(KeyForwardDecision::OutboundSessionNotShared),
781            }
782        // Otherwise, there's not enough info to decide if we can safely share
783        // the session.
784        } else if device.user_id() == self.user_id() {
785            Err(KeyForwardDecision::UntrustedDevice)
786        } else {
787            Err(KeyForwardDecision::MissingOutboundSession)
788        }
789    }
790
791    /// Check if it's ok, or rather if it makes sense to automatically request
792    /// a key from our other devices.
793    ///
794    /// # Arguments
795    ///
796    /// * `key_info` - The info of our key request containing information about
797    ///   the key we wish to request.
798    #[cfg(feature = "automatic-room-key-forwarding")]
799    async fn should_request_key(&self, key_info: &SecretInfo) -> Result<bool, CryptoStoreError> {
800        if self.inner.room_key_requests_enabled.load(Ordering::SeqCst) {
801            let request = self.inner.store.get_secret_request_by_info(key_info).await?;
802
803            // Don't send out duplicate requests, users can re-request them if they
804            // think a second request might succeed.
805            if request.is_none() {
806                let devices = self.inner.store.get_user_devices(self.user_id()).await?;
807
808                // Devices will only respond to key requests if the devices are
809                // verified, if the device isn't verified by us it's unlikely that
810                // we're verified by them either. Don't request keys if there isn't
811                // at least one verified device.
812                Ok(devices.is_any_verified())
813            } else {
814                Ok(false)
815            }
816        } else {
817            Ok(false)
818        }
819    }
820
821    /// Create a new outgoing key request for the key with the given session id.
822    ///
823    /// This will queue up a new to-device request and store the key info so
824    /// once we receive a forwarded room key we can check that it matches the
825    /// key we requested.
826    ///
827    /// This method will return a cancel request and a new key request if the
828    /// key was already requested, otherwise it will return just the key
829    /// request.
830    ///
831    /// # Arguments
832    ///
833    /// * `room_id` - The id of the room where the key is used in.
834    ///
835    /// * `event` - The event for which we would like to request the room key.
836    pub async fn request_key(
837        &self,
838        room_id: &RoomId,
839        event: &EncryptedEvent,
840    ) -> Result<(Option<OutgoingRequest>, OutgoingRequest), MegolmError> {
841        let secret_info =
842            event.room_key_info(room_id).ok_or(EventError::UnsupportedAlgorithm)?.into();
843
844        let request = self.inner.store.get_secret_request_by_info(&secret_info).await?;
845
846        if let Some(request) = request {
847            let cancel = request.to_cancellation(self.device_id());
848            let request = request.to_request(self.device_id());
849
850            Ok((Some(cancel), request))
851        } else {
852            let request = self.request_key_helper(secret_info).await?;
853
854            Ok((None, request))
855        }
856    }
857
858    /// Create outgoing secret requests for the given
859    pub fn request_missing_secrets(
860        own_user_id: &UserId,
861        secret_names: Vec<SecretName>,
862    ) -> Vec<GossipRequest> {
863        if !secret_names.is_empty() {
864            info!(?secret_names, "Creating new outgoing secret requests");
865
866            secret_names
867                .into_iter()
868                .map(|n| GossipRequest::from_secret_name(own_user_id.to_owned(), n))
869                .collect()
870        } else {
871            trace!("No secrets are missing from our store, not requesting them");
872            vec![]
873        }
874    }
875
876    async fn request_key_helper(
877        &self,
878        key_info: SecretInfo,
879    ) -> Result<OutgoingRequest, CryptoStoreError> {
880        let request = GossipRequest {
881            request_recipient: self.user_id().to_owned(),
882            request_id: TransactionId::new(),
883            info: key_info,
884            sent_out: false,
885        };
886
887        let outgoing_request = request.to_request(self.device_id());
888        self.save_outgoing_key_info(request).await?;
889
890        Ok(outgoing_request)
891    }
892
893    /// Create a new outgoing key request for the key with the given session id.
894    ///
895    /// This will queue up a new to-device request and store the key info so
896    /// once we receive a forwarded room key we can check that it matches the
897    /// key we requested.
898    ///
899    /// This does nothing if a request for this key has already been sent out.
900    ///
901    /// # Arguments
902    /// * `room_id` - The id of the room where the key is used in.
903    ///
904    /// * `event` - The event for which we would like to request the room key.
905    #[cfg(feature = "automatic-room-key-forwarding")]
906    pub async fn create_outgoing_key_request(
907        &self,
908        room_id: &RoomId,
909        event: &EncryptedEvent,
910    ) -> Result<bool, CryptoStoreError> {
911        if let Some(info) = event.room_key_info(room_id).map(|i| i.into())
912            && self.should_request_key(&info).await?
913        {
914            // Size of the request_key_helper future should not impact this
915            // async fn since it is likely enough that this branch won't be
916            // entered.
917            Box::pin(self.request_key_helper(info)).await?;
918            return Ok(true);
919        }
920
921        Ok(false)
922    }
923
924    /// Save an outgoing key info.
925    async fn save_outgoing_key_info(&self, info: GossipRequest) -> Result<(), CryptoStoreError> {
926        let mut changes = Changes::default();
927        changes.key_requests.push(info);
928        self.inner.store.save_changes(changes).await?;
929
930        Ok(())
931    }
932
933    /// Delete the given outgoing key info.
934    async fn delete_key_info(&self, info: &GossipRequest) -> Result<(), CryptoStoreError> {
935        self.inner.store.delete_outgoing_secret_requests(&info.request_id).await
936    }
937
938    /// Mark the outgoing request as sent.
939    pub async fn mark_outgoing_request_as_sent(
940        &self,
941        id: &TransactionId,
942    ) -> Result<(), CryptoStoreError> {
943        let info = self.inner.store.get_outgoing_secret_requests(id).await?;
944
945        if let Some(mut info) = info {
946            debug!(
947                recipient = ?info.request_recipient,
948                request_type = info.request_type(),
949                request_id = ?info.request_id,
950                "Marking outgoing secret request as sent"
951            );
952            info.sent_out = true;
953            self.save_outgoing_key_info(info).await?;
954        } else if let Some(req) = self.inner.outgoing_requests.read().get(id) {
955            // This outgoing event was not saved into the store. This is
956            // expected: for example `m.secret.send` events are only stored in
957            // `outgoing_requests`, not in the store.
958            debug!(
959                request_id = ?req.request_id,
960                "Marking outgoing request as sent"
961            );
962        }
963
964        self.inner.outgoing_requests.write().remove(id);
965
966        Ok(())
967    }
968
969    /// Mark the given outgoing key info as done.
970    ///
971    /// This will queue up a request cancellation.
972    async fn mark_as_done(&self, key_info: &GossipRequest) -> Result<(), CryptoStoreError> {
973        debug!(
974            recipient = ?key_info.request_recipient,
975            request_type = key_info.request_type(),
976            request_id = ?key_info.request_id,
977            "Successfully received a secret, removing the request"
978        );
979
980        self.inner.outgoing_requests.write().remove(&key_info.request_id);
981        // TODO return the key info instead of deleting it so the sync handler
982        // can delete it in one transaction.
983        self.delete_key_info(key_info).await?;
984
985        let request = key_info.to_cancellation(self.device_id());
986        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
987
988        Ok(())
989    }
990
991    async fn accept_secret(
992        &self,
993        secret: GossippedSecret,
994        changes: &mut Changes,
995    ) -> Result<(), CryptoStoreError> {
996        if secret.secret_name != SecretName::RecoveryKey {
997            match self.inner.store.import_secret(&secret).await {
998                Ok(_) => self.mark_as_done(&secret.gossip_request).await?,
999                // If this is a store error propagate it up the call stack.
1000                Err(SecretImportError::Store(e)) => return Err(e),
1001                // Otherwise warn that there was something wrong with the
1002                // secret.
1003                Err(e) => {
1004                    warn!(
1005                        secret_name = ?secret.secret_name,
1006                        error = ?e,
1007                        "Error while importing a secret"
1008                    );
1009                }
1010            }
1011        } else {
1012            // We would need to fire out a request to figure out if this backup decryption
1013            // key is the one that is used for the current backup and if the
1014            // backup is trusted.
1015            //
1016            // So we put the secret into our inbox. Later users can inspect the contents of
1017            // the inbox and decide if they want to activate the backup.
1018            info!("Received a backup decryption key, storing it into the secret inbox.");
1019            changes.secrets.push(secret.into());
1020        }
1021
1022        Ok(())
1023    }
1024
1025    async fn receive_secret(
1026        &self,
1027        cache: &StoreCache,
1028        sender_key: Curve25519PublicKey,
1029        secret: GossippedSecret,
1030        changes: &mut Changes,
1031    ) -> Result<(), CryptoStoreError> {
1032        debug!("Received a m.secret.send event with a matching request");
1033
1034        if let Some(device) =
1035            self.inner.store.get_device_from_curve_key(&secret.event.sender, sender_key).await?
1036        {
1037            // Only accept secrets from one of our own trusted devices.
1038            if device.user_id() == self.user_id() && device.is_verified() {
1039                self.accept_secret(secret, changes).await?;
1040            } else {
1041                warn!("Received a m.secret.send event from another user or from unverified device");
1042            }
1043        } else {
1044            warn!("Received a m.secret.send event from an unknown device");
1045
1046            self.identity_manager()
1047                .key_query_manager
1048                .synced(cache)
1049                .await?
1050                .mark_user_as_changed(&secret.event.sender)
1051                .await?;
1052        }
1053
1054        Ok(())
1055    }
1056
1057    #[instrument(skip_all, fields(sender_key, sender = ?event.sender, request_id = ?event.content.request_id, secret_name))]
1058    pub async fn receive_secret_event(
1059        &self,
1060        cache: &StoreCache,
1061        sender_key: Curve25519PublicKey,
1062        event: &DecryptedSecretSendEvent,
1063        changes: &mut Changes,
1064    ) -> Result<Option<SecretName>, CryptoStoreError> {
1065        debug!("Received a m.secret.send event");
1066
1067        let request_id = &event.content.request_id;
1068
1069        let name = if let Some(request) =
1070            self.inner.store.get_outgoing_secret_requests(request_id).await?
1071        {
1072            match &request.info {
1073                SecretInfo::KeyRequest(_) => {
1074                    warn!("Received a m.secret.send event but the request was for a room key");
1075
1076                    None
1077                }
1078                SecretInfo::SecretRequest(secret_name) => {
1079                    Span::current().record("secret_name", debug(secret_name));
1080
1081                    let secret_name = secret_name.to_owned();
1082
1083                    let secret = GossippedSecret {
1084                        secret_name: secret_name.to_owned(),
1085                        event: event.to_owned(),
1086                        gossip_request: request,
1087                    };
1088
1089                    self.receive_secret(cache, sender_key, secret, changes).await?;
1090
1091                    Some(secret_name)
1092                }
1093            }
1094        } else {
1095            warn!("Received a m.secret.send event, but no matching request was found");
1096            None
1097        };
1098
1099        Ok(name)
1100    }
1101
1102    #[tracing::instrument(skip_all)]
1103    async fn accept_forwarded_room_key(
1104        &self,
1105        info: &GossipRequest,
1106        sender_key: Curve25519PublicKey,
1107        event: &DecryptedForwardedRoomKeyEvent,
1108    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
1109        match InboundGroupSession::try_from(event) {
1110            Ok(session) => {
1111                let new_session = self.inner.store.merge_received_group_session(session).await?;
1112                if new_session.is_some() {
1113                    self.mark_as_done(info).await?;
1114                }
1115                Ok(new_session)
1116            }
1117            Err(e) => {
1118                warn!(?sender_key, "Couldn't create a group session from a received room key");
1119                Err(e.into())
1120            }
1121        }
1122    }
1123
1124    async fn should_accept_forward(
1125        &self,
1126        info: &GossipRequest,
1127        sender_key: Curve25519PublicKey,
1128    ) -> Result<bool, CryptoStoreError> {
1129        let device =
1130            self.inner.store.get_device_from_curve_key(&info.request_recipient, sender_key).await?;
1131
1132        if let Some(device) = device {
1133            Ok(device.user_id() == self.user_id() && device.is_verified())
1134        } else {
1135            Ok(false)
1136        }
1137    }
1138
1139    /// Receive a forwarded room key event that was sent using any of our
1140    /// supported content types.
1141    async fn receive_supported_keys(
1142        &self,
1143        sender_key: Curve25519PublicKey,
1144        event: &DecryptedForwardedRoomKeyEvent,
1145    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
1146        let Some(info) = event.room_key_info() else {
1147            warn!(
1148                sender_key = sender_key.to_base64(),
1149                algorithm = ?event.content.algorithm(),
1150                "Received a forwarded room key with an unsupported algorithm",
1151            );
1152            return Ok(None);
1153        };
1154
1155        let Some(request) =
1156            self.inner.store.get_secret_request_by_info(&info.clone().into()).await?
1157        else {
1158            warn!(
1159                sender_key = ?sender_key,
1160                room_id = ?info.room_id(),
1161                session_id = info.session_id(),
1162                sender_key = ?sender_key,
1163                algorithm = ?info.algorithm(),
1164                "Received a forwarded room key that we didn't request",
1165            );
1166            return Ok(None);
1167        };
1168
1169        if self.should_accept_forward(&request, sender_key).await? {
1170            self.accept_forwarded_room_key(&request, sender_key, event).await
1171        } else {
1172            warn!(
1173                ?sender_key,
1174                room_id = ?info.room_id(),
1175                session_id = info.session_id(),
1176                "Received a forwarded room key from an unknown device, or \
1177                 from a device that the key request recipient doesn't own",
1178            );
1179
1180            Ok(None)
1181        }
1182    }
1183
1184    /// Receive a forwarded room key event.
1185    pub async fn receive_forwarded_room_key(
1186        &self,
1187        sender_key: Curve25519PublicKey,
1188        event: &DecryptedForwardedRoomKeyEvent,
1189    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
1190        match event.content {
1191            ForwardedRoomKeyContent::MegolmV1AesSha2(_) => {
1192                self.receive_supported_keys(sender_key, event).await
1193            }
1194            #[cfg(feature = "experimental-algorithms")]
1195            ForwardedRoomKeyContent::MegolmV2AesSha2(_) => {
1196                self.receive_supported_keys(sender_key, event).await
1197            }
1198            ForwardedRoomKeyContent::Unknown(_) => {
1199                warn!(
1200                    sender = event.sender.as_str(),
1201                    sender_key = sender_key.to_base64(),
1202                    algorithm = ?event.content.algorithm(),
1203                    "Received a forwarded room key with an unsupported algorithm",
1204                );
1205
1206                Ok(None)
1207            }
1208        }
1209    }
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214    #[cfg(feature = "experimental-push-secrets")]
1215    use std::ops::Deref;
1216    use std::sync::Arc;
1217
1218    #[cfg(feature = "automatic-room-key-forwarding")]
1219    use assert_matches::assert_matches;
1220    use matrix_sdk_test::{async_test, message_like_event_content};
1221    use ruma::{
1222        DeviceId, RoomId, UserId, device_id,
1223        events::{
1224            ToDeviceEvent as RumaToDeviceEvent,
1225            secret::request::{
1226                RequestAction, SecretName, SecretRequestAction, ToDeviceSecretRequestEventContent,
1227            },
1228        },
1229        owned_event_id, room_id,
1230        serde::Raw,
1231        user_id,
1232    };
1233    use tokio::sync::Mutex;
1234
1235    use super::GossipMachine;
1236    use crate::{
1237        DecryptionSettings, TrustRequirement,
1238        identities::{DeviceData, IdentityManager, LocalTrust},
1239        olm::{Account, PrivateCrossSigningIdentity},
1240        session_manager::GroupSessionCache,
1241        store::{
1242            CryptoStoreWrapper, MemoryStore, Store,
1243            types::{Changes, PendingChanges},
1244        },
1245        types::events::room::encrypted::{
1246            EncryptedEvent, EncryptedToDeviceEvent, RoomEncryptedEventContent,
1247        },
1248        verification::VerificationMachine,
1249    };
1250    #[cfg(feature = "automatic-room-key-forwarding")]
1251    use crate::{
1252        EncryptionSettings,
1253        gossiping::KeyForwardDecision,
1254        olm::OutboundGroupSession,
1255        store::{CryptoStore, types::DeviceChanges},
1256        types::requests::AnyOutgoingRequest,
1257        types::{
1258            EventEncryptionAlgorithm,
1259            events::{
1260                forwarded_room_key::ForwardedRoomKeyContent, olm_v1::AnyDecryptedOlmEvent,
1261                olm_v1::DecryptedOlmV1Event,
1262            },
1263        },
1264    };
1265
1266    fn alice_id() -> &'static UserId {
1267        user_id!("@alice:example.org")
1268    }
1269
1270    fn alice_device_id() -> &'static DeviceId {
1271        device_id!("JLAFKJWSCS")
1272    }
1273
1274    fn bob_id() -> &'static UserId {
1275        user_id!("@bob:example.org")
1276    }
1277
1278    fn bob_device_id() -> &'static DeviceId {
1279        device_id!("ILMLKASTES")
1280    }
1281
1282    fn alice2_device_id() -> &'static DeviceId {
1283        device_id!("ILMLKASTES")
1284    }
1285
1286    fn room_id() -> &'static RoomId {
1287        room_id!("!test:example.org")
1288    }
1289
1290    fn account() -> Account {
1291        Account::with_device_id(alice_id(), alice_device_id())
1292    }
1293
1294    fn bob_account() -> Account {
1295        Account::with_device_id(bob_id(), bob_device_id())
1296    }
1297
1298    fn alice_2_account() -> Account {
1299        Account::with_device_id(alice_id(), alice2_device_id())
1300    }
1301
1302    #[cfg(feature = "automatic-room-key-forwarding")]
1303    async fn gossip_machine_test_helper(user_id: &UserId) -> GossipMachine {
1304        let user_id = user_id.to_owned();
1305        let device_id = DeviceId::new();
1306
1307        let store = Arc::new(store_with_account_helper(&user_id, &device_id).await);
1308        let static_data = store.load_account().await.unwrap().unwrap().static_data;
1309        let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(alice_id())));
1310        let verification =
1311            VerificationMachine::new(static_data.clone(), identity.clone(), store.clone());
1312        let store = Store::new(static_data, identity, store, verification);
1313
1314        let session_cache = GroupSessionCache::new(store.clone());
1315        let identity_manager = IdentityManager::new(store.clone());
1316
1317        GossipMachine::new(store, identity_manager, session_cache, Default::default())
1318    }
1319
1320    #[cfg(feature = "automatic-room-key-forwarding")]
1321    async fn store_with_account_helper(
1322        user_id: &UserId,
1323        device_id: &DeviceId,
1324    ) -> CryptoStoreWrapper {
1325        // Properly create the store by first saving the own device and then the account
1326        // data.
1327        let account = Account::with_device_id(user_id, device_id);
1328        let device = DeviceData::from_account(&account);
1329        device.set_trust_state(LocalTrust::Verified);
1330
1331        let changes = Changes {
1332            devices: DeviceChanges { new: vec![device], ..Default::default() },
1333            ..Default::default()
1334        };
1335        let mem_store = MemoryStore::new();
1336        mem_store.save_changes(changes).await.unwrap();
1337        mem_store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1338
1339        CryptoStoreWrapper::new(user_id, device_id, mem_store)
1340    }
1341
1342    async fn get_machine_test_helper() -> GossipMachine {
1343        let user_id = alice_id();
1344        let account = Account::with_device_id(user_id, alice_device_id());
1345        let device = DeviceData::from_account(&account);
1346        let another_device =
1347            DeviceData::from_account(&Account::with_device_id(user_id, alice2_device_id()));
1348
1349        let store =
1350            Arc::new(CryptoStoreWrapper::new(user_id, account.device_id(), MemoryStore::new()));
1351        let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(alice_id())));
1352        let verification =
1353            VerificationMachine::new(account.static_data.clone(), identity.clone(), store.clone());
1354
1355        let store = Store::new(account.static_data().clone(), identity, store, verification);
1356        store.save_device_data(&[device, another_device]).await.unwrap();
1357        store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1358        let session_cache = GroupSessionCache::new(store.clone());
1359
1360        let identity_manager = IdentityManager::new(store.clone());
1361
1362        GossipMachine::new(store, identity_manager, session_cache, Default::default())
1363    }
1364
1365    #[cfg(feature = "automatic-room-key-forwarding")]
1366    async fn machines_for_key_share_test_helper(
1367        other_machine_owner: &UserId,
1368        create_sessions: bool,
1369        algorithm: EventEncryptionAlgorithm,
1370    ) -> (GossipMachine, OutboundGroupSession, GossipMachine) {
1371        use crate::olm::SenderData;
1372
1373        let alice_machine = get_machine_test_helper().await;
1374        let alice_device = DeviceData::from_account(
1375            &alice_machine.inner.store.cache().await.unwrap().account().await.unwrap(),
1376        );
1377
1378        let bob_machine = gossip_machine_test_helper(other_machine_owner).await;
1379
1380        let bob_device = DeviceData::from_account(
1381            #[allow(clippy::explicit_auto_deref)] // clippy's wrong
1382            &*bob_machine.inner.store.cache().await.unwrap().account().await.unwrap(),
1383        );
1384
1385        // We need a trusted device, otherwise we won't request keys
1386        let second_device = DeviceData::from_account(&alice_2_account());
1387        second_device.set_trust_state(LocalTrust::Verified);
1388        bob_device.set_trust_state(LocalTrust::Verified);
1389        alice_machine.inner.store.save_device_data(&[bob_device, second_device]).await.unwrap();
1390        let devices = std::slice::from_ref(&alice_device);
1391        bob_machine.inner.store.save_device_data(devices).await.unwrap();
1392
1393        if create_sessions {
1394            // Create Olm sessions for our two accounts.
1395            let (alice_session, bob_session) = alice_machine
1396                .inner
1397                .store
1398                .with_transaction(async |atr| {
1399                    let sessions = bob_machine
1400                        .inner
1401                        .store
1402                        .with_transaction(async |btr| {
1403                            let alice_account = atr.account().await?;
1404                            let bob_account = btr.account().await?;
1405                            let sessions =
1406                                alice_account.create_session_for_test_helper(bob_account).await;
1407                            Ok(sessions)
1408                        })
1409                        .await?;
1410                    Ok(sessions)
1411                })
1412                .await
1413                .unwrap();
1414
1415            // Populate our stores with Olm sessions and a Megolm session.
1416
1417            alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
1418            bob_machine.inner.store.save_sessions(&[bob_session]).await.unwrap();
1419        }
1420
1421        let settings = EncryptionSettings { algorithm, ..Default::default() };
1422        let (group_session, inbound_group_session) = bob_machine
1423            .inner
1424            .store
1425            .static_account()
1426            .create_group_session_pair(room_id(), settings, SenderData::unknown())
1427            .await
1428            .unwrap();
1429
1430        bob_machine
1431            .inner
1432            .store
1433            .save_inbound_group_sessions(&[inbound_group_session])
1434            .await
1435            .unwrap();
1436
1437        let result = group_session.encrypt("m.dummy", &message_like_event_content!({})).await;
1438        let event = wrap_encrypted_content(bob_machine.user_id(), result.content);
1439
1440        // Alice wants to request the outbound group session from bob.
1441        assert!(
1442            alice_machine.create_outgoing_key_request(room_id(), &event,).await.unwrap(),
1443            "We should request a room key"
1444        );
1445
1446        group_session
1447            .mark_shared_with(
1448                alice_device.user_id(),
1449                alice_device.device_id(),
1450                alice_device.curve25519_key().unwrap(),
1451            )
1452            .await;
1453
1454        // Put the outbound session into bobs store.
1455        bob_machine.inner.outbound_group_sessions.insert(group_session.clone());
1456
1457        (alice_machine, group_session, bob_machine)
1458    }
1459
1460    fn extract_content<'a>(
1461        recipient: &UserId,
1462        request: &'a crate::types::requests::OutgoingRequest,
1463    ) -> &'a Raw<ruma::events::AnyToDeviceEventContent> {
1464        request
1465            .request()
1466            .to_device()
1467            .expect("The request should be always a to-device request")
1468            .messages
1469            .get(recipient)
1470            .unwrap()
1471            .values()
1472            .next()
1473            .unwrap()
1474    }
1475
1476    fn wrap_encrypted_content(
1477        sender: &UserId,
1478        content: Raw<RoomEncryptedEventContent>,
1479    ) -> EncryptedEvent {
1480        let content = content.deserialize().unwrap();
1481
1482        EncryptedEvent {
1483            sender: sender.to_owned(),
1484            event_id: owned_event_id!("$143273582443PhrSn:example.org"),
1485            #[cfg(feature = "experimental-encrypted-state-events")]
1486            state_key: None,
1487            content,
1488            origin_server_ts: ruma::MilliSecondsSinceUnixEpoch::now(),
1489            unsigned: Default::default(),
1490            other: Default::default(),
1491        }
1492    }
1493
1494    fn request_to_event<C>(
1495        recipient: &UserId,
1496        sender: &UserId,
1497        request: &crate::types::requests::OutgoingRequest,
1498    ) -> crate::types::events::ToDeviceEvent<C>
1499    where
1500        C: crate::types::events::EventType
1501            + serde::de::DeserializeOwned
1502            + serde::ser::Serialize
1503            + std::fmt::Debug,
1504    {
1505        let content = extract_content(recipient, request);
1506        let content: C = content.deserialize_as_unchecked().unwrap_or_else(|_| {
1507            panic!("We can always deserialize the to-device event content {content:?}")
1508        });
1509
1510        crate::types::events::ToDeviceEvent::new(sender.to_owned(), content)
1511    }
1512
1513    #[async_test]
1514    async fn test_create_machine() {
1515        let machine = get_machine_test_helper().await;
1516
1517        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1518    }
1519
1520    #[async_test]
1521    async fn test_re_request_keys() {
1522        let machine = get_machine_test_helper().await;
1523        let account = account();
1524
1525        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1526
1527        let result = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1528        let event = wrap_encrypted_content(machine.user_id(), result.content);
1529
1530        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1531        let (cancel, request) = machine.request_key(session.room_id(), &event).await.unwrap();
1532
1533        assert!(cancel.is_none());
1534
1535        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1536
1537        let (cancel, _) = machine.request_key(session.room_id(), &event).await.unwrap();
1538
1539        assert!(cancel.is_some());
1540    }
1541
1542    #[async_test]
1543    #[cfg(feature = "automatic-room-key-forwarding")]
1544    async fn test_create_key_request() {
1545        let machine = get_machine_test_helper().await;
1546        let account = account();
1547        let second_account = alice_2_account();
1548        let alice_device = DeviceData::from_account(&second_account);
1549
1550        // We need a trusted device, otherwise we won't request keys
1551        alice_device.set_trust_state(LocalTrust::Verified);
1552        machine.inner.store.save_device_data(&[alice_device]).await.unwrap();
1553
1554        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1555        let result = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1556        let event = wrap_encrypted_content(machine.user_id(), result.content);
1557
1558        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1559        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1560        assert!(!machine.outgoing_to_device_requests().await.unwrap().is_empty());
1561        assert_eq!(machine.outgoing_to_device_requests().await.unwrap().len(), 1);
1562
1563        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1564
1565        let requests = machine.outgoing_to_device_requests().await.unwrap();
1566        assert_eq!(requests.len(), 1);
1567
1568        let request = &requests[0];
1569
1570        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1571        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1572    }
1573
1574    /// We should *not* request keys if that has been disabled
1575    #[async_test]
1576    #[cfg(feature = "automatic-room-key-forwarding")]
1577    async fn test_create_key_request_requests_disabled() {
1578        let machine = get_machine_test_helper().await;
1579        let account = account();
1580        let second_account = alice_2_account();
1581        let alice_device = DeviceData::from_account(&second_account);
1582
1583        // We need a trusted device, otherwise we won't request keys
1584        alice_device.set_trust_state(LocalTrust::Verified);
1585        machine.inner.store.save_device_data(&[alice_device]).await.unwrap();
1586
1587        // Disable key requests
1588        assert!(machine.are_room_key_requests_enabled());
1589        machine.set_room_key_requests_enabled(false);
1590        assert!(!machine.are_room_key_requests_enabled());
1591
1592        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1593        let result = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1594        let event = wrap_encrypted_content(machine.user_id(), result.content);
1595
1596        // The outgoing to-device requests should be empty before and after
1597        // `create_outgoing_key_request`.
1598        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1599        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1600        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1601    }
1602
1603    #[async_test]
1604    #[cfg(feature = "automatic-room-key-forwarding")]
1605    async fn test_receive_forwarded_key() {
1606        let machine = get_machine_test_helper().await;
1607        let account = account();
1608
1609        let second_account = alice_2_account();
1610        let alice_device = DeviceData::from_account(&second_account);
1611
1612        // We need a trusted device, otherwise we won't request keys
1613        alice_device.set_trust_state(LocalTrust::Verified);
1614        let devices = std::slice::from_ref(&alice_device);
1615        machine.inner.store.save_device_data(devices).await.unwrap();
1616
1617        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1618        let result = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1619        let room_event = wrap_encrypted_content(machine.user_id(), result.content);
1620
1621        machine.create_outgoing_key_request(session.room_id(), &room_event).await.unwrap();
1622
1623        let requests = machine.outgoing_to_device_requests().await.unwrap();
1624        let request = &requests[0];
1625        let id = &request.request_id;
1626
1627        machine.mark_outgoing_request_as_sent(id).await.unwrap();
1628
1629        let export = session.export_at_index(10).await;
1630
1631        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1632
1633        let event = DecryptedOlmV1Event::new(
1634            alice_id(),
1635            alice_id(),
1636            alice_device.ed25519_key().unwrap(),
1637            None,
1638            content,
1639        );
1640
1641        assert!(
1642            machine
1643                .inner
1644                .store
1645                .get_inbound_group_session(session.room_id(), session.session_id(),)
1646                .await
1647                .unwrap()
1648                .is_none()
1649        );
1650
1651        let first_session = machine
1652            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1653            .await
1654            .unwrap();
1655        let first_session = first_session.unwrap();
1656
1657        assert_eq!(first_session.first_known_index(), 10);
1658
1659        let sessions = std::slice::from_ref(&first_session);
1660        machine.inner.store.save_inbound_group_sessions(sessions).await.unwrap();
1661
1662        // Get the cancel request.
1663        let id = machine
1664            .inner
1665            .outgoing_requests
1666            .read()
1667            .first_key_value()
1668            .map(|(_, r)| r.request_id.clone())
1669            .unwrap();
1670        machine.mark_outgoing_request_as_sent(&id).await.unwrap();
1671
1672        machine.create_outgoing_key_request(session.room_id(), &room_event).await.unwrap();
1673
1674        let requests = machine.outgoing_to_device_requests().await.unwrap();
1675        let request = &requests[0];
1676
1677        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1678
1679        let export = session.export_at_index(15).await;
1680
1681        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1682
1683        let event = DecryptedOlmV1Event::new(
1684            alice_id(),
1685            alice_id(),
1686            alice_device.ed25519_key().unwrap(),
1687            None,
1688            content,
1689        );
1690
1691        let second_session = machine
1692            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1693            .await
1694            .unwrap();
1695
1696        assert!(second_session.is_none());
1697
1698        let export = session.export_at_index(0).await;
1699
1700        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1701
1702        let event = DecryptedOlmV1Event::new(
1703            alice_id(),
1704            alice_id(),
1705            alice_device.ed25519_key().unwrap(),
1706            None,
1707            content,
1708        );
1709
1710        let second_session = machine
1711            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1712            .await
1713            .unwrap();
1714
1715        assert_eq!(second_session.unwrap().first_known_index(), 0);
1716    }
1717
1718    #[async_test]
1719    #[cfg(feature = "automatic-room-key-forwarding")]
1720    async fn test_should_share_key() {
1721        let machine = get_machine_test_helper().await;
1722        let account = account();
1723
1724        let own_device =
1725            machine.inner.store.get_device(alice_id(), alice2_device_id()).await.unwrap().unwrap();
1726
1727        let (outbound, inbound) = account.create_group_session_pair_with_defaults(room_id()).await;
1728
1729        // We don't share keys with untrusted devices.
1730        assert_matches!(
1731            machine.should_share_key(&own_device, &inbound).await,
1732            Err(KeyForwardDecision::UntrustedDevice)
1733        );
1734        own_device.set_trust_state(LocalTrust::Verified);
1735        // Now we do want to share the keys.
1736        machine.should_share_key(&own_device, &inbound).await.unwrap();
1737
1738        let bob_device = DeviceData::from_account(&bob_account());
1739        machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1740
1741        let bob_device =
1742            machine.inner.store.get_device(bob_id(), bob_device_id()).await.unwrap().unwrap();
1743
1744        // We don't share sessions with other user's devices if no outbound
1745        // session was provided.
1746        assert_matches!(
1747            machine.should_share_key(&bob_device, &inbound).await,
1748            Err(KeyForwardDecision::MissingOutboundSession)
1749        );
1750
1751        let mut changes = Changes::default();
1752
1753        changes.outbound_group_sessions.push(outbound.clone());
1754        changes.inbound_group_sessions.push(inbound.clone());
1755        machine.inner.store.save_changes(changes).await.unwrap();
1756        machine.inner.outbound_group_sessions.insert(outbound.clone());
1757
1758        // We don't share sessions with other user's devices if the session
1759        // wasn't shared in the first place.
1760        assert_matches!(
1761            machine.should_share_key(&bob_device, &inbound).await,
1762            Err(KeyForwardDecision::OutboundSessionNotShared)
1763        );
1764
1765        bob_device.set_trust_state(LocalTrust::Verified);
1766
1767        // We don't share sessions with other user's devices if the session
1768        // wasn't shared in the first place even if the device is trusted.
1769        assert_matches!(
1770            machine.should_share_key(&bob_device, &inbound).await,
1771            Err(KeyForwardDecision::OutboundSessionNotShared)
1772        );
1773
1774        // We now share the session, since it was shared before.
1775        outbound
1776            .mark_shared_with(
1777                bob_device.user_id(),
1778                bob_device.device_id(),
1779                bob_device.curve25519_key().unwrap(),
1780            )
1781            .await;
1782        machine.should_share_key(&bob_device, &inbound).await.unwrap();
1783
1784        let (other_outbound, other_inbound) =
1785            account.create_group_session_pair_with_defaults(room_id()).await;
1786
1787        // But we don't share some other session that doesn't match our outbound
1788        // session.
1789        assert_matches!(
1790            machine.should_share_key(&bob_device, &other_inbound).await,
1791            Err(KeyForwardDecision::MissingOutboundSession)
1792        );
1793
1794        // Finally, let's ensure we don't share the session with a device that rotated
1795        // its curve25519 key.
1796        let bob_device = DeviceData::from_account(&bob_account());
1797        machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1798
1799        let bob_device =
1800            machine.inner.store.get_device(bob_id(), bob_device_id()).await.unwrap().unwrap();
1801        assert_matches!(
1802            machine.should_share_key(&bob_device, &inbound).await,
1803            Err(KeyForwardDecision::ChangedSenderKey)
1804        );
1805
1806        // Now let's encrypt some messages in another session to increment the message
1807        // index and then share it with our own untrusted device.
1808        own_device.set_trust_state(LocalTrust::Unset);
1809
1810        for _ in 1..=3 {
1811            other_outbound.encrypt_helper("foo".to_owned()).await;
1812        }
1813        other_outbound
1814            .mark_shared_with(
1815                own_device.user_id(),
1816                own_device.device_id(),
1817                own_device.curve25519_key().unwrap(),
1818            )
1819            .await;
1820
1821        machine.inner.outbound_group_sessions.insert(other_outbound.clone());
1822
1823        // Since our device is untrusted, we should share the session starting only from
1824        // the current index (at which the message was marked as shared). This
1825        // should be 3 since we encrypted 3 messages.
1826        assert_matches!(machine.should_share_key(&own_device, &other_inbound).await, Ok(Some(3)));
1827
1828        own_device.set_trust_state(LocalTrust::Verified);
1829
1830        // However once our device is trusted, we share the entire session.
1831        assert_matches!(machine.should_share_key(&own_device, &other_inbound).await, Ok(None));
1832    }
1833
1834    #[cfg(feature = "automatic-room-key-forwarding")]
1835    async fn test_key_share_cycle(algorithm: EventEncryptionAlgorithm) {
1836        let (alice_machine, group_session, bob_machine) =
1837            machines_for_key_share_test_helper(alice_id(), true, algorithm).await;
1838
1839        // Get the request and convert it into a event.
1840        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
1841        let request = &requests[0];
1842        let event = request_to_event(alice_id(), alice_id(), request);
1843
1844        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1845
1846        // Bob doesn't have any outgoing requests.
1847        assert!(bob_machine.inner.outgoing_requests.read().is_empty());
1848
1849        // Receive the room key request from alice.
1850        bob_machine.receive_incoming_key_request(&event);
1851
1852        {
1853            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
1854            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
1855        }
1856        // Now bob does have an outgoing request.
1857        assert!(!bob_machine.inner.outgoing_requests.read().is_empty());
1858
1859        // Get the request and convert it to a encrypted to-device event.
1860        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
1861        let request = &requests[0];
1862
1863        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), alice_id(), request);
1864        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1865
1866        // Check that alice doesn't have the session.
1867        assert!(
1868            alice_machine
1869                .inner
1870                .store
1871                .get_inbound_group_session(room_id(), group_session.session_id())
1872                .await
1873                .unwrap()
1874                .is_none()
1875        );
1876
1877        let decrypted = alice_machine
1878            .inner
1879            .store
1880            .with_transaction(async |tr| {
1881                let res = tr
1882                    .account()
1883                    .await?
1884                    .decrypt_to_device_event(
1885                        &alice_machine.inner.store,
1886                        &event,
1887                        &DecryptionSettings {
1888                            sender_device_trust_requirement: TrustRequirement::Untrusted,
1889                        },
1890                    )
1891                    .await?;
1892                Ok(res)
1893            })
1894            .await
1895            .unwrap();
1896
1897        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
1898            panic!("Invalid decrypted event type");
1899        };
1900
1901        let session = alice_machine
1902            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
1903            .await
1904            .unwrap();
1905        alice_machine.inner.store.save_inbound_group_sessions(&[session.unwrap()]).await.unwrap();
1906
1907        // Check that alice now does have the session.
1908        let session = alice_machine
1909            .inner
1910            .store
1911            .get_inbound_group_session(room_id(), group_session.session_id())
1912            .await
1913            .unwrap()
1914            .unwrap();
1915
1916        assert_eq!(session.session_id(), group_session.session_id())
1917    }
1918
1919    #[async_test]
1920    #[cfg(feature = "automatic-room-key-forwarding")]
1921    async fn test_reject_forward_from_another_user() {
1922        let (alice_machine, group_session, bob_machine) = machines_for_key_share_test_helper(
1923            bob_id(),
1924            true,
1925            EventEncryptionAlgorithm::MegolmV1AesSha2,
1926        )
1927        .await;
1928
1929        // Get the request and convert it into a event.
1930        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
1931        let request = &requests[0];
1932        let event = request_to_event(alice_id(), alice_id(), request);
1933
1934        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1935
1936        // Bob doesn't have any outgoing requests.
1937        assert!(bob_machine.inner.outgoing_requests.read().is_empty());
1938
1939        // Receive the room key request from alice.
1940        bob_machine.receive_incoming_key_request(&event);
1941        {
1942            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
1943            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
1944        }
1945        // Now bob does have an outgoing request.
1946        assert!(!bob_machine.inner.outgoing_requests.read().is_empty());
1947
1948        // Get the request and convert it to a encrypted to-device event.
1949        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
1950        let request = &requests[0];
1951
1952        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), bob_id(), request);
1953        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1954
1955        // Check that alice doesn't have the session.
1956        assert!(
1957            alice_machine
1958                .inner
1959                .store
1960                .get_inbound_group_session(room_id(), group_session.session_id())
1961                .await
1962                .unwrap()
1963                .is_none()
1964        );
1965
1966        let decrypted = alice_machine
1967            .inner
1968            .store
1969            .with_transaction(async |tr| {
1970                let res = tr
1971                    .account()
1972                    .await?
1973                    .decrypt_to_device_event(
1974                        &alice_machine.inner.store,
1975                        &event,
1976                        &DecryptionSettings {
1977                            sender_device_trust_requirement: TrustRequirement::Untrusted,
1978                        },
1979                    )
1980                    .await?;
1981                Ok(res)
1982            })
1983            .await
1984            .unwrap();
1985
1986        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
1987            panic!("Invalid decrypted event type");
1988        };
1989
1990        let session = alice_machine
1991            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
1992            .await
1993            .unwrap();
1994
1995        assert!(session.is_none(), "We should not receive a room key from another user");
1996    }
1997
1998    #[async_test]
1999    #[cfg(feature = "automatic-room-key-forwarding")]
2000    async fn test_key_share_cycle_megolm_v1() {
2001        test_key_share_cycle(EventEncryptionAlgorithm::MegolmV1AesSha2).await;
2002    }
2003
2004    #[async_test]
2005    #[cfg(all(feature = "experimental-algorithms", feature = "automatic-room-key-forwarding"))]
2006    async fn test_key_share_cycle_megolm_v2() {
2007        test_key_share_cycle(EventEncryptionAlgorithm::MegolmV2AesSha2).await;
2008    }
2009
2010    #[async_test]
2011    async fn test_secret_share_cycle() {
2012        let alice_machine = get_machine_test_helper().await;
2013
2014        let mut second_account = alice_2_account();
2015        let alice_device = DeviceData::from_account(&second_account);
2016
2017        let bob_account = bob_account();
2018        let bob_device = DeviceData::from_account(&bob_account);
2019
2020        let devices = std::slice::from_ref(&alice_device);
2021        alice_machine.inner.store.save_device_data(devices).await.unwrap();
2022
2023        // Create Olm sessions for our two accounts.
2024        let alice_session = alice_machine
2025            .inner
2026            .store
2027            .with_transaction(async |tr| {
2028                let alice_account = tr.account().await?;
2029                let (alice_session, _) =
2030                    alice_account.create_session_for_test_helper(&mut second_account).await;
2031                Ok(alice_session)
2032            })
2033            .await
2034            .unwrap();
2035
2036        alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
2037
2038        let event = RumaToDeviceEvent::new(
2039            bob_account.user_id().to_owned(),
2040            ToDeviceSecretRequestEventContent::new(
2041                RequestAction::Request(SecretRequestAction::new(SecretName::CrossSigningMasterKey)),
2042                bob_account.device_id().to_owned(),
2043                "request_id".into(),
2044            ),
2045        );
2046
2047        // No secret found
2048        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
2049        alice_machine.receive_incoming_secret_request(&event);
2050        {
2051            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
2052            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
2053        }
2054        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
2055
2056        // No device found
2057        alice_machine.inner.store.reset_cross_signing_identity().await;
2058        alice_machine.receive_incoming_secret_request(&event);
2059        {
2060            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
2061            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
2062        }
2063        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
2064
2065        alice_machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
2066
2067        // The device doesn't belong to us
2068        alice_machine.inner.store.reset_cross_signing_identity().await;
2069        alice_machine.receive_incoming_secret_request(&event);
2070        {
2071            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
2072            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
2073        }
2074        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
2075
2076        let event = RumaToDeviceEvent::new(
2077            alice_id().to_owned(),
2078            ToDeviceSecretRequestEventContent::new(
2079                RequestAction::Request(SecretRequestAction::new(SecretName::CrossSigningMasterKey)),
2080                second_account.device_id().into(),
2081                "request_id".into(),
2082            ),
2083        );
2084
2085        // The device isn't trusted
2086        alice_machine.receive_incoming_secret_request(&event);
2087        {
2088            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
2089            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
2090        }
2091        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
2092
2093        // We need a trusted device, otherwise we won't serve secrets
2094        alice_device.set_trust_state(LocalTrust::Verified);
2095        let devices = std::slice::from_ref(&alice_device);
2096        alice_machine.inner.store.save_device_data(devices).await.unwrap();
2097
2098        alice_machine.receive_incoming_secret_request(&event);
2099        {
2100            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
2101            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
2102        }
2103        assert!(!alice_machine.inner.outgoing_requests.read().is_empty());
2104    }
2105
2106    #[async_test]
2107    async fn test_secret_broadcasting() {
2108        use futures_util::{FutureExt, pin_mut};
2109        use ruma::api::client::to_device::send_event_to_device::v3::Response as ToDeviceResponse;
2110        use serde_json::value::to_raw_value;
2111        use tokio_stream::StreamExt;
2112
2113        use crate::{
2114            EncryptionSyncChanges,
2115            machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper,
2116        };
2117
2118        let alice_id = user_id!("@alice:localhost");
2119
2120        let (alice_machine, bob_machine) =
2121            get_machine_pair_with_setup_sessions_test_helper(alice_id, alice_id, false).await;
2122
2123        let key_requests = GossipMachine::request_missing_secrets(
2124            bob_machine.user_id(),
2125            vec![SecretName::RecoveryKey],
2126        );
2127        let mut changes = Changes::default();
2128        let request_id = key_requests[0].request_id.to_owned();
2129        changes.key_requests = key_requests;
2130        bob_machine.store().save_changes(changes).await.unwrap();
2131        for request in bob_machine.outgoing_requests().await.unwrap() {
2132            bob_machine
2133                .mark_request_as_sent(request.request_id(), &ToDeviceResponse::new())
2134                .await
2135                .unwrap();
2136        }
2137
2138        let event = RumaToDeviceEvent::new(
2139            alice_machine.user_id().to_owned(),
2140            ToDeviceSecretRequestEventContent::new(
2141                RequestAction::Request(SecretRequestAction::new(SecretName::RecoveryKey)),
2142                bob_machine.device_id().to_owned(),
2143                request_id,
2144            ),
2145        );
2146
2147        let bob_device = alice_machine
2148            .get_device(alice_id, bob_machine.device_id(), None)
2149            .await
2150            .unwrap()
2151            .unwrap();
2152        let alice_device = bob_machine
2153            .get_device(alice_id, alice_machine.device_id(), None)
2154            .await
2155            .unwrap()
2156            .unwrap();
2157
2158        // We need a trusted device, otherwise we won't serve nor accept secrets.
2159        bob_device.set_trust_state(LocalTrust::Verified);
2160        alice_device.set_trust_state(LocalTrust::Verified);
2161        alice_machine.store().save_device_data(&[bob_device.inner]).await.unwrap();
2162        bob_machine.store().save_device_data(&[alice_device.inner]).await.unwrap();
2163
2164        let decryption_key = crate::store::types::BackupDecryptionKey::new();
2165        alice_machine
2166            .backup_machine()
2167            .save_decryption_key(Some(decryption_key), None)
2168            .await
2169            .unwrap();
2170        alice_machine.inner.key_request_machine.receive_incoming_secret_request(&event);
2171        {
2172            let alice_cache = alice_machine.store().cache().await.unwrap();
2173            alice_machine
2174                .inner
2175                .key_request_machine
2176                .collect_incoming_key_requests(&alice_cache)
2177                .await
2178                .unwrap();
2179        }
2180
2181        let requests =
2182            alice_machine.inner.key_request_machine.outgoing_to_device_requests().await.unwrap();
2183
2184        assert_eq!(requests.len(), 1);
2185        let request = requests.first().expect("We should have an outgoing to-device request");
2186
2187        let event: EncryptedToDeviceEvent =
2188            request_to_event(bob_machine.user_id(), alice_machine.user_id(), request);
2189        let event = Raw::from_json(to_raw_value(&event).unwrap());
2190
2191        let stream = bob_machine.store().secrets_stream();
2192        pin_mut!(stream);
2193
2194        let decryption_settings =
2195            DecryptionSettings { sender_device_trust_requirement: TrustRequirement::Untrusted };
2196
2197        bob_machine
2198            .receive_sync_changes(
2199                EncryptionSyncChanges {
2200                    to_device_events: vec![event],
2201                    changed_devices: &Default::default(),
2202                    one_time_keys_counts: &Default::default(),
2203                    unused_fallback_keys: None,
2204                    next_batch_token: None,
2205                },
2206                &decryption_settings,
2207            )
2208            .await
2209            .unwrap();
2210
2211        stream.next().now_or_never().expect("The broadcaster should have sent out the secret");
2212    }
2213
2214    #[async_test]
2215    #[cfg(feature = "automatic-room-key-forwarding")]
2216    async fn test_key_share_cycle_without_session() {
2217        let (alice_machine, group_session, bob_machine) = machines_for_key_share_test_helper(
2218            alice_id(),
2219            false,
2220            EventEncryptionAlgorithm::MegolmV1AesSha2,
2221        )
2222        .await;
2223
2224        // Get the request and convert it into a event.
2225        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
2226        let request = &requests[0];
2227        let event = request_to_event(alice_id(), alice_id(), request);
2228
2229        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
2230
2231        // Bob doesn't have any outgoing requests.
2232        assert!(bob_machine.outgoing_to_device_requests().await.unwrap().is_empty());
2233        assert!(bob_machine.inner.users_for_key_claim.read().is_empty());
2234        assert!(bob_machine.inner.wait_queue.is_empty());
2235
2236        // Receive the room key request from alice.
2237        bob_machine.receive_incoming_key_request(&event);
2238        {
2239            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
2240            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
2241        }
2242        // Bob only has a keys claim request, since we're lacking a session
2243        assert_eq!(bob_machine.outgoing_to_device_requests().await.unwrap().len(), 1);
2244        assert_matches!(
2245            bob_machine.outgoing_to_device_requests().await.unwrap()[0].request(),
2246            AnyOutgoingRequest::KeysClaim(_)
2247        );
2248        assert!(!bob_machine.inner.users_for_key_claim.read().is_empty());
2249        assert!(!bob_machine.inner.wait_queue.is_empty());
2250
2251        let (alice_session, bob_session) = alice_machine
2252            .inner
2253            .store
2254            .with_transaction(async |atr| {
2255                let res = bob_machine
2256                    .inner
2257                    .store
2258                    .with_transaction(async |btr| {
2259                        let alice_account = atr.account().await?;
2260                        let bob_account = btr.account().await?;
2261                        let sessions =
2262                            alice_account.create_session_for_test_helper(bob_account).await;
2263                        Ok(sessions)
2264                    })
2265                    .await?;
2266                Ok(res)
2267            })
2268            .await
2269            .unwrap();
2270
2271        // We create a session now.
2272        alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
2273        bob_machine.inner.store.save_sessions(&[bob_session]).await.unwrap();
2274
2275        bob_machine.retry_keyshare(alice_id(), alice_device_id());
2276        assert!(bob_machine.inner.users_for_key_claim.read().is_empty());
2277        {
2278            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
2279            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
2280        }
2281        // Bob now has an outgoing requests.
2282        assert!(!bob_machine.outgoing_to_device_requests().await.unwrap().is_empty());
2283        assert!(bob_machine.inner.wait_queue.is_empty());
2284
2285        // Get the request and convert it to a encrypted to-device event.
2286        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
2287        let request = &requests[0];
2288
2289        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), alice_id(), request);
2290        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
2291
2292        // Check that alice doesn't have the session.
2293        assert!(
2294            alice_machine
2295                .inner
2296                .store
2297                .get_inbound_group_session(room_id(), group_session.session_id())
2298                .await
2299                .unwrap()
2300                .is_none()
2301        );
2302
2303        let decrypted = alice_machine
2304            .inner
2305            .store
2306            .with_transaction(async |tr| {
2307                let res = tr
2308                    .account()
2309                    .await?
2310                    .decrypt_to_device_event(
2311                        &alice_machine.inner.store,
2312                        &event,
2313                        &DecryptionSettings {
2314                            sender_device_trust_requirement: TrustRequirement::Untrusted,
2315                        },
2316                    )
2317                    .await?;
2318                Ok(res)
2319            })
2320            .await
2321            .unwrap();
2322
2323        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
2324            panic!("Invalid decrypted event type");
2325        };
2326
2327        let session = alice_machine
2328            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
2329            .await
2330            .unwrap();
2331        alice_machine.inner.store.save_inbound_group_sessions(&[session.unwrap()]).await.unwrap();
2332
2333        // Check that alice now does have the session.
2334        let session = alice_machine
2335            .inner
2336            .store
2337            .get_inbound_group_session(room_id(), group_session.session_id())
2338            .await
2339            .unwrap()
2340            .unwrap();
2341
2342        assert_eq!(session.session_id(), group_session.session_id())
2343    }
2344
2345    /// Set up OlmMachines for the secret-pushing tests
2346    #[cfg(feature = "experimental-push-secrets")]
2347    async fn set_up_secret_push() -> (
2348        crate::machine::OlmMachine,
2349        crate::identities::device::Device,
2350        crate::machine::OlmMachine,
2351        crate::identities::device::Device,
2352        crate::store::types::BackupDecryptionKey,
2353    ) {
2354        use crate::machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper;
2355
2356        let alice_id = user_id!("@alice:localhost");
2357
2358        let (alice_machine, bob_machine) =
2359            get_machine_pair_with_setup_sessions_test_helper(alice_id, alice_id, false).await;
2360
2361        let bob_device = alice_machine
2362            .get_device(alice_id, bob_machine.device_id(), None)
2363            .await
2364            .unwrap()
2365            .unwrap();
2366        let alice_device = bob_machine
2367            .get_device(alice_id, alice_machine.device_id(), None)
2368            .await
2369            .unwrap()
2370            .unwrap();
2371
2372        let decryption_key = crate::store::types::BackupDecryptionKey::new();
2373        alice_machine
2374            .backup_machine()
2375            .save_decryption_key(Some(decryption_key.clone()), None)
2376            .await
2377            .unwrap();
2378
2379        (alice_machine, alice_device, bob_machine, bob_device, decryption_key)
2380    }
2381
2382    #[async_test]
2383    #[cfg(feature = "experimental-push-secrets")]
2384    async fn test_secret_pushing() {
2385        let (alice_machine, _alice_device, _bob_machine, bob_device, _decryption_key) =
2386            set_up_secret_push().await;
2387
2388        // try to push a secret, but the other device isn't verified, so nothing
2389        // should happen
2390        alice_machine
2391            .inner
2392            .key_request_machine
2393            .push_secret_to_verified_devices(SecretName::RecoveryKey)
2394            .await
2395            .unwrap();
2396        {
2397            let alice_cache = alice_machine.store().cache().await.unwrap();
2398            alice_machine
2399                .inner
2400                .key_request_machine
2401                .collect_incoming_key_requests(&alice_cache)
2402                .await
2403                .unwrap();
2404        }
2405
2406        let requests =
2407            alice_machine.inner.key_request_machine.outgoing_to_device_requests().await.unwrap();
2408
2409        assert_eq!(requests.len(), 0);
2410
2411        // Now the device is trusted, so the secret should be pushed
2412        bob_device.set_trust_state(LocalTrust::Verified);
2413        alice_machine.store().save_device_data(&[bob_device.inner]).await.unwrap();
2414
2415        alice_machine
2416            .inner
2417            .key_request_machine
2418            .push_secret_to_verified_devices(SecretName::RecoveryKey)
2419            .await
2420            .unwrap();
2421        {
2422            let alice_cache = alice_machine.store().cache().await.unwrap();
2423            alice_machine
2424                .inner
2425                .key_request_machine
2426                .collect_incoming_key_requests(&alice_cache)
2427                .await
2428                .unwrap();
2429        }
2430
2431        let requests =
2432            alice_machine.inner.key_request_machine.outgoing_to_device_requests().await.unwrap();
2433
2434        assert_eq!(requests.len(), 1);
2435    }
2436
2437    #[async_test]
2438    #[cfg(feature = "experimental-push-secrets")]
2439    async fn test_secret_push_receive() {
2440        use futures_util::{FutureExt, pin_mut};
2441        use serde_json::value::to_raw_value;
2442        use tokio_stream::StreamExt;
2443
2444        use crate::EncryptionSyncChanges;
2445
2446        let (alice_machine, alice_device, bob_machine, bob_device, decryption_key) =
2447            set_up_secret_push().await;
2448
2449        // Push the secret to Bob
2450        bob_device.set_trust_state(LocalTrust::Verified);
2451        alice_machine.store().save_device_data(&[bob_device.inner]).await.unwrap();
2452
2453        alice_machine
2454            .inner
2455            .key_request_machine
2456            .push_secret_to_verified_devices(SecretName::RecoveryKey)
2457            .await
2458            .unwrap();
2459        {
2460            let alice_cache = alice_machine.store().cache().await.unwrap();
2461            alice_machine
2462                .inner
2463                .key_request_machine
2464                .collect_incoming_key_requests(&alice_cache)
2465                .await
2466                .unwrap();
2467        }
2468
2469        let requests =
2470            alice_machine.inner.key_request_machine.outgoing_to_device_requests().await.unwrap();
2471
2472        assert_eq!(requests.len(), 1);
2473        let request = requests.first().expect("We should have an outgoing to-device request");
2474
2475        // Since Alice is trusted, we should get the secret
2476        alice_device.set_trust_state(LocalTrust::Verified);
2477        bob_machine.store().save_device_data(&[alice_device.inner]).await.unwrap();
2478        let event: EncryptedToDeviceEvent =
2479            request_to_event(bob_machine.user_id(), alice_machine.user_id(), request);
2480        let event = Raw::from_json(to_raw_value(&event).unwrap());
2481
2482        let stream = bob_machine.store().secrets_stream();
2483        pin_mut!(stream);
2484
2485        let decryption_settings =
2486            DecryptionSettings { sender_device_trust_requirement: TrustRequirement::Untrusted };
2487
2488        bob_machine
2489            .receive_sync_changes(
2490                EncryptionSyncChanges {
2491                    to_device_events: vec![event],
2492                    changed_devices: &Default::default(),
2493                    one_time_keys_counts: &Default::default(),
2494                    unused_fallback_keys: None,
2495                    next_batch_token: None,
2496                },
2497                &decryption_settings,
2498            )
2499            .await
2500            .unwrap();
2501
2502        let secret = stream
2503            .next()
2504            .now_or_never()
2505            .flatten()
2506            .expect("The broadcaster should have sent out the secret");
2507
2508        assert_eq!(secret.secret.deref(), &decryption_key.to_base64())
2509    }
2510
2511    #[async_test]
2512    #[cfg(feature = "experimental-push-secrets")]
2513    async fn test_secret_push_receive_untrusted() {
2514        use futures_util::{FutureExt, pin_mut};
2515        use serde_json::value::to_raw_value;
2516        use tokio_stream::StreamExt;
2517
2518        use crate::EncryptionSyncChanges;
2519
2520        let (alice_machine, _alice_device, bob_machine, bob_device, _decryption_key) =
2521            set_up_secret_push().await;
2522
2523        // Push the secret to Bob
2524        bob_device.set_trust_state(LocalTrust::Verified);
2525        alice_machine.store().save_device_data(&[bob_device.inner]).await.unwrap();
2526
2527        alice_machine
2528            .inner
2529            .key_request_machine
2530            .push_secret_to_verified_devices(SecretName::RecoveryKey)
2531            .await
2532            .unwrap();
2533        {
2534            let alice_cache = alice_machine.store().cache().await.unwrap();
2535            alice_machine
2536                .inner
2537                .key_request_machine
2538                .collect_incoming_key_requests(&alice_cache)
2539                .await
2540                .unwrap();
2541        }
2542
2543        let requests =
2544            alice_machine.inner.key_request_machine.outgoing_to_device_requests().await.unwrap();
2545
2546        assert_eq!(requests.len(), 1);
2547        let request = requests.first().expect("We should have an outgoing to-device request");
2548
2549        // Test receiving the event.  Alice isn't trusted, so the secret will be
2550        // dropped
2551        let event: EncryptedToDeviceEvent =
2552            request_to_event(bob_machine.user_id(), alice_machine.user_id(), request);
2553        let event = Raw::from_json(to_raw_value(&event).unwrap());
2554
2555        let stream = bob_machine.store().secrets_stream();
2556        pin_mut!(stream);
2557
2558        let decryption_settings =
2559            DecryptionSettings { sender_device_trust_requirement: TrustRequirement::Untrusted };
2560
2561        bob_machine
2562            .receive_sync_changes(
2563                EncryptionSyncChanges {
2564                    to_device_events: vec![event.clone()],
2565                    changed_devices: &Default::default(),
2566                    one_time_keys_counts: &Default::default(),
2567                    unused_fallback_keys: None,
2568                    next_batch_token: None,
2569                },
2570                &decryption_settings,
2571            )
2572            .await
2573            .unwrap();
2574
2575        assert!(stream.next().now_or_never().flatten().is_none());
2576    }
2577}