matrix_sdk_crypto/identities/
manager.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17    ops::Deref,
18    sync::Arc,
19    time::Duration,
20};
21
22use futures_util::future::join_all;
23use itertools::Itertools;
24use matrix_sdk_common::{executor::spawn, failures_cache::FailuresCache};
25use ruma::{
26    api::client::keys::get_keys::v3::Response as KeysQueryResponse, serde::Raw, OwnedDeviceId,
27    OwnedServerName, OwnedTransactionId, OwnedUserId, ServerName, TransactionId, UserId,
28};
29use tokio::sync::Mutex;
30use tracing::{debug, enabled, info, instrument, trace, warn, Level};
31
32use crate::{
33    error::OlmResult,
34    identities::{DeviceData, OtherUserIdentityData, OwnUserIdentityData, UserIdentityData},
35    olm::{
36        sender_data_finder::SessionDeviceCheckError, InboundGroupSession,
37        PrivateCrossSigningIdentity, SenderDataFinder, SenderDataType,
38    },
39    store::{
40        caches::SequenceNumber, Changes, DeviceChanges, IdentityChanges, KeyQueryManager,
41        Result as StoreResult, Store, StoreCache, StoreCacheGuard, UserKeyQueryResult,
42    },
43    types::{
44        requests::KeysQueryRequest, CrossSigningKey, DeviceKeys, MasterPubkey, SelfSigningPubkey,
45        UserSigningPubkey,
46    },
47    CryptoStoreError, LocalTrust, OwnUserIdentity, SignatureError, UserIdentity,
48};
49
50enum DeviceChange {
51    New(DeviceData),
52    Updated(DeviceData),
53    None,
54}
55
56/// This enum helps us to distinguish between the changed and unchanged
57/// identity case.
58/// An unchanged identity means same cross signing keys as well as same
59/// set of signatures on the master key.
60enum IdentityUpdateResult {
61    Updated(UserIdentityData),
62    Unchanged(UserIdentityData),
63}
64
65#[derive(Debug, Clone)]
66pub(crate) struct IdentityManager {
67    /// Servers that have previously appeared in the `failures` section of a
68    /// `/keys/query` response.
69    ///
70    /// See also [`crate::session_manager::SessionManager::failures`].
71    failures: FailuresCache<OwnedServerName>,
72    store: Store,
73
74    pub(crate) key_query_manager: Arc<KeyQueryManager>,
75
76    /// Details of the current "in-flight" key query request, if any
77    keys_query_request_details: Arc<Mutex<Option<KeysQueryRequestDetails>>>,
78}
79
80/// Details of an in-flight key query request
81#[derive(Debug, Clone, Default)]
82struct KeysQueryRequestDetails {
83    /// The sequence number, to be passed to
84    /// `Store.mark_tracked_users_as_up_to_date`.
85    sequence_number: SequenceNumber,
86
87    /// A single batch of queries returned by the Store is broken up into one or
88    /// more actual KeysQueryRequests, each with their own request id. We
89    /// record the outstanding request ids here.
90    request_ids: HashSet<OwnedTransactionId>,
91}
92
93// Helper type to handle key query response
94struct KeySetInfo {
95    user_id: OwnedUserId,
96    master_key: MasterPubkey,
97    self_signing: SelfSigningPubkey,
98}
99
100impl IdentityManager {
101    const MAX_KEY_QUERY_USERS: usize = 250;
102
103    pub fn new(store: Store) -> Self {
104        let keys_query_request_details = Mutex::new(None);
105
106        IdentityManager {
107            store,
108            key_query_manager: Default::default(),
109            failures: Default::default(),
110            keys_query_request_details: keys_query_request_details.into(),
111        }
112    }
113
114    fn user_id(&self) -> &UserId {
115        &self.store.static_account().user_id
116    }
117
118    /// Receive a successful `/keys/query` response.
119    ///
120    /// Returns a list of devices newly discovered devices and devices that
121    /// changed.
122    ///
123    /// # Arguments
124    ///
125    /// * `request_id` - The request_id returned by `users_for_key_query` or
126    ///   `build_key_query_for_users`
127    /// * `response` - The response of the `/keys/query` request that the client
128    ///   performed.
129    pub async fn receive_keys_query_response(
130        &self,
131        request_id: &TransactionId,
132        response: &KeysQueryResponse,
133    ) -> OlmResult<(DeviceChanges, IdentityChanges)> {
134        debug!(
135            ?request_id,
136            users = ?response.device_keys.keys().collect::<BTreeSet<_>>(),
137            failures = ?response.failures,
138            "Handling a `/keys/query` response"
139        );
140
141        // Parse the strings into server names and filter out our own server. We should
142        // never get failures from our own server but let's remove it as a
143        // precaution anyways.
144        let failed_servers = response
145            .failures
146            .keys()
147            .filter_map(|k| ServerName::parse(k).ok())
148            .filter(|s| s != self.user_id().server_name());
149        let successful_servers = response.device_keys.keys().map(|u| u.server_name());
150
151        // Append the new failed servers and remove any successful servers. We
152        // need to explicitly remove the successful servers because the cache
153        // doesn't automatically remove entries that elapse. Instead, the effect
154        // is that elapsed servers will be retried and their delays incremented.
155        self.failures.extend(failed_servers);
156        self.failures.remove(successful_servers);
157
158        let devices = self.handle_devices_from_key_query(response.device_keys.clone()).await?;
159        let (identities, cross_signing_identity) = self.handle_cross_signing_keys(response).await?;
160
161        let changes = Changes {
162            identities: identities.clone(),
163            devices: devices.clone(),
164            private_identity: cross_signing_identity,
165            ..Default::default()
166        };
167
168        self.store.save_changes(changes).await?;
169
170        // Update the sender data on any existing inbound group sessions based on the
171        // changes in this response.
172        //
173        // `update_sender_data_from_device_changes` relies on being able to look up the
174        // user identities from the store, so this has to happen *after* the
175        // changes from `handle_cross_signing_keys` are saved.
176        //
177        // Note: it might be possible for this to race against session creation. If a
178        // new session is received at the same time as a `/keys/query` response is being
179        // processed, it could be saved without up-to-date sender data, but it might be
180        // saved too late for it to be picked up by
181        // `update_sender_data_from_device_changes`. However, this should be rare,
182        // since, in general, /sync responses which might create a new session
183        // are not processed at the same time as /keys/query responses (assuming
184        // that the application does not call `OlmMachine::receive_sync_changes`
185        // at the same time as `OlmMachine::mark_request_as_sent`).
186        self.update_sender_data_from_device_changes(&devices).await?;
187
188        // if this request is one of those we expected to be in flight, pass the
189        // sequence number back to the store so that it can mark devices up to
190        // date
191        let sequence_number = {
192            let mut request_details = self.keys_query_request_details.lock().await;
193
194            request_details.as_mut().and_then(|details| {
195                if details.request_ids.remove(request_id) {
196                    Some(details.sequence_number)
197                } else {
198                    None
199                }
200            })
201        };
202
203        if let Some(sequence_number) = sequence_number {
204            let cache = self.store.cache().await?;
205            self.key_query_manager
206                .synced(&cache)
207                .await?
208                .mark_tracked_users_as_up_to_date(
209                    response.device_keys.keys().map(Deref::deref),
210                    sequence_number,
211                )
212                .await?;
213        }
214
215        if enabled!(Level::DEBUG) {
216            debug_log_keys_query_response(&devices, &identities, request_id);
217        }
218
219        Ok((devices, identities))
220    }
221
222    async fn update_or_create_device(
223        store: Store,
224        device_keys: DeviceKeys,
225    ) -> StoreResult<DeviceChange> {
226        let old_device =
227            store.get_device_data(&device_keys.user_id, &device_keys.device_id).await?;
228
229        if let Some(mut device) = old_device {
230            match device.update_device(&device_keys) {
231                Err(e) => {
232                    warn!(
233                        user_id = ?device.user_id(),
234                        device_id = ?device.device_id(),
235                        error = ?e,
236                        "Rejecting device update",
237                    );
238                    Ok(DeviceChange::None)
239                }
240                Ok(true) => Ok(DeviceChange::Updated(device)),
241                Ok(false) => Ok(DeviceChange::None),
242            }
243        } else {
244            match DeviceData::try_from(&device_keys) {
245                Ok(d) => {
246                    // If this is our own device, check that the server isn't
247                    // lying about our keys, also mark the device as locally
248                    // trusted.
249                    if d.user_id() == store.user_id() && d.device_id() == store.device_id() {
250                        let local_device_keys = store.static_account().unsigned_device_keys();
251
252                        if d.keys() == &local_device_keys.keys {
253                            d.set_trust_state(LocalTrust::Verified);
254
255                            trace!(
256                                user_id = ?d.user_id(),
257                                device_id = ?d.device_id(),
258                                keys = ?d.keys(),
259                                "Adding our own device to the device store, \
260                                marking it as locally verified",
261                            );
262
263                            Ok(DeviceChange::New(d))
264                        } else {
265                            Ok(DeviceChange::None)
266                        }
267                    } else {
268                        trace!(
269                            user_id = ?d.user_id(),
270                            device_id = ?d.device_id(),
271                            keys = ?d.keys(),
272                            "Adding a new device to the device store",
273                        );
274
275                        Ok(DeviceChange::New(d))
276                    }
277                }
278                Err(e) => {
279                    warn!(
280                        user_id = ?device_keys.user_id,
281                        device_id = ?device_keys.device_id,
282                        error = ?e,
283                        "Rejecting a previously unseen device",
284                    );
285
286                    Ok(DeviceChange::None)
287                }
288            }
289        }
290    }
291
292    async fn update_user_devices(
293        store: Store,
294        user_id: OwnedUserId,
295        device_map: BTreeMap<OwnedDeviceId, Raw<ruma::encryption::DeviceKeys>>,
296    ) -> StoreResult<DeviceChanges> {
297        let own_device_id = store.static_account().device_id().to_owned();
298
299        let mut changes = DeviceChanges::default();
300
301        let current_devices: HashSet<OwnedDeviceId> = device_map.keys().cloned().collect();
302
303        let tasks = device_map.into_iter().filter_map(|(device_id, device_keys)| match device_keys
304            .deserialize_as::<DeviceKeys>(
305        ) {
306            Ok(device_keys) => {
307                if user_id != device_keys.user_id || device_id != device_keys.device_id {
308                    warn!(
309                        ?user_id,
310                        ?device_id,
311                        device_key_user = ?device_keys.user_id,
312                        device_key_device_id = ?device_keys.device_id,
313                        "Mismatch in the device keys payload",
314                    );
315                    None
316                } else {
317                    Some(spawn(Self::update_or_create_device(store.clone(), device_keys)))
318                }
319            }
320            Err(e) => {
321                warn!(
322                    ?user_id, ?device_id, error = ?e,
323                    "Device keys failed to deserialize",
324                );
325                None
326            }
327        });
328
329        let results = join_all(tasks).await;
330
331        for device in results {
332            let device = device.expect("Creating or updating a device panicked")?;
333
334            match device {
335                DeviceChange::New(d) => changes.new.push(d),
336                DeviceChange::Updated(d) => changes.changed.push(d),
337                DeviceChange::None => (),
338            }
339        }
340
341        let current_devices: HashSet<&OwnedDeviceId> = current_devices.iter().collect();
342        let stored_devices = store.get_device_data_for_user(&user_id).await?;
343        let stored_devices_set: HashSet<&OwnedDeviceId> = stored_devices.keys().collect();
344        let deleted_devices_set = stored_devices_set.difference(&current_devices);
345
346        let own_user_id = store.static_account().user_id();
347        for device_id in deleted_devices_set {
348            if user_id == *own_user_id && *device_id == &own_device_id {
349                let identity_keys = store.static_account().identity_keys();
350
351                warn!(
352                    user_id = ?own_user_id,
353                    device_id = ?own_device_id,
354                    curve25519_key = ?identity_keys.curve25519,
355                    ed25519_key = ?identity_keys.ed25519,
356                    "Our own device might have been deleted"
357                );
358            } else if let Some(device) = stored_devices.get(*device_id) {
359                device.mark_as_deleted();
360                changes.deleted.push(device.clone());
361            }
362        }
363
364        Ok(changes)
365    }
366
367    /// Handle the device keys part of a key query response.
368    ///
369    /// # Arguments
370    ///
371    /// * `device_keys_map` - A map holding the device keys of the users for
372    ///   which the key query was done.
373    ///
374    /// Returns a list of devices that changed. Changed here means either
375    /// they are new, one of their properties has changed or they got deleted.
376    async fn handle_devices_from_key_query(
377        &self,
378        device_keys_map: BTreeMap<
379            OwnedUserId,
380            BTreeMap<OwnedDeviceId, Raw<ruma::encryption::DeviceKeys>>,
381        >,
382    ) -> StoreResult<DeviceChanges> {
383        let mut changes = DeviceChanges::default();
384
385        let tasks = device_keys_map.into_iter().map(|(user_id, device_keys_map)| {
386            spawn(Self::update_user_devices(self.store.clone(), user_id, device_keys_map))
387        });
388
389        let results = join_all(tasks).await;
390
391        for result in results {
392            let change_fragment = result.expect("Panic while updating user devices")?;
393
394            changes.extend(change_fragment);
395        }
396
397        Ok(changes)
398    }
399
400    /// Check if the given public identity matches our stored private one.
401    ///
402    /// If they don't match, this is an indication that our identity has been
403    /// rotated. In this case we return `Some(cleared_private_identity)`,
404    /// where `cleared_private_identity` is our currently-stored
405    /// private identity with the conflicting keys removed.
406    ///
407    /// Otherwise, assuming we do have a private master cross-signing key, we
408    /// mark the public identity as verified.
409    ///
410    /// # Returns
411    ///
412    /// If the private identity needs updating (because it does not match the
413    /// public keys), the updated private identity (which will need to be
414    /// persisted).
415    ///
416    /// Otherwise, `None`.
417    async fn check_private_identity(
418        &self,
419        identity: &OwnUserIdentityData,
420    ) -> Option<PrivateCrossSigningIdentity> {
421        let private_identity = self.store.private_identity();
422        let private_identity = private_identity.lock().await;
423        let result = private_identity.clear_if_differs(identity).await;
424
425        if result.any_differ() {
426            info!(cleared = ?result, "Removed some or all of our private cross signing keys");
427            Some((*private_identity).clone())
428        } else {
429            // If the master key didn't rotate above (`clear_if_differs`),
430            // then this means that the public part and the private parts of
431            // the master key match. We previously did a signature check, so
432            // this means that the private part of the master key has signed
433            // the identity. We can safely mark the public part of the
434            // identity as verified.
435            if private_identity.has_master_key().await && !identity.is_verified() {
436                trace!("Marked our own identity as verified");
437                identity.mark_as_verified()
438            }
439
440            None
441        }
442    }
443
444    /// Process an identity received in a `/keys/query` response that we
445    /// previously knew about.
446    ///
447    /// If the identity is our own, we will look for a user-signing key; if one
448    /// is not found, an error is returned. Otherwise, we then compare the
449    /// received public identity against our stored private identity;
450    /// if they match, the returned public identity is marked as verified and
451    /// `*changed_private_identity` is set to `None`. If they do *not* match,
452    /// it is an indication that our identity has been rotated, and
453    /// `*changed_private_identity` is set to our currently-stored private
454    /// identity with the conflicting keys removed (which will need to be
455    /// persisted).
456    ///
457    /// Whether the identity is our own or that of another, we check whether
458    /// there has been any change to the cross-signing keys, and classify
459    /// the result into [`IdentityUpdateResult::Updated`] or
460    /// [`IdentityUpdateResult::Unchanged`].
461    ///
462    /// # Arguments
463    ///
464    /// * `response` - The entire `/keys/query` response.
465    /// * `master_key` - The public master cross-signing key from the
466    ///   `/keys/query` response.
467    /// * `self_signing` - The public self-signing key from the `/keys/query`
468    ///   response.
469    /// * `i` - The existing identity for this user.
470    /// * `changed_private_identity` - Output parameter. Unchanged if the
471    ///   identity is that of another user. If it is our own, set to `None` or
472    ///   `Some` depending on whether our stored private identity needs
473    ///   updating. See above for more detail.
474    async fn handle_changed_identity(
475        &self,
476        response: &KeysQueryResponse,
477        maybe_verified_own_identity: Option<&OwnUserIdentity>,
478        master_key: MasterPubkey,
479        self_signing: SelfSigningPubkey,
480        i: UserIdentityData,
481        changed_private_identity: &mut Option<PrivateCrossSigningIdentity>,
482    ) -> Result<IdentityUpdateResult, SignatureError> {
483        match i {
484            UserIdentityData::Own(mut identity) => {
485                let user_signing = self.get_user_signing_key_from_response(response)?;
486                let has_changed = identity.update(master_key, self_signing, user_signing)?;
487                *changed_private_identity = self.check_private_identity(&identity).await;
488                if has_changed {
489                    Ok(IdentityUpdateResult::Updated(identity.into()))
490                } else {
491                    Ok(IdentityUpdateResult::Unchanged(identity.into()))
492                }
493            }
494            UserIdentityData::Other(mut identity) => {
495                let has_changed = identity.update(
496                    master_key,
497                    self_signing,
498                    maybe_verified_own_identity.map(|o| o.user_signing_key()),
499                )?;
500
501                if has_changed {
502                    Ok(IdentityUpdateResult::Updated(identity.into()))
503                } else {
504                    Ok(IdentityUpdateResult::Unchanged(identity.into()))
505                }
506            }
507        }
508    }
509
510    /// Process an identity received in a `/keys/query` response that we didn't
511    /// previously know about.
512    ///
513    /// If the identity is our own, we will look for a user-signing key, and if
514    /// it is present and correct, all three keys will be returned in the
515    /// `IdentityChange` result; otherwise, an error is returned. We will also
516    /// compare the received public identity against our stored private
517    /// identity; if they match, the returned public identity is marked as
518    /// verified and `*changed_private_identity` is set to `None`. If they do
519    /// *not* match, it is an indication that our identity has been rotated,
520    /// and `*changed_private_identity` is set to our currently-stored
521    /// private identity with the conflicting keys removed (which will need
522    /// to be persisted).
523    ///
524    /// If the identity is that of another user, we just parse the keys into the
525    /// `IdentityChange` result, since all other checks have already been done.
526    ///
527    /// # Arguments
528    ///
529    /// * `response` - The entire `/keys/query` response.
530    /// * `master_key` - The public master cross-signing key from the
531    ///   `/keys/query` response.
532    /// * `self_signing` - The public self-signing key from the `/keys/query`
533    ///   response.
534    /// * `changed_private_identity` - Output parameter. Unchanged if the
535    ///   identity is that of another user. If it is our own, set to `None` or
536    ///   `Some` depending on whether our stored private identity needs
537    ///   updating. See above for more detail.
538    async fn handle_new_identity(
539        &self,
540        response: &KeysQueryResponse,
541        maybe_verified_own_identity: Option<&OwnUserIdentity>,
542        master_key: MasterPubkey,
543        self_signing: SelfSigningPubkey,
544        changed_private_identity: &mut Option<PrivateCrossSigningIdentity>,
545    ) -> Result<UserIdentityData, SignatureError> {
546        if master_key.user_id() == self.user_id() {
547            // Own identity
548            let user_signing = self.get_user_signing_key_from_response(response)?;
549            let identity = OwnUserIdentityData::new(master_key, self_signing, user_signing)?;
550            *changed_private_identity = self.check_private_identity(&identity).await;
551            Ok(identity.into())
552        } else {
553            // First time seen, create the identity. The current MSK will be pinned.
554            let identity = OtherUserIdentityData::new(master_key, self_signing)?;
555            let is_verified = maybe_verified_own_identity
556                .is_some_and(|own_user_identity| own_user_identity.is_identity_signed(&identity));
557            if is_verified {
558                identity.mark_as_previously_verified();
559            }
560
561            Ok(identity.into())
562        }
563    }
564
565    /// Try to deserialize the master key and self-signing key of an
566    /// identity from a `/keys/query` response.
567    ///
568    /// Each user identity *must* at least contain a master and self-signing
569    /// key, and this function deserializes them. (Our own identity, in addition
570    /// to those two, also contains a user-signing key, but that is not
571    /// extracted here; see
572    /// [`IdentityManager::get_user_signing_key_from_response`])
573    ///
574    /// # Arguments
575    ///
576    ///  * `master_key` - The master key for a particular user from a
577    ///    `/keys/query` response.
578    ///  * `response` - The entire `/keys/query` response.
579    ///
580    /// # Returns
581    ///
582    /// `None` if the self-signing key couldn't be found in the response, or the
583    /// one of the keys couldn't be deserialized. Else, the deserialized
584    /// public keys.
585    fn get_minimal_set_of_keys(
586        master_key: &Raw<CrossSigningKey>,
587        response: &KeysQueryResponse,
588    ) -> Option<(MasterPubkey, SelfSigningPubkey)> {
589        match master_key.deserialize_as::<MasterPubkey>() {
590            Ok(master_key) => {
591                if let Some(self_signing) = response
592                    .self_signing_keys
593                    .get(master_key.user_id())
594                    .and_then(|k| k.deserialize_as::<SelfSigningPubkey>().ok())
595                {
596                    Some((master_key, self_signing))
597                } else {
598                    warn!("A user identity didn't contain a self signing pubkey or the key was invalid");
599                    None
600                }
601            }
602            Err(e) => {
603                warn!(
604                    error = ?e,
605                    "Couldn't update or create new user identity"
606                );
607                None
608            }
609        }
610    }
611
612    /// Try to deserialize the our user-signing key from a `/keys/query`
613    /// response.
614    ///
615    /// If a `/keys/query` response includes our own cross-signing keys, then it
616    /// should include our user-signing key. This method attempts to
617    /// extract, deserialize, and check the key from the response.
618    ///
619    /// # Arguments
620    ///
621    /// * `response` - the entire `/keys/query` response.
622    fn get_user_signing_key_from_response(
623        &self,
624        response: &KeysQueryResponse,
625    ) -> Result<UserSigningPubkey, SignatureError> {
626        let Some(user_signing) = response
627            .user_signing_keys
628            .get(self.user_id())
629            .and_then(|k| k.deserialize_as::<UserSigningPubkey>().ok())
630        else {
631            warn!(
632                "User identity for our own user didn't contain a user signing pubkey or the key \
633                    isn't valid",
634            );
635            return Err(SignatureError::MissingSigningKey);
636        };
637
638        if user_signing.user_id() != self.user_id() {
639            warn!(
640                expected = ?self.user_id(),
641                got = ?user_signing.user_id(),
642                "User ID mismatch in our user-signing key",
643            );
644            return Err(SignatureError::UserIdMismatch);
645        }
646
647        Ok(user_signing)
648    }
649
650    /// Process the cross-signing keys for a particular identity from a
651    /// `/keys/query` response.
652    ///
653    /// Checks that the keys are consistent, verifies the updates, and produces
654    /// a list of changes to be stored.
655    ///
656    /// # Arguments
657    ///
658    /// * `response` - The entire `/keys/query` response.
659    /// * `changes` - The identity results so far, which we will add to.
660    /// * `changed_identity` - Output parameter: Unchanged if the identity is
661    ///   that of another user. If it is our own, set to `None` or `Some`
662    ///   depending on whether our stored private identity needs updating.
663    /// * `maybe_verified_own_identity` - Own verified identity if any to check
664    ///   verification status of updated identity.
665    /// * `key_set_info` - The identity info as returned by the `/keys/query`
666    ///   response.
667    #[instrument(skip_all, fields(user_id))]
668    async fn update_or_create_identity(
669        &self,
670        response: &KeysQueryResponse,
671        changes: &mut IdentityChanges,
672        changed_private_identity: &mut Option<PrivateCrossSigningIdentity>,
673        maybe_verified_own_identity: Option<&OwnUserIdentity>,
674        key_set_info: KeySetInfo,
675    ) -> StoreResult<()> {
676        let KeySetInfo { user_id, master_key, self_signing } = key_set_info;
677        if master_key.user_id() != user_id || self_signing.user_id() != user_id {
678            warn!(?user_id, "User ID mismatch in one of the cross signing keys");
679        } else if let Some(i) = self.store.get_user_identity(&user_id).await? {
680            // an identity we knew about before, which is being updated
681            match self
682                .handle_changed_identity(
683                    response,
684                    maybe_verified_own_identity,
685                    master_key,
686                    self_signing,
687                    i,
688                    changed_private_identity,
689                )
690                .await
691            {
692                Ok(IdentityUpdateResult::Updated(identity)) => {
693                    trace!(?identity, "Updated a user identity");
694                    changes.changed.push(identity);
695                }
696                Ok(IdentityUpdateResult::Unchanged(identity)) => {
697                    trace!(?identity, "Received an unchanged user identity");
698                    changes.unchanged.push(identity);
699                }
700                Err(e) => {
701                    warn!(error = ?e, "Couldn't update an existing user identity");
702                }
703            }
704        } else {
705            // an identity we did not know about before
706            match self
707                .handle_new_identity(
708                    response,
709                    maybe_verified_own_identity,
710                    master_key,
711                    self_signing,
712                    changed_private_identity,
713                )
714                .await
715            {
716                Ok(identity) => {
717                    trace!(?identity, "Created new user identity");
718                    changes.new.push(identity);
719                }
720                Err(e) => {
721                    warn!(error = ?e, "Couldn't create new user identity");
722                }
723            }
724        };
725
726        Ok(())
727    }
728
729    /// Handle the cross signing keys part of a key query response.
730    ///
731    /// # Arguments
732    ///
733    /// * `response` - The `/keys/query` response.
734    ///
735    /// # Returns
736    ///
737    /// The processed results, to be saved to the datastore, comprising:
738    ///
739    ///  * A list of public identities that were received, categorised as "new",
740    ///    "changed" or "unchanged".
741    ///
742    ///  * If our own identity was updated and did not match our private
743    ///    identity, an update to that private identity. Otherwise, `None`.
744    async fn handle_cross_signing_keys(
745        &self,
746        response: &KeysQueryResponse,
747    ) -> StoreResult<(IdentityChanges, Option<PrivateCrossSigningIdentity>)> {
748        let mut changes = IdentityChanges::default();
749        let mut changed_identity = None;
750
751        // We want to check if the updated/new other identities are trusted by us or
752        // not. This is based on the current verified state of the own identity.
753        let maybe_own_verified_identity = self
754            .store
755            .get_identity(self.user_id())
756            .await?
757            .and_then(UserIdentity::own)
758            .filter(|own| own.is_verified());
759
760        for (user_id, master_key) in &response.master_keys {
761            // Get the master and self-signing key for each identity; those are required for
762            // every user identity type. If we don't have those we skip over.
763            let Some((master_key, self_signing)) =
764                Self::get_minimal_set_of_keys(master_key.cast_ref(), response)
765            else {
766                continue;
767            };
768
769            let key_set_info = KeySetInfo { user_id: user_id.clone(), master_key, self_signing };
770
771            self.update_or_create_identity(
772                response,
773                &mut changes,
774                &mut changed_identity,
775                maybe_own_verified_identity.as_ref(),
776                key_set_info,
777            )
778            .await?;
779        }
780
781        Ok((changes, changed_identity))
782    }
783
784    /// Generate an "out-of-band" key query request for the given set of users.
785    ///
786    /// Unlike the regular key query requests returned by `users_for_key_query`,
787    /// there can be several of these in flight at once. This can be useful
788    /// if we need results to be as up-to-date as possible.
789    ///
790    /// Once the request has been made, the response can be fed back into the
791    /// IdentityManager and store by calling `receive_keys_query_response`.
792    ///
793    /// # Arguments
794    ///
795    /// * `users` - list of users whose keys should be queried
796    ///
797    /// # Returns
798    ///
799    /// A tuple containing the request ID for the request, and the request
800    /// itself.
801    pub(crate) fn build_key_query_for_users<'a>(
802        &self,
803        users: impl IntoIterator<Item = &'a UserId>,
804    ) -> (OwnedTransactionId, KeysQueryRequest) {
805        // Since this is an "out-of-band" request, we just make up a transaction ID and
806        // do not store the details in `self.keys_query_request_details`.
807        //
808        // `receive_keys_query_response` will process the response as normal, except
809        // that it will not mark the users as "up-to-date".
810
811        // We assume that there aren't too many users here; if we find a usecase that
812        // requires lots of users to be up-to-date we may need to rethink this.
813        (TransactionId::new(), KeysQueryRequest::new(users.into_iter().map(|u| u.to_owned())))
814    }
815
816    /// Get a list of key query requests needed.
817    ///
818    /// # Returns
819    ///
820    /// A map of a request ID to the `/keys/query` request.
821    ///
822    /// The response of a successful key query requests needs to be passed to
823    /// the [`OlmMachine`] with the [`receive_keys_query_response`].
824    ///
825    /// [`receive_keys_query_response`]: Self::receive_keys_query_response
826    pub async fn users_for_key_query(
827        &self,
828    ) -> StoreResult<BTreeMap<OwnedTransactionId, KeysQueryRequest>> {
829        // Forget about any previous key queries in flight.
830        *self.keys_query_request_details.lock().await = None;
831
832        // We always want to track our own user, but in case we aren't in an encrypted
833        // room yet, we won't be tracking ourselves yet. This ensures we are always
834        // tracking ourselves.
835        //
836        // The check for emptiness is done first for performance.
837        let (users, sequence_number) = {
838            let cache = self.store.cache().await?;
839            let key_query_manager = self.key_query_manager.synced(&cache).await?;
840
841            let (users, sequence_number) = key_query_manager.users_for_key_query().await;
842
843            if users.is_empty() && !key_query_manager.tracked_users().contains(self.user_id()) {
844                key_query_manager.mark_user_as_changed(self.user_id()).await?;
845                key_query_manager.users_for_key_query().await
846            } else {
847                (users, sequence_number)
848            }
849        };
850
851        if users.is_empty() {
852            Ok(BTreeMap::new())
853        } else {
854            // Let's remove users that are part of the `FailuresCache`. The cache, which is
855            // a TTL cache, remembers users for which a previous `/key/query` request has
856            // failed. We don't retry a `/keys/query` for such users for a
857            // certain amount of time.
858            let users = users.into_iter().filter(|u| !self.failures.contains(u.server_name()));
859
860            // We don't want to create a single `/keys/query` request with an infinite
861            // amount of users. Some servers will likely bail out after a
862            // certain amount of users and the responses will be large. In the
863            // case of a transmission error, we'll have to retransmit the large
864            // response.
865            //
866            // Convert the set of users into multiple /keys/query requests.
867            let requests: BTreeMap<_, _> = users
868                .chunks(Self::MAX_KEY_QUERY_USERS)
869                .into_iter()
870                .map(|user_chunk| {
871                    let request_id = TransactionId::new();
872                    let request = KeysQueryRequest::new(user_chunk);
873
874                    debug!(?request_id, users = ?request.device_keys.keys(), "Created a /keys/query request");
875
876                    (request_id, request)
877                })
878                .collect();
879
880            // Collect the request IDs, these will be used later in the
881            // `receive_keys_query_response()` method to figure out if the user can be
882            // marked as up-to-date/non-dirty.
883            let request_ids = requests.keys().cloned().collect();
884            let request_details = KeysQueryRequestDetails { sequence_number, request_ids };
885
886            *self.keys_query_request_details.lock().await = Some(request_details);
887
888            Ok(requests)
889        }
890    }
891
892    /// Receive the list of users that contained changed devices from the
893    /// `/sync` response.
894    ///
895    /// This will queue up the given user for a key query.
896    ///
897    /// Note: The user already needs to be tracked for it to be queued up for a
898    /// key query.
899    pub async fn receive_device_changes(
900        &self,
901        cache: &StoreCache,
902        users: impl Iterator<Item = &UserId>,
903    ) -> StoreResult<()> {
904        self.key_query_manager.synced(cache).await?.mark_tracked_users_as_changed(users).await
905    }
906
907    /// See the docs for [`OlmMachine::update_tracked_users()`].
908    pub async fn update_tracked_users(
909        &self,
910        users: impl IntoIterator<Item = &UserId>,
911    ) -> StoreResult<()> {
912        let cache = self.store.cache().await?;
913        self.key_query_manager.synced(&cache).await?.update_tracked_users(users.into_iter()).await
914    }
915
916    /// Retrieve a list of a user's current devices, so we can encrypt a message
917    /// to them.
918    ///
919    /// If we have not yet seen any devices for the user, and their device list
920    /// has been marked as outdated, then we wait for the `/keys/query` request
921    /// to complete. This helps ensure that we attempt at least once to fetch a
922    /// user's devices before encrypting to them.
923    pub async fn get_user_devices_for_encryption(
924        &self,
925        users: impl Iterator<Item = &UserId>,
926    ) -> StoreResult<HashMap<OwnedUserId, HashMap<OwnedDeviceId, DeviceData>>> {
927        // How long we wait for /keys/query to complete.
928        const KEYS_QUERY_WAIT_TIME: Duration = Duration::from_secs(5);
929
930        let mut devices_by_user = HashMap::new();
931        let mut users_with_no_devices_on_failed_servers = Vec::new();
932        let mut users_with_no_devices_on_unfailed_servers = Vec::new();
933
934        for user_id in users {
935            // First of all, check the store for this user.
936            let devices = self.store.get_device_data_for_user_filtered(user_id).await?;
937
938            // Now, look for users who have no devices at all.
939            //
940            // If a user has no devices at all, that implies we have never (successfully)
941            // done a `/keys/query` for them; we wait for one to complete if it is
942            // in flight. (Of course, the user might genuinely have no devices, but
943            // that's fine, it just means we redundantly grab the cache guard and
944            // check the pending-query flag.)
945            if !devices.is_empty() {
946                // This user has at least one known device.
947                //
948                // The device list may also be outdated in this case; but in this
949                // situation, we are racing between sending a message and retrieving their
950                // device list. That's an inherently racy situation and there is no real
951                // benefit to waiting for the `/keys/query` request to complete. So we don't
952                // bother.
953                //
954                // We just add their devices to the result and carry on.
955                devices_by_user.insert(user_id.to_owned(), devices);
956                continue;
957            }
958
959            // *However*, if the user's server is currently subject to a backoff due to
960            // previous failures, then `users_for_key_query` won't attempt to query
961            // for the user's devices, so there's no point waiting.
962            //
963            // XXX: this is racy. It's possible that:
964            //  * `failures` included the user's server when `users_for_key_query` was
965            //    called, so the user was not returned in the `KeyQueryRequest`, and:
966            //  * The backoff has now expired.
967            //
968            // In that case, we'll end up waiting for the *next* `users_for_key_query` call,
969            // which might not be for 30 seconds or so. (And by then, it might be `failed`
970            // again.)
971            if self.failures.contains(user_id.server_name()) {
972                users_with_no_devices_on_failed_servers.push(user_id);
973                continue;
974            }
975
976            users_with_no_devices_on_unfailed_servers.push(user_id);
977        }
978
979        if !users_with_no_devices_on_failed_servers.is_empty() {
980            info!(
981                ?users_with_no_devices_on_failed_servers,
982                "Not waiting for `/keys/query` for users whose server has previously failed"
983            );
984        }
985
986        if !users_with_no_devices_on_unfailed_servers.is_empty() {
987            // For each user with no devices, fire off a task to wait for a `/keys/query`
988            // result if one is pending.
989            //
990            // We don't actually update the `devices_by_user` map here since that could
991            // require concurrent access to it. Instead each task returns a
992            // `(OwnedUserId, HashMap)` pair (or rather, an `Option` of one) so that we can
993            // add the results to the map.
994            let results = join_all(
995                users_with_no_devices_on_unfailed_servers
996                    .into_iter()
997                    .map(|user_id| self.get_updated_keys_for_user(KEYS_QUERY_WAIT_TIME, user_id)),
998            )
999            .await;
1000
1001            // Once all the tasks have completed, process the results.
1002            let mut updated_users = Vec::new();
1003            for result in results {
1004                if let Some((user_id, updated_devices)) = result? {
1005                    devices_by_user.insert(user_id.to_owned(), updated_devices);
1006                    updated_users.push(user_id);
1007                }
1008            }
1009
1010            if !updated_users.is_empty() {
1011                info!(
1012                    ?updated_users,
1013                    "Waited for `/keys/query` to complete for users who have no devices"
1014                );
1015            }
1016        }
1017
1018        Ok(devices_by_user)
1019    }
1020
1021    /// Helper for get_user_devices_for_encryption.
1022    ///
1023    /// Waits for any pending `/keys/query` for the given user. If one was
1024    /// pending, reloads the device list and returns `Some(user_id,
1025    /// device_list)`. If no request was pending, returns `None`.
1026    #[allow(clippy::type_complexity)]
1027    #[instrument(skip(self))]
1028    async fn get_updated_keys_for_user<'a>(
1029        &self,
1030        timeout_duration: Duration,
1031        user_id: &'a UserId,
1032    ) -> Result<Option<(&'a UserId, HashMap<OwnedDeviceId, DeviceData>)>, CryptoStoreError> {
1033        let cache = self.store.cache().await?;
1034        match self
1035            .key_query_manager
1036            .wait_if_user_key_query_pending(cache, timeout_duration, user_id)
1037            .await?
1038        {
1039            UserKeyQueryResult::WasPending => {
1040                Ok(Some((user_id, self.store.get_device_data_for_user_filtered(user_id).await?)))
1041            }
1042            _ => Ok(None),
1043        }
1044    }
1045
1046    /// Given a list of changed devices, update any [`InboundGroupSession`]s
1047    /// which were sent from those devices and which do not have complete
1048    /// sender data.
1049    async fn update_sender_data_from_device_changes(
1050        &self,
1051        device_changes: &DeviceChanges,
1052    ) -> Result<(), CryptoStoreError> {
1053        for device in device_changes.new.iter().chain(device_changes.changed.iter()) {
1054            // 1. Look for InboundGroupSessions from the device whose sender_data is
1055            //    UnknownDevice. For such sessions, we now have the device, and can update
1056            //    the sender_data accordingly.
1057            //
1058            // In theory, we only need to do this for new devices. In practice, I'm a bit
1059            // worried about races leading us to getting stuck in the
1060            // UnknownDevice state, so we'll paper over that by doing this check
1061            // on device updates too.
1062            self.update_sender_data_for_sessions_for_device(device, SenderDataType::UnknownDevice)
1063                .await?;
1064
1065            // 2. If, and only if, the device is now correctly cross-signed (ie,
1066            //    device.is_cross_signed_by_owner() is true, and we have the master
1067            //    cross-signing key for the owner), look for InboundGroupSessions from the
1068            //    device whose sender_data is DeviceInfo. We can also update the sender_data
1069            //    for these sessions.
1070            //
1071            // In theory, we can skip a couple of steps of the SenderDataFinder algorithm,
1072            // because we're doing the cross-signing check here. In practice,
1073            // it's *way* easier just to use the same logic.
1074            let device_owner_identity = self.store.get_user_identity(device.user_id()).await?;
1075            if device_owner_identity.is_some_and(|id| device.is_cross_signed_by_owner(&id)) {
1076                self.update_sender_data_for_sessions_for_device(device, SenderDataType::DeviceInfo)
1077                    .await?;
1078            }
1079        }
1080
1081        Ok(())
1082    }
1083
1084    /// Given a device, look for [`InboundGroupSession`]s whose sender data is
1085    /// in the given state, and update it.
1086    #[instrument(skip(self))]
1087    async fn update_sender_data_for_sessions_for_device(
1088        &self,
1089        device: &DeviceData,
1090        sender_data_type: SenderDataType,
1091    ) -> Result<(), CryptoStoreError> {
1092        const IGS_BATCH_SIZE: usize = 50;
1093
1094        let Some(curve_key) = device.curve25519_key() else { return Ok(()) };
1095
1096        let mut last_session_id: Option<String> = None;
1097        loop {
1098            let mut sessions = self
1099                .store
1100                .get_inbound_group_sessions_for_device_batch(
1101                    curve_key,
1102                    sender_data_type,
1103                    last_session_id,
1104                    IGS_BATCH_SIZE,
1105                )
1106                .await?;
1107
1108            if sessions.is_empty() {
1109                // end of the session list
1110                return Ok(());
1111            }
1112
1113            last_session_id = None;
1114            for session in &mut sessions {
1115                last_session_id = Some(session.session_id().to_owned());
1116                self.update_sender_data_for_session(session, device).await?;
1117            }
1118            self.store.save_inbound_group_sessions(&sessions).await?;
1119        }
1120    }
1121
1122    /// Update the sender data on the given inbound group session, using the
1123    /// given device data.
1124    #[instrument(skip(self, device, session), fields(session_id = session.session_id()))]
1125    async fn update_sender_data_for_session(
1126        &self,
1127        session: &mut InboundGroupSession,
1128        device: &DeviceData,
1129    ) -> Result<(), CryptoStoreError> {
1130        match SenderDataFinder::find_using_device_data(&self.store, device.clone(), session).await {
1131            Ok(sender_data) => {
1132                debug!("Updating existing InboundGroupSession with new SenderData {sender_data:?}");
1133                session.sender_data = sender_data;
1134            }
1135            Err(SessionDeviceCheckError::CryptoStoreError(e)) => {
1136                return Err(e);
1137            }
1138            Err(SessionDeviceCheckError::MismatchedIdentityKeys(e)) => {
1139                warn!(
1140                    ?session,
1141                    ?device,
1142                    "cannot update existing InboundGroupSession due to ownership error: {e}",
1143                );
1144            }
1145        };
1146
1147        Ok(())
1148    }
1149
1150    /// Mark all tracked users as dirty.
1151    ///
1152    /// All users *whose device lists we are tracking* are flagged as needing a
1153    /// key query. Users whose devices we are not tracking are ignored.
1154    pub(crate) async fn mark_all_tracked_users_as_dirty(
1155        &self,
1156        store_cache: StoreCacheGuard,
1157    ) -> StoreResult<()> {
1158        let store_wrapper = store_cache.store_wrapper();
1159        let tracked_users = store_wrapper.load_tracked_users().await?;
1160
1161        self.key_query_manager
1162            .synced(&store_cache)
1163            .await?
1164            .mark_tracked_users_as_changed(
1165                tracked_users.iter().map(|tracked_user| tracked_user.user_id.as_ref()),
1166            )
1167            .await?;
1168
1169        Ok(())
1170    }
1171}
1172
1173/// Log information about what changed after processing a /keys/query response.
1174/// Only does anything if the DEBUG log level is enabled.
1175fn debug_log_keys_query_response(
1176    devices: &DeviceChanges,
1177    identities: &IdentityChanges,
1178    request_id: &TransactionId,
1179) {
1180    #[allow(unknown_lints, clippy::unwrap_or_default)] // false positive
1181    let changed_devices = devices.changed.iter().fold(BTreeMap::new(), |mut acc, d| {
1182        acc.entry(d.user_id()).or_insert_with(BTreeSet::new).insert(d.device_id());
1183        acc
1184    });
1185
1186    #[allow(unknown_lints, clippy::unwrap_or_default)] // false positive
1187    let new_devices = devices.new.iter().fold(BTreeMap::new(), |mut acc, d| {
1188        acc.entry(d.user_id()).or_insert_with(BTreeSet::new).insert(d.device_id());
1189        acc
1190    });
1191
1192    #[allow(unknown_lints, clippy::unwrap_or_default)] // false positive
1193    let deleted_devices = devices.deleted.iter().fold(BTreeMap::new(), |mut acc, d| {
1194        acc.entry(d.user_id()).or_insert_with(BTreeSet::new).insert(d.device_id());
1195        acc
1196    });
1197
1198    let new_identities = identities.new.iter().map(|i| i.user_id()).collect::<BTreeSet<_>>();
1199    let changed_identities =
1200        identities.changed.iter().map(|i| i.user_id()).collect::<BTreeSet<_>>();
1201
1202    debug!(
1203        ?request_id,
1204        ?new_devices,
1205        ?changed_devices,
1206        ?deleted_devices,
1207        ?new_identities,
1208        ?changed_identities,
1209        "Finished handling of the `/keys/query` response"
1210    );
1211}
1212
1213#[cfg(any(test, feature = "testing"))]
1214#[allow(dead_code)]
1215pub(crate) mod testing {
1216    use std::sync::Arc;
1217
1218    use matrix_sdk_test::ruma_response_from_json;
1219    use ruma::{
1220        api::client::keys::get_keys::v3::Response as KeyQueryResponse, device_id, user_id,
1221        DeviceId, UserId,
1222    };
1223    use serde_json::json;
1224    use tokio::sync::Mutex;
1225
1226    use crate::{
1227        identities::IdentityManager,
1228        olm::{Account, PrivateCrossSigningIdentity},
1229        store::{CryptoStoreWrapper, MemoryStore, PendingChanges, Store},
1230        types::{requests::UploadSigningKeysRequest, DeviceKeys},
1231        verification::VerificationMachine,
1232    };
1233
1234    pub fn user_id() -> &'static UserId {
1235        user_id!("@example:localhost")
1236    }
1237
1238    pub fn other_user_id() -> &'static UserId {
1239        user_id!("@example2:localhost")
1240    }
1241
1242    pub fn device_id() -> &'static DeviceId {
1243        device_id!("WSKKLTJZCL")
1244    }
1245
1246    pub(crate) async fn manager_test_helper(
1247        user_id: &UserId,
1248        device_id: &DeviceId,
1249    ) -> IdentityManager {
1250        let identity = PrivateCrossSigningIdentity::new(user_id.into());
1251        let identity = Arc::new(Mutex::new(identity));
1252        let user_id = user_id.to_owned();
1253        let account = Account::with_device_id(&user_id, device_id);
1254        let static_account = account.static_data().clone();
1255        let store = Arc::new(CryptoStoreWrapper::new(&user_id, device_id, MemoryStore::new()));
1256        let verification =
1257            VerificationMachine::new(static_account.clone(), identity.clone(), store.clone());
1258        let store = Store::new(static_account, identity, store, verification);
1259        store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1260        IdentityManager::new(store)
1261    }
1262
1263    pub fn other_key_query() -> KeyQueryResponse {
1264        let data = &json!({
1265            "device_keys": {
1266                "@example2:localhost": {
1267                    "SKISMLNIMH": {
1268                        "algorithms": ["m.olm.v1.curve25519-aes-sha2", "m.megolm.v1.aes-sha2"],
1269                        "device_id": "SKISMLNIMH",
1270                        "keys": {
1271                            "curve25519:SKISMLNIMH": "qO9xFazIcW8dE0oqHGMojGgJwbBpMOhGnIfJy2pzvmI",
1272                            "ed25519:SKISMLNIMH": "y3wV3AoyIGREqrJJVH8DkQtlwHBUxoZ9ApP76kFgXQ8"
1273                        },
1274                        "signatures": {
1275                            "@example2:localhost": {
1276                                "ed25519:SKISMLNIMH": "YwbT35rbjKoYFZVU1tQP8MsL06+znVNhNzUMPt6jTEYRBFoC4GDq9hQEJBiFSq37r1jvLMteggVAWw37fs1yBA",
1277                                "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "PWuuTE/aTkp1EJQkPHhRx2BxbF+wjMIDFxDRp7JAerlMkDsNFUTfRRusl6vqROPU36cl+yY8oeJTZGFkU6+pBQ"
1278                            }
1279                        },
1280                        "user_id": "@example2:localhost",
1281                        "unsigned": {
1282                            "device_display_name": "Riot Desktop (Linux)"
1283                        }
1284                    }
1285                }
1286            },
1287            "failures": {},
1288            "master_keys": {
1289                "@example2:localhost": {
1290                    "user_id": "@example2:localhost",
1291                    "usage": ["master"],
1292                    "keys": {
1293                        "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do"
1294                    },
1295                    "signatures": {
1296                        "@example2:localhost": {
1297                            "ed25519:SKISMLNIMH": "KdUZqzt8VScGNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1298                        }
1299                    }
1300                }
1301            },
1302            "self_signing_keys": {
1303                "@example2:localhost": {
1304                    "user_id": "@example2:localhost",
1305                    "usage": ["self_signing"],
1306                    "keys": {
1307                        "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc"
1308                    },
1309                    "signatures": {
1310                        "@example2:localhost": {
1311                            "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "W/O8BnmiUETPpH02mwYaBgvvgF/atXnusmpSTJZeUSH/vHg66xiZOhveQDG4cwaW8iMa+t9N4h1DWnRoHB4mCQ"
1312                        }
1313                    }
1314                }
1315            },
1316            "user_signing_keys": {}
1317        });
1318        ruma_response_from_json(data)
1319    }
1320
1321    // An updated version of `other_key_query` featuring an additional signature on
1322    // the master key *Note*: The added signature is actually not valid, but a
1323    // valid signature  is not required for our test.
1324    pub fn other_key_query_cross_signed() -> KeyQueryResponse {
1325        let data = json!({
1326            "device_keys": {
1327                "@example2:localhost": {
1328                    "SKISMLNIMH": {
1329                        "algorithms": ["m.olm.v1.curve25519-aes-sha2", "m.megolm.v1.aes-sha2"],
1330                        "device_id": "SKISMLNIMH",
1331                        "keys": {
1332                            "curve25519:SKISMLNIMH": "qO9xFazIcW8dE0oqHGMojGgJwbBpMOhGnIfJy2pzvmI",
1333                            "ed25519:SKISMLNIMH": "y3wV3AoyIGREqrJJVH8DkQtlwHBUxoZ9ApP76kFgXQ8"
1334                        },
1335                        "signatures": {
1336                            "@example2:localhost": {
1337                                "ed25519:SKISMLNIMH": "YwbT35rbjKoYFZVU1tQP8MsL06+znVNhNzUMPt6jTEYRBFoC4GDq9hQEJBiFSq37r1jvLMteggVAWw37fs1yBA",
1338                                "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "PWuuTE/aTkp1EJQkPHhRx2BxbF+wjMIDFxDRp7JAerlMkDsNFUTfRRusl6vqROPU36cl+yY8oeJTZGFkU6+pBQ"
1339                            }
1340                        },
1341                        "user_id": "@example2:localhost",
1342                        "unsigned": {
1343                            "device_display_name": "Riot Desktop (Linux)"
1344                        }
1345                    }
1346                }
1347            },
1348            "failures": {},
1349            "master_keys": {
1350                "@example2:localhost": {
1351                    "user_id": "@example2:localhost",
1352                    "usage": ["master"],
1353                    "keys": {
1354                        "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do"
1355                    },
1356                    "signatures": {
1357                        "@example2:localhost": {
1358                            "ed25519:SKISMLNIMH": "KdUZqzt8VScGNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1359                        },
1360                        // This is the added signature from alice USK compared to `other_key_query`. Note that actual signature is not valid.
1361                        "@alice:localhost": {
1362                            "ed25519:DU9z4gBFKFKCk7a13sW9wjT0Iyg7Hqv5f0BPM7DEhPo": "NotAValidSignature+GNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1363                        }
1364                    }
1365                }
1366            },
1367            "self_signing_keys": {
1368                "@example2:localhost": {
1369                    "user_id": "@example2:localhost",
1370                    "usage": ["self_signing"],
1371                    "keys": {
1372                        "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc"
1373                    },
1374                    "signatures": {
1375                        "@example2:localhost": {
1376                            "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "W/O8BnmiUETPpH02mwYaBgvvgF/atXnusmpSTJZeUSH/vHg66xiZOhveQDG4cwaW8iMa+t9N4h1DWnRoHB4mCQ"
1377                        }
1378                    }
1379                }
1380            },
1381            "user_signing_keys": {}
1382        });
1383        ruma_response_from_json(&data)
1384    }
1385
1386    /// Mocked response to a /keys/query request.
1387    pub fn own_key_query_with_user_id(user_id: &UserId) -> KeyQueryResponse {
1388        let data = json!({
1389          "device_keys": {
1390            user_id: {
1391              "WSKKLTJZCL": {
1392                "algorithms": [
1393                  "m.olm.v1.curve25519-aes-sha2",
1394                  "m.megolm.v1.aes-sha2"
1395                ],
1396                "device_id": "WSKKLTJZCL",
1397                "keys": {
1398                  "curve25519:WSKKLTJZCL": "wnip2tbJBJxrFayC88NNJpm61TeSNgYcqBH4T9yEDhU",
1399                  "ed25519:WSKKLTJZCL": "lQ+eshkhgKoo+qp9Qgnj3OX5PBoWMU5M9zbuEevwYqE"
1400                },
1401                "signatures": {
1402                  user_id: {
1403                    "ed25519:WSKKLTJZCL": "SKpIUnq7QK0xleav0PrIQyKjVm+TgZr7Yi8cKjLeZDtkgyToE2d4/e3Aj79dqOlLB92jFVE4d1cM/Ry04wFwCA",
1404                    "ed25519:0C8lCBxrvrv/O7BQfsKnkYogHZX3zAgw3RfJuyiq210": "9UGu1iC5YhFCdELGfB29YaV+QE0t/X5UDSsPf4QcdZyXIwyp9zBbHX2lh9vWudNQ+akZpaq7ZRaaM+4TCnw/Ag"
1405                  }
1406                },
1407                "user_id": user_id,
1408                "unsigned": {
1409                  "device_display_name": "Cross signing capable"
1410                }
1411              },
1412              "LVWOVGOXME": {
1413                "algorithms": [
1414                  "m.olm.v1.curve25519-aes-sha2",
1415                  "m.megolm.v1.aes-sha2"
1416                ],
1417                "device_id": "LVWOVGOXME",
1418                "keys": {
1419                  "curve25519:LVWOVGOXME": "KMfWKUhnDW1D11hNzATs/Ax1FQRsJxKCWzq0NyGtIiI",
1420                  "ed25519:LVWOVGOXME": "k+NC3L7CBD6fBClcHBrKLOkqCyGNSKhWXiH5Q2STRnA"
1421                },
1422                "signatures": {
1423                  user_id: {
1424                    "ed25519:LVWOVGOXME": "39Ir5Bttpc5+bQwzLj7rkjm5E5/cp/JTbMJ/t0enj6J5w9MXVBFOUqqM2hpaRaRwILMMpwYbJ8IOGjl0Y/MGAw"
1425                  }
1426                },
1427                "user_id": user_id,
1428                "unsigned": {
1429                  "device_display_name": "Non-cross signing"
1430                }
1431              }
1432            }
1433          },
1434          "failures": {},
1435          "master_keys": {
1436            user_id: {
1437              "user_id": user_id,
1438              "usage": [
1439                "master"
1440              ],
1441              "keys": {
1442                "ed25519:rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0": "rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0"
1443              },
1444              "signatures": {
1445                user_id: {
1446                  "ed25519:WSKKLTJZCL": "ZzJp1wtmRdykXAUEItEjNiFlBrxx8L6/Vaen9am8AuGwlxxJtOkuY4m+4MPLvDPOgavKHLsrRuNLAfCeakMlCQ"
1447                }
1448              }
1449            }
1450          },
1451          "self_signing_keys": {
1452            user_id: {
1453              "user_id": user_id,
1454              "usage": [
1455                "self_signing"
1456              ],
1457              "keys": {
1458                "ed25519:0C8lCBxrvrv/O7BQfsKnkYogHZX3zAgw3RfJuyiq210": "0C8lCBxrvrv/O7BQfsKnkYogHZX3zAgw3RfJuyiq210"
1459              },
1460              "signatures": {
1461                user_id: {
1462                  "ed25519:rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0": "AC7oDUW4rUhtInwb4lAoBJ0wAuu4a5k+8e34B5+NKsDB8HXRwgVwUWN/MRWc/sJgtSbVlhzqS9THEmQQ1C51Bw"
1463                }
1464              }
1465            }
1466          },
1467          "user_signing_keys": {
1468            user_id: {
1469              "user_id": user_id,
1470              "usage": [
1471                "user_signing"
1472              ],
1473              "keys": {
1474                "ed25519:DU9z4gBFKFKCk7a13sW9wjT0Iyg7Hqv5f0BPM7DEhPo": "DU9z4gBFKFKCk7a13sW9wjT0Iyg7Hqv5f0BPM7DEhPo"
1475              },
1476              "signatures": {
1477                user_id: {
1478                  "ed25519:rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0": "C4L2sx9frGqj8w41KyynHGqwUbbwBYRZpYCB+6QWnvQFA5Oi/1PJj8w5anwzEsoO0TWmLYmf7FXuAGewanOWDg"
1479                }
1480              }
1481            }
1482          }
1483        });
1484        ruma_response_from_json(&data)
1485    }
1486
1487    pub fn own_key_query() -> KeyQueryResponse {
1488        own_key_query_with_user_id(user_id())
1489    }
1490
1491    pub fn key_query(
1492        identity: UploadSigningKeysRequest,
1493        device_keys: DeviceKeys,
1494    ) -> KeyQueryResponse {
1495        let json = json!({
1496            "device_keys": {
1497                "@example:localhost": {
1498                    device_keys.device_id.to_string(): device_keys
1499                }
1500            },
1501            "failures": {},
1502            "master_keys": {
1503                "@example:localhost": identity.master_key
1504            },
1505            "self_signing_keys": {
1506                "@example:localhost": identity.self_signing_key
1507            },
1508            "user_signing_keys": {
1509                "@example:localhost": identity.user_signing_key
1510            },
1511          }
1512        );
1513
1514        ruma_response_from_json(&json)
1515    }
1516}
1517
1518#[cfg(test)]
1519pub(crate) mod tests {
1520    use std::ops::Deref;
1521
1522    use futures_util::pin_mut;
1523    use matrix_sdk_test::{async_test, ruma_response_from_json, test_json};
1524    use ruma::{
1525        api::client::keys::get_keys::v3::Response as KeysQueryResponse, device_id, user_id,
1526        TransactionId,
1527    };
1528    use serde_json::json;
1529    use stream_assert::{assert_closed, assert_pending, assert_ready};
1530
1531    use super::testing::{
1532        device_id, key_query, manager_test_helper, other_key_query, other_user_id, user_id,
1533    };
1534    use crate::{
1535        identities::manager::testing::{other_key_query_cross_signed, own_key_query},
1536        olm::PrivateCrossSigningIdentity,
1537        CrossSigningKeyExport, OlmMachine,
1538    };
1539
1540    fn key_query_with_failures() -> KeysQueryResponse {
1541        let response = json!({
1542            "device_keys": {
1543            },
1544            "failures": {
1545                "example.org": {
1546                    "errcode": "M_RESOURCE_LIMIT_EXCEEDED",
1547                    "error": "Not yet ready to retry",
1548                }
1549            }
1550        });
1551
1552        ruma_response_from_json(&response)
1553    }
1554
1555    #[async_test]
1556    async fn test_tracked_users() {
1557        let manager = manager_test_helper(user_id(), device_id()).await;
1558        let alice = user_id!("@alice:example.org");
1559
1560        let cache = manager.store.cache().await.unwrap();
1561        let key_query_manager = manager.key_query_manager.synced(&cache).await.unwrap();
1562
1563        assert!(key_query_manager.tracked_users().is_empty(), "No users are initially tracked");
1564
1565        manager.receive_device_changes(&cache, [alice].iter().map(Deref::deref)).await.unwrap();
1566
1567        assert!(
1568            !key_query_manager.tracked_users().contains(alice),
1569            "Receiving a device changes update for a user we don't track does nothing"
1570        );
1571
1572        assert!(
1573            !key_query_manager.users_for_key_query().await.0.contains(alice),
1574            "The user we don't track doesn't end up in the `/keys/query` request"
1575        );
1576    }
1577
1578    #[async_test]
1579    async fn test_manager_creation() {
1580        let manager = manager_test_helper(user_id(), device_id()).await;
1581        let cache = manager.store.cache().await.unwrap();
1582        assert!(manager.key_query_manager.synced(&cache).await.unwrap().tracked_users().is_empty())
1583    }
1584
1585    #[async_test]
1586    async fn test_manager_key_query_response() {
1587        let manager = manager_test_helper(user_id(), device_id()).await;
1588        let other_user = other_user_id();
1589        let devices = manager.store.get_user_devices(other_user).await.unwrap();
1590        assert_eq!(devices.devices().count(), 0);
1591
1592        manager
1593            .receive_keys_query_response(&TransactionId::new(), &other_key_query())
1594            .await
1595            .unwrap();
1596
1597        let devices = manager.store.get_user_devices(other_user).await.unwrap();
1598        assert_eq!(devices.devices().count(), 1);
1599
1600        let device = manager
1601            .store
1602            .get_device_data(other_user, device_id!("SKISMLNIMH"))
1603            .await
1604            .unwrap()
1605            .unwrap();
1606        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
1607        let identity = identity.other().unwrap();
1608
1609        assert!(identity.is_device_signed(&device));
1610    }
1611
1612    #[async_test]
1613    async fn test_manager_own_key_query_response() {
1614        let manager = manager_test_helper(user_id(), device_id()).await;
1615        let our_user = user_id();
1616        let devices = manager.store.get_user_devices(our_user).await.unwrap();
1617        assert_eq!(devices.devices().count(), 0);
1618
1619        let private_identity = manager.store.private_identity();
1620        let private_identity = private_identity.lock().await;
1621        let identity_request = private_identity.as_upload_request().await;
1622        drop(private_identity);
1623
1624        let device_keys =
1625            manager.store.cache().await.unwrap().account().await.unwrap().device_keys();
1626        manager
1627            .receive_keys_query_response(
1628                &TransactionId::new(),
1629                &key_query(identity_request, device_keys),
1630            )
1631            .await
1632            .unwrap();
1633
1634        let identity = manager
1635            .store
1636            .get_user_identity(our_user)
1637            .await
1638            .unwrap()
1639            .expect("missing user identity");
1640        let identity = identity.own().expect("missing own identity");
1641        assert!(identity.is_verified());
1642
1643        let devices = manager.store.get_user_devices(our_user).await.unwrap();
1644        assert_eq!(devices.devices().count(), 1);
1645
1646        let device =
1647            manager.store.get_device_data(our_user, device_id!(device_id())).await.unwrap();
1648
1649        assert!(device.is_some());
1650    }
1651
1652    #[async_test]
1653    async fn test_private_identity_invalidation_after_public_keys_change() {
1654        let user_id = user_id!("@example1:localhost");
1655        let manager = manager_test_helper(user_id, "DEVICEID".into()).await;
1656
1657        let identity_request = {
1658            let private_identity = manager.store.private_identity();
1659            let private_identity = private_identity.lock().await;
1660            private_identity.as_upload_request().await
1661        };
1662        let device_keys = manager.store.static_account().unsigned_device_keys();
1663
1664        let response = json!({
1665            "device_keys": {
1666                user_id: {
1667                    device_keys.device_id.to_string(): device_keys
1668                }
1669            },
1670            "master_keys": {
1671                user_id: identity_request.master_key,
1672            },
1673            "self_signing_keys": {
1674                user_id: identity_request.self_signing_key,
1675            },
1676            "user_signing_keys": {
1677                user_id: identity_request.user_signing_key,
1678            }
1679        });
1680
1681        let response = ruma_response_from_json(&response);
1682        manager.receive_keys_query_response(&TransactionId::new(), &response).await.unwrap();
1683
1684        let identity = manager.store.get_user_identity(user_id).await.unwrap().unwrap();
1685        let identity = identity.own().unwrap();
1686        assert!(identity.is_verified());
1687
1688        let identity_request = {
1689            let private_identity = PrivateCrossSigningIdentity::new(user_id.into());
1690            private_identity.as_upload_request().await
1691        };
1692
1693        let response = json!({
1694            "master_keys": {
1695                user_id: identity_request.master_key,
1696                "@example2:localhost": {
1697                    "user_id": "@example2:localhost",
1698                    "usage": ["master"],
1699                    "keys": {
1700                        "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do"
1701                    },
1702                    "signatures": {
1703                        "@example2:localhost": {
1704                            "ed25519:SKISMLNIMH": "KdUZqzt8VScGNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1705                        }
1706                    }
1707                },
1708            },
1709            "self_signing_keys": {
1710                user_id: identity_request.self_signing_key,
1711                "@example2:localhost": {
1712                    "user_id": "@example2:localhost",
1713                    "usage": ["self_signing"],
1714                    "keys": {
1715                        "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc"
1716                    },
1717                    "signatures": {
1718                        "@example2:localhost": {
1719                            "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "W/O8BnmiUETPpH02mwYaBgvvgF/atXnusmpSTJZeUSH/vHg66xiZOhveQDG4cwaW8iMa+t9N4h1DWnRoHB4mCQ"
1720                        }
1721                    }
1722                }
1723            },
1724            "user_signing_keys": {
1725                user_id: identity_request.user_signing_key,
1726            }
1727        });
1728
1729        let response = ruma_response_from_json(&response);
1730        let (_, private_identity) = manager.handle_cross_signing_keys(&response).await.unwrap();
1731
1732        assert!(private_identity.is_some());
1733        let private_identity = manager.store.private_identity();
1734        assert!(private_identity.lock().await.is_empty().await);
1735    }
1736
1737    #[async_test]
1738    async fn test_no_tracked_users_key_query_request() {
1739        let manager = manager_test_helper(user_id(), device_id()).await;
1740
1741        let cache = manager.store.cache().await.unwrap();
1742        assert!(
1743            manager.key_query_manager.synced(&cache).await.unwrap().tracked_users().is_empty(),
1744            "No users are initially tracked"
1745        );
1746
1747        let requests = manager.users_for_key_query().await.unwrap();
1748        assert!(!requests.is_empty(), "We query the keys for our own user");
1749
1750        assert!(
1751            manager
1752                .key_query_manager
1753                .synced(&cache)
1754                .await
1755                .unwrap()
1756                .tracked_users()
1757                .contains(manager.user_id()),
1758            "Our own user is now tracked"
1759        );
1760    }
1761
1762    /// If a user is invalidated while a /keys/query request is in flight, that
1763    /// user is not removed from the list of outdated users when the
1764    /// response is received
1765    #[async_test]
1766    async fn test_invalidation_race_handling() {
1767        let manager = manager_test_helper(user_id(), device_id()).await;
1768        let alice = other_user_id();
1769        manager.update_tracked_users([alice]).await.unwrap();
1770
1771        // alice should be in the list of key queries
1772        let (reqid, req) = manager.users_for_key_query().await.unwrap().pop_first().unwrap();
1773        assert!(req.device_keys.contains_key(alice));
1774
1775        // another invalidation turns up
1776        {
1777            let cache = manager.store.cache().await.unwrap();
1778            manager.receive_device_changes(&cache, [alice].into_iter()).await.unwrap();
1779        }
1780
1781        // the response from the query arrives
1782        manager.receive_keys_query_response(&reqid, &other_key_query()).await.unwrap();
1783
1784        // alice should *still* be in the list of key queries
1785        let (reqid, req) = manager.users_for_key_query().await.unwrap().pop_first().unwrap();
1786        assert!(req.device_keys.contains_key(alice));
1787
1788        // another key query response
1789        manager.receive_keys_query_response(&reqid, &other_key_query()).await.unwrap();
1790
1791        // finally alice should not be in the list
1792        let queries = manager.users_for_key_query().await.unwrap();
1793        assert!(!queries.iter().any(|(_, r)| r.device_keys.contains_key(alice)));
1794    }
1795
1796    #[async_test]
1797    async fn test_failure_handling() {
1798        let manager = manager_test_helper(user_id(), device_id()).await;
1799        let alice = user_id!("@alice:example.org");
1800
1801        {
1802            let cache = manager.store.cache().await.unwrap();
1803            let key_query_manager = manager.key_query_manager.synced(&cache).await.unwrap();
1804            assert!(key_query_manager.tracked_users().is_empty(), "No users are initially tracked");
1805
1806            key_query_manager.mark_user_as_changed(alice).await.unwrap();
1807
1808            assert!(
1809                key_query_manager.tracked_users().contains(alice),
1810                "Alice is tracked after being marked as tracked"
1811            );
1812        }
1813
1814        let (reqid, req) = manager.users_for_key_query().await.unwrap().pop_first().unwrap();
1815        assert!(req.device_keys.contains_key(alice));
1816
1817        // a failure should stop us querying for the user's keys.
1818        let response = key_query_with_failures();
1819        manager.receive_keys_query_response(&reqid, &response).await.unwrap();
1820        assert!(manager.failures.contains(alice.server_name()));
1821        assert!(!manager
1822            .users_for_key_query()
1823            .await
1824            .unwrap()
1825            .iter()
1826            .any(|(_, r)| r.device_keys.contains_key(alice)));
1827
1828        // clearing the failure flag should make the user reappear in the query list.
1829        manager.failures.remove([alice.server_name().to_owned()].iter());
1830        assert!(manager
1831            .users_for_key_query()
1832            .await
1833            .unwrap()
1834            .iter()
1835            .any(|(_, r)| r.device_keys.contains_key(alice)));
1836    }
1837
1838    #[async_test]
1839    async fn test_out_of_band_key_query() {
1840        // build the request
1841        let manager = manager_test_helper(user_id(), device_id()).await;
1842        let (reqid, req) = manager.build_key_query_for_users(vec![user_id()]);
1843        assert!(req.device_keys.contains_key(user_id()));
1844
1845        // make up a response and check it is processed
1846        let (device_changes, identity_changes) =
1847            manager.receive_keys_query_response(&reqid, &own_key_query()).await.unwrap();
1848        assert_eq!(device_changes.new.len(), 1);
1849        assert_eq!(device_changes.new[0].device_id(), "LVWOVGOXME");
1850        assert_eq!(identity_changes.new.len(), 1);
1851        assert_eq!(identity_changes.new[0].user_id(), user_id());
1852
1853        let devices = manager.store.get_user_devices(user_id()).await.unwrap();
1854        assert_eq!(devices.devices().count(), 1);
1855        assert_eq!(devices.devices().next().unwrap().device_id(), "LVWOVGOXME");
1856    }
1857
1858    #[async_test]
1859    async fn test_invalid_key_response() {
1860        let my_user_id = user_id();
1861        let my_device_id = device_id();
1862        let manager = manager_test_helper(my_user_id, my_device_id).await;
1863
1864        // First of all, populate the store with good data
1865        let (reqid, _) = manager.build_key_query_for_users(vec![user_id()]);
1866        let (device_changes, identity_changes) =
1867            manager.receive_keys_query_response(&reqid, &own_key_query()).await.unwrap();
1868        assert_eq!(device_changes.new.len(), 1);
1869        let test_device_id = device_changes.new.first().unwrap().device_id().to_owned();
1870        use crate::store::Changes;
1871        let changes =
1872            Changes { devices: device_changes, identities: identity_changes, ..Changes::default() };
1873        manager.store.save_changes(changes).await.unwrap();
1874
1875        // Now provide an invalid update
1876        let (reqid, _) = manager.build_key_query_for_users(vec![my_user_id]);
1877        let response = ruma_response_from_json(&json!({
1878            "device_keys": {
1879                my_user_id: {
1880                    test_device_id.as_str(): {
1881                        "algorithms": [
1882                            "m.olm.v1.curve25519-aes-sha2",
1883                        ],
1884                        "device_id": test_device_id.as_str(),
1885                        "keys": {
1886                            format!("curve25519:{}", test_device_id): "wnip2tbJBJxrFayC88NNJpm61TeSNgYcqBH4T9yEDhU",
1887                            format!("ed25519:{}", test_device_id): "lQ+eshkhgKoo+qp9Qgnj3OX5PBoWMU5M9zbuEevwYqE"
1888                        },
1889                        "signatures": {
1890                            my_user_id: {
1891                                // Not a valid signature.
1892                                format!("ed25519:{}", test_device_id): "imadethisup",
1893                            }
1894                        },
1895                        "user_id": my_user_id,
1896                    }
1897                }
1898            }
1899        }));
1900
1901        let (device_changes, identity_changes) =
1902            manager.receive_keys_query_response(&reqid, &response).await.unwrap();
1903
1904        // The result should be empty
1905        assert_eq!(device_changes.new.len(), 0);
1906        assert_eq!(device_changes.changed.len(), 0);
1907        assert_eq!(device_changes.deleted.len(), 0);
1908        assert_eq!(identity_changes.new.len(), 0);
1909
1910        // And the device should not have been updated.
1911        let device =
1912            manager.store.get_user_devices(my_user_id).await.unwrap().get(&test_device_id).unwrap();
1913        assert_eq!(device.algorithms().len(), 2);
1914    }
1915
1916    #[async_test]
1917    async fn test_devices_stream() {
1918        let manager = manager_test_helper(user_id(), device_id()).await;
1919        let (request_id, _) = manager.build_key_query_for_users(vec![user_id()]);
1920
1921        let stream = manager.store.devices_stream();
1922        pin_mut!(stream);
1923
1924        manager.receive_keys_query_response(&request_id, &own_key_query()).await.unwrap();
1925
1926        let update = assert_ready!(stream);
1927        assert!(!update.new.is_empty(), "The device update should contain some devices");
1928    }
1929
1930    #[async_test]
1931    async fn test_identities_stream() {
1932        let manager = manager_test_helper(user_id(), device_id()).await;
1933        let (request_id, _) = manager.build_key_query_for_users(vec![user_id()]);
1934
1935        let stream = manager.store.user_identities_stream();
1936        pin_mut!(stream);
1937
1938        manager.receive_keys_query_response(&request_id, &own_key_query()).await.unwrap();
1939
1940        let update = assert_ready!(stream);
1941        assert!(!update.new.is_empty(), "The identities update should contain some identities");
1942    }
1943
1944    #[async_test]
1945    async fn test_identities_stream_raw() {
1946        let mut manager = Some(manager_test_helper(user_id(), device_id()).await);
1947        let (request_id, _) = manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]);
1948
1949        let stream = manager.as_ref().unwrap().store.identities_stream_raw();
1950        pin_mut!(stream);
1951
1952        manager
1953            .as_ref()
1954            .unwrap()
1955            .receive_keys_query_response(&request_id, &own_key_query())
1956            .await
1957            .unwrap();
1958
1959        let (identity_update, _) = assert_ready!(stream);
1960        assert_eq!(identity_update.new.len(), 1);
1961        assert_eq!(identity_update.changed.len(), 0);
1962        assert_eq!(identity_update.unchanged.len(), 0);
1963        assert_eq!(identity_update.new[0].user_id(), user_id());
1964
1965        assert_pending!(stream);
1966
1967        let (new_request_id, _) =
1968            manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]);
1969
1970        // A second `/keys/query` response with the same result shouldn't fire a change
1971        // notification: the identity and device should be unchanged.
1972        manager
1973            .as_ref()
1974            .unwrap()
1975            .receive_keys_query_response(&new_request_id, &own_key_query())
1976            .await
1977            .unwrap();
1978
1979        assert_pending!(stream);
1980
1981        // dropping the manager (and hence dropping the store) should close the stream
1982        manager.take();
1983        assert_closed!(stream);
1984    }
1985
1986    #[async_test]
1987    async fn test_identities_stream_raw_signature_update() {
1988        let mut manager = Some(manager_test_helper(user_id(), device_id()).await);
1989        let (request_id, _) =
1990            manager.as_ref().unwrap().build_key_query_for_users(vec![other_user_id()]);
1991
1992        let stream = manager.as_ref().unwrap().store.identities_stream_raw();
1993        pin_mut!(stream);
1994
1995        manager
1996            .as_ref()
1997            .unwrap()
1998            .receive_keys_query_response(&request_id, &other_key_query())
1999            .await
2000            .unwrap();
2001
2002        let (identity_update, _) = assert_ready!(stream);
2003        assert_eq!(identity_update.new.len(), 1);
2004        assert_eq!(identity_update.changed.len(), 0);
2005        assert_eq!(identity_update.unchanged.len(), 0);
2006        assert_eq!(identity_update.new[0].user_id(), other_user_id());
2007
2008        let initial_msk = identity_update.new[0].master_key().clone();
2009
2010        let (new_request_id, _) =
2011            manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]);
2012        // There is a new signature on the msk, should trigger a change
2013        manager
2014            .as_ref()
2015            .unwrap()
2016            .receive_keys_query_response(&new_request_id, &other_key_query_cross_signed())
2017            .await
2018            .unwrap();
2019
2020        let (identity_update_2, _) = assert_ready!(stream);
2021        assert_eq!(identity_update_2.new.len(), 0);
2022        assert_eq!(identity_update_2.changed.len(), 1);
2023        assert_eq!(identity_update_2.unchanged.len(), 0);
2024
2025        let updated_msk = identity_update_2.changed[0].master_key().clone();
2026
2027        // Identity has a change (new signature) but it's the same msk
2028        assert_eq!(initial_msk, updated_msk);
2029
2030        assert_pending!(stream);
2031
2032        manager.take();
2033    }
2034
2035    #[async_test]
2036    async fn test_key_query_with_unknown_properties() {
2037        let manager = manager_test_helper(user_id(), device_id()).await;
2038        let other_user = user_id!("@example:localhost");
2039        let devices = manager.store.get_user_devices(other_user).await.unwrap();
2040        assert_eq!(devices.devices().count(), 0);
2041
2042        let response = json!({
2043            "device_keys": {
2044                "@example:localhost": {
2045                    "OBEBOSKTBE": {
2046                        "algorithms": ["m.olm.v1.curve25519-aes-sha2", "m.megolm.v1.aes-sha2"],
2047                        "user_id": "@example:localhost",
2048                        "device_id": "OBEBOSKTBE",
2049                        "extra_property": "somevalue",
2050                        "keys": {
2051                            "curve25519:OBEBOSKTBE": "ECrdZebl0DskwbkxoztsiKPb6ivu7M2qQ70BFWwre3w",
2052                            "ed25519:OBEBOSKTBE": "hFWo+pG6TVWNzq/ZubUQVL5Ardu9rqHxpKkCbf1/KiA"
2053                        },
2054                        "signatures": {
2055                            "@example:localhost": {
2056                                "ed25519:OBEBOSKTBE": "6vyYUgX+IoT1x6Mvf0g/GEPVb2UI3brfL7WZ75WZ81sH4FBFgAzkkuGpw9suGLKXnlEdLH0suBzaT4esVhFDCw",
2057                            },
2058                        },
2059                    },
2060                },
2061            },
2062        });
2063
2064        let response = ruma_response_from_json(&response);
2065        manager.receive_keys_query_response(&TransactionId::new(), &response).await.unwrap();
2066
2067        let devices = manager.store.get_user_devices(other_user).await.unwrap();
2068        assert_eq!(devices.devices().count(), 1);
2069
2070        manager.store.get_device_data(other_user, device_id!("OBEBOSKTBE")).await.unwrap().unwrap();
2071    }
2072
2073    #[async_test]
2074    async fn test_manager_identity_updates() {
2075        use test_json::keys_query_sets::IdentityChangeDataSet as DataSet;
2076
2077        let manager = manager_test_helper(user_id(), device_id()).await;
2078        let other_user = DataSet::user_id();
2079        let devices = manager.store.get_user_devices(other_user).await.unwrap();
2080        assert_eq!(devices.devices().count(), 0);
2081
2082        let identity = manager.store.get_user_identity(other_user).await.unwrap();
2083        assert!(identity.is_none());
2084
2085        manager
2086            .receive_keys_query_response(
2087                &TransactionId::new(),
2088                &DataSet::key_query_with_identity_a(),
2089            )
2090            .await
2091            .unwrap();
2092
2093        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2094        let other_identity = identity.other().unwrap();
2095
2096        // We should now have an identity for the user but no pin violation
2097        // (pinned master key is the current one)
2098        assert!(!other_identity.has_pin_violation());
2099        let first_device =
2100            manager.store.get_device_data(other_user, DataSet::device_a()).await.unwrap().unwrap();
2101        assert!(first_device.is_cross_signed_by_owner(&identity));
2102
2103        // We receive a new keys update for that user, with a new identity
2104        manager
2105            .receive_keys_query_response(
2106                &TransactionId::new(),
2107                &DataSet::key_query_with_identity_b(),
2108            )
2109            .await
2110            .unwrap();
2111
2112        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2113        let other_identity = identity.other().unwrap();
2114
2115        // The previous known identity has been replaced, there should be a pin
2116        // violation
2117        assert!(other_identity.has_pin_violation());
2118
2119        let second_device =
2120            manager.store.get_device_data(other_user, DataSet::device_b()).await.unwrap().unwrap();
2121
2122        // There is a new device signed by the new identity
2123        assert!(second_device.is_cross_signed_by_owner(&identity));
2124
2125        // The first device should not be signed by the new identity
2126        let first_device =
2127            manager.store.get_device_data(other_user, DataSet::device_a()).await.unwrap().unwrap();
2128        assert!(!first_device.is_cross_signed_by_owner(&identity));
2129
2130        let remember_previous_identity = other_identity.clone();
2131        // We receive updated keys for that user, with no identity anymore.
2132        // Notice that there is no server API to delete identity, but we want to
2133        // test here that a home server cannot clear the identity and
2134        // subsequently serve a new one which would get automatically approved.
2135        manager
2136            .receive_keys_query_response(
2137                &TransactionId::new(),
2138                &DataSet::key_query_with_identity_no_identity(),
2139            )
2140            .await
2141            .unwrap();
2142
2143        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2144        let other_identity = identity.other().unwrap();
2145
2146        assert_eq!(other_identity, &remember_previous_identity);
2147        assert!(other_identity.has_pin_violation());
2148    }
2149
2150    #[async_test]
2151    async fn test_manager_resolve_identity_pin_violation() {
2152        use test_json::keys_query_sets::IdentityChangeDataSet as DataSet;
2153
2154        let manager = manager_test_helper(user_id(), device_id()).await;
2155        let other_user = DataSet::user_id();
2156
2157        manager
2158            .receive_keys_query_response(
2159                &TransactionId::new(),
2160                &DataSet::key_query_with_identity_a(),
2161            )
2162            .await
2163            .unwrap();
2164
2165        // We receive a new keys update for that user, with a new identity
2166        manager
2167            .receive_keys_query_response(
2168                &TransactionId::new(),
2169                &DataSet::key_query_with_identity_b(),
2170            )
2171            .await
2172            .unwrap();
2173
2174        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2175        let other_identity = identity.other().unwrap();
2176
2177        // We have a new identity now, so there should be a pin violation
2178        assert!(other_identity.has_pin_violation());
2179
2180        // Resolve the violation by pinning the new identity
2181        other_identity.pin();
2182
2183        assert!(!other_identity.has_pin_violation());
2184    }
2185
2186    // Set up a machine do initial own key query and import cross-signing secret to
2187    // make the current session verified.
2188    async fn common_verified_identity_changes_machine_setup() -> OlmMachine {
2189        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2190
2191        let machine = OlmMachine::new(DataSet::own_id(), device_id!("LOCAL")).await;
2192
2193        let keys_query = DataSet::own_keys_query_response_1();
2194        let txn_id = TransactionId::new();
2195        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2196
2197        machine
2198            .import_cross_signing_keys(CrossSigningKeyExport {
2199                master_key: DataSet::MASTER_KEY_PRIVATE_EXPORT.to_owned().into(),
2200                self_signing_key: DataSet::SELF_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2201                user_signing_key: DataSet::USER_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2202            })
2203            .await
2204            .unwrap();
2205        machine
2206    }
2207    #[async_test]
2208    async fn test_manager_verified_latch_setup_on_new_identities() {
2209        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2210
2211        let machine = common_verified_identity_changes_machine_setup().await;
2212
2213        // ######
2214        // First test: Assert that the latch is properly set on new identities
2215        // ######
2216        let keys_query = DataSet::bob_keys_query_response_signed();
2217        let txn_id = TransactionId::new();
2218        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2219
2220        let own_identity =
2221            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2222        // For sanity check that own identity is trusted
2223        assert!(own_identity.is_verified());
2224
2225        let bob_identity =
2226            machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap().other().unwrap();
2227        // The verified latch should be true
2228        assert!(bob_identity.was_previously_verified());
2229        // And bob is verified
2230        assert!(bob_identity.is_verified());
2231
2232        // ######
2233        // Second test: Assert that the local latch stays on if the identity is rotated
2234        // ######
2235        let keys_query = DataSet::bob_keys_query_response_rotated();
2236        let txn_id = TransactionId::new();
2237        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2238
2239        let bob_identity =
2240            machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap().other().unwrap();
2241        // Bob is not verified anymore
2242        assert!(!bob_identity.is_verified());
2243        // The verified latch should still be true
2244        assert!(bob_identity.was_previously_verified());
2245        // Bob device_2 is self-signed even if there is this verification latch
2246        // violation
2247        let bob_device = machine
2248            .get_device(DataSet::bob_id(), DataSet::bob_device_2_id(), None)
2249            .await
2250            .unwrap()
2251            .unwrap();
2252        assert!(bob_identity.is_device_signed(&bob_device));
2253        // there is also a pin violation
2254        assert!(bob_identity.has_pin_violation());
2255        // Fixing the pin violation won't fix the verification latch violation
2256        bob_identity.pin_current_master_key().await.unwrap();
2257        assert!(!bob_identity.has_pin_violation());
2258        let has_latch_violation =
2259            bob_identity.was_previously_verified() && !bob_identity.is_verified();
2260        assert!(has_latch_violation);
2261    }
2262
2263    #[async_test]
2264    async fn test_manager_verified_identity_changes_setup_on_updated_identities() {
2265        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2266
2267        let machine = common_verified_identity_changes_machine_setup().await;
2268
2269        // ######
2270        // Get the Carol identity for the first time
2271        // ######
2272        let keys_query = DataSet::carol_keys_query_response_unsigned();
2273        let txn_id = TransactionId::new();
2274        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2275
2276        let carol_identity =
2277            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2278        // The identity is not verified
2279        assert!(!carol_identity.is_verified());
2280        // The verified latch is off
2281        assert!(!carol_identity.was_previously_verified());
2282
2283        // Carol is verified, likely from another session. Ensure the latch is updated
2284        // when the key query response is processed
2285        let keys_query = DataSet::carol_keys_query_response_signed();
2286        let txn_id = TransactionId::new();
2287        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2288
2289        let carol_identity = machine
2290            .get_identity(DataSet::carol_id(), None)
2291            .await
2292            .unwrap()
2293            .unwrap()
2294            .other()
2295            .unwrap();
2296        assert!(carol_identity.is_verified());
2297        // This should have updated the latch
2298        assert!(carol_identity.was_previously_verified());
2299        // It is the same identity, it's just signed now so no pin violation
2300        assert!(!carol_identity.has_pin_violation());
2301    }
2302
2303    // Set up a machine do initial own key query.
2304    // The cross signing secrets are not yet uploaded.
2305    // Then query keys for carol and bob (both signed by own identity)
2306    async fn common_verified_identity_changes_own_trust_change_machine_setup() -> OlmMachine {
2307        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2308
2309        // Start on a non-verified session
2310        let machine = OlmMachine::new(DataSet::own_id(), device_id!("LOCAL")).await;
2311
2312        let keys_query = DataSet::own_keys_query_response_1();
2313        let txn_id = TransactionId::new();
2314        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2315
2316        // For sanity check that own identity is not trusted
2317        let own_identity =
2318            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2319        assert!(!own_identity.is_verified());
2320
2321        let keys_query = DataSet::own_keys_query_response_1();
2322        let txn_id = TransactionId::new();
2323        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2324
2325        // Get Bob and Carol already signed
2326        let keys_query = DataSet::bob_keys_query_response_signed();
2327        let txn_id = TransactionId::new();
2328        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2329
2330        let keys_query = DataSet::carol_keys_query_response_signed();
2331        let txn_id = TransactionId::new();
2332        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2333
2334        machine.update_tracked_users(vec![DataSet::bob_id(), DataSet::carol_id()]).await.unwrap();
2335
2336        machine
2337    }
2338
2339    #[async_test]
2340    async fn test_manager_verified_identity_changes_setup_on_own_identity_trust_change() {
2341        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2342        let machine = common_verified_identity_changes_own_trust_change_machine_setup().await;
2343
2344        let own_identity =
2345            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2346
2347        let bob_identity = machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap();
2348        // Bob is verified by our identity but our own identity is not yet trusted
2349        assert!(!bob_identity.was_previously_verified());
2350        assert!(own_identity.is_identity_signed(&bob_identity.other().unwrap()));
2351
2352        let carol_identity =
2353            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2354        // Carol is verified by our identity but our own identity is not yet trusted
2355        assert!(!carol_identity.was_previously_verified());
2356        assert!(own_identity.is_identity_signed(&carol_identity.other().unwrap()));
2357
2358        // Marking our own identity as trusted should update the existing identities
2359        let _ = own_identity.verify().await;
2360
2361        let own_identity = machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap();
2362        assert!(own_identity.is_verified());
2363
2364        let carol_identity =
2365            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2366        assert!(carol_identity.is_verified());
2367        // The latch should be set now
2368        assert!(carol_identity.was_previously_verified());
2369
2370        let bob_identity = machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap();
2371        assert!(bob_identity.is_verified());
2372        // The latch should be set now
2373        assert!(bob_identity.was_previously_verified());
2374    }
2375
2376    #[async_test]
2377    async fn test_manager_verified_identity_change_setup_on_import_secrets() {
2378        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2379        let machine = common_verified_identity_changes_own_trust_change_machine_setup().await;
2380
2381        let own_identity =
2382            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2383
2384        let bob_identity =
2385            machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap().other().unwrap();
2386        // Carol is verified by our identity but our own identity is not yet trusted
2387        assert!(own_identity.is_identity_signed(&bob_identity));
2388        assert!(!bob_identity.was_previously_verified());
2389
2390        let carol_identity = machine
2391            .get_identity(DataSet::carol_id(), None)
2392            .await
2393            .unwrap()
2394            .unwrap()
2395            .other()
2396            .unwrap();
2397        // Carol is verified by our identity but our own identity is not yet trusted
2398        assert!(own_identity.is_identity_signed(&carol_identity));
2399        assert!(!carol_identity.was_previously_verified());
2400
2401        // Marking our own identity as trusted should update the existing identities
2402        machine
2403            .import_cross_signing_keys(CrossSigningKeyExport {
2404                master_key: DataSet::MASTER_KEY_PRIVATE_EXPORT.to_owned().into(),
2405                self_signing_key: DataSet::SELF_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2406                user_signing_key: DataSet::USER_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2407            })
2408            .await
2409            .unwrap();
2410
2411        let own_identity = machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap();
2412        assert!(own_identity.is_verified());
2413
2414        let carol_identity =
2415            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2416        assert!(carol_identity.is_verified());
2417        // The latch should be set now
2418        assert!(carol_identity.was_previously_verified());
2419
2420        let bob_identity = machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap();
2421        assert!(bob_identity.is_verified());
2422        // The latch should be set now
2423        assert!(bob_identity.was_previously_verified());
2424    }
2425
2426    mod update_sender_data {
2427        use assert_matches::assert_matches;
2428        use matrix_sdk_test::async_test;
2429        use ruma::room_id;
2430
2431        use super::{device_id, manager_test_helper};
2432        use crate::{
2433            identities::manager::testing::{other_user_id, user_id},
2434            olm::{InboundGroupSession, SenderData},
2435            store::{Changes, DeviceChanges},
2436            Account, DeviceData, EncryptionSettings,
2437        };
2438
2439        #[async_test]
2440        async fn test_adds_device_info_to_existing_sessions() {
2441            let manager = manager_test_helper(user_id(), device_id()).await;
2442
2443            // Given that we have lots of sessions in the store, from each of two devices
2444            let account1 = Account::new(user_id());
2445            let account2 = Account::new(other_user_id());
2446
2447            let mut account1_sessions = Vec::new();
2448            for _ in 0..60 {
2449                account1_sessions.push(create_inbound_group_session(&account1).await);
2450            }
2451            let mut account2_sessions = Vec::new();
2452            for _ in 0..60 {
2453                account2_sessions.push(create_inbound_group_session(&account2).await);
2454            }
2455            manager
2456                .store
2457                .save_changes(Changes {
2458                    inbound_group_sessions: [account1_sessions.clone(), account2_sessions.clone()]
2459                        .concat(),
2460                    ..Default::default()
2461                })
2462                .await
2463                .unwrap();
2464
2465            // When we get an update for one device
2466            let device_data = DeviceData::from_account(&account1);
2467            manager
2468                .update_sender_data_from_device_changes(&DeviceChanges {
2469                    changed: vec![device_data],
2470                    ..Default::default()
2471                })
2472                .await
2473                .unwrap();
2474
2475            // Then those sessions should be updated
2476            for session in account1_sessions {
2477                let updated = manager
2478                    .store
2479                    .get_inbound_group_session(session.room_id(), session.session_id())
2480                    .await
2481                    .unwrap()
2482                    .expect("Could not find session after update");
2483                assert_matches!(
2484                    updated.sender_data,
2485                    SenderData::DeviceInfo { .. },
2486                    "incorrect sender data for session {}",
2487                    session.session_id()
2488                );
2489            }
2490
2491            // ... and those from the other account should not
2492            for session in account2_sessions {
2493                let updated = manager
2494                    .store
2495                    .get_inbound_group_session(session.room_id(), session.session_id())
2496                    .await
2497                    .unwrap()
2498                    .expect("Could not find session after update");
2499                assert_matches!(updated.sender_data, SenderData::UnknownDevice { .. });
2500            }
2501        }
2502
2503        /// Create an InboundGroupSession sent from the given account
2504        async fn create_inbound_group_session(account: &Account) -> InboundGroupSession {
2505            let (_, igs) = account
2506                .create_group_session_pair(
2507                    room_id!("!test:room"),
2508                    EncryptionSettings::default(),
2509                    SenderData::unknown(),
2510                )
2511                .await
2512                .unwrap();
2513            igs
2514        }
2515    }
2516}