matrix_sdk_crypto/gossiping/
machine.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// TODO
16//
17// handle the case where we can't create a session with a device. clearing our
18// stale key share requests that we'll never be able to handle.
19//
20// If we don't trust the device store an object that remembers the request and
21// let the users introspect that object.
22
23use std::{
24    collections::{btree_map::Entry, BTreeMap, BTreeSet},
25    mem,
26    sync::{
27        atomic::{AtomicBool, Ordering},
28        Arc,
29    },
30};
31
32use matrix_sdk_common::locks::RwLock as StdRwLock;
33use ruma::{
34    api::client::keys::claim_keys::v3::Request as KeysClaimRequest,
35    events::secret::request::{
36        RequestAction, SecretName, ToDeviceSecretRequestEvent as SecretRequestEvent,
37    },
38    DeviceId, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedTransactionId, OwnedUserId, RoomId,
39    TransactionId, UserId,
40};
41use tracing::{debug, field::debug, info, instrument, trace, warn, Span};
42use vodozemac::{megolm::SessionOrdering, Curve25519PublicKey};
43
44use super::{GossipRequest, GossippedSecret, RequestEvent, RequestInfo, SecretInfo, WaitQueue};
45use crate::{
46    error::{EventError, OlmError, OlmResult},
47    identities::IdentityManager,
48    olm::{InboundGroupSession, Session},
49    session_manager::GroupSessionCache,
50    store::{Changes, CryptoStoreError, SecretImportError, Store, StoreCache},
51    types::{
52        events::{
53            forwarded_room_key::ForwardedRoomKeyContent,
54            olm_v1::{DecryptedForwardedRoomKeyEvent, DecryptedSecretSendEvent},
55            room::encrypted::EncryptedEvent,
56            room_key_request::RoomKeyRequestEvent,
57            secret_send::SecretSendContent,
58            EventType,
59        },
60        requests::{OutgoingRequest, ToDeviceRequest},
61    },
62    Device, MegolmError,
63};
64
65#[derive(Clone, Debug)]
66pub(crate) struct GossipMachine {
67    inner: Arc<GossipMachineInner>,
68}
69
70#[derive(Debug)]
71pub(crate) struct GossipMachineInner {
72    store: Store,
73    #[cfg(feature = "automatic-room-key-forwarding")]
74    outbound_group_sessions: GroupSessionCache,
75    outgoing_requests: StdRwLock<BTreeMap<OwnedTransactionId, OutgoingRequest>>,
76    incoming_key_requests: StdRwLock<BTreeMap<RequestInfo, RequestEvent>>,
77    wait_queue: WaitQueue,
78    users_for_key_claim: Arc<StdRwLock<BTreeMap<OwnedUserId, BTreeSet<OwnedDeviceId>>>>,
79
80    /// Whether we should respond to incoming `m.room_key_request` messages.
81    room_key_forwarding_enabled: AtomicBool,
82
83    /// Whether we should send out `m.room_key_request` messages.
84    room_key_requests_enabled: AtomicBool,
85
86    identity_manager: IdentityManager,
87}
88
89impl GossipMachine {
90    pub fn new(
91        store: Store,
92        identity_manager: IdentityManager,
93        #[allow(unused)] outbound_group_sessions: GroupSessionCache,
94        users_for_key_claim: Arc<StdRwLock<BTreeMap<OwnedUserId, BTreeSet<OwnedDeviceId>>>>,
95    ) -> Self {
96        let room_key_forwarding_enabled =
97            AtomicBool::new(cfg!(feature = "automatic-room-key-forwarding"));
98
99        let room_key_requests_enabled =
100            AtomicBool::new(cfg!(feature = "automatic-room-key-forwarding"));
101
102        Self {
103            inner: Arc::new(GossipMachineInner {
104                store,
105                #[cfg(feature = "automatic-room-key-forwarding")]
106                outbound_group_sessions,
107                outgoing_requests: Default::default(),
108                incoming_key_requests: Default::default(),
109                wait_queue: WaitQueue::new(),
110                users_for_key_claim,
111                room_key_forwarding_enabled,
112                room_key_requests_enabled,
113                identity_manager,
114            }),
115        }
116    }
117
118    pub(crate) fn identity_manager(&self) -> &IdentityManager {
119        &self.inner.identity_manager
120    }
121
122    #[cfg(feature = "automatic-room-key-forwarding")]
123    pub fn set_room_key_forwarding_enabled(&self, enabled: bool) {
124        self.inner.room_key_forwarding_enabled.store(enabled, Ordering::SeqCst)
125    }
126
127    pub fn is_room_key_forwarding_enabled(&self) -> bool {
128        self.inner.room_key_forwarding_enabled.load(Ordering::SeqCst)
129    }
130
131    /// Configure whether we should send outgoing `m.room_key_request`s on
132    /// decryption failure.
133    #[cfg(feature = "automatic-room-key-forwarding")]
134    pub fn set_room_key_requests_enabled(&self, enabled: bool) {
135        self.inner.room_key_requests_enabled.store(enabled, Ordering::SeqCst)
136    }
137
138    /// Query whether we should send outgoing `m.room_key_request`s on
139    /// decryption failure.
140    pub fn are_room_key_requests_enabled(&self) -> bool {
141        self.inner.room_key_requests_enabled.load(Ordering::SeqCst)
142    }
143
144    /// Load stored outgoing requests that were not yet sent out.
145    async fn load_outgoing_requests(&self) -> Result<Vec<OutgoingRequest>, CryptoStoreError> {
146        Ok(self
147            .inner
148            .store
149            .get_unsent_secret_requests()
150            .await?
151            .into_iter()
152            .filter(|i| !i.sent_out)
153            .map(|info| info.to_request(self.device_id()))
154            .collect())
155    }
156
157    /// Our own user id.
158    pub fn user_id(&self) -> &UserId {
159        &self.inner.store.static_account().user_id
160    }
161
162    /// Our own device ID.
163    pub fn device_id(&self) -> &DeviceId {
164        &self.inner.store.static_account().device_id
165    }
166
167    pub async fn outgoing_to_device_requests(
168        &self,
169    ) -> Result<Vec<OutgoingRequest>, CryptoStoreError> {
170        let mut key_requests = self.load_outgoing_requests().await?;
171        let key_forwards: Vec<OutgoingRequest> =
172            self.inner.outgoing_requests.read().values().cloned().collect();
173        key_requests.extend(key_forwards);
174
175        let users_for_key_claim: BTreeMap<_, _> = self
176            .inner
177            .users_for_key_claim
178            .read()
179            .iter()
180            .map(|(key, value)| {
181                let device_map = value
182                    .iter()
183                    .map(|d| (d.to_owned(), OneTimeKeyAlgorithm::SignedCurve25519))
184                    .collect();
185
186                (key.to_owned(), device_map)
187            })
188            .collect();
189
190        if !users_for_key_claim.is_empty() {
191            let key_claim_request = KeysClaimRequest::new(users_for_key_claim);
192            key_requests.push(OutgoingRequest {
193                request_id: TransactionId::new(),
194                request: Arc::new(key_claim_request.into()),
195            });
196        }
197
198        Ok(key_requests)
199    }
200
201    /// Receive a room key request event.
202    pub fn receive_incoming_key_request(&self, event: &RoomKeyRequestEvent) {
203        self.receive_event(event.clone().into())
204    }
205
206    fn receive_event(&self, event: RequestEvent) {
207        // Some servers might send to-device events to ourselves if we send one
208        // out using a wildcard instead of a specific device as a recipient.
209        //
210        // Check if we're the sender of this request event and ignore it if
211        // so.
212        if event.sender() == self.user_id() && event.requesting_device_id() == self.device_id() {
213            trace!("Received a secret request event from ourselves, ignoring")
214        } else {
215            let request_info = event.to_request_info();
216            self.inner.incoming_key_requests.write().insert(request_info, event);
217        }
218    }
219
220    pub fn receive_incoming_secret_request(&self, event: &SecretRequestEvent) {
221        self.receive_event(event.clone().into())
222    }
223
224    /// Handle all the incoming key requests that are queued up and empty our
225    /// key request queue.
226    pub async fn collect_incoming_key_requests(
227        &self,
228        cache: &StoreCache,
229    ) -> OlmResult<Vec<Session>> {
230        let mut changed_sessions = Vec::new();
231
232        let incoming_key_requests = mem::take(&mut *self.inner.incoming_key_requests.write());
233
234        for event in incoming_key_requests.values() {
235            if let Some(s) = match event {
236                #[cfg(feature = "automatic-room-key-forwarding")]
237                RequestEvent::KeyShare(e) => Box::pin(self.handle_key_request(cache, e)).await?,
238                RequestEvent::Secret(e) => Box::pin(self.handle_secret_request(cache, e)).await?,
239                #[cfg(not(feature = "automatic-room-key-forwarding"))]
240                _ => None,
241            } {
242                changed_sessions.push(s);
243            }
244        }
245
246        Ok(changed_sessions)
247    }
248
249    /// Store the key share request for later, once we get an Olm session with
250    /// the given device [`retry_keyshare`](#method.retry_keyshare) should be
251    /// called.
252    fn handle_key_share_without_session(&self, device: Device, event: RequestEvent) {
253        self.inner
254            .users_for_key_claim
255            .write()
256            .entry(device.user_id().to_owned())
257            .or_default()
258            .insert(device.device_id().into());
259        self.inner.wait_queue.insert(&device, event);
260    }
261
262    /// Retry keyshares for a device that previously didn't have an Olm session
263    /// with us.
264    ///
265    /// This should be only called if the given user/device got a new Olm
266    /// session.
267    ///
268    /// # Arguments
269    ///
270    /// * `user_id` - The user id of the device that we created the Olm session
271    ///   with.
272    ///
273    /// * `device_id` - The device ID of the device that got the Olm session.
274    pub fn retry_keyshare(&self, user_id: &UserId, device_id: &DeviceId) {
275        if let Entry::Occupied(mut e) =
276            self.inner.users_for_key_claim.write().entry(user_id.to_owned())
277        {
278            e.get_mut().remove(device_id);
279
280            if e.get().is_empty() {
281                e.remove();
282            }
283        }
284
285        let mut incoming_key_requests = self.inner.incoming_key_requests.write();
286        for (key, event) in self.inner.wait_queue.remove(user_id, device_id) {
287            incoming_key_requests.entry(key).or_insert(event);
288        }
289    }
290
291    async fn handle_secret_request(
292        &self,
293        cache: &StoreCache,
294        event: &SecretRequestEvent,
295    ) -> OlmResult<Option<Session>> {
296        let secret_name = match &event.content.action {
297            RequestAction::Request(s) => s,
298            // We ignore cancellations here since there's nothing to serve.
299            RequestAction::RequestCancellation => return Ok(None),
300            action => {
301                warn!(?action, "Unknown secret request action");
302                return Ok(None);
303            }
304        };
305
306        let content = if let Some(secret) = self.inner.store.export_secret(secret_name).await? {
307            SecretSendContent::new(event.content.request_id.to_owned(), secret)
308        } else {
309            info!(?secret_name, "Can't serve a secret request, secret isn't found");
310            return Ok(None);
311        };
312
313        let device =
314            self.inner.store.get_device(&event.sender, &event.content.requesting_device_id).await?;
315
316        if let Some(device) = device {
317            if device.user_id() == self.user_id() {
318                if device.is_verified() {
319                    info!(
320                        user_id = ?device.user_id(),
321                        device_id = ?device.device_id(),
322                        ?secret_name,
323                        "Sharing a secret with a device",
324                    );
325
326                    match self.share_secret(&device, content).await {
327                        Ok(s) => Ok(Some(s)),
328                        Err(OlmError::MissingSession) => {
329                            info!(
330                                user_id = ?device.user_id(),
331                                device_id = ?device.device_id(),
332                                ?secret_name,
333                                "Secret request is missing an Olm session, \
334                                putting the request in the wait queue",
335                            );
336                            self.handle_key_share_without_session(device, event.clone().into());
337
338                            Ok(None)
339                        }
340                        Err(e) => Err(e),
341                    }
342                } else {
343                    info!(
344                        user_id = ?device.user_id(),
345                        device_id = ?device.device_id(),
346                        ?secret_name,
347                        "Received a secret request that we won't serve, the device isn't trusted",
348                    );
349
350                    Ok(None)
351                }
352            } else {
353                info!(
354                    user_id = ?device.user_id(),
355                    device_id = ?device.device_id(),
356                    ?secret_name,
357                    "Received a secret request that we won't serve, the device doesn't belong to us",
358                );
359
360                Ok(None)
361            }
362        } else {
363            warn!(
364                user_id = ?event.sender,
365                device_id = ?event.content.requesting_device_id,
366                ?secret_name,
367                "Received a secret request from an unknown device",
368            );
369            self.inner
370                .identity_manager
371                .key_query_manager
372                .synced(cache)
373                .await?
374                .mark_user_as_changed(&event.sender)
375                .await?;
376
377            Ok(None)
378        }
379    }
380
381    /// Try to encrypt the given `InboundGroupSession` for the given `Device` as
382    /// a forwarded room key.
383    ///
384    /// This method might fail if we do not share an 1-to-1 Olm session with the
385    /// given `Device`, in that case we're going to queue up an
386    /// `/keys/claim` request to be sent out and retry once the 1-to-1 Olm
387    /// session has been established.
388    #[cfg(feature = "automatic-room-key-forwarding")]
389    async fn try_to_forward_room_key(
390        &self,
391        event: &RoomKeyRequestEvent,
392        device: Device,
393        session: &InboundGroupSession,
394        message_index: Option<u32>,
395    ) -> OlmResult<Option<Session>> {
396        info!(?message_index, "Serving a room key request",);
397
398        match self.forward_room_key(session, &device, message_index).await {
399            Ok(s) => Ok(Some(s)),
400            Err(OlmError::MissingSession) => {
401                info!(
402                    "Key request is missing an Olm session, putting the request in the wait queue",
403                );
404                self.handle_key_share_without_session(device, event.to_owned().into());
405
406                Ok(None)
407            }
408            Err(OlmError::SessionExport(e)) => {
409                warn!(
410                    "Can't serve a room key request, the session \
411                     can't be exported into a forwarded room key: {e:?}",
412                );
413                Ok(None)
414            }
415            Err(e) => Err(e),
416        }
417    }
418
419    /// Answer a room key request after we found the matching
420    /// `InboundGroupSession`.
421    #[cfg(feature = "automatic-room-key-forwarding")]
422    async fn answer_room_key_request(
423        &self,
424        cache: &StoreCache,
425        event: &RoomKeyRequestEvent,
426        session: &InboundGroupSession,
427    ) -> OlmResult<Option<Session>> {
428        use super::KeyForwardDecision;
429
430        let device =
431            self.inner.store.get_device(&event.sender, &event.content.requesting_device_id).await?;
432
433        let Some(device) = device else {
434            warn!("Received a key request from an unknown device");
435            self.identity_manager()
436                .key_query_manager
437                .synced(cache)
438                .await?
439                .mark_user_as_changed(&event.sender)
440                .await?;
441
442            return Ok(None);
443        };
444
445        match self.should_share_key(&device, session).await {
446            Ok(message_index) => {
447                self.try_to_forward_room_key(event, device, session, message_index).await
448            }
449            Err(e) => {
450                if let KeyForwardDecision::ChangedSenderKey = e {
451                    warn!(
452                        "Received a key request from a device that changed \
453                         their Curve25519 sender key"
454                    );
455                } else {
456                    debug!(
457                        reason = ?e,
458                        "Received a key request that we won't serve",
459                    );
460                }
461
462                Ok(None)
463            }
464        }
465    }
466
467    #[cfg(feature = "automatic-room-key-forwarding")]
468    #[tracing::instrument(
469        skip_all,
470        fields(
471            user_id = ?event.sender,
472            device_id = ?event.content.requesting_device_id,
473            ?room_id,
474            session_id
475        )
476    )]
477    async fn handle_supported_key_request(
478        &self,
479        cache: &StoreCache,
480        event: &RoomKeyRequestEvent,
481        room_id: &RoomId,
482        session_id: &str,
483    ) -> OlmResult<Option<Session>> {
484        let session = self.inner.store.get_inbound_group_session(room_id, session_id).await?;
485
486        if let Some(s) = session {
487            self.answer_room_key_request(cache, event, &s).await
488        } else {
489            debug!("Received a room key request for an unknown inbound group session",);
490
491            Ok(None)
492        }
493    }
494
495    /// Handle a single incoming key request.
496    #[cfg(feature = "automatic-room-key-forwarding")]
497    async fn handle_key_request(
498        &self,
499        cache: &StoreCache,
500        event: &RoomKeyRequestEvent,
501    ) -> OlmResult<Option<Session>> {
502        use crate::types::events::room_key_request::{Action, RequestedKeyInfo};
503
504        if self.inner.room_key_forwarding_enabled.load(Ordering::SeqCst) {
505            match &event.content.action {
506                Action::Request(info) => match info {
507                    RequestedKeyInfo::MegolmV1AesSha2(i) => {
508                        self.handle_supported_key_request(cache, event, &i.room_id, &i.session_id)
509                            .await
510                    }
511                    #[cfg(feature = "experimental-algorithms")]
512                    RequestedKeyInfo::MegolmV2AesSha2(i) => {
513                        self.handle_supported_key_request(cache, event, &i.room_id, &i.session_id)
514                            .await
515                    }
516                    RequestedKeyInfo::Unknown(i) => {
517                        debug!(
518                            sender = ?event.sender,
519                            algorithm = ?i.algorithm,
520                            "Received a room key request for a unsupported algorithm"
521                        );
522                        Ok(None)
523                    }
524                },
525                // We ignore cancellations here since there's nothing to serve.
526                Action::Cancellation => Ok(None),
527            }
528        } else {
529            debug!(
530                sender = ?event.sender,
531                "Received a room key request, but room key forwarding has been turned off"
532            );
533            Ok(None)
534        }
535    }
536
537    async fn share_secret(
538        &self,
539        device: &Device,
540        content: SecretSendContent,
541    ) -> OlmResult<Session> {
542        let event_type = content.event_type().to_owned();
543        let (used_session, content) = device.encrypt(&event_type, content).await?;
544
545        let encrypted_event_type = content.event_type().to_owned();
546
547        let request = ToDeviceRequest::new(
548            device.user_id(),
549            device.device_id().to_owned(),
550            &encrypted_event_type,
551            content.cast(),
552        );
553
554        let request = OutgoingRequest {
555            request_id: request.txn_id.clone(),
556            request: Arc::new(request.into()),
557        };
558        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
559
560        Ok(used_session)
561    }
562
563    #[cfg(feature = "automatic-room-key-forwarding")]
564    async fn forward_room_key(
565        &self,
566        session: &InboundGroupSession,
567        device: &Device,
568        message_index: Option<u32>,
569    ) -> OlmResult<Session> {
570        let (used_session, content) =
571            device.encrypt_room_key_for_forwarding(session.clone(), message_index).await?;
572
573        let event_type = content.event_type().to_owned();
574
575        let request = ToDeviceRequest::new(
576            device.user_id(),
577            device.device_id().to_owned(),
578            &event_type,
579            content.cast(),
580        );
581
582        let request = OutgoingRequest {
583            request_id: request.txn_id.clone(),
584            request: Arc::new(request.into()),
585        };
586        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
587
588        Ok(used_session)
589    }
590
591    /// Check if it's ok to share a session with the given device.
592    ///
593    /// The logic for this is currently as follows:
594    ///
595    /// * Share the session in full, starting from the earliest known index, if
596    ///   the requesting device is our own, trusted (verified) device.
597    ///
598    /// * For other requesting devices, share only a limited session and only if
599    ///   we originally shared with that device because it was present when the
600    ///   message was initially sent. By limited, we mean that the session will
601    ///   not be shared in full, but only from the message index at that moment.
602    ///   Since this information is recorded in the outbound session, we need to
603    ///   have it for this to work.
604    ///
605    /// * In all other cases, refuse to share the session.
606    ///
607    /// # Arguments
608    ///
609    /// * `device` - The device that is requesting a session from us.
610    ///
611    /// * `session` - The session that was requested to be shared.
612    ///
613    /// # Return value
614    ///
615    /// A `Result` representing whether we should share the session:
616    ///
617    /// - `Ok(None)`: Should share the entire session, starting with the
618    ///   earliest known index.
619    /// - `Ok(Some(i))`: Should share the session, but only starting from index
620    ///   i.
621    /// - `Err(x)`: Should *refuse* to share the session. `x` is the reason for
622    ///   the refusal.
623    #[cfg(feature = "automatic-room-key-forwarding")]
624    async fn should_share_key(
625        &self,
626        device: &Device,
627        session: &InboundGroupSession,
628    ) -> Result<Option<u32>, super::KeyForwardDecision> {
629        use super::KeyForwardDecision;
630        use crate::olm::ShareState;
631
632        let outbound_session = self
633            .inner
634            .outbound_group_sessions
635            .get_or_load(session.room_id())
636            .await
637            .filter(|outgoing_session| outgoing_session.session_id() == session.session_id());
638
639        // If this is our own, verified device, we share the entire session from the
640        // earliest known index.
641        if device.user_id() == self.user_id() && device.is_verified() {
642            Ok(None)
643        // Otherwise, if the records show we previously shared with this device,
644        // we'll reshare the session from the index we previously shared
645        // at. For this, we need an outbound session because this
646        // information is recorded there.
647        } else if let Some(outbound) = outbound_session {
648            match outbound.sharing_view().get_share_state(&device.inner) {
649                ShareState::Shared { message_index, olm_wedging_index: _ } => {
650                    Ok(Some(message_index))
651                }
652                ShareState::SharedButChangedSenderKey => Err(KeyForwardDecision::ChangedSenderKey),
653                ShareState::NotShared => Err(KeyForwardDecision::OutboundSessionNotShared),
654            }
655        // Otherwise, there's not enough info to decide if we can safely share
656        // the session.
657        } else if device.user_id() == self.user_id() {
658            Err(KeyForwardDecision::UntrustedDevice)
659        } else {
660            Err(KeyForwardDecision::MissingOutboundSession)
661        }
662    }
663
664    /// Check if it's ok, or rather if it makes sense to automatically request
665    /// a key from our other devices.
666    ///
667    /// # Arguments
668    ///
669    /// * `key_info` - The info of our key request containing information about
670    ///   the key we wish to request.
671    #[cfg(feature = "automatic-room-key-forwarding")]
672    async fn should_request_key(&self, key_info: &SecretInfo) -> Result<bool, CryptoStoreError> {
673        if self.inner.room_key_requests_enabled.load(Ordering::SeqCst) {
674            let request = self.inner.store.get_secret_request_by_info(key_info).await?;
675
676            // Don't send out duplicate requests, users can re-request them if they
677            // think a second request might succeed.
678            if request.is_none() {
679                let devices = self.inner.store.get_user_devices(self.user_id()).await?;
680
681                // Devices will only respond to key requests if the devices are
682                // verified, if the device isn't verified by us it's unlikely that
683                // we're verified by them either. Don't request keys if there isn't
684                // at least one verified device.
685                Ok(devices.is_any_verified())
686            } else {
687                Ok(false)
688            }
689        } else {
690            Ok(false)
691        }
692    }
693
694    /// Create a new outgoing key request for the key with the given session id.
695    ///
696    /// This will queue up a new to-device request and store the key info so
697    /// once we receive a forwarded room key we can check that it matches the
698    /// key we requested.
699    ///
700    /// This method will return a cancel request and a new key request if the
701    /// key was already requested, otherwise it will return just the key
702    /// request.
703    ///
704    /// # Arguments
705    ///
706    /// * `room_id` - The id of the room where the key is used in.
707    ///
708    /// * `event` - The event for which we would like to request the room key.
709    pub async fn request_key(
710        &self,
711        room_id: &RoomId,
712        event: &EncryptedEvent,
713    ) -> Result<(Option<OutgoingRequest>, OutgoingRequest), MegolmError> {
714        let secret_info =
715            event.room_key_info(room_id).ok_or(EventError::UnsupportedAlgorithm)?.into();
716
717        let request = self.inner.store.get_secret_request_by_info(&secret_info).await?;
718
719        if let Some(request) = request {
720            let cancel = request.to_cancellation(self.device_id());
721            let request = request.to_request(self.device_id());
722
723            Ok((Some(cancel), request))
724        } else {
725            let request = self.request_key_helper(secret_info).await?;
726
727            Ok((None, request))
728        }
729    }
730
731    /// Create outgoing secret requests for the given
732    pub fn request_missing_secrets(
733        own_user_id: &UserId,
734        secret_names: Vec<SecretName>,
735    ) -> Vec<GossipRequest> {
736        if !secret_names.is_empty() {
737            info!(?secret_names, "Creating new outgoing secret requests");
738
739            secret_names
740                .into_iter()
741                .map(|n| GossipRequest::from_secret_name(own_user_id.to_owned(), n))
742                .collect()
743        } else {
744            trace!("No secrets are missing from our store, not requesting them");
745            vec![]
746        }
747    }
748
749    async fn request_key_helper(
750        &self,
751        key_info: SecretInfo,
752    ) -> Result<OutgoingRequest, CryptoStoreError> {
753        let request = GossipRequest {
754            request_recipient: self.user_id().to_owned(),
755            request_id: TransactionId::new(),
756            info: key_info,
757            sent_out: false,
758        };
759
760        let outgoing_request = request.to_request(self.device_id());
761        self.save_outgoing_key_info(request).await?;
762
763        Ok(outgoing_request)
764    }
765
766    /// Create a new outgoing key request for the key with the given session id.
767    ///
768    /// This will queue up a new to-device request and store the key info so
769    /// once we receive a forwarded room key we can check that it matches the
770    /// key we requested.
771    ///
772    /// This does nothing if a request for this key has already been sent out.
773    ///
774    /// # Arguments
775    /// * `room_id` - The id of the room where the key is used in.
776    ///
777    /// * `event` - The event for which we would like to request the room key.
778    #[cfg(feature = "automatic-room-key-forwarding")]
779    pub async fn create_outgoing_key_request(
780        &self,
781        room_id: &RoomId,
782        event: &EncryptedEvent,
783    ) -> Result<bool, CryptoStoreError> {
784        if let Some(info) = event.room_key_info(room_id).map(|i| i.into()) {
785            if self.should_request_key(&info).await? {
786                // Size of the request_key_helper future should not impact this
787                // async fn since it is likely enough that this branch won't be
788                // entered.
789                Box::pin(self.request_key_helper(info)).await?;
790                return Ok(true);
791            }
792        }
793
794        Ok(false)
795    }
796
797    /// Save an outgoing key info.
798    async fn save_outgoing_key_info(&self, info: GossipRequest) -> Result<(), CryptoStoreError> {
799        let mut changes = Changes::default();
800        changes.key_requests.push(info);
801        self.inner.store.save_changes(changes).await?;
802
803        Ok(())
804    }
805
806    /// Delete the given outgoing key info.
807    async fn delete_key_info(&self, info: &GossipRequest) -> Result<(), CryptoStoreError> {
808        self.inner.store.delete_outgoing_secret_requests(&info.request_id).await
809    }
810
811    /// Mark the outgoing request as sent.
812    pub async fn mark_outgoing_request_as_sent(
813        &self,
814        id: &TransactionId,
815    ) -> Result<(), CryptoStoreError> {
816        let info = self.inner.store.get_outgoing_secret_requests(id).await?;
817
818        if let Some(mut info) = info {
819            trace!(
820                recipient = ?info.request_recipient,
821                request_type = info.request_type(),
822                request_id = ?info.request_id,
823                "Marking outgoing secret request as sent"
824            );
825            info.sent_out = true;
826            self.save_outgoing_key_info(info).await?;
827        }
828
829        self.inner.outgoing_requests.write().remove(id);
830
831        Ok(())
832    }
833
834    /// Mark the given outgoing key info as done.
835    ///
836    /// This will queue up a request cancellation.
837    async fn mark_as_done(&self, key_info: &GossipRequest) -> Result<(), CryptoStoreError> {
838        trace!(
839            recipient = ?key_info.request_recipient,
840            request_type = key_info.request_type(),
841            request_id = ?key_info.request_id,
842            "Successfully received a secret, removing the request"
843        );
844
845        self.inner.outgoing_requests.write().remove(&key_info.request_id);
846        // TODO return the key info instead of deleting it so the sync handler
847        // can delete it in one transaction.
848        self.delete_key_info(key_info).await?;
849
850        let request = key_info.to_cancellation(self.device_id());
851        self.inner.outgoing_requests.write().insert(request.request_id.clone(), request);
852
853        Ok(())
854    }
855
856    async fn accept_secret(
857        &self,
858        secret: GossippedSecret,
859        changes: &mut Changes,
860    ) -> Result<(), CryptoStoreError> {
861        if secret.secret_name != SecretName::RecoveryKey {
862            match self.inner.store.import_secret(&secret).await {
863                Ok(_) => self.mark_as_done(&secret.gossip_request).await?,
864                // If this is a store error propagate it up the call stack.
865                Err(SecretImportError::Store(e)) => return Err(e),
866                // Otherwise warn that there was something wrong with the
867                // secret.
868                Err(e) => {
869                    warn!(
870                        secret_name = ?secret.secret_name,
871                        error = ?e,
872                        "Error while importing a secret"
873                    );
874                }
875            }
876        } else {
877            // We would need to fire out a request to figure out if this backup decryption
878            // key is the one that is used for the current backup and if the
879            // backup is trusted.
880            //
881            // So we put the secret into our inbox. Later users can inspect the contents of
882            // the inbox and decide if they want to activate the backup.
883            info!("Received a backup decryption key, storing it into the secret inbox.");
884            changes.secrets.push(secret);
885        }
886
887        Ok(())
888    }
889
890    async fn receive_secret(
891        &self,
892        cache: &StoreCache,
893        sender_key: Curve25519PublicKey,
894        secret: GossippedSecret,
895        changes: &mut Changes,
896    ) -> Result<(), CryptoStoreError> {
897        debug!("Received a m.secret.send event with a matching request");
898
899        if let Some(device) =
900            self.inner.store.get_device_from_curve_key(&secret.event.sender, sender_key).await?
901        {
902            // Only accept secrets from one of our own trusted devices.
903            if device.user_id() == self.user_id() && device.is_verified() {
904                self.accept_secret(secret, changes).await?;
905            } else {
906                warn!("Received a m.secret.send event from another user or from unverified device");
907            }
908        } else {
909            warn!("Received a m.secret.send event from an unknown device");
910
911            self.identity_manager()
912                .key_query_manager
913                .synced(cache)
914                .await?
915                .mark_user_as_changed(&secret.event.sender)
916                .await?;
917        }
918
919        Ok(())
920    }
921
922    #[instrument(skip_all, fields(sender_key, sender = ?event.sender, request_id = ?event.content.request_id, secret_name))]
923    pub async fn receive_secret_event(
924        &self,
925        cache: &StoreCache,
926        sender_key: Curve25519PublicKey,
927        event: &DecryptedSecretSendEvent,
928        changes: &mut Changes,
929    ) -> Result<Option<SecretName>, CryptoStoreError> {
930        debug!("Received a m.secret.send event");
931
932        let request_id = &event.content.request_id;
933
934        let name = if let Some(request) =
935            self.inner.store.get_outgoing_secret_requests(request_id).await?
936        {
937            match &request.info {
938                SecretInfo::KeyRequest(_) => {
939                    warn!("Received a m.secret.send event but the request was for a room key");
940
941                    None
942                }
943                SecretInfo::SecretRequest(secret_name) => {
944                    Span::current().record("secret_name", debug(secret_name));
945
946                    let secret_name = secret_name.to_owned();
947
948                    let secret = GossippedSecret {
949                        secret_name: secret_name.to_owned(),
950                        event: event.to_owned(),
951                        gossip_request: request,
952                    };
953
954                    self.receive_secret(cache, sender_key, secret, changes).await?;
955
956                    Some(secret_name)
957                }
958            }
959        } else {
960            warn!("Received a m.secret.send event, but no matching request was found");
961            None
962        };
963
964        Ok(name)
965    }
966
967    async fn accept_forwarded_room_key(
968        &self,
969        info: &GossipRequest,
970        sender_key: Curve25519PublicKey,
971        event: &DecryptedForwardedRoomKeyEvent,
972    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
973        match InboundGroupSession::try_from(event) {
974            Ok(session) => {
975                if self.inner.store.compare_group_session(&session).await?
976                    == SessionOrdering::Better
977                {
978                    self.mark_as_done(info).await?;
979
980                    info!(
981                        ?sender_key,
982                        claimed_sender_key = ?session.sender_key(),
983                        room_id = ?session.room_id(),
984                        session_id = session.session_id(),
985                        algorithm = ?session.algorithm(),
986                        "Received a forwarded room key",
987                    );
988
989                    Ok(Some(session))
990                } else {
991                    info!(
992                        ?sender_key,
993                        claimed_sender_key = ?session.sender_key(),
994                        room_id = ?session.room_id(),
995                        session_id = session.session_id(),
996                        algorithm = ?session.algorithm(),
997                        "Received a forwarded room key but we already have a better version of it",
998                    );
999
1000                    Ok(None)
1001                }
1002            }
1003            Err(e) => {
1004                warn!(?sender_key, "Couldn't create a group session from a received room key");
1005                Err(e.into())
1006            }
1007        }
1008    }
1009
1010    async fn should_accept_forward(
1011        &self,
1012        info: &GossipRequest,
1013        sender_key: Curve25519PublicKey,
1014    ) -> Result<bool, CryptoStoreError> {
1015        let device =
1016            self.inner.store.get_device_from_curve_key(&info.request_recipient, sender_key).await?;
1017
1018        if let Some(device) = device {
1019            Ok(device.user_id() == self.user_id() && device.is_verified())
1020        } else {
1021            Ok(false)
1022        }
1023    }
1024
1025    /// Receive a forwarded room key event that was sent using any of our
1026    /// supported content types.
1027    async fn receive_supported_keys(
1028        &self,
1029        sender_key: Curve25519PublicKey,
1030        event: &DecryptedForwardedRoomKeyEvent,
1031    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
1032        let Some(info) = event.room_key_info() else {
1033            warn!(
1034                sender_key = sender_key.to_base64(),
1035                algorithm = ?event.content.algorithm(),
1036                "Received a forwarded room key with an unsupported algorithm",
1037            );
1038            return Ok(None);
1039        };
1040
1041        let Some(request) =
1042            self.inner.store.get_secret_request_by_info(&info.clone().into()).await?
1043        else {
1044            warn!(
1045                sender_key = ?sender_key,
1046                room_id = ?info.room_id(),
1047                session_id = info.session_id(),
1048                sender_key = ?sender_key,
1049                algorithm = ?info.algorithm(),
1050                "Received a forwarded room key that we didn't request",
1051            );
1052            return Ok(None);
1053        };
1054
1055        if self.should_accept_forward(&request, sender_key).await? {
1056            self.accept_forwarded_room_key(&request, sender_key, event).await
1057        } else {
1058            warn!(
1059                ?sender_key,
1060                room_id = ?info.room_id(),
1061                session_id = info.session_id(),
1062                "Received a forwarded room key from an unknown device, or \
1063                 from a device that the key request recipient doesn't own",
1064            );
1065
1066            Ok(None)
1067        }
1068    }
1069
1070    /// Receive a forwarded room key event.
1071    pub async fn receive_forwarded_room_key(
1072        &self,
1073        sender_key: Curve25519PublicKey,
1074        event: &DecryptedForwardedRoomKeyEvent,
1075    ) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
1076        match event.content {
1077            ForwardedRoomKeyContent::MegolmV1AesSha2(_) => {
1078                self.receive_supported_keys(sender_key, event).await
1079            }
1080            #[cfg(feature = "experimental-algorithms")]
1081            ForwardedRoomKeyContent::MegolmV2AesSha2(_) => {
1082                self.receive_supported_keys(sender_key, event).await
1083            }
1084            ForwardedRoomKeyContent::Unknown(_) => {
1085                warn!(
1086                    sender = event.sender.as_str(),
1087                    sender_key = sender_key.to_base64(),
1088                    algorithm = ?event.content.algorithm(),
1089                    "Received a forwarded room key with an unsupported algorithm",
1090                );
1091
1092                Ok(None)
1093            }
1094        }
1095    }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100    use std::sync::Arc;
1101
1102    #[cfg(feature = "automatic-room-key-forwarding")]
1103    use assert_matches::assert_matches;
1104    use matrix_sdk_test::{async_test, message_like_event_content};
1105    use ruma::{
1106        device_id, event_id,
1107        events::{
1108            secret::request::{RequestAction, SecretName, ToDeviceSecretRequestEventContent},
1109            ToDeviceEvent as RumaToDeviceEvent,
1110        },
1111        room_id,
1112        serde::Raw,
1113        user_id, DeviceId, RoomId, UserId,
1114    };
1115    use tokio::sync::Mutex;
1116
1117    use super::GossipMachine;
1118    #[cfg(feature = "automatic-room-key-forwarding")]
1119    use crate::{
1120        gossiping::KeyForwardDecision,
1121        olm::OutboundGroupSession,
1122        store::{CryptoStore, DeviceChanges},
1123        types::requests::AnyOutgoingRequest,
1124        types::{
1125            events::{
1126                forwarded_room_key::ForwardedRoomKeyContent, olm_v1::AnyDecryptedOlmEvent,
1127                olm_v1::DecryptedOlmV1Event,
1128            },
1129            EventEncryptionAlgorithm,
1130        },
1131        EncryptionSettings,
1132    };
1133    use crate::{
1134        identities::{DeviceData, IdentityManager, LocalTrust},
1135        olm::{Account, PrivateCrossSigningIdentity},
1136        session_manager::GroupSessionCache,
1137        store::{Changes, CryptoStoreWrapper, MemoryStore, PendingChanges, Store},
1138        types::events::room::encrypted::{
1139            EncryptedEvent, EncryptedToDeviceEvent, RoomEncryptedEventContent,
1140        },
1141        verification::VerificationMachine,
1142    };
1143
1144    fn alice_id() -> &'static UserId {
1145        user_id!("@alice:example.org")
1146    }
1147
1148    fn alice_device_id() -> &'static DeviceId {
1149        device_id!("JLAFKJWSCS")
1150    }
1151
1152    fn bob_id() -> &'static UserId {
1153        user_id!("@bob:example.org")
1154    }
1155
1156    fn bob_device_id() -> &'static DeviceId {
1157        device_id!("ILMLKASTES")
1158    }
1159
1160    fn alice2_device_id() -> &'static DeviceId {
1161        device_id!("ILMLKASTES")
1162    }
1163
1164    fn room_id() -> &'static RoomId {
1165        room_id!("!test:example.org")
1166    }
1167
1168    fn account() -> Account {
1169        Account::with_device_id(alice_id(), alice_device_id())
1170    }
1171
1172    fn bob_account() -> Account {
1173        Account::with_device_id(bob_id(), bob_device_id())
1174    }
1175
1176    fn alice_2_account() -> Account {
1177        Account::with_device_id(alice_id(), alice2_device_id())
1178    }
1179
1180    #[cfg(feature = "automatic-room-key-forwarding")]
1181    async fn gossip_machine_test_helper(user_id: &UserId) -> GossipMachine {
1182        let user_id = user_id.to_owned();
1183        let device_id = DeviceId::new();
1184
1185        let store = Arc::new(store_with_account_helper(&user_id, &device_id).await);
1186        let static_data = store.load_account().await.unwrap().unwrap().static_data;
1187        let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(alice_id())));
1188        let verification =
1189            VerificationMachine::new(static_data.clone(), identity.clone(), store.clone());
1190        let store = Store::new(static_data, identity, store, verification);
1191
1192        let session_cache = GroupSessionCache::new(store.clone());
1193        let identity_manager = IdentityManager::new(store.clone());
1194
1195        GossipMachine::new(store, identity_manager, session_cache, Default::default())
1196    }
1197
1198    #[cfg(feature = "automatic-room-key-forwarding")]
1199    async fn store_with_account_helper(
1200        user_id: &UserId,
1201        device_id: &DeviceId,
1202    ) -> CryptoStoreWrapper {
1203        // Properly create the store by first saving the own device and then the account
1204        // data.
1205        let account = Account::with_device_id(user_id, device_id);
1206        let device = DeviceData::from_account(&account);
1207        device.set_trust_state(LocalTrust::Verified);
1208
1209        let changes = Changes {
1210            devices: DeviceChanges { new: vec![device], ..Default::default() },
1211            ..Default::default()
1212        };
1213        let mem_store = MemoryStore::new();
1214        mem_store.save_changes(changes).await.unwrap();
1215        mem_store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1216
1217        CryptoStoreWrapper::new(user_id, device_id, mem_store)
1218    }
1219
1220    async fn get_machine_test_helper() -> GossipMachine {
1221        let user_id = alice_id().to_owned();
1222        let account = Account::with_device_id(&user_id, alice_device_id());
1223        let device = DeviceData::from_account(&account);
1224        let another_device =
1225            DeviceData::from_account(&Account::with_device_id(&user_id, alice2_device_id()));
1226
1227        let store =
1228            Arc::new(CryptoStoreWrapper::new(&user_id, account.device_id(), MemoryStore::new()));
1229        let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(alice_id())));
1230        let verification =
1231            VerificationMachine::new(account.static_data.clone(), identity.clone(), store.clone());
1232
1233        let store = Store::new(account.static_data().clone(), identity, store, verification);
1234        store.save_device_data(&[device, another_device]).await.unwrap();
1235        store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1236        let session_cache = GroupSessionCache::new(store.clone());
1237
1238        let identity_manager = IdentityManager::new(store.clone());
1239
1240        GossipMachine::new(store, identity_manager, session_cache, Default::default())
1241    }
1242
1243    #[cfg(feature = "automatic-room-key-forwarding")]
1244    async fn machines_for_key_share_test_helper(
1245        other_machine_owner: &UserId,
1246        create_sessions: bool,
1247        algorithm: EventEncryptionAlgorithm,
1248    ) -> (GossipMachine, OutboundGroupSession, GossipMachine) {
1249        use crate::olm::SenderData;
1250
1251        let alice_machine = get_machine_test_helper().await;
1252        let alice_device = DeviceData::from_account(
1253            &alice_machine.inner.store.cache().await.unwrap().account().await.unwrap(),
1254        );
1255
1256        let bob_machine = gossip_machine_test_helper(other_machine_owner).await;
1257
1258        let bob_device = DeviceData::from_account(
1259            #[allow(clippy::explicit_auto_deref)] // clippy's wrong
1260            &*bob_machine.inner.store.cache().await.unwrap().account().await.unwrap(),
1261        );
1262
1263        // We need a trusted device, otherwise we won't request keys
1264        let second_device = DeviceData::from_account(&alice_2_account());
1265        second_device.set_trust_state(LocalTrust::Verified);
1266        bob_device.set_trust_state(LocalTrust::Verified);
1267        alice_machine.inner.store.save_device_data(&[bob_device, second_device]).await.unwrap();
1268        let devices = std::slice::from_ref(&alice_device);
1269        bob_machine.inner.store.save_device_data(devices).await.unwrap();
1270
1271        if create_sessions {
1272            // Create Olm sessions for our two accounts.
1273            let (alice_session, bob_session) = alice_machine
1274                .inner
1275                .store
1276                .with_transaction(|mut atr| async {
1277                    let sessions = bob_machine
1278                        .inner
1279                        .store
1280                        .with_transaction(|mut btr| async {
1281                            let alice_account = atr.account().await?;
1282                            let bob_account = btr.account().await?;
1283                            let sessions =
1284                                alice_account.create_session_for_test_helper(bob_account).await;
1285                            Ok((btr, sessions))
1286                        })
1287                        .await?;
1288                    Ok((atr, sessions))
1289                })
1290                .await
1291                .unwrap();
1292
1293            // Populate our stores with Olm sessions and a Megolm session.
1294
1295            alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
1296            bob_machine.inner.store.save_sessions(&[bob_session]).await.unwrap();
1297        }
1298
1299        let settings = EncryptionSettings { algorithm, ..Default::default() };
1300        let (group_session, inbound_group_session) = bob_machine
1301            .inner
1302            .store
1303            .static_account()
1304            .create_group_session_pair(room_id(), settings, SenderData::unknown())
1305            .await
1306            .unwrap();
1307
1308        bob_machine
1309            .inner
1310            .store
1311            .save_inbound_group_sessions(&[inbound_group_session])
1312            .await
1313            .unwrap();
1314
1315        let content = group_session.encrypt("m.dummy", &message_like_event_content!({})).await;
1316        let event = wrap_encrypted_content(bob_machine.user_id(), content);
1317
1318        // Alice wants to request the outbound group session from bob.
1319        assert!(
1320            alice_machine.create_outgoing_key_request(room_id(), &event,).await.unwrap(),
1321            "We should request a room key"
1322        );
1323
1324        group_session
1325            .mark_shared_with(
1326                alice_device.user_id(),
1327                alice_device.device_id(),
1328                alice_device.curve25519_key().unwrap(),
1329            )
1330            .await;
1331
1332        // Put the outbound session into bobs store.
1333        bob_machine.inner.outbound_group_sessions.insert(group_session.clone());
1334
1335        (alice_machine, group_session, bob_machine)
1336    }
1337
1338    fn extract_content<'a>(
1339        recipient: &UserId,
1340        request: &'a crate::types::requests::OutgoingRequest,
1341    ) -> &'a Raw<ruma::events::AnyToDeviceEventContent> {
1342        request
1343            .request()
1344            .to_device()
1345            .expect("The request should be always a to-device request")
1346            .messages
1347            .get(recipient)
1348            .unwrap()
1349            .values()
1350            .next()
1351            .unwrap()
1352    }
1353
1354    fn wrap_encrypted_content(
1355        sender: &UserId,
1356        content: Raw<RoomEncryptedEventContent>,
1357    ) -> EncryptedEvent {
1358        let content = content.deserialize().unwrap();
1359
1360        EncryptedEvent {
1361            sender: sender.to_owned(),
1362            event_id: event_id!("$143273582443PhrSn:example.org").to_owned(),
1363            content,
1364            origin_server_ts: ruma::MilliSecondsSinceUnixEpoch::now(),
1365            unsigned: Default::default(),
1366            other: Default::default(),
1367        }
1368    }
1369
1370    fn request_to_event<C>(
1371        recipient: &UserId,
1372        sender: &UserId,
1373        request: &crate::types::requests::OutgoingRequest,
1374    ) -> crate::types::events::ToDeviceEvent<C>
1375    where
1376        C: crate::types::events::EventType
1377            + serde::de::DeserializeOwned
1378            + serde::ser::Serialize
1379            + std::fmt::Debug,
1380    {
1381        let content = extract_content(recipient, request);
1382        let content: C = content.deserialize_as().unwrap_or_else(|_| {
1383            panic!("We can always deserialize the to-device event content {content:?}")
1384        });
1385
1386        crate::types::events::ToDeviceEvent::new(sender.to_owned(), content)
1387    }
1388
1389    #[async_test]
1390    async fn test_create_machine() {
1391        let machine = get_machine_test_helper().await;
1392
1393        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1394    }
1395
1396    #[async_test]
1397    async fn test_re_request_keys() {
1398        let machine = get_machine_test_helper().await;
1399        let account = account();
1400
1401        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1402
1403        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1404        let event = wrap_encrypted_content(machine.user_id(), content);
1405
1406        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1407        let (cancel, request) = machine.request_key(session.room_id(), &event).await.unwrap();
1408
1409        assert!(cancel.is_none());
1410
1411        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1412
1413        let (cancel, _) = machine.request_key(session.room_id(), &event).await.unwrap();
1414
1415        assert!(cancel.is_some());
1416    }
1417
1418    #[async_test]
1419    #[cfg(feature = "automatic-room-key-forwarding")]
1420    async fn test_create_key_request() {
1421        let machine = get_machine_test_helper().await;
1422        let account = account();
1423        let second_account = alice_2_account();
1424        let alice_device = DeviceData::from_account(&second_account);
1425
1426        // We need a trusted device, otherwise we won't request keys
1427        alice_device.set_trust_state(LocalTrust::Verified);
1428        machine.inner.store.save_device_data(&[alice_device]).await.unwrap();
1429
1430        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1431        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1432        let event = wrap_encrypted_content(machine.user_id(), content);
1433
1434        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1435        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1436        assert!(!machine.outgoing_to_device_requests().await.unwrap().is_empty());
1437        assert_eq!(machine.outgoing_to_device_requests().await.unwrap().len(), 1);
1438
1439        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1440
1441        let requests = machine.outgoing_to_device_requests().await.unwrap();
1442        assert_eq!(requests.len(), 1);
1443
1444        let request = &requests[0];
1445
1446        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1447        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1448    }
1449
1450    /// We should *not* request keys if that has been disabled
1451    #[async_test]
1452    #[cfg(feature = "automatic-room-key-forwarding")]
1453    async fn test_create_key_request_requests_disabled() {
1454        let machine = get_machine_test_helper().await;
1455        let account = account();
1456        let second_account = alice_2_account();
1457        let alice_device = DeviceData::from_account(&second_account);
1458
1459        // We need a trusted device, otherwise we won't request keys
1460        alice_device.set_trust_state(LocalTrust::Verified);
1461        machine.inner.store.save_device_data(&[alice_device]).await.unwrap();
1462
1463        // Disable key requests
1464        assert!(machine.are_room_key_requests_enabled());
1465        machine.set_room_key_requests_enabled(false);
1466        assert!(!machine.are_room_key_requests_enabled());
1467
1468        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1469        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1470        let event = wrap_encrypted_content(machine.user_id(), content);
1471
1472        // The outgoing to-device requests should be empty before and after
1473        // `create_outgoing_key_request`.
1474        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1475        machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
1476        assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
1477    }
1478
1479    #[async_test]
1480    #[cfg(feature = "automatic-room-key-forwarding")]
1481    async fn test_receive_forwarded_key() {
1482        let machine = get_machine_test_helper().await;
1483        let account = account();
1484
1485        let second_account = alice_2_account();
1486        let alice_device = DeviceData::from_account(&second_account);
1487
1488        // We need a trusted device, otherwise we won't request keys
1489        alice_device.set_trust_state(LocalTrust::Verified);
1490        let devices = std::slice::from_ref(&alice_device);
1491        machine.inner.store.save_device_data(devices).await.unwrap();
1492
1493        let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
1494        let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
1495        let room_event = wrap_encrypted_content(machine.user_id(), content);
1496
1497        machine.create_outgoing_key_request(session.room_id(), &room_event).await.unwrap();
1498
1499        let requests = machine.outgoing_to_device_requests().await.unwrap();
1500        let request = &requests[0];
1501        let id = &request.request_id;
1502
1503        machine.mark_outgoing_request_as_sent(id).await.unwrap();
1504
1505        let export = session.export_at_index(10).await;
1506
1507        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1508
1509        let event = DecryptedOlmV1Event::new(
1510            alice_id(),
1511            alice_id(),
1512            alice_device.ed25519_key().unwrap(),
1513            None,
1514            content,
1515        );
1516
1517        assert!(machine
1518            .inner
1519            .store
1520            .get_inbound_group_session(session.room_id(), session.session_id(),)
1521            .await
1522            .unwrap()
1523            .is_none());
1524
1525        let first_session = machine
1526            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1527            .await
1528            .unwrap();
1529        let first_session = first_session.unwrap();
1530
1531        assert_eq!(first_session.first_known_index(), 10);
1532
1533        let sessions = std::slice::from_ref(&first_session);
1534        machine.inner.store.save_inbound_group_sessions(sessions).await.unwrap();
1535
1536        // Get the cancel request.
1537        let id = machine
1538            .inner
1539            .outgoing_requests
1540            .read()
1541            .first_key_value()
1542            .map(|(_, r)| r.request_id.clone())
1543            .unwrap();
1544        machine.mark_outgoing_request_as_sent(&id).await.unwrap();
1545
1546        machine.create_outgoing_key_request(session.room_id(), &room_event).await.unwrap();
1547
1548        let requests = machine.outgoing_to_device_requests().await.unwrap();
1549        let request = &requests[0];
1550
1551        machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1552
1553        let export = session.export_at_index(15).await;
1554
1555        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1556
1557        let event = DecryptedOlmV1Event::new(
1558            alice_id(),
1559            alice_id(),
1560            alice_device.ed25519_key().unwrap(),
1561            None,
1562            content,
1563        );
1564
1565        let second_session = machine
1566            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1567            .await
1568            .unwrap();
1569
1570        assert!(second_session.is_none());
1571
1572        let export = session.export_at_index(0).await;
1573
1574        let content: ForwardedRoomKeyContent = export.try_into().unwrap();
1575
1576        let event = DecryptedOlmV1Event::new(
1577            alice_id(),
1578            alice_id(),
1579            alice_device.ed25519_key().unwrap(),
1580            None,
1581            content,
1582        );
1583
1584        let second_session = machine
1585            .receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
1586            .await
1587            .unwrap();
1588
1589        assert_eq!(second_session.unwrap().first_known_index(), 0);
1590    }
1591
1592    #[async_test]
1593    #[cfg(feature = "automatic-room-key-forwarding")]
1594    async fn test_should_share_key() {
1595        let machine = get_machine_test_helper().await;
1596        let account = account();
1597
1598        let own_device =
1599            machine.inner.store.get_device(alice_id(), alice2_device_id()).await.unwrap().unwrap();
1600
1601        let (outbound, inbound) = account.create_group_session_pair_with_defaults(room_id()).await;
1602
1603        // We don't share keys with untrusted devices.
1604        assert_matches!(
1605            machine.should_share_key(&own_device, &inbound).await,
1606            Err(KeyForwardDecision::UntrustedDevice)
1607        );
1608        own_device.set_trust_state(LocalTrust::Verified);
1609        // Now we do want to share the keys.
1610        machine.should_share_key(&own_device, &inbound).await.unwrap();
1611
1612        let bob_device = DeviceData::from_account(&bob_account());
1613        machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1614
1615        let bob_device =
1616            machine.inner.store.get_device(bob_id(), bob_device_id()).await.unwrap().unwrap();
1617
1618        // We don't share sessions with other user's devices if no outbound
1619        // session was provided.
1620        assert_matches!(
1621            machine.should_share_key(&bob_device, &inbound).await,
1622            Err(KeyForwardDecision::MissingOutboundSession)
1623        );
1624
1625        let mut changes = Changes::default();
1626
1627        changes.outbound_group_sessions.push(outbound.clone());
1628        changes.inbound_group_sessions.push(inbound.clone());
1629        machine.inner.store.save_changes(changes).await.unwrap();
1630        machine.inner.outbound_group_sessions.insert(outbound.clone());
1631
1632        // We don't share sessions with other user's devices if the session
1633        // wasn't shared in the first place.
1634        assert_matches!(
1635            machine.should_share_key(&bob_device, &inbound).await,
1636            Err(KeyForwardDecision::OutboundSessionNotShared)
1637        );
1638
1639        bob_device.set_trust_state(LocalTrust::Verified);
1640
1641        // We don't share sessions with other user's devices if the session
1642        // wasn't shared in the first place even if the device is trusted.
1643        assert_matches!(
1644            machine.should_share_key(&bob_device, &inbound).await,
1645            Err(KeyForwardDecision::OutboundSessionNotShared)
1646        );
1647
1648        // We now share the session, since it was shared before.
1649        outbound
1650            .mark_shared_with(
1651                bob_device.user_id(),
1652                bob_device.device_id(),
1653                bob_device.curve25519_key().unwrap(),
1654            )
1655            .await;
1656        machine.should_share_key(&bob_device, &inbound).await.unwrap();
1657
1658        let (other_outbound, other_inbound) =
1659            account.create_group_session_pair_with_defaults(room_id()).await;
1660
1661        // But we don't share some other session that doesn't match our outbound
1662        // session.
1663        assert_matches!(
1664            machine.should_share_key(&bob_device, &other_inbound).await,
1665            Err(KeyForwardDecision::MissingOutboundSession)
1666        );
1667
1668        // Finally, let's ensure we don't share the session with a device that rotated
1669        // its curve25519 key.
1670        let bob_device = DeviceData::from_account(&bob_account());
1671        machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1672
1673        let bob_device =
1674            machine.inner.store.get_device(bob_id(), bob_device_id()).await.unwrap().unwrap();
1675        assert_matches!(
1676            machine.should_share_key(&bob_device, &inbound).await,
1677            Err(KeyForwardDecision::ChangedSenderKey)
1678        );
1679
1680        // Now let's encrypt some messages in another session to increment the message
1681        // index and then share it with our own untrusted device.
1682        own_device.set_trust_state(LocalTrust::Unset);
1683
1684        for _ in 1..=3 {
1685            other_outbound.encrypt_helper("foo".to_owned()).await;
1686        }
1687        other_outbound
1688            .mark_shared_with(
1689                own_device.user_id(),
1690                own_device.device_id(),
1691                own_device.curve25519_key().unwrap(),
1692            )
1693            .await;
1694
1695        machine.inner.outbound_group_sessions.insert(other_outbound.clone());
1696
1697        // Since our device is untrusted, we should share the session starting only from
1698        // the current index (at which the message was marked as shared). This
1699        // should be 3 since we encrypted 3 messages.
1700        assert_matches!(machine.should_share_key(&own_device, &other_inbound).await, Ok(Some(3)));
1701
1702        own_device.set_trust_state(LocalTrust::Verified);
1703
1704        // However once our device is trusted, we share the entire session.
1705        assert_matches!(machine.should_share_key(&own_device, &other_inbound).await, Ok(None));
1706    }
1707
1708    #[cfg(feature = "automatic-room-key-forwarding")]
1709    async fn test_key_share_cycle(algorithm: EventEncryptionAlgorithm) {
1710        let (alice_machine, group_session, bob_machine) =
1711            machines_for_key_share_test_helper(alice_id(), true, algorithm).await;
1712
1713        // Get the request and convert it into a event.
1714        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
1715        let request = &requests[0];
1716        let event = request_to_event(alice_id(), alice_id(), request);
1717
1718        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1719
1720        // Bob doesn't have any outgoing requests.
1721        assert!(bob_machine.inner.outgoing_requests.read().is_empty());
1722
1723        // Receive the room key request from alice.
1724        bob_machine.receive_incoming_key_request(&event);
1725
1726        {
1727            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
1728            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
1729        }
1730        // Now bob does have an outgoing request.
1731        assert!(!bob_machine.inner.outgoing_requests.read().is_empty());
1732
1733        // Get the request and convert it to a encrypted to-device event.
1734        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
1735        let request = &requests[0];
1736
1737        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), alice_id(), request);
1738        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1739
1740        // Check that alice doesn't have the session.
1741        assert!(alice_machine
1742            .inner
1743            .store
1744            .get_inbound_group_session(room_id(), group_session.session_id())
1745            .await
1746            .unwrap()
1747            .is_none());
1748
1749        let decrypted = alice_machine
1750            .inner
1751            .store
1752            .with_transaction(|mut tr| async {
1753                let res = tr
1754                    .account()
1755                    .await?
1756                    .decrypt_to_device_event(&alice_machine.inner.store, &event)
1757                    .await?;
1758                Ok((tr, res))
1759            })
1760            .await
1761            .unwrap();
1762
1763        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
1764            panic!("Invalid decrypted event type");
1765        };
1766
1767        let session = alice_machine
1768            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
1769            .await
1770            .unwrap();
1771        alice_machine.inner.store.save_inbound_group_sessions(&[session.unwrap()]).await.unwrap();
1772
1773        // Check that alice now does have the session.
1774        let session = alice_machine
1775            .inner
1776            .store
1777            .get_inbound_group_session(room_id(), group_session.session_id())
1778            .await
1779            .unwrap()
1780            .unwrap();
1781
1782        assert_eq!(session.session_id(), group_session.session_id())
1783    }
1784
1785    #[async_test]
1786    #[cfg(feature = "automatic-room-key-forwarding")]
1787    async fn test_reject_forward_from_another_user() {
1788        let (alice_machine, group_session, bob_machine) = machines_for_key_share_test_helper(
1789            bob_id(),
1790            true,
1791            EventEncryptionAlgorithm::MegolmV1AesSha2,
1792        )
1793        .await;
1794
1795        // Get the request and convert it into a event.
1796        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
1797        let request = &requests[0];
1798        let event = request_to_event(alice_id(), alice_id(), request);
1799
1800        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1801
1802        // Bob doesn't have any outgoing requests.
1803        assert!(bob_machine.inner.outgoing_requests.read().is_empty());
1804
1805        // Receive the room key request from alice.
1806        bob_machine.receive_incoming_key_request(&event);
1807        {
1808            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
1809            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
1810        }
1811        // Now bob does have an outgoing request.
1812        assert!(!bob_machine.inner.outgoing_requests.read().is_empty());
1813
1814        // Get the request and convert it to a encrypted to-device event.
1815        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
1816        let request = &requests[0];
1817
1818        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), bob_id(), request);
1819        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
1820
1821        // Check that alice doesn't have the session.
1822        assert!(alice_machine
1823            .inner
1824            .store
1825            .get_inbound_group_session(room_id(), group_session.session_id())
1826            .await
1827            .unwrap()
1828            .is_none());
1829
1830        let decrypted = alice_machine
1831            .inner
1832            .store
1833            .with_transaction(|mut tr| async {
1834                let res = tr
1835                    .account()
1836                    .await?
1837                    .decrypt_to_device_event(&alice_machine.inner.store, &event)
1838                    .await?;
1839                Ok((tr, res))
1840            })
1841            .await
1842            .unwrap();
1843        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
1844            panic!("Invalid decrypted event type");
1845        };
1846
1847        let session = alice_machine
1848            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
1849            .await
1850            .unwrap();
1851
1852        assert!(session.is_none(), "We should not receive a room key from another user");
1853    }
1854
1855    #[async_test]
1856    #[cfg(feature = "automatic-room-key-forwarding")]
1857    async fn test_key_share_cycle_megolm_v1() {
1858        test_key_share_cycle(EventEncryptionAlgorithm::MegolmV1AesSha2).await;
1859    }
1860
1861    #[async_test]
1862    #[cfg(all(feature = "experimental-algorithms", feature = "automatic-room-key-forwarding"))]
1863    async fn test_key_share_cycle_megolm_v2() {
1864        test_key_share_cycle(EventEncryptionAlgorithm::MegolmV2AesSha2).await;
1865    }
1866
1867    #[async_test]
1868    async fn test_secret_share_cycle() {
1869        let alice_machine = get_machine_test_helper().await;
1870
1871        let mut second_account = alice_2_account();
1872        let alice_device = DeviceData::from_account(&second_account);
1873
1874        let bob_account = bob_account();
1875        let bob_device = DeviceData::from_account(&bob_account);
1876
1877        let devices = std::slice::from_ref(&alice_device);
1878        alice_machine.inner.store.save_device_data(devices).await.unwrap();
1879
1880        // Create Olm sessions for our two accounts.
1881        let alice_session = alice_machine
1882            .inner
1883            .store
1884            .with_transaction(|mut tr| async {
1885                let alice_account = tr.account().await?;
1886                let (alice_session, _) =
1887                    alice_account.create_session_for_test_helper(&mut second_account).await;
1888                Ok((tr, alice_session))
1889            })
1890            .await
1891            .unwrap();
1892
1893        alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
1894
1895        let event = RumaToDeviceEvent {
1896            sender: bob_account.user_id().to_owned(),
1897            content: ToDeviceSecretRequestEventContent::new(
1898                RequestAction::Request(SecretName::CrossSigningMasterKey),
1899                bob_account.device_id().to_owned(),
1900                "request_id".into(),
1901            ),
1902        };
1903
1904        // No secret found
1905        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1906        alice_machine.receive_incoming_secret_request(&event);
1907        {
1908            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1909            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1910        }
1911        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1912
1913        // No device found
1914        alice_machine.inner.store.reset_cross_signing_identity().await;
1915        alice_machine.receive_incoming_secret_request(&event);
1916        {
1917            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1918            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1919        }
1920        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1921
1922        alice_machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
1923
1924        // The device doesn't belong to us
1925        alice_machine.inner.store.reset_cross_signing_identity().await;
1926        alice_machine.receive_incoming_secret_request(&event);
1927        {
1928            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1929            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1930        }
1931        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1932
1933        let event = RumaToDeviceEvent {
1934            sender: alice_id().to_owned(),
1935            content: ToDeviceSecretRequestEventContent::new(
1936                RequestAction::Request(SecretName::CrossSigningMasterKey),
1937                second_account.device_id().into(),
1938                "request_id".into(),
1939            ),
1940        };
1941
1942        // The device isn't trusted
1943        alice_machine.receive_incoming_secret_request(&event);
1944        {
1945            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1946            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1947        }
1948        assert!(alice_machine.inner.outgoing_requests.read().is_empty());
1949
1950        // We need a trusted device, otherwise we won't serve secrets
1951        alice_device.set_trust_state(LocalTrust::Verified);
1952        let devices = std::slice::from_ref(&alice_device);
1953        alice_machine.inner.store.save_device_data(devices).await.unwrap();
1954
1955        alice_machine.receive_incoming_secret_request(&event);
1956        {
1957            let alice_cache = alice_machine.inner.store.cache().await.unwrap();
1958            alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
1959        }
1960        assert!(!alice_machine.inner.outgoing_requests.read().is_empty());
1961    }
1962
1963    #[async_test]
1964    async fn test_secret_broadcasting() {
1965        use futures_util::{pin_mut, FutureExt};
1966        use ruma::api::client::to_device::send_event_to_device::v3::Response as ToDeviceResponse;
1967        use serde_json::value::to_raw_value;
1968        use tokio_stream::StreamExt;
1969
1970        use crate::{
1971            machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper,
1972            EncryptionSyncChanges,
1973        };
1974
1975        let alice_id = user_id!("@alice:localhost");
1976
1977        let (alice_machine, bob_machine) =
1978            get_machine_pair_with_setup_sessions_test_helper(alice_id, alice_id, false).await;
1979
1980        let key_requests = GossipMachine::request_missing_secrets(
1981            bob_machine.user_id(),
1982            vec![SecretName::RecoveryKey],
1983        );
1984        let mut changes = Changes::default();
1985        let request_id = key_requests[0].request_id.to_owned();
1986        changes.key_requests = key_requests;
1987        bob_machine.store().save_changes(changes).await.unwrap();
1988        for request in bob_machine.outgoing_requests().await.unwrap() {
1989            bob_machine
1990                .mark_request_as_sent(request.request_id(), &ToDeviceResponse::new())
1991                .await
1992                .unwrap();
1993        }
1994
1995        let event = RumaToDeviceEvent {
1996            sender: alice_machine.user_id().to_owned(),
1997            content: ToDeviceSecretRequestEventContent::new(
1998                RequestAction::Request(SecretName::RecoveryKey),
1999                bob_machine.device_id().to_owned(),
2000                request_id,
2001            ),
2002        };
2003
2004        let bob_device = alice_machine
2005            .get_device(alice_id, bob_machine.device_id(), None)
2006            .await
2007            .unwrap()
2008            .unwrap();
2009        let alice_device = bob_machine
2010            .get_device(alice_id, alice_machine.device_id(), None)
2011            .await
2012            .unwrap()
2013            .unwrap();
2014
2015        // We need a trusted device, otherwise we won't serve nor accept secrets.
2016        bob_device.set_trust_state(LocalTrust::Verified);
2017        alice_device.set_trust_state(LocalTrust::Verified);
2018        alice_machine.store().save_device_data(&[bob_device.inner]).await.unwrap();
2019        bob_machine.store().save_device_data(&[alice_device.inner]).await.unwrap();
2020
2021        let decryption_key = crate::store::BackupDecryptionKey::new().unwrap();
2022        alice_machine
2023            .backup_machine()
2024            .save_decryption_key(Some(decryption_key), None)
2025            .await
2026            .unwrap();
2027        alice_machine.inner.key_request_machine.receive_incoming_secret_request(&event);
2028        {
2029            let alice_cache = alice_machine.store().cache().await.unwrap();
2030            alice_machine
2031                .inner
2032                .key_request_machine
2033                .collect_incoming_key_requests(&alice_cache)
2034                .await
2035                .unwrap();
2036        }
2037
2038        let requests =
2039            alice_machine.inner.key_request_machine.outgoing_to_device_requests().await.unwrap();
2040
2041        assert_eq!(requests.len(), 1);
2042        let request = requests.first().expect("We should have an outgoing to-device request");
2043
2044        let event: EncryptedToDeviceEvent =
2045            request_to_event(bob_machine.user_id(), alice_machine.user_id(), request);
2046        let event = Raw::from_json(to_raw_value(&event).unwrap());
2047
2048        let stream = bob_machine.store().secrets_stream();
2049        pin_mut!(stream);
2050
2051        bob_machine
2052            .receive_sync_changes(EncryptionSyncChanges {
2053                to_device_events: vec![event],
2054                changed_devices: &Default::default(),
2055                one_time_keys_counts: &Default::default(),
2056                unused_fallback_keys: None,
2057                next_batch_token: None,
2058            })
2059            .await
2060            .unwrap();
2061
2062        stream.next().now_or_never().expect("The broadcaster should have sent out the secret");
2063    }
2064
2065    #[async_test]
2066    #[cfg(feature = "automatic-room-key-forwarding")]
2067    async fn test_key_share_cycle_without_session() {
2068        let (alice_machine, group_session, bob_machine) = machines_for_key_share_test_helper(
2069            alice_id(),
2070            false,
2071            EventEncryptionAlgorithm::MegolmV1AesSha2,
2072        )
2073        .await;
2074
2075        // Get the request and convert it into a event.
2076        let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
2077        let request = &requests[0];
2078        let event = request_to_event(alice_id(), alice_id(), request);
2079
2080        alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
2081
2082        // Bob doesn't have any outgoing requests.
2083        assert!(bob_machine.outgoing_to_device_requests().await.unwrap().is_empty());
2084        assert!(bob_machine.inner.users_for_key_claim.read().is_empty());
2085        assert!(bob_machine.inner.wait_queue.is_empty());
2086
2087        // Receive the room key request from alice.
2088        bob_machine.receive_incoming_key_request(&event);
2089        {
2090            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
2091            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
2092        }
2093        // Bob only has a keys claim request, since we're lacking a session
2094        assert_eq!(bob_machine.outgoing_to_device_requests().await.unwrap().len(), 1);
2095        assert_matches!(
2096            bob_machine.outgoing_to_device_requests().await.unwrap()[0].request(),
2097            AnyOutgoingRequest::KeysClaim(_)
2098        );
2099        assert!(!bob_machine.inner.users_for_key_claim.read().is_empty());
2100        assert!(!bob_machine.inner.wait_queue.is_empty());
2101
2102        let (alice_session, bob_session) = alice_machine
2103            .inner
2104            .store
2105            .with_transaction(|mut atr| async {
2106                let res = bob_machine
2107                    .inner
2108                    .store
2109                    .with_transaction(|mut btr| async {
2110                        let alice_account = atr.account().await?;
2111                        let bob_account = btr.account().await?;
2112                        let sessions =
2113                            alice_account.create_session_for_test_helper(bob_account).await;
2114                        Ok((btr, sessions))
2115                    })
2116                    .await?;
2117                Ok((atr, res))
2118            })
2119            .await
2120            .unwrap();
2121
2122        // We create a session now.
2123        alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
2124        bob_machine.inner.store.save_sessions(&[bob_session]).await.unwrap();
2125
2126        bob_machine.retry_keyshare(alice_id(), alice_device_id());
2127        assert!(bob_machine.inner.users_for_key_claim.read().is_empty());
2128        {
2129            let bob_cache = bob_machine.inner.store.cache().await.unwrap();
2130            bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
2131        }
2132        // Bob now has an outgoing requests.
2133        assert!(!bob_machine.outgoing_to_device_requests().await.unwrap().is_empty());
2134        assert!(bob_machine.inner.wait_queue.is_empty());
2135
2136        // Get the request and convert it to a encrypted to-device event.
2137        let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
2138        let request = &requests[0];
2139
2140        let event: EncryptedToDeviceEvent = request_to_event(alice_id(), alice_id(), request);
2141        bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
2142
2143        // Check that alice doesn't have the session.
2144        assert!(alice_machine
2145            .inner
2146            .store
2147            .get_inbound_group_session(room_id(), group_session.session_id())
2148            .await
2149            .unwrap()
2150            .is_none());
2151
2152        let decrypted = alice_machine
2153            .inner
2154            .store
2155            .with_transaction(|mut tr| async {
2156                let res = tr
2157                    .account()
2158                    .await?
2159                    .decrypt_to_device_event(&alice_machine.inner.store, &event)
2160                    .await?;
2161                Ok((tr, res))
2162            })
2163            .await
2164            .unwrap();
2165
2166        let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
2167            panic!("Invalid decrypted event type");
2168        };
2169
2170        let session = alice_machine
2171            .receive_forwarded_room_key(decrypted.result.sender_key, ev)
2172            .await
2173            .unwrap();
2174        alice_machine.inner.store.save_inbound_group_sessions(&[session.unwrap()]).await.unwrap();
2175
2176        // Check that alice now does have the session.
2177        let session = alice_machine
2178            .inner
2179            .store
2180            .get_inbound_group_session(room_id(), group_session.session_id())
2181            .await
2182            .unwrap()
2183            .unwrap();
2184
2185        assert_eq!(session.session_id(), group_session.session_id())
2186    }
2187}