matrix_sdk_crypto/gossiping/
machine.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// TODO
16//
17// handle the case where we can't create a session with a device. clearing our
18// stale key share requests that we'll never be able to handle.
19//
20// If we don't trust the device store an object that remembers the request and
21// let the users introspect that object.
22
23use std::{
24    collections::{btree_map::Entry, BTreeMap, BTreeSet},
25    mem,
26    sync::{
27        atomic::{AtomicBool, Ordering},
28        Arc,
29    },
30};
31
32use matrix_sdk_common::locks::RwLock as StdRwLock;
33use ruma::{
34    api::client::keys::claim_keys::v3::Request as KeysClaimRequest,
35    events::secret::request::{
36        RequestAction, SecretName, ToDeviceSecretRequestEvent as SecretRequestEvent,
37    },
38    DeviceId, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedTransactionId, OwnedUserId, RoomId,
39    TransactionId, UserId,
40};
41use tracing::{debug, field::debug, info, instrument, trace, warn, Span};
42use vodozemac::Curve25519PublicKey;
43
44use super::{GossipRequest, GossippedSecret, RequestEvent, RequestInfo, SecretInfo, WaitQueue};
45use crate::{
46    error::{EventError, OlmError, OlmResult},
47    identities::IdentityManager,
48    olm::{InboundGroupSession, Session},
49    session_manager::GroupSessionCache,
50    store::{caches::StoreCache, types::Changes, CryptoStoreError, SecretImportError, Store},
51    types::{
52        events::{
53            forwarded_room_key::ForwardedRoomKeyContent,
54            olm_v1::{DecryptedForwardedRoomKeyEvent, DecryptedSecretSendEvent},
55            room::encrypted::EncryptedEvent,
56            room_key_request::RoomKeyRequestEvent,
57            secret_send::SecretSendContent,
58            EventType,
59        },
60        requests::{OutgoingRequest, ToDeviceRequest},
61    },
62    Device, MegolmError,
63};
64
65#[derive(Clone, Debug)]
66pub(crate) struct GossipMachine {
67    inner: Arc<GossipMachineInner>,
68}
69
70#[derive(Debug)]
71pub(crate) struct GossipMachineInner {
72    store: Store,
73    #[cfg(feature = "automatic-room-key-forwarding")]
74    outbound_group_sessions: GroupSessionCache,
75    outgoing_requests: StdRwLock<BTreeMap<OwnedTransactionId, OutgoingRequest>>,
76    incoming_key_requests: StdRwLock<BTreeMap<RequestInfo, RequestEvent>>,
77    wait_queue: WaitQueue,
78    users_for_key_claim: Arc<StdRwLock<BTreeMap<OwnedUserId, BTreeSet<OwnedDeviceId>>>>,
79
80    /// Whether we should respond to incoming `m.room_key_request` messages.
81    room_key_forwarding_enabled: AtomicBool,
82
83    /// Whether we should send out `m.room_key_request` messages.
84    room_key_requests_enabled: AtomicBool,
85
86    identity_manager: IdentityManager,
87}
88
89impl GossipMachine {
90    pub fn new(
91        store: Store,
92        identity_manager: IdentityManager,
93        #[allow(unused)] outbound_group_sessions: GroupSessionCache,
94        users_for_key_claim: Arc<StdRwLock<BTreeMap<OwnedUserId, BTreeSet<OwnedDeviceId>>>>,
95    ) -> Self {
96        let room_key_forwarding_enabled =
97            AtomicBool::new(cfg!(feature = "automatic-room-key-forwarding"));
98
99        let room_key_requests_enabled =
100            AtomicBool::new(cfg!(feature = "automatic-room-key-forwarding"));
101
102        Self {
103            inner: Arc::new(GossipMachineInner {
104                store,
105                #[cfg(feature = "automatic-room-key-forwarding")]
106                outbound_group_sessions,
107                outgoing_requests: Default::default(),
108                incoming_key_requests: Default::default(),
109                wait_queue: WaitQueue::new(),
110                users_for_key_claim,
111                room_key_forwarding_enabled,
112                room_key_requests_enabled,
113                identity_manager,
114            }),
115        }
116    }
117
118    pub(crate) fn identity_manager(&self) -> &IdentityManager {
119        &self.inner.identity_manager
120    }
121
122    #[cfg(feature = "automatic-room-key-forwarding")]
123    pub fn set_room_key_forwarding_enabled(&self, enabled: bool) {
124        self.inner.room_key_forwarding_enabled.store(enabled, Ordering::SeqCst)
125    }
126
127    pub fn is_room_key_forwarding_enabled(&self) -> bool {
128        self.inner.room_key_forwarding_enabled.load(Ordering::SeqCst)
129    }
130
131    /// Configure whether we should send outgoing `m.room_key_request`s on
132    /// decryption failure.
133    #[cfg(feature = "automatic-room-key-forwarding")]
134    pub fn set_room_key_requests_enabled(&self, enabled: bool) {
135        self.inner.room_key_requests_enabled.store(enabled, Ordering::SeqCst)
136    }
137
138    /// Query whether we should send outgoing `m.room_key_request`s on
139    /// decryption failure.
140    pub fn are_room_key_requests_enabled(&self) -> bool {
141        self.inner.room_key_requests_enabled.load(Ordering::SeqCst)
142    }
143
144    /// Load stored outgoing requests that were not yet sent out.
145    async fn load_outgoing_requests(&self) -> Result<Vec<OutgoingRequest>, CryptoStoreError> {
146        Ok(self
147            .inner
148            .store
149            .get_unsent_secret_requests()
150            .await?
151            .into_iter()
152            .filter(|i| !i.sent_out)
153            .map(|info| info.to_request(self.device_id()))
154            .collect())
155    }
156
157    /// Our own user id.
158    pub fn user_id(&self) -> &UserId {
159        &self.inner.store.static_account().user_id
160    }
161
162    /// Our own device ID.
163    pub fn device_id(&self) -> &DeviceId {
164        &self.inner.store.static_account().device_id
165    }
166
167    pub async fn outgoing_to_device_requests(
168        &self,
169    ) -> Result<Vec<OutgoingRequest>, CryptoStoreError> {
170        let mut key_requests = self.load_outgoing_requests().await?;
171        let key_forwards: Vec<OutgoingRequest> =
172            self.inner.outgoing_requests.read().values().cloned().collect();
173        key_requests.extend(key_forwards);
174
175        let users_for_key_claim: BTreeMap<_, _> = self
176            .inner
177            .users_for_key_claim
178            .read()
179            .iter()
180            .map(|(key, value)| {
181                let device_map = value
182                    .iter()
183                    .map(|d| (d.to_owned(), OneTimeKeyAlgorithm::SignedCurve25519))
184                    .collect();
185
186                (key.to_owned(), device_map)
187            })
188            .collect();
189
190        if !users_for_key_claim.is_empty() {
191            let key_claim_request = KeysClaimRequest::new(users_for_key_claim);
192            key_requests.push(OutgoingRequest {
193                request_id: TransactionId::new(),
194                request: Arc::new(key_claim_request.into()),
195            });
196        }
197
198        Ok(key_requests)
199    }
200
201    /// Receive a room key request event.
202    pub fn receive_incoming_key_request(&self, event: &RoomKeyRequestEvent) {
203        self.receive_event(event.clone().into())
204    }
205
206    fn receive_event(&self, event: RequestEvent) {
207        // Some servers might send to-device events to ourselves if we send one
208        // out using a wildcard instead of a specific device as a recipient.
209        //
210        // Check if we're the sender of this request event and ignore it if
211        // so.
212        if event.sender() == self.user_id() && event.requesting_device_id() == self.device_id() {
213            trace!("Received a secret request event from ourselves, ignoring")
214        } else {
215            let request_info = event.to_request_info();
216            self.inner.incoming_key_requests.write().insert(request_info, event);
217        }
218    }
219
220    pub fn receive_incoming_secret_request(&self, event: &SecretRequestEvent) {
221        self.receive_event(event.clone().into())
222    }
223
224    /// Handle all the incoming key requests that are queued up and empty our
225    /// key request queue.
226    pub async fn collect_incoming_key_requests(
227        &self,
228        cache: &StoreCache,
229    ) -> OlmResult<Vec<Session>> {
230        let mut changed_sessions = Vec::new();
231
232        let incoming_key_requests = mem::take(&mut *self.inner.incoming_key_requests.write());
233
234        for event in incoming_key_requests.values() {
235            if let Some(s) = match event {
236                #[cfg(feature = "automatic-room-key-forwarding")]
237                RequestEvent::KeyShare(e) => Box::pin(self.handle_key_request(cache, e)).await?,
238                RequestEvent::Secret(e) => Box::pin(self.handle_secret_request(cache, e)).await?,
239                #[cfg(not(feature = "automatic-room-key-forwarding"))]
240                _ => None,
241            } {
242                changed_sessions.push(s);
243            }
244        }
245
246        Ok(changed_sessions)
247    }
248
249    /// Store the key share request for later, once we get an Olm session with
250    /// the given device [`retry_keyshare`](#method.retry_keyshare) should be
251    /// called.
252    fn handle_key_share_without_session(&self, device: Device, event: RequestEvent) {
253        self.inner
254            .users_for_key_claim
255            .write()
256            .entry(device.user_id().to_owned())
257            .or_default()
258            .insert(device.device_id().into());
259        self.inner.wait_queue.insert(&device, event);
260    }
261
262    /// Retry keyshares for a device that previously didn't have an Olm session
263    /// with us.
264    ///
265    /// This should be only called if the given user/device got a new Olm
266    /// session.
267    ///
268    /// # Arguments
269    ///
270    /// * `user_id` - The user id of the device that we created the Olm session
271    ///   with.
272    ///
273    /// * `device_id` - The device ID of the device that got the Olm session.
274    pub fn retry_keyshare(&self, user_id: &UserId, device_id: &DeviceId) {
275        if let Entry::Occupied(mut e) =
276            self.inner.users_for_key_claim.write().entry(user_id.to_owned())
277        {
278            e.get_mut().remove(device_id);
279
280            if e.get().is_empty() {
281                e.remove();
282            }
283        }
284
285        let mut incoming_key_requests = self.inner.incoming_key_requests.write();
286        for (key, event) in self.inner.wait_queue.remove(user_id, device_id) {
287            incoming_key_requests.entry(key).or_insert(event);
288        }
289    }
290
291    async fn handle_secret_request(
292        &self,
293        cache: &StoreCache,
294        event: &SecretRequestEvent,
295    ) -> OlmResult<Option<Session>> {
296        let secret_name = match &event.content.action {
297            RequestAction::Request(s) => s,
298            // We ignore cancellations here since there's nothing to serve.
299            RequestAction::RequestCancellation => return Ok(None),
300            action => {
301                warn!(?action, "Unknown secret request action");
302                return Ok(None);
303            }
304        };
305
306        let content = if let Some(secret) = self.inner.store.export_secret(secret_name).await? {
307            SecretSendContent::new(event.content.request_id.to_owned(), secret)
308        } else {
309            info!(?secret_name, "Can't serve a secret request, secret isn't found");
310            return Ok(None);
311        };
312
313        let device =
314            self.inner.store.get_device(&event.sender, &event.content.requesting_device_id).await?;
315
316        if let Some(device) = device {
317            if device.user_id() == self.user_id() {
318                if device.is_verified() {
319                    info!(
320                        user_id = ?device.user_id(),
321                        device_id = ?device.device_id(),
322                        ?secret_name,
323                        "Sharing a secret with a device",
324                    );
325
326                    match self.share_secret(&device, content).await {
327                        Ok(s) => Ok(Some(s)),
328                        Err(OlmError::MissingSession) => {
329                            info!(
330                                user_id = ?device.user_id(),
331                                device_id = ?device.device_id(),
332                                ?secret_name,
333                                "Secret request is missing an Olm session, \
334                                putting the request in the wait queue",
335                            );
336                            self.handle_key_share_without_session(device, event.clone().into());
337
338                            Ok(None)
339                        }
340                        Err(e) => Err(e),
341                    }
342                } else {
343                    info!(
344                        user_id = ?device.user_id(),
345                        device_id = ?device.device_id(),
346                        ?secret_name,
347                        "Received a secret request that we won't serve, the device isn't trusted",
348                    );
349
350                    Ok(None)
351                }
352            } else {
353                info!(
354                    user_id = ?device.user_id(),
355                    device_id = ?device.device_id(),
356                    ?secret_name,
357                    "Received a secret request that we won't serve, the device doesn't belong to us",
358                );
359
360                Ok(None)
361            }
362        } else {
363            warn!(
364                user_id = ?event.sender,
365                device_id = ?event.content.requesting_device_id,
366                ?secret_name,
367                "Received a secret request from an unknown device",
368            );
369            self.inner
370                .identity_manager
371                .key_query_manager
372                .synced(cache)
373                .await?
374                .mark_user_as_changed(&event.sender)
375                .await?;
376
377            Ok(None)
378        }
379    }
380
381    /// Try to encrypt the given `InboundGroupSession` for the given `Device` as
382    /// a forwarded room key.
383    ///
384    /// This method might fail if we do not share an 1-to-1 Olm session with the
385    /// given `Device`, in that case we're going to queue up an
386    /// `/keys/claim` request to be sent out and retry once the 1-to-1 Olm
387    /// session has been established.
388    #[cfg(feature = "automatic-room-key-forwarding")]
389    async fn try_to_forward_room_key(
390        &self,
391        event: &RoomKeyRequestEvent,
392        device: Device,
393        session: &InboundGroupSession,
394        message_index: Option<u32>,
395    ) -> OlmResult<Option<Session>> {
396        info!(?message_index, "Serving a room key request",);
397
398        match self.forward_room_key(session, &device, message_index).await {
399            Ok(s) => Ok(Some(s)),
400            Err(OlmError::MissingSession) => {
401                info!(
402                    "Key request is missing an Olm session, putting the request in the wait queue",
403                );
404                self.handle_key_share_without_session(device, event.to_owned().into());
405
406                Ok(None)
407            }
408            Err(OlmError::SessionExport(e)) => {
409                warn!(
410                    "Can't serve a room key request, the session \
411                     can't be exported into a forwarded room key: {e:?}",
412                );
413                Ok(None)
414            }
415            Err(e) => Err(e),
416        }
417    }
418
419    /// Answer a room key request after we found the matching
420    /// `InboundGroupSession`.
421    #[cfg(feature = "automatic-room-key-forwarding")]
422    async fn answer_room_key_request(
423        &self,
424        cache: &StoreCache,
425        event: &RoomKeyRequestEvent,
426        session: &InboundGroupSession,
427    ) -> OlmResult<Option<Session>> {
428        use super::KeyForwardDecision;
429
430        let device =
431            self.inner.store.get_device(&event.sender, &event.content.requesting_device_id).await?;
432
433        let Some(device) = device else {
434            warn!("Received a key request from an unknown device");
435            self.identity_manager()
436                .key_query_manager
437                .synced(cache)
438                .await?
439                .mark_user_as_changed(&event.sender)
440                .await?;
441
442            return Ok(None);
443        };
444
445        match self.should_share_key(&device, session).await {
446            Ok(message_index) => {
447                self.try_to_forward_room_key(event, device, session, message_index).await
448            }
449            Err(e) => {
450                if let KeyForwardDecision::ChangedSenderKey = e {
451                    warn!(
452                        "Received a key request from a device that changed \
453                         their Curve25519 sender key"
454                    );
455                } else {
456                    debug!(
457                        reason = ?e,
458                        "Received a key request that we won't serve",
459                    );
460                }
461
462                Ok(None)
463            }
464        }
465    }
466
467    #[cfg(feature = "automatic-room-key-forwarding")]
468    #[tracing::instrument(
469        skip_all,
470        fields(
471            user_id = ?event.sender,
472            device_id = ?event.content.requesting_device_id,
473            ?room_id,
474            session_id
475        )
476    )]
477    async fn handle_supported_key_request(
478        &self,
479        cache: &StoreCache,
480        event: &RoomKeyRequestEvent,
481        room_id: &RoomId,
482        session_id: &str,
483    ) -> OlmResult<Option<Session>> {
484        let session = self.inner.store.get_inbound_group_session(room_id, session_id).await?;
485
486        if let Some(s) = session {
487            self.answer_room_key_request(cache, event, &s).await
488        } else {
489            debug!("Received a room key request for an unknown inbound group session",);
490
491            Ok(None)
492        }
493    }
494
495    /// Handle a single incoming key request.
496    #[cfg(feature = "automatic-room-key-forwarding")]
497    async fn handle_key_request(
498        &self,
499        cache: &StoreCache,
500        event: &RoomKeyRequestEvent,
501    ) -> OlmResult<Option<Session>> {
502        use crate::types::events::room_key_request::{Action, RequestedKeyInfo};
503
504        if self.inner.room_key_forwarding_enabled.load(Ordering::SeqCst) {
505            match &event.content.action {
506                Action::Request(info) => match info {
507                    RequestedKeyInfo::MegolmV1AesSha2(i) => {
508                        self.handle_supported_key_request(cache, event, &i.room_id, &i.session_id)
509                            .await
510                    }
511                    #[cfg(feature = "experimental-algorithms")]
512                    RequestedKeyInfo::MegolmV2AesSha2(i) => {
513                        self.handle_supported_key_request(cache, event, &i.room_id, &i.session_id)
514                            .await
515                    }
516                    RequestedKeyInfo::Unknown(i) => {
517                        debug!(
518                            sender = ?event.sender,
519                            algorithm = ?i.algorithm,
520                            "Received a room key request for a unsupported algorithm"
521                        );
522                        Ok(None)
523                    }
524                },
525                // We ignore cancellations here since there's nothing to serve.
526                Action::Cancellation => Ok(None),
527            }
528        } else {
529            debug!(
530                sender = ?event.sender,
531                "Received a room key request, but room key forwarding has been turned off"
532            );
533            Ok(None)
534        }
535    }
536
537    async fn share_secret(
538        &self,
539        device: &Device,
540        content: SecretSendContent,
541    ) -> OlmResult<Session> {
542        let event_type = content.event_type().to_owned();
543        let (used_session, content) = device.encrypt(&event_type, content).await?;
544
545        let encrypted_event_type = content.event_type().to_owned();
546
547        let request = ToDeviceRequest::new(
548            device.user_id(),
549            device.device_id().to_owned(),
550            &encrypted_event_type,
551            content.cast(),
552        );
553
554        let request = OutgoingRequest {
555            request_id: request.txn_id.clone(),
556            request: Arc::new(request.into()),
557        };
558        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
559
560        Ok(used_session)
561    }
562
563    #[cfg(feature = "automatic-room-key-forwarding")]
564    async fn forward_room_key(
565        &self,
566        session: &InboundGroupSession,
567        device: &Device,
568        message_index: Option<u32>,
569    ) -> OlmResult<Session> {
570        let (used_session, content) =
571            device.encrypt_room_key_for_forwarding(session.clone(), message_index).await?;
572
573        let event_type = content.event_type().to_owned();
574
575        let request = ToDeviceRequest::new(
576            device.user_id(),
577            device.device_id().to_owned(),
578            &event_type,
579            content.cast(),
580        );
581
582        let request = OutgoingRequest {
583            request_id: request.txn_id.clone(),
584            request: Arc::new(request.into()),
585        };
586        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
587
588        Ok(used_session)
589    }
590
591    /// Check if it's ok to share a session with the given device.
592    ///
593    /// The logic for this is currently as follows:
594    ///
595    /// * Share the session in full, starting from the earliest known index, if
596    ///   the requesting device is our own, trusted (verified) device.
597    ///
598    /// * For other requesting devices, share only a limited session and only if
599    ///   we originally shared with that device because it was present when the
600    ///   message was initially sent. By limited, we mean that the session will
601    ///   not be shared in full, but only from the message index at that moment.
602    ///   Since this information is recorded in the outbound session, we need to
603    ///   have it for this to work.
604    ///
605    /// * In all other cases, refuse to share the session.
606    ///
607    /// # Arguments
608    ///
609    /// * `device` - The device that is requesting a session from us.
610    ///
611    /// * `session` - The session that was requested to be shared.
612    ///
613    /// # Return value
614    ///
615    /// A `Result` representing whether we should share the session:
616    ///
617    /// - `Ok(None)`: Should share the entire session, starting with the
618    ///   earliest known index.
619    /// - `Ok(Some(i))`: Should share the session, but only starting from index
620    ///   i.
621    /// - `Err(x)`: Should *refuse* to share the session. `x` is the reason for
622    ///   the refusal.
623    #[cfg(feature = "automatic-room-key-forwarding")]
624    async fn should_share_key(
625        &self,
626        device: &Device,
627        session: &InboundGroupSession,
628    ) -> Result<Option<u32>, super::KeyForwardDecision> {
629        use super::KeyForwardDecision;
630        use crate::olm::ShareState;
631
632        let outbound_session = self
633            .inner
634            .outbound_group_sessions
635            .get_or_load(session.room_id())
636            .await
637            .filter(|outgoing_session| outgoing_session.session_id() == session.session_id());
638
639        // If this is our own, verified device, we share the entire session from the
640        // earliest known index.
641        if device.user_id() == self.user_id() && device.is_verified() {
642            Ok(None)
643        // Otherwise, if the records show we previously shared with this device,
644        // we'll reshare the session from the index we previously shared
645        // at. For this, we need an outbound session because this
646        // information is recorded there.
647        } else if let Some(outbound) = outbound_session {
648            match outbound.sharing_view().get_share_state(&device.inner) {
649                ShareState::Shared { message_index, olm_wedging_index: _ } => {
650                    Ok(Some(message_index))
651                }
652                ShareState::SharedButChangedSenderKey => Err(KeyForwardDecision::ChangedSenderKey),
653                ShareState::NotShared => Err(KeyForwardDecision::OutboundSessionNotShared),
654            }
655        // Otherwise, there's not enough info to decide if we can safely share
656        // the session.
657        } else if device.user_id() == self.user_id() {
658            Err(KeyForwardDecision::UntrustedDevice)
659        } else {
660            Err(KeyForwardDecision::MissingOutboundSession)
661        }
662    }
663
664    /// Check if it's ok, or rather if it makes sense to automatically request
665    /// a key from our other devices.
666    ///
667    /// # Arguments
668    ///
669    /// * `key_info` - The info of our key request containing information about
670    ///   the key we wish to request.
671    #[cfg(feature = "automatic-room-key-forwarding")]
672    async fn should_request_key(&self, key_info: &SecretInfo) -> Result<bool, CryptoStoreError> {
673        if self.inner.room_key_requests_enabled.load(Ordering::SeqCst) {
674            let request = self.inner.store.get_secret_request_by_info(key_info).await?;
675
676            // Don't send out duplicate requests, users can re-request them if they
677            // think a second request might succeed.
678            if request.is_none() {
679                let devices = self.inner.store.get_user_devices(self.user_id()).await?;
680
681                // Devices will only respond to key requests if the devices are
682                // verified, if the device isn't verified by us it's unlikely that
683                // we're verified by them either. Don't request keys if there isn't
684                // at least one verified device.
685                Ok(devices.is_any_verified())
686            } else {
687                Ok(false)
688            }
689        } else {
690            Ok(false)
691        }
692    }
693
694    /// Create a new outgoing key request for the key with the given session id.
695    ///
696    /// This will queue up a new to-device request and store the key info so
697    /// once we receive a forwarded room key we can check that it matches the
698    /// key we requested.
699    ///
700    /// This method will return a cancel request and a new key request if the
701    /// key was already requested, otherwise it will return just the key
702    /// request.
703    ///
704    /// # Arguments
705    ///
706    /// * `room_id` - The id of the room where the key is used in.
707    ///
708    /// * `event` - The event for which we would like to request the room key.
709    pub async fn request_key(
710        &self,
711        room_id: &RoomId,
712        event: &EncryptedEvent,
713    ) -> Result<(Option<OutgoingRequest>, OutgoingRequest), MegolmError> {
714        let secret_info =
715            event.room_key_info(room_id).ok_or(EventError::UnsupportedAlgorithm)?.into();
716
717        let request = self.inner.store.get_secret_request_by_info(&secret_info).await?;
718
719        if let Some(request) = request {
720            let cancel = request.to_cancellation(self.device_id());
721            let request = request.to_request(self.device_id());
722
723            Ok((Some(cancel), request))
724        } else {
725            let request = self.request_key_helper(secret_info).await?;
726
727            Ok((None, request))
728        }
729    }
730
731    /// Create outgoing secret requests for the given
732    pub fn request_missing_secrets(
733        own_user_id: &UserId,
734        secret_names: Vec<SecretName>,
735    ) -> Vec<GossipRequest> {
736        if !secret_names.is_empty() {
737            info!(?secret_names, "Creating new outgoing secret requests");
738
739            secret_names
740                .into_iter()
741                .map(|n| GossipRequest::from_secret_name(own_user_id.to_owned(), n))
742                .collect()
743        } else {
744            trace!("No secrets are missing from our store, not requesting them");
745            vec![]
746        }
747    }
748
749    async fn request_key_helper(
750        &self,
751        key_info: SecretInfo,
752    ) -> Result<OutgoingRequest, CryptoStoreError> {
753        let request = GossipRequest {
754            request_recipient: self.user_id().to_owned(),
755            request_id: TransactionId::new(),
756            info: key_info,
757            sent_out: false,
758        };
759
760        let outgoing_request = request.to_request(self.device_id());
761        self.save_outgoing_key_info(request).await?;
762
763        Ok(outgoing_request)
764    }
765
766    /// Create a new outgoing key request for the key with the given session id.
767    ///
768    /// This will queue up a new to-device request and store the key info so
769    /// once we receive a forwarded room key we can check that it matches the
770    /// key we requested.
771    ///
772    /// This does nothing if a request for this key has already been sent out.
773    ///
774    /// # Arguments
775    /// * `room_id` - The id of the room where the key is used in.
776    ///
777    /// * `event` - The event for which we would like to request the room key.
778    #[cfg(feature = "automatic-room-key-forwarding")]
779    pub async fn create_outgoing_key_request(
780        &self,
781        room_id: &RoomId,
782        event: &EncryptedEvent,
783    ) -> Result<bool, CryptoStoreError> {
784        if let Some(info) = event.room_key_info(room_id).map(|i| i.into()) {
785            if self.should_request_key(&info).await? {
786                // Size of the request_key_helper future should not impact this
787                // async fn since it is likely enough that this branch won't be
788                // entered.
789                Box::pin(self.request_key_helper(info)).await?;
790                return Ok(true);
791            }
792        }
793
794        Ok(false)
795    }
796
797    /// Save an outgoing key info.
798    async fn save_outgoing_key_info(&self, info: GossipRequest) -> Result<(), CryptoStoreError> {
799        let mut changes = Changes::default();
800        changes.key_requests.push(info);
801        self.inner.store.save_changes(changes).await?;
802
803        Ok(())
804    }
805
806    /// Delete the given outgoing key info.
807    async fn delete_key_info(&self, info: &GossipRequest) -> Result<(), CryptoStoreError> {
808        self.inner.store.delete_outgoing_secret_requests(&info.request_id).await
809    }
810
811    /// Mark the outgoing request as sent.
812    pub async fn mark_outgoing_request_as_sent(
813        &self,
814        id: &TransactionId,
815    ) -> Result<(), CryptoStoreError> {
816        let info = self.inner.store.get_outgoing_secret_requests(id).await?;
817
818        if let Some(mut info) = info {
819            trace!(
820                recipient = ?info.request_recipient,
821                request_type = info.request_type(),
822                request_id = ?info.request_id,
823                "Marking outgoing secret request as sent"
824            );
825            info.sent_out = true;
826            self.save_outgoing_key_info(info).await?;
827        }
828
829        self.inner.outgoing_requests.write().remove(id);
830
831        Ok(())
832    }
833
834    /// Mark the given outgoing key info as done.
835    ///
836    /// This will queue up a request cancellation.
837    async fn mark_as_done(&self, key_info: &GossipRequest) -> Result<(), CryptoStoreError> {
838        trace!(
839            recipient = ?key_info.request_recipient,
840            request_type = key_info.request_type(),
841            request_id = ?key_info.request_id,
842            "Successfully received a secret, removing the request"
843        );
844
845        self.inner.outgoing_requests.write().remove(&key_info.request_id);
846        // TODO return the key info instead of deleting it so the sync handler
847        // can delete it in one transaction.
848        self.delete_key_info(key_info).await?;
849
850        let request = key_info.to_cancellation(self.device_id());
851        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
852
853        Ok(())
854    }
855
856    async fn accept_secret(
857        &self,
858        secret: GossippedSecret,
859        changes: &mut Changes,
860    ) -> Result<(), CryptoStoreError> {
861        if secret.secret_name != SecretName::RecoveryKey {
862            match self.inner.store.import_secret(&secret).await {
863                Ok(_) => self.mark_as_done(&secret.gossip_request).await?,
864                // If this is a store error propagate it up the call stack.
865                Err(SecretImportError::Store(e)) => return Err(e),
866                // Otherwise warn that there was something wrong with the
867                // secret.
868                Err(e) => {
869                    warn!(
870                        secret_name = ?secret.secret_name,
871                        error = ?e,
872                        "Error while importing a secret"
873                    );
874                }
875            }
876        } else {
877            // We would need to fire out a request to figure out if this backup decryption
878            // key is the one that is used for the current backup and if the
879            // backup is trusted.
880            //
881            // So we put the secret into our inbox. Later users can inspect the contents of
882            // the inbox and decide if they want to activate the backup.
883            info!("Received a backup decryption key, storing it into the secret inbox.");
884            changes.secrets.push(secret);
885        }
886
887        Ok(())
888    }
889
890    async fn receive_secret(
891        &self,
892        cache: &StoreCache,
893        sender_key: Curve25519PublicKey,
894        secret: GossippedSecret,
895        changes: &mut Changes,
896    ) -> Result<(), CryptoStoreError> {
897        debug!("Received a m.secret.send event with a matching request");
898
899        if let Some(device) =
900            self.inner.store.get_device_from_curve_key(&secret.event.sender, sender_key).await?
901        {
902            // Only accept secrets from one of our own trusted devices.
903            if device.user_id() == self.user_id() && device.is_verified() {
904                self.accept_secret(secret, changes).await?;
905            } else {
906                warn!("Received a m.secret.send event from another user or from unverified device");
907            }
908        } else {
909            warn!("Received a m.secret.send event from an unknown device");
910
911            self.identity_manager()
912                .key_query_manager
913                .synced(cache)
914                .await?
915                .mark_user_as_changed(&secret.event.sender)
916                .await?;
917        }
918
919        Ok(())
920    }
921
922    #[instrument(skip_all, fields(sender_key, sender = ?event.sender, request_id = ?event.content.request_id, secret_name))]
923    pub async fn receive_secret_event(
924        &self,
925        cache: &StoreCache,
926        sender_key: Curve25519PublicKey,
927        event: &DecryptedSecretSendEvent,
928        changes: &mut Changes,
929    ) -> Result<Option<SecretName>, CryptoStoreError> {
930        debug!("Received a m.secret.send event");
931
932        let request_id = &event.content.request_id;
933
934        let name = if let Some(request) =
935            self.inner.store.get_outgoing_secret_requests(request_id).await?
936        {
937            match &request.info {
938                SecretInfo::KeyRequest(_) => {
939                    warn!("Received a m.secret.send event but the request was for a room key");
940
941                    None
942                }
943                SecretInfo::SecretRequest(secret_name) => {
944                    Span::current().record("secret_name", debug(secret_name));
945
946                    let secret_name = secret_name.to_owned();
947
948                    let secret = GossippedSecret {
949                        secret_name: secret_name.to_owned(),
950                        event: event.to_owned(),
951                        gossip_request: request,
952                    };
953
954                    self.receive_secret(cache, sender_key, secret, changes).await?;
955
956                    Some(secret_name)
957                }
958            }
959        } else {
960            warn!("Received a m.secret.send event, but no matching request was found");
961            None
962        };
963
964        Ok(name)
965    }
966
967    #[tracing::instrument(skip_all)]
968    async fn accept_forwarded_room_key(
969        &self,
970        info: &GossipRequest,
971        sender_key: Curve25519PublicKey,
972        event: &DecryptedForwardedRoomKeyEvent,
973    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
974        match InboundGroupSession::try_from(event) {
975            Ok(session) => {
976                let new_session = self.inner.store.merge_received_group_session(session).await?;
977                if new_session.is_some() {
978                    self.mark_as_done(info).await?;
979                }
980                Ok(new_session)
981            }
982            Err(e) => {
983                warn!(?sender_key, "Couldn't create a group session from a received room key");
984                Err(e.into())
985            }
986        }
987    }
988
989    async fn should_accept_forward(
990        &self,
991        info: &GossipRequest,
992        sender_key: Curve25519PublicKey,
993    ) -> Result<bool, CryptoStoreError> {
994        let device =
995            self.inner.store.get_device_from_curve_key(&info.request_recipient, sender_key).await?;
996
997        if let Some(device) = device {
998            Ok(device.user_id() == self.user_id() && device.is_verified())
999        } else {
1000            Ok(false)
1001        }
1002    }
1003
1004    /// Receive a forwarded room key event that was sent using any of our
1005    /// supported content types.
1006    async fn receive_supported_keys(
1007        &self,
1008        sender_key: Curve25519PublicKey,
1009        event: &DecryptedForwardedRoomKeyEvent,
1010    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
1011        let Some(info) = event.room_key_info() else {
1012            warn!(
1013                sender_key = sender_key.to_base64(),
1014                algorithm = ?event.content.algorithm(),
1015                "Received a forwarded room key with an unsupported algorithm",
1016            );
1017            return Ok(None);
1018        };
1019
1020        let Some(request) =
1021            self.inner.store.get_secret_request_by_info(&info.clone().into()).await?
1022        else {
1023            warn!(
1024                sender_key = ?sender_key,
1025                room_id = ?info.room_id(),
1026                session_id = info.session_id(),
1027                sender_key = ?sender_key,
1028                algorithm = ?info.algorithm(),
1029                "Received a forwarded room key that we didn't request",
1030            );
1031            return Ok(None);
1032        };
1033
1034        if self.should_accept_forward(&request, sender_key).await? {
1035            self.accept_forwarded_room_key(&request, sender_key, event).await
1036        } else {
1037            warn!(
1038                ?sender_key,
1039                room_id = ?info.room_id(),
1040                session_id = info.session_id(),
1041                "Received a forwarded room key from an unknown device, or \
1042                 from a device that the key request recipient doesn't own",
1043            );
1044
1045            Ok(None)
1046        }
1047    }
1048
1049    /// Receive a forwarded room key event.
1050    pub async fn receive_forwarded_room_key(
1051        &self,
1052        sender_key: Curve25519PublicKey,
1053        event: &DecryptedForwardedRoomKeyEvent,
1054    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
1055        match event.content {
1056            ForwardedRoomKeyContent::MegolmV1AesSha2(_) => {
1057                self.receive_supported_keys(sender_key, event).await
1058            }
1059            #[cfg(feature = "experimental-algorithms")]
1060            ForwardedRoomKeyContent::MegolmV2AesSha2(_) => {
1061                self.receive_supported_keys(sender_key, event).await
1062            }
1063            ForwardedRoomKeyContent::Unknown(_) => {
1064                warn!(
1065                    sender = event.sender.as_str(),
1066                    sender_key = sender_key.to_base64(),
1067                    algorithm = ?event.content.algorithm(),
1068                    "Received a forwarded room key with an unsupported algorithm",
1069                );
1070
1071                Ok(None)
1072            }
1073        }
1074    }
1075}
1076
1077#[cfg(test)]
1078mod tests {
1079    use std::sync::Arc;
1080
1081    #[cfg(feature = "automatic-room-key-forwarding")]
1082    use assert_matches::assert_matches;
1083    use matrix_sdk_test::{async_test, message_like_event_content};
1084    use ruma::{
1085        device_id, event_id,
1086        events::{
1087            secret::request::{RequestAction, SecretName, ToDeviceSecretRequestEventContent},
1088            ToDeviceEvent as RumaToDeviceEvent,
1089        },
1090        room_id,
1091        serde::Raw,
1092        user_id, DeviceId, RoomId, UserId,
1093    };
1094    use tokio::sync::Mutex;
1095
1096    use super::GossipMachine;
1097    #[cfg(feature = "automatic-room-key-forwarding")]
1098    use crate::{
1099        gossiping::KeyForwardDecision,
1100        olm::OutboundGroupSession,
1101        store::{types::DeviceChanges, CryptoStore},
1102        types::requests::AnyOutgoingRequest,
1103        types::{
1104            events::{
1105                forwarded_room_key::ForwardedRoomKeyContent, olm_v1::AnyDecryptedOlmEvent,
1106                olm_v1::DecryptedOlmV1Event,
1107            },
1108            EventEncryptionAlgorithm,
1109        },
1110        EncryptionSettings,
1111    };
1112    use crate::{
1113        identities::{DeviceData, IdentityManager, LocalTrust},
1114        olm::{Account, PrivateCrossSigningIdentity},
1115        session_manager::GroupSessionCache,
1116        store::{
1117            types::{Changes, PendingChanges},
1118            CryptoStoreWrapper, MemoryStore, Store,
1119        },
1120        types::events::room::encrypted::{
1121            EncryptedEvent, EncryptedToDeviceEvent, RoomEncryptedEventContent,
1122        },
1123        verification::VerificationMachine,
1124        DecryptionSettings, TrustRequirement,
1125    };
1126
1127    fn alice_id() -> &'static UserId {
1128        user_id!("@alice:example.org")
1129    }
1130
1131    fn alice_device_id() -> &'static DeviceId {
1132        device_id!("JLAFKJWSCS")
1133    }
1134
1135    fn bob_id() -> &'static UserId {
1136        user_id!("@bob:example.org")
1137    }
1138
1139    fn bob_device_id() -> &'static DeviceId {
1140        device_id!("ILMLKASTES")
1141    }
1142
1143    fn alice2_device_id() -> &'static DeviceId {
1144        device_id!("ILMLKASTES")
1145    }
1146
1147    fn room_id() -> &'static RoomId {
1148        room_id!("!test:example.org")
1149    }
1150
1151    fn account() -> Account {
1152        Account::with_device_id(alice_id(), alice_device_id())
1153    }
1154
1155    fn bob_account() -> Account {
1156        Account::with_device_id(bob_id(), bob_device_id())
1157    }
1158
1159    fn alice_2_account() -> Account {
1160        Account::with_device_id(alice_id(), alice2_device_id())
1161    }
1162
1163    #[cfg(feature = "automatic-room-key-forwarding")]
1164    async fn gossip_machine_test_helper(user_id: &UserId) -> GossipMachine {
1165        let user_id = user_id.to_owned();
1166        let device_id = DeviceId::new();
1167
1168        let store = Arc::new(store_with_account_helper(&user_id, &device_id).await);
1169        let static_data = store.load_account().await.unwrap().unwrap().static_data;
1170        let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(alice_id())));
1171        let verification =
1172            VerificationMachine::new(static_data.clone(), identity.clone(), store.clone());
1173        let store = Store::new(static_data, identity, store, verification);
1174
1175        let session_cache = GroupSessionCache::new(store.clone());
1176        let identity_manager = IdentityManager::new(store.clone());
1177
1178        GossipMachine::new(store, identity_manager, session_cache, Default::default())
1179    }
1180
1181    #[cfg(feature = "automatic-room-key-forwarding")]
1182    async fn store_with_account_helper(
1183        user_id: &UserId,
1184        device_id: &DeviceId,
1185    ) -> CryptoStoreWrapper {
1186        // Properly create the store by first saving the own device and then the account
1187        // data.
1188        let account = Account::with_device_id(user_id, device_id);
1189        let device = DeviceData::from_account(&account);
1190        device.set_trust_state(LocalTrust::Verified);
1191
1192        let changes = Changes {
1193            devices: DeviceChanges { new: vec![device], ..Default::default() },
1194            ..Default::default()
1195        };
1196        let mem_store = MemoryStore::new();
1197        mem_store.save_changes(changes).await.unwrap();
1198        mem_store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1199
1200        CryptoStoreWrapper::new(user_id, device_id, mem_store)
1201    }
1202
1203    async fn get_machine_test_helper() -> GossipMachine {
1204        let user_id = alice_id().to_owned();
1205        let account = Account::with_device_id(&user_id, alice_device_id());
1206        let device = DeviceData::from_account(&account);
1207        let another_device =
1208            DeviceData::from_account(&Account::with_device_id(&user_id, alice2_device_id()));
1209
1210        let store =
1211            Arc::new(CryptoStoreWrapper::new(&user_id, account.device_id(), MemoryStore::new()));
1212        let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(alice_id())));
1213        let verification =
1214            VerificationMachine::new(account.static_data.clone(), identity.clone(), store.clone());
1215
1216        let store = Store::new(account.static_data().clone(), identity, store, verification);
1217        store.save_device_data(&[device, another_device]).await.unwrap();
1218        store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1219        let session_cache = GroupSessionCache::new(store.clone());
1220
1221        let identity_manager = IdentityManager::new(store.clone());
1222
1223        GossipMachine::new(store, identity_manager, session_cache, Default::default())
1224    }
1225
1226    #[cfg(feature = "automatic-room-key-forwarding")]
1227    async fn machines_for_key_share_test_helper(
1228        other_machine_owner: &UserId,
1229        create_sessions: bool,
1230        algorithm: EventEncryptionAlgorithm,
1231    ) -> (GossipMachine, OutboundGroupSession, GossipMachine) {
1232        use crate::olm::SenderData;
1233
1234        let alice_machine = get_machine_test_helper().await;
1235        let alice_device = DeviceData::from_account(
1236            &alice_machine.inner.store.cache().await.unwrap().account().await.unwrap(),
1237        );
1238
1239        let bob_machine = gossip_machine_test_helper(other_machine_owner).await;
1240
1241        let bob_device = DeviceData::from_account(
1242            #[allow(clippy::explicit_auto_deref)] // clippy's wrong
1243            &*bob_machine.inner.store.cache().await.unwrap().account().await.unwrap(),
1244        );
1245
1246        // We need a trusted device, otherwise we won't request keys
1247        let second_device = DeviceData::from_account(&alice_2_account());
1248        second_device.set_trust_state(LocalTrust::Verified);
1249        bob_device.set_trust_state(LocalTrust::Verified);
1250        alice_machine.inner.store.save_device_data(&[bob_device, second_device]).await.unwrap();
1251        let devices = std::slice::from_ref(&alice_device);
1252        bob_machine.inner.store.save_device_data(devices).await.unwrap();
1253
1254        if create_sessions {
1255            // Create Olm sessions for our two accounts.
1256            let (alice_session, bob_session) = alice_machine
1257                .inner
1258                .store
1259                .with_transaction(|mut atr| async {
1260                    let sessions = bob_machine
1261                        .inner
1262                        .store
1263                        .with_transaction(|mut btr| async {
1264                            let alice_account = atr.account().await?;
1265                            let bob_account = btr.account().await?;
1266                            let sessions =
1267                                alice_account.create_session_for_test_helper(bob_account).await;
1268                            Ok((btr, sessions))
1269                        })
1270                        .await?;
1271                    Ok((atr, sessions))
1272                })
1273                .await
1274                .unwrap();
1275
1276            // Populate our stores with Olm sessions and a Megolm session.
1277
1278            alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
1279            bob_machine.inner.store.save_sessions(&[bob_session]).await.unwrap();
1280        }
1281
1282        let settings = EncryptionSettings { algorithm, ..Default::default() };
1283        let (group_session, inbound_group_session) = bob_machine
1284            .inner
1285            .store
1286            .static_account()
1287            .create_group_session_pair(room_id(), settings, SenderData::unknown())
1288            .await
1289            .unwrap();
1290
1291        bob_machine
1292            .inner
1293            .store
1294            .save_inbound_group_sessions(&[inbound_group_session])
1295            .await
1296            .unwrap();
1297
1298        let content = group_session.encrypt("m.dummy", &message_like_event_content!({})).await;
1299        let event = wrap_encrypted_content(bob_machine.user_id(), content);
1300
1301        // Alice wants to request the outbound group session from bob.
1302        assert!(
1303            alice_machine.create_outgoing_key_request(room_id(), &event,).await.unwrap(),
1304            "We should request a room key"
1305        );
1306
1307        group_session
1308            .mark_shared_with(
1309                alice_device.user_id(),
1310                alice_device.device_id(),
1311                alice_device.curve25519_key().unwrap(),
1312            )
1313            .await;
1314
1315        // Put the outbound session into bobs store.
1316        bob_machine.inner.outbound_group_sessions.insert(group_session.clone());
1317
1318        (alice_machine, group_session, bob_machine)
1319    }
1320
1321    fn extract_content<'a>(
1322        recipient: &UserId,
1323        request: &'a crate::types::requests::OutgoingRequest,
1324    ) -> &'a Raw<ruma::events::AnyToDeviceEventContent> {
1325        request
1326            .request()
1327            .to_device()
1328            .expect("The request should be always a to-device request")
1329            .messages
1330            .get(recipient)
1331            .unwrap()
1332            .values()
1333            .next()
1334            .unwrap()
1335    }
1336
1337    fn wrap_encrypted_content(
1338        sender: &UserId,
1339        content: Raw<RoomEncryptedEventContent>,
1340    ) -> EncryptedEvent {
1341        let content = content.deserialize().unwrap();
1342
1343        EncryptedEvent {
1344            sender: sender.to_owned(),
1345            event_id: event_id!("$143273582443PhrSn:example.org").to_owned(),
1346            #[cfg(feature = "experimental-encrypted-state-events")]
1347            state_key: None,
1348            content,
1349            origin_server_ts: ruma::MilliSecondsSinceUnixEpoch::now(),
1350            unsigned: Default::default(),
1351            other: Default::default(),
1352        }
1353    }
1354
1355    fn request_to_event<C>(
1356        recipient: &UserId,
1357        sender: &UserId,
1358        request: &crate::types::requests::OutgoingRequest,
1359    ) -> crate::types::events::ToDeviceEvent<C>
1360    where
1361        C: crate::types::events::EventType
1362            + serde::de::DeserializeOwned
1363            + serde::ser::Serialize
1364            + std::fmt::Debug,
1365    {
1366        let content = extract_content(recipient, request);
1367        let content: C = content.deserialize_as_unchecked().unwrap_or_else(|_| {
1368            panic!("We can always deserialize the to-device event content {content:?}")
1369        });
1370
1371        crate::types::events::ToDeviceEvent::new(sender.to_owned(), content)
1372    }
1373
1374    #[async_test]
1375    async fn test_create_machine() {
1376        let machine = get_machine_test_helper().await;
1377
1378        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1379    }
1380
1381    #[async_test]
1382    async fn test_re_request_keys() {
1383        let machine = get_machine_test_helper().await;
1384        let account = account();
1385
1386        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1387
1388        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1389        let event = wrap_encrypted_content(machine.user_id(), content);
1390
1391        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1392        let (cancel, request) = machine.request_key(session.room_id(), &event).await.unwrap();
1393
1394        assert!(cancel.is_none());
1395
1396        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1397
1398        let (cancel, _) = machine.request_key(session.room_id(), &event).await.unwrap();
1399
1400        assert!(cancel.is_some());
1401    }
1402
1403    #[async_test]
1404    #[cfg(feature = "automatic-room-key-forwarding")]
1405    async fn test_create_key_request() {
1406        let machine = get_machine_test_helper().await;
1407        let account = account();
1408        let second_account = alice_2_account();
1409        let alice_device = DeviceData::from_account(&second_account);
1410
1411        // We need a trusted device, otherwise we won't request keys
1412        alice_device.set_trust_state(LocalTrust::Verified);
1413        machine.inner.store.save_device_data(&[alice_device]).await.unwrap();
1414
1415        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1416        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1417        let event = wrap_encrypted_content(machine.user_id(), content);
1418
1419        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1420        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1421        assert!(!machine.outgoing_to_device_requests().await.unwrap().is_empty());
1422        assert_eq!(machine.outgoing_to_device_requests().await.unwrap().len(), 1);
1423
1424        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1425
1426        let requests = machine.outgoing_to_device_requests().await.unwrap();
1427        assert_eq!(requests.len(), 1);
1428
1429        let request = &requests[0];
1430
1431        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1432        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1433    }
1434
1435    /// We should *not* request keys if that has been disabled
1436    #[async_test]
1437    #[cfg(feature = "automatic-room-key-forwarding")]
1438    async fn test_create_key_request_requests_disabled() {
1439        let machine = get_machine_test_helper().await;
1440        let account = account();
1441        let second_account = alice_2_account();
1442        let alice_device = DeviceData::from_account(&second_account);
1443
1444        // We need a trusted device, otherwise we won't request keys
1445        alice_device.set_trust_state(LocalTrust::Verified);
1446        machine.inner.store.save_device_data(&[alice_device]).await.unwrap();
1447
1448        // Disable key requests
1449        assert!(machine.are_room_key_requests_enabled());
1450        machine.set_room_key_requests_enabled(false);
1451        assert!(!machine.are_room_key_requests_enabled());
1452
1453        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1454        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1455        let event = wrap_encrypted_content(machine.user_id(), content);
1456
1457        // The outgoing to-device requests should be empty before and after
1458        // `create_outgoing_key_request`.
1459        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1460        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1461        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1462    }
1463
1464    #[async_test]
1465    #[cfg(feature = "automatic-room-key-forwarding")]
1466    async fn test_receive_forwarded_key() {
1467        let machine = get_machine_test_helper().await;
1468        let account = account();
1469
1470        let second_account = alice_2_account();
1471        let alice_device = DeviceData::from_account(&second_account);
1472
1473        // We need a trusted device, otherwise we won't request keys
1474        alice_device.set_trust_state(LocalTrust::Verified);
1475        let devices = std::slice::from_ref(&alice_device);
1476        machine.inner.store.save_device_data(devices).await.unwrap();
1477
1478        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1479        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1480        let room_event = wrap_encrypted_content(machine.user_id(), content);
1481
1482        machine.create_outgoing_key_request(session.room_id(), &room_event).await.unwrap();
1483
1484        let requests = machine.outgoing_to_device_requests().await.unwrap();
1485        let request = &requests[0];
1486        let id = &request.request_id;
1487
1488        machine.mark_outgoing_request_as_sent(id).await.unwrap();
1489
1490        let export = session.export_at_index(10).await;
1491
1492        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1493
1494        let event = DecryptedOlmV1Event::new(
1495            alice_id(),
1496            alice_id(),
1497            alice_device.ed25519_key().unwrap(),
1498            None,
1499            content,
1500        );
1501
1502        assert!(machine
1503            .inner
1504            .store
1505            .get_inbound_group_session(session.room_id(), session.session_id(),)
1506            .await
1507            .unwrap()
1508            .is_none());
1509
1510        let first_session = machine
1511            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1512            .await
1513            .unwrap();
1514        let first_session = first_session.unwrap();
1515
1516        assert_eq!(first_session.first_known_index(), 10);
1517
1518        let sessions = std::slice::from_ref(&first_session);
1519        machine.inner.store.save_inbound_group_sessions(sessions).await.unwrap();
1520
1521        // Get the cancel request.
1522        let id = machine
1523            .inner
1524            .outgoing_requests
1525            .read()
1526            .first_key_value()
1527            .map(|(_, r)| r.request_id.clone())
1528            .unwrap();
1529        machine.mark_outgoing_request_as_sent(&id).await.unwrap();
1530
1531        machine.create_outgoing_key_request(session.room_id(), &room_event).await.unwrap();
1532
1533        let requests = machine.outgoing_to_device_requests().await.unwrap();
1534        let request = &requests[0];
1535
1536        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1537
1538        let export = session.export_at_index(15).await;
1539
1540        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1541
1542        let event = DecryptedOlmV1Event::new(
1543            alice_id(),
1544            alice_id(),
1545            alice_device.ed25519_key().unwrap(),
1546            None,
1547            content,
1548        );
1549
1550        let second_session = machine
1551            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1552            .await
1553            .unwrap();
1554
1555        assert!(second_session.is_none());
1556
1557        let export = session.export_at_index(0).await;
1558
1559        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1560
1561        let event = DecryptedOlmV1Event::new(
1562            alice_id(),
1563            alice_id(),
1564            alice_device.ed25519_key().unwrap(),
1565            None,
1566            content,
1567        );
1568
1569        let second_session = machine
1570            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1571            .await
1572            .unwrap();
1573
1574        assert_eq!(second_session.unwrap().first_known_index(), 0);
1575    }
1576
1577    #[async_test]
1578    #[cfg(feature = "automatic-room-key-forwarding")]
1579    async fn test_should_share_key() {
1580        let machine = get_machine_test_helper().await;
1581        let account = account();
1582
1583        let own_device =
1584            machine.inner.store.get_device(alice_id(), alice2_device_id()).await.unwrap().unwrap();
1585
1586        let (outbound, inbound) = account.create_group_session_pair_with_defaults(room_id()).await;
1587
1588        // We don't share keys with untrusted devices.
1589        assert_matches!(
1590            machine.should_share_key(&own_device, &inbound).await,
1591            Err(KeyForwardDecision::UntrustedDevice)
1592        );
1593        own_device.set_trust_state(LocalTrust::Verified);
1594        // Now we do want to share the keys.
1595        machine.should_share_key(&own_device, &inbound).await.unwrap();
1596
1597        let bob_device = DeviceData::from_account(&bob_account());
1598        machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1599
1600        let bob_device =
1601            machine.inner.store.get_device(bob_id(), bob_device_id()).await.unwrap().unwrap();
1602
1603        // We don't share sessions with other user's devices if no outbound
1604        // session was provided.
1605        assert_matches!(
1606            machine.should_share_key(&bob_device, &inbound).await,
1607            Err(KeyForwardDecision::MissingOutboundSession)
1608        );
1609
1610        let mut changes = Changes::default();
1611
1612        changes.outbound_group_sessions.push(outbound.clone());
1613        changes.inbound_group_sessions.push(inbound.clone());
1614        machine.inner.store.save_changes(changes).await.unwrap();
1615        machine.inner.outbound_group_sessions.insert(outbound.clone());
1616
1617        // We don't share sessions with other user's devices if the session
1618        // wasn't shared in the first place.
1619        assert_matches!(
1620            machine.should_share_key(&bob_device, &inbound).await,
1621            Err(KeyForwardDecision::OutboundSessionNotShared)
1622        );
1623
1624        bob_device.set_trust_state(LocalTrust::Verified);
1625
1626        // We don't share sessions with other user's devices if the session
1627        // wasn't shared in the first place even if the device is trusted.
1628        assert_matches!(
1629            machine.should_share_key(&bob_device, &inbound).await,
1630            Err(KeyForwardDecision::OutboundSessionNotShared)
1631        );
1632
1633        // We now share the session, since it was shared before.
1634        outbound
1635            .mark_shared_with(
1636                bob_device.user_id(),
1637                bob_device.device_id(),
1638                bob_device.curve25519_key().unwrap(),
1639            )
1640            .await;
1641        machine.should_share_key(&bob_device, &inbound).await.unwrap();
1642
1643        let (other_outbound, other_inbound) =
1644            account.create_group_session_pair_with_defaults(room_id()).await;
1645
1646        // But we don't share some other session that doesn't match our outbound
1647        // session.
1648        assert_matches!(
1649            machine.should_share_key(&bob_device, &other_inbound).await,
1650            Err(KeyForwardDecision::MissingOutboundSession)
1651        );
1652
1653        // Finally, let's ensure we don't share the session with a device that rotated
1654        // its curve25519 key.
1655        let bob_device = DeviceData::from_account(&bob_account());
1656        machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1657
1658        let bob_device =
1659            machine.inner.store.get_device(bob_id(), bob_device_id()).await.unwrap().unwrap();
1660        assert_matches!(
1661            machine.should_share_key(&bob_device, &inbound).await,
1662            Err(KeyForwardDecision::ChangedSenderKey)
1663        );
1664
1665        // Now let's encrypt some messages in another session to increment the message
1666        // index and then share it with our own untrusted device.
1667        own_device.set_trust_state(LocalTrust::Unset);
1668
1669        for _ in 1..=3 {
1670            other_outbound.encrypt_helper("foo".to_owned()).await;
1671        }
1672        other_outbound
1673            .mark_shared_with(
1674                own_device.user_id(),
1675                own_device.device_id(),
1676                own_device.curve25519_key().unwrap(),
1677            )
1678            .await;
1679
1680        machine.inner.outbound_group_sessions.insert(other_outbound.clone());
1681
1682        // Since our device is untrusted, we should share the session starting only from
1683        // the current index (at which the message was marked as shared). This
1684        // should be 3 since we encrypted 3 messages.
1685        assert_matches!(machine.should_share_key(&own_device, &other_inbound).await, Ok(Some(3)));
1686
1687        own_device.set_trust_state(LocalTrust::Verified);
1688
1689        // However once our device is trusted, we share the entire session.
1690        assert_matches!(machine.should_share_key(&own_device, &other_inbound).await, Ok(None));
1691    }
1692
1693    #[cfg(feature = "automatic-room-key-forwarding")]
1694    async fn test_key_share_cycle(algorithm: EventEncryptionAlgorithm) {
1695        let (alice_machine, group_session, bob_machine) =
1696            machines_for_key_share_test_helper(alice_id(), true, algorithm).await;
1697
1698        // Get the request and convert it into a event.
1699        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
1700        let request = &requests[0];
1701        let event = request_to_event(alice_id(), alice_id(), request);
1702
1703        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1704
1705        // Bob doesn't have any outgoing requests.
1706        assert!(bob_machine.inner.outgoing_requests.read().is_empty());
1707
1708        // Receive the room key request from alice.
1709        bob_machine.receive_incoming_key_request(&event);
1710
1711        {
1712            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
1713            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
1714        }
1715        // Now bob does have an outgoing request.
1716        assert!(!bob_machine.inner.outgoing_requests.read().is_empty());
1717
1718        // Get the request and convert it to a encrypted to-device event.
1719        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
1720        let request = &requests[0];
1721
1722        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), alice_id(), request);
1723        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1724
1725        // Check that alice doesn't have the session.
1726        assert!(alice_machine
1727            .inner
1728            .store
1729            .get_inbound_group_session(room_id(), group_session.session_id())
1730            .await
1731            .unwrap()
1732            .is_none());
1733
1734        let decrypted = alice_machine
1735            .inner
1736            .store
1737            .with_transaction(|mut tr| async {
1738                let res = tr
1739                    .account()
1740                    .await?
1741                    .decrypt_to_device_event(
1742                        &alice_machine.inner.store,
1743                        &event,
1744                        &DecryptionSettings {
1745                            sender_device_trust_requirement: TrustRequirement::Untrusted,
1746                        },
1747                    )
1748                    .await?;
1749                Ok((tr, res))
1750            })
1751            .await
1752            .unwrap();
1753
1754        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
1755            panic!("Invalid decrypted event type");
1756        };
1757
1758        let session = alice_machine
1759            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
1760            .await
1761            .unwrap();
1762        alice_machine.inner.store.save_inbound_group_sessions(&[session.unwrap()]).await.unwrap();
1763
1764        // Check that alice now does have the session.
1765        let session = alice_machine
1766            .inner
1767            .store
1768            .get_inbound_group_session(room_id(), group_session.session_id())
1769            .await
1770            .unwrap()
1771            .unwrap();
1772
1773        assert_eq!(session.session_id(), group_session.session_id())
1774    }
1775
1776    #[async_test]
1777    #[cfg(feature = "automatic-room-key-forwarding")]
1778    async fn test_reject_forward_from_another_user() {
1779        let (alice_machine, group_session, bob_machine) = machines_for_key_share_test_helper(
1780            bob_id(),
1781            true,
1782            EventEncryptionAlgorithm::MegolmV1AesSha2,
1783        )
1784        .await;
1785
1786        // Get the request and convert it into a event.
1787        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
1788        let request = &requests[0];
1789        let event = request_to_event(alice_id(), alice_id(), request);
1790
1791        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1792
1793        // Bob doesn't have any outgoing requests.
1794        assert!(bob_machine.inner.outgoing_requests.read().is_empty());
1795
1796        // Receive the room key request from alice.
1797        bob_machine.receive_incoming_key_request(&event);
1798        {
1799            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
1800            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
1801        }
1802        // Now bob does have an outgoing request.
1803        assert!(!bob_machine.inner.outgoing_requests.read().is_empty());
1804
1805        // Get the request and convert it to a encrypted to-device event.
1806        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
1807        let request = &requests[0];
1808
1809        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), bob_id(), request);
1810        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1811
1812        // Check that alice doesn't have the session.
1813        assert!(alice_machine
1814            .inner
1815            .store
1816            .get_inbound_group_session(room_id(), group_session.session_id())
1817            .await
1818            .unwrap()
1819            .is_none());
1820
1821        let decrypted = alice_machine
1822            .inner
1823            .store
1824            .with_transaction(|mut tr| async {
1825                let res = tr
1826                    .account()
1827                    .await?
1828                    .decrypt_to_device_event(
1829                        &alice_machine.inner.store,
1830                        &event,
1831                        &DecryptionSettings {
1832                            sender_device_trust_requirement: TrustRequirement::Untrusted,
1833                        },
1834                    )
1835                    .await?;
1836                Ok((tr, res))
1837            })
1838            .await
1839            .unwrap();
1840        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
1841            panic!("Invalid decrypted event type");
1842        };
1843
1844        let session = alice_machine
1845            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
1846            .await
1847            .unwrap();
1848
1849        assert!(session.is_none(), "We should not receive a room key from another user");
1850    }
1851
1852    #[async_test]
1853    #[cfg(feature = "automatic-room-key-forwarding")]
1854    async fn test_key_share_cycle_megolm_v1() {
1855        test_key_share_cycle(EventEncryptionAlgorithm::MegolmV1AesSha2).await;
1856    }
1857
1858    #[async_test]
1859    #[cfg(all(feature = "experimental-algorithms", feature = "automatic-room-key-forwarding"))]
1860    async fn test_key_share_cycle_megolm_v2() {
1861        test_key_share_cycle(EventEncryptionAlgorithm::MegolmV2AesSha2).await;
1862    }
1863
1864    #[async_test]
1865    async fn test_secret_share_cycle() {
1866        let alice_machine = get_machine_test_helper().await;
1867
1868        let mut second_account = alice_2_account();
1869        let alice_device = DeviceData::from_account(&second_account);
1870
1871        let bob_account = bob_account();
1872        let bob_device = DeviceData::from_account(&bob_account);
1873
1874        let devices = std::slice::from_ref(&alice_device);
1875        alice_machine.inner.store.save_device_data(devices).await.unwrap();
1876
1877        // Create Olm sessions for our two accounts.
1878        let alice_session = alice_machine
1879            .inner
1880            .store
1881            .with_transaction(|mut tr| async {
1882                let alice_account = tr.account().await?;
1883                let (alice_session, _) =
1884                    alice_account.create_session_for_test_helper(&mut second_account).await;
1885                Ok((tr, alice_session))
1886            })
1887            .await
1888            .unwrap();
1889
1890        alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
1891
1892        let event = RumaToDeviceEvent::new(
1893            bob_account.user_id().to_owned(),
1894            ToDeviceSecretRequestEventContent::new(
1895                RequestAction::Request(SecretName::CrossSigningMasterKey),
1896                bob_account.device_id().to_owned(),
1897                "request_id".into(),
1898            ),
1899        );
1900
1901        // No secret found
1902        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1903        alice_machine.receive_incoming_secret_request(&event);
1904        {
1905            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1906            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1907        }
1908        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1909
1910        // No device found
1911        alice_machine.inner.store.reset_cross_signing_identity().await;
1912        alice_machine.receive_incoming_secret_request(&event);
1913        {
1914            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1915            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1916        }
1917        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1918
1919        alice_machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1920
1921        // The device doesn't belong to us
1922        alice_machine.inner.store.reset_cross_signing_identity().await;
1923        alice_machine.receive_incoming_secret_request(&event);
1924        {
1925            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1926            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1927        }
1928        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1929
1930        let event = RumaToDeviceEvent::new(
1931            alice_id().to_owned(),
1932            ToDeviceSecretRequestEventContent::new(
1933                RequestAction::Request(SecretName::CrossSigningMasterKey),
1934                second_account.device_id().into(),
1935                "request_id".into(),
1936            ),
1937        );
1938
1939        // The device isn't trusted
1940        alice_machine.receive_incoming_secret_request(&event);
1941        {
1942            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1943            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1944        }
1945        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1946
1947        // We need a trusted device, otherwise we won't serve secrets
1948        alice_device.set_trust_state(LocalTrust::Verified);
1949        let devices = std::slice::from_ref(&alice_device);
1950        alice_machine.inner.store.save_device_data(devices).await.unwrap();
1951
1952        alice_machine.receive_incoming_secret_request(&event);
1953        {
1954            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1955            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1956        }
1957        assert!(!alice_machine.inner.outgoing_requests.read().is_empty());
1958    }
1959
1960    #[async_test]
1961    async fn test_secret_broadcasting() {
1962        use futures_util::{pin_mut, FutureExt};
1963        use ruma::api::client::to_device::send_event_to_device::v3::Response as ToDeviceResponse;
1964        use serde_json::value::to_raw_value;
1965        use tokio_stream::StreamExt;
1966
1967        use crate::{
1968            machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper,
1969            EncryptionSyncChanges,
1970        };
1971
1972        let alice_id = user_id!("@alice:localhost");
1973
1974        let (alice_machine, bob_machine) =
1975            get_machine_pair_with_setup_sessions_test_helper(alice_id, alice_id, false).await;
1976
1977        let key_requests = GossipMachine::request_missing_secrets(
1978            bob_machine.user_id(),
1979            vec![SecretName::RecoveryKey],
1980        );
1981        let mut changes = Changes::default();
1982        let request_id = key_requests[0].request_id.to_owned();
1983        changes.key_requests = key_requests;
1984        bob_machine.store().save_changes(changes).await.unwrap();
1985        for request in bob_machine.outgoing_requests().await.unwrap() {
1986            bob_machine
1987                .mark_request_as_sent(request.request_id(), &ToDeviceResponse::new())
1988                .await
1989                .unwrap();
1990        }
1991
1992        let event = RumaToDeviceEvent::new(
1993            alice_machine.user_id().to_owned(),
1994            ToDeviceSecretRequestEventContent::new(
1995                RequestAction::Request(SecretName::RecoveryKey),
1996                bob_machine.device_id().to_owned(),
1997                request_id,
1998            ),
1999        );
2000
2001        let bob_device = alice_machine
2002            .get_device(alice_id, bob_machine.device_id(), None)
2003            .await
2004            .unwrap()
2005            .unwrap();
2006        let alice_device = bob_machine
2007            .get_device(alice_id, alice_machine.device_id(), None)
2008            .await
2009            .unwrap()
2010            .unwrap();
2011
2012        // We need a trusted device, otherwise we won't serve nor accept secrets.
2013        bob_device.set_trust_state(LocalTrust::Verified);
2014        alice_device.set_trust_state(LocalTrust::Verified);
2015        alice_machine.store().save_device_data(&[bob_device.inner]).await.unwrap();
2016        bob_machine.store().save_device_data(&[alice_device.inner]).await.unwrap();
2017
2018        let decryption_key = crate::store::types::BackupDecryptionKey::new().unwrap();
2019        alice_machine
2020            .backup_machine()
2021            .save_decryption_key(Some(decryption_key), None)
2022            .await
2023            .unwrap();
2024        alice_machine.inner.key_request_machine.receive_incoming_secret_request(&event);
2025        {
2026            let alice_cache = alice_machine.store().cache().await.unwrap();
2027            alice_machine
2028                .inner
2029                .key_request_machine
2030                .collect_incoming_key_requests(&alice_cache)
2031                .await
2032                .unwrap();
2033        }
2034
2035        let requests =
2036            alice_machine.inner.key_request_machine.outgoing_to_device_requests().await.unwrap();
2037
2038        assert_eq!(requests.len(), 1);
2039        let request = requests.first().expect("We should have an outgoing to-device request");
2040
2041        let event: EncryptedToDeviceEvent =
2042            request_to_event(bob_machine.user_id(), alice_machine.user_id(), request);
2043        let event = Raw::from_json(to_raw_value(&event).unwrap());
2044
2045        let stream = bob_machine.store().secrets_stream();
2046        pin_mut!(stream);
2047
2048        let decryption_settings =
2049            DecryptionSettings { sender_device_trust_requirement: TrustRequirement::Untrusted };
2050
2051        bob_machine
2052            .receive_sync_changes(
2053                EncryptionSyncChanges {
2054                    to_device_events: vec![event],
2055                    changed_devices: &Default::default(),
2056                    one_time_keys_counts: &Default::default(),
2057                    unused_fallback_keys: None,
2058                    next_batch_token: None,
2059                },
2060                &decryption_settings,
2061            )
2062            .await
2063            .unwrap();
2064
2065        stream.next().now_or_never().expect("The broadcaster should have sent out the secret");
2066    }
2067
2068    #[async_test]
2069    #[cfg(feature = "automatic-room-key-forwarding")]
2070    async fn test_key_share_cycle_without_session() {
2071        let (alice_machine, group_session, bob_machine) = machines_for_key_share_test_helper(
2072            alice_id(),
2073            false,
2074            EventEncryptionAlgorithm::MegolmV1AesSha2,
2075        )
2076        .await;
2077
2078        // Get the request and convert it into a event.
2079        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
2080        let request = &requests[0];
2081        let event = request_to_event(alice_id(), alice_id(), request);
2082
2083        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
2084
2085        // Bob doesn't have any outgoing requests.
2086        assert!(bob_machine.outgoing_to_device_requests().await.unwrap().is_empty());
2087        assert!(bob_machine.inner.users_for_key_claim.read().is_empty());
2088        assert!(bob_machine.inner.wait_queue.is_empty());
2089
2090        // Receive the room key request from alice.
2091        bob_machine.receive_incoming_key_request(&event);
2092        {
2093            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
2094            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
2095        }
2096        // Bob only has a keys claim request, since we're lacking a session
2097        assert_eq!(bob_machine.outgoing_to_device_requests().await.unwrap().len(), 1);
2098        assert_matches!(
2099            bob_machine.outgoing_to_device_requests().await.unwrap()[0].request(),
2100            AnyOutgoingRequest::KeysClaim(_)
2101        );
2102        assert!(!bob_machine.inner.users_for_key_claim.read().is_empty());
2103        assert!(!bob_machine.inner.wait_queue.is_empty());
2104
2105        let (alice_session, bob_session) = alice_machine
2106            .inner
2107            .store
2108            .with_transaction(|mut atr| async {
2109                let res = bob_machine
2110                    .inner
2111                    .store
2112                    .with_transaction(|mut btr| async {
2113                        let alice_account = atr.account().await?;
2114                        let bob_account = btr.account().await?;
2115                        let sessions =
2116                            alice_account.create_session_for_test_helper(bob_account).await;
2117                        Ok((btr, sessions))
2118                    })
2119                    .await?;
2120                Ok((atr, res))
2121            })
2122            .await
2123            .unwrap();
2124
2125        // We create a session now.
2126        alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
2127        bob_machine.inner.store.save_sessions(&[bob_session]).await.unwrap();
2128
2129        bob_machine.retry_keyshare(alice_id(), alice_device_id());
2130        assert!(bob_machine.inner.users_for_key_claim.read().is_empty());
2131        {
2132            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
2133            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
2134        }
2135        // Bob now has an outgoing requests.
2136        assert!(!bob_machine.outgoing_to_device_requests().await.unwrap().is_empty());
2137        assert!(bob_machine.inner.wait_queue.is_empty());
2138
2139        // Get the request and convert it to a encrypted to-device event.
2140        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
2141        let request = &requests[0];
2142
2143        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), alice_id(), request);
2144        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
2145
2146        // Check that alice doesn't have the session.
2147        assert!(alice_machine
2148            .inner
2149            .store
2150            .get_inbound_group_session(room_id(), group_session.session_id())
2151            .await
2152            .unwrap()
2153            .is_none());
2154
2155        let decrypted = alice_machine
2156            .inner
2157            .store
2158            .with_transaction(|mut tr| async {
2159                let res = tr
2160                    .account()
2161                    .await?
2162                    .decrypt_to_device_event(
2163                        &alice_machine.inner.store,
2164                        &event,
2165                        &DecryptionSettings {
2166                            sender_device_trust_requirement: TrustRequirement::Untrusted,
2167                        },
2168                    )
2169                    .await?;
2170                Ok((tr, res))
2171            })
2172            .await
2173            .unwrap();
2174
2175        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
2176            panic!("Invalid decrypted event type");
2177        };
2178
2179        let session = alice_machine
2180            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
2181            .await
2182            .unwrap();
2183        alice_machine.inner.store.save_inbound_group_sessions(&[session.unwrap()]).await.unwrap();
2184
2185        // Check that alice now does have the session.
2186        let session = alice_machine
2187            .inner
2188            .store
2189            .get_inbound_group_session(room_id(), group_session.session_id())
2190            .await
2191            .unwrap()
2192            .unwrap();
2193
2194        assert_eq!(session.session_id(), group_session.session_id())
2195    }
2196}