matrix_sdk_crypto/identities/
manager.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17    ops::Deref,
18    sync::Arc,
19    time::Duration,
20};
21
22use futures_util::future::join_all;
23use itertools::Itertools;
24use matrix_sdk_common::{executor::spawn, failures_cache::FailuresCache};
25use ruma::{
26    api::client::keys::get_keys::v3::Response as KeysQueryResponse, serde::Raw, OwnedDeviceId,
27    OwnedServerName, OwnedTransactionId, OwnedUserId, ServerName, TransactionId, UserId,
28};
29use tokio::sync::Mutex;
30use tracing::{debug, enabled, info, instrument, trace, warn, Level};
31
32use crate::{
33    error::OlmResult,
34    identities::{DeviceData, OtherUserIdentityData, OwnUserIdentityData, UserIdentityData},
35    olm::{
36        sender_data_finder::SessionDeviceCheckError, InboundGroupSession,
37        PrivateCrossSigningIdentity, SenderDataFinder, SenderDataType,
38    },
39    store::{
40        caches::{SequenceNumber, StoreCache, StoreCacheGuard},
41        types::{Changes, DeviceChanges, IdentityChanges, UserKeyQueryResult},
42        KeyQueryManager, Result as StoreResult, Store,
43    },
44    types::{
45        requests::KeysQueryRequest, CrossSigningKey, DeviceKeys, MasterPubkey, SelfSigningPubkey,
46        UserSigningPubkey,
47    },
48    CryptoStoreError, LocalTrust, OwnUserIdentity, SignatureError, UserIdentity,
49};
50
51enum DeviceChange {
52    New(DeviceData),
53    Updated(DeviceData),
54    None,
55}
56
57/// This enum helps us to distinguish between the changed and unchanged
58/// identity case.
59/// An unchanged identity means same cross signing keys as well as same
60/// set of signatures on the master key.
61enum IdentityUpdateResult {
62    Updated(UserIdentityData),
63    Unchanged(UserIdentityData),
64}
65
66#[derive(Debug, Clone)]
67pub(crate) struct IdentityManager {
68    /// Servers that have previously appeared in the `failures` section of a
69    /// `/keys/query` response.
70    ///
71    /// See also [`crate::session_manager::SessionManager::failures`].
72    failures: FailuresCache<OwnedServerName>,
73    store: Store,
74
75    pub(crate) key_query_manager: Arc<KeyQueryManager>,
76
77    /// Details of the current "in-flight" key query request, if any
78    keys_query_request_details: Arc<Mutex<Option<KeysQueryRequestDetails>>>,
79}
80
81/// Details of an in-flight key query request
82#[derive(Debug, Clone, Default)]
83struct KeysQueryRequestDetails {
84    /// The sequence number, to be passed to
85    /// `Store.mark_tracked_users_as_up_to_date`.
86    sequence_number: SequenceNumber,
87
88    /// A single batch of queries returned by the Store is broken up into one or
89    /// more actual KeysQueryRequests, each with their own request id. We
90    /// record the outstanding request ids here.
91    request_ids: HashSet<OwnedTransactionId>,
92}
93
94// Helper type to handle key query response
95struct KeySetInfo {
96    user_id: OwnedUserId,
97    master_key: MasterPubkey,
98    self_signing: SelfSigningPubkey,
99}
100
101impl IdentityManager {
102    const MAX_KEY_QUERY_USERS: usize = 250;
103
104    pub fn new(store: Store) -> Self {
105        let keys_query_request_details = Mutex::new(None);
106
107        IdentityManager {
108            store,
109            key_query_manager: Default::default(),
110            failures: Default::default(),
111            keys_query_request_details: keys_query_request_details.into(),
112        }
113    }
114
115    fn user_id(&self) -> &UserId {
116        &self.store.static_account().user_id
117    }
118
119    /// Receive a successful `/keys/query` response.
120    ///
121    /// Returns a list of devices newly discovered devices and devices that
122    /// changed.
123    ///
124    /// # Arguments
125    ///
126    /// * `request_id` - The request_id returned by `users_for_key_query` or
127    ///   `build_key_query_for_users`
128    /// * `response` - The response of the `/keys/query` request that the client
129    ///   performed.
130    pub async fn receive_keys_query_response(
131        &self,
132        request_id: &TransactionId,
133        response: &KeysQueryResponse,
134    ) -> OlmResult<(DeviceChanges, IdentityChanges)> {
135        debug!(
136            ?request_id,
137            users = ?response.device_keys.keys().collect::<BTreeSet<_>>(),
138            failures = ?response.failures,
139            "Handling a `/keys/query` response"
140        );
141
142        // Parse the strings into server names and filter out our own server. We should
143        // never get failures from our own server but let's remove it as a
144        // precaution anyways.
145        let failed_servers = response
146            .failures
147            .keys()
148            .filter_map(|k| ServerName::parse(k).ok())
149            .filter(|s| s != self.user_id().server_name());
150        let successful_servers = response.device_keys.keys().map(|u| u.server_name());
151
152        // Append the new failed servers and remove any successful servers. We
153        // need to explicitly remove the successful servers because the cache
154        // doesn't automatically remove entries that elapse. Instead, the effect
155        // is that elapsed servers will be retried and their delays incremented.
156        self.failures.extend(failed_servers);
157        self.failures.remove(successful_servers);
158
159        let devices = self.handle_devices_from_key_query(response.device_keys.clone()).await?;
160        let (identities, cross_signing_identity) = self.handle_cross_signing_keys(response).await?;
161
162        let changes = Changes {
163            identities: identities.clone(),
164            devices: devices.clone(),
165            private_identity: cross_signing_identity,
166            ..Default::default()
167        };
168
169        self.store.save_changes(changes).await?;
170
171        // Update the sender data on any existing inbound group sessions based on the
172        // changes in this response.
173        //
174        // `update_sender_data_from_device_changes` relies on being able to look up the
175        // user identities from the store, so this has to happen *after* the
176        // changes from `handle_cross_signing_keys` are saved.
177        //
178        // Note: it might be possible for this to race against session creation. If a
179        // new session is received at the same time as a `/keys/query` response is being
180        // processed, it could be saved without up-to-date sender data, but it might be
181        // saved too late for it to be picked up by
182        // `update_sender_data_from_device_changes`. However, this should be rare,
183        // since, in general, /sync responses which might create a new session
184        // are not processed at the same time as /keys/query responses (assuming
185        // that the application does not call `OlmMachine::receive_sync_changes`
186        // at the same time as `OlmMachine::mark_request_as_sent`).
187        self.update_sender_data_from_device_changes(&devices).await?;
188
189        // if this request is one of those we expected to be in flight, pass the
190        // sequence number back to the store so that it can mark devices up to
191        // date
192        let sequence_number = {
193            let mut request_details = self.keys_query_request_details.lock().await;
194
195            request_details.as_mut().and_then(|details| {
196                if details.request_ids.remove(request_id) {
197                    Some(details.sequence_number)
198                } else {
199                    None
200                }
201            })
202        };
203
204        if let Some(sequence_number) = sequence_number {
205            let cache = self.store.cache().await?;
206            self.key_query_manager
207                .synced(&cache)
208                .await?
209                .mark_tracked_users_as_up_to_date(
210                    response.device_keys.keys().map(Deref::deref),
211                    sequence_number,
212                )
213                .await?;
214        }
215
216        if enabled!(Level::DEBUG) {
217            debug_log_keys_query_response(&devices, &identities, request_id);
218        }
219
220        Ok((devices, identities))
221    }
222
223    async fn update_or_create_device(
224        store: Store,
225        device_keys: DeviceKeys,
226    ) -> StoreResult<DeviceChange> {
227        let old_device =
228            store.get_device_data(&device_keys.user_id, &device_keys.device_id).await?;
229
230        if let Some(mut device) = old_device {
231            match device.update_device(&device_keys) {
232                Err(e) => {
233                    warn!(
234                        user_id = ?device.user_id(),
235                        device_id = ?device.device_id(),
236                        error = ?e,
237                        "Rejecting device update",
238                    );
239                    Ok(DeviceChange::None)
240                }
241                Ok(true) => Ok(DeviceChange::Updated(device)),
242                Ok(false) => Ok(DeviceChange::None),
243            }
244        } else {
245            match DeviceData::try_from(&device_keys) {
246                Ok(d) => {
247                    // If this is our own device, check that the server isn't
248                    // lying about our keys, also mark the device as locally
249                    // trusted.
250                    if d.user_id() == store.user_id() && d.device_id() == store.device_id() {
251                        let local_device_keys = store.static_account().unsigned_device_keys();
252
253                        if d.keys() == &local_device_keys.keys {
254                            d.set_trust_state(LocalTrust::Verified);
255
256                            trace!(
257                                user_id = ?d.user_id(),
258                                device_id = ?d.device_id(),
259                                keys = ?d.keys(),
260                                "Adding our own device to the device store, \
261                                marking it as locally verified",
262                            );
263
264                            Ok(DeviceChange::New(d))
265                        } else {
266                            Ok(DeviceChange::None)
267                        }
268                    } else {
269                        trace!(
270                            user_id = ?d.user_id(),
271                            device_id = ?d.device_id(),
272                            keys = ?d.keys(),
273                            "Adding a new device to the device store",
274                        );
275
276                        Ok(DeviceChange::New(d))
277                    }
278                }
279                Err(e) => {
280                    warn!(
281                        user_id = ?device_keys.user_id,
282                        device_id = ?device_keys.device_id,
283                        error = ?e,
284                        "Rejecting a previously unseen device",
285                    );
286
287                    Ok(DeviceChange::None)
288                }
289            }
290        }
291    }
292
293    async fn update_user_devices(
294        store: Store,
295        user_id: OwnedUserId,
296        device_map: BTreeMap<OwnedDeviceId, Raw<ruma::encryption::DeviceKeys>>,
297    ) -> StoreResult<DeviceChanges> {
298        let own_device_id = store.static_account().device_id().to_owned();
299
300        let mut changes = DeviceChanges::default();
301
302        let current_devices: HashSet<OwnedDeviceId> = device_map.keys().cloned().collect();
303
304        let tasks = device_map.into_iter().filter_map(|(device_id, device_keys)| match device_keys
305            .deserialize_as::<DeviceKeys>(
306        ) {
307            Ok(device_keys) => {
308                if user_id != device_keys.user_id || device_id != device_keys.device_id {
309                    warn!(
310                        ?user_id,
311                        ?device_id,
312                        device_key_user = ?device_keys.user_id,
313                        device_key_device_id = ?device_keys.device_id,
314                        "Mismatch in the device keys payload",
315                    );
316                    None
317                } else {
318                    Some(spawn(Self::update_or_create_device(store.clone(), device_keys)))
319                }
320            }
321            Err(e) => {
322                warn!(
323                    ?user_id, ?device_id, error = ?e,
324                    "Device keys failed to deserialize",
325                );
326                None
327            }
328        });
329
330        let results = join_all(tasks).await;
331
332        for device in results {
333            let device = device.expect("Creating or updating a device panicked")?;
334
335            match device {
336                DeviceChange::New(d) => changes.new.push(d),
337                DeviceChange::Updated(d) => changes.changed.push(d),
338                DeviceChange::None => (),
339            }
340        }
341
342        let current_devices: HashSet<&OwnedDeviceId> = current_devices.iter().collect();
343        let stored_devices = store.get_device_data_for_user(&user_id).await?;
344        let stored_devices_set: HashSet<&OwnedDeviceId> = stored_devices.keys().collect();
345        let deleted_devices_set = stored_devices_set.difference(&current_devices);
346
347        let own_user_id = store.static_account().user_id();
348        for device_id in deleted_devices_set {
349            if user_id == *own_user_id && *device_id == &own_device_id {
350                let identity_keys = store.static_account().identity_keys();
351
352                warn!(
353                    user_id = ?own_user_id,
354                    device_id = ?own_device_id,
355                    curve25519_key = ?identity_keys.curve25519,
356                    ed25519_key = ?identity_keys.ed25519,
357                    "Our own device might have been deleted"
358                );
359            } else if let Some(device) = stored_devices.get(*device_id) {
360                device.mark_as_deleted();
361                changes.deleted.push(device.clone());
362            }
363        }
364
365        Ok(changes)
366    }
367
368    /// Handle the device keys part of a key query response.
369    ///
370    /// # Arguments
371    ///
372    /// * `device_keys_map` - A map holding the device keys of the users for
373    ///   which the key query was done.
374    ///
375    /// Returns a list of devices that changed. Changed here means either
376    /// they are new, one of their properties has changed or they got deleted.
377    async fn handle_devices_from_key_query(
378        &self,
379        device_keys_map: BTreeMap<
380            OwnedUserId,
381            BTreeMap<OwnedDeviceId, Raw<ruma::encryption::DeviceKeys>>,
382        >,
383    ) -> StoreResult<DeviceChanges> {
384        let mut changes = DeviceChanges::default();
385
386        let tasks = device_keys_map.into_iter().map(|(user_id, device_keys_map)| {
387            spawn(Self::update_user_devices(self.store.clone(), user_id, device_keys_map))
388        });
389
390        let results = join_all(tasks).await;
391
392        for result in results {
393            let change_fragment = result.expect("Panic while updating user devices")?;
394
395            changes.extend(change_fragment);
396        }
397
398        Ok(changes)
399    }
400
401    /// Check if the given public identity matches our stored private one.
402    ///
403    /// If they don't match, this is an indication that our identity has been
404    /// rotated. In this case we return `Some(cleared_private_identity)`,
405    /// where `cleared_private_identity` is our currently-stored
406    /// private identity with the conflicting keys removed.
407    ///
408    /// Otherwise, assuming we do have a private master cross-signing key, we
409    /// mark the public identity as verified.
410    ///
411    /// # Returns
412    ///
413    /// If the private identity needs updating (because it does not match the
414    /// public keys), the updated private identity (which will need to be
415    /// persisted).
416    ///
417    /// Otherwise, `None`.
418    async fn check_private_identity(
419        &self,
420        identity: &OwnUserIdentityData,
421    ) -> Option<PrivateCrossSigningIdentity> {
422        let private_identity = self.store.private_identity();
423        let private_identity = private_identity.lock().await;
424        let result = private_identity.clear_if_differs(identity).await;
425
426        if result.any_differ() {
427            info!(cleared = ?result, "Removed some or all of our private cross signing keys");
428            Some((*private_identity).clone())
429        } else {
430            // If the master key didn't rotate above (`clear_if_differs`),
431            // then this means that the public part and the private parts of
432            // the master key match. We previously did a signature check, so
433            // this means that the private part of the master key has signed
434            // the identity. We can safely mark the public part of the
435            // identity as verified.
436            if private_identity.has_master_key().await && !identity.is_verified() {
437                trace!("Marked our own identity as verified");
438                identity.mark_as_verified()
439            }
440
441            None
442        }
443    }
444
445    /// Process an identity received in a `/keys/query` response that we
446    /// previously knew about.
447    ///
448    /// If the identity is our own, we will look for a user-signing key; if one
449    /// is not found, an error is returned. Otherwise, we then compare the
450    /// received public identity against our stored private identity;
451    /// if they match, the returned public identity is marked as verified and
452    /// `*changed_private_identity` is set to `None`. If they do *not* match,
453    /// it is an indication that our identity has been rotated, and
454    /// `*changed_private_identity` is set to our currently-stored private
455    /// identity with the conflicting keys removed (which will need to be
456    /// persisted).
457    ///
458    /// Whether the identity is our own or that of another, we check whether
459    /// there has been any change to the cross-signing keys, and classify
460    /// the result into [`IdentityUpdateResult::Updated`] or
461    /// [`IdentityUpdateResult::Unchanged`].
462    ///
463    /// # Arguments
464    ///
465    /// * `response` - The entire `/keys/query` response.
466    /// * `master_key` - The public master cross-signing key from the
467    ///   `/keys/query` response.
468    /// * `self_signing` - The public self-signing key from the `/keys/query`
469    ///   response.
470    /// * `i` - The existing identity for this user.
471    /// * `changed_private_identity` - Output parameter. Unchanged if the
472    ///   identity is that of another user. If it is our own, set to `None` or
473    ///   `Some` depending on whether our stored private identity needs
474    ///   updating. See above for more detail.
475    async fn handle_changed_identity(
476        &self,
477        response: &KeysQueryResponse,
478        maybe_verified_own_identity: Option<&OwnUserIdentity>,
479        master_key: MasterPubkey,
480        self_signing: SelfSigningPubkey,
481        i: UserIdentityData,
482        changed_private_identity: &mut Option<PrivateCrossSigningIdentity>,
483    ) -> Result<IdentityUpdateResult, SignatureError> {
484        match i {
485            UserIdentityData::Own(mut identity) => {
486                let user_signing = self.get_user_signing_key_from_response(response)?;
487                let has_changed = identity.update(master_key, self_signing, user_signing)?;
488                *changed_private_identity = self.check_private_identity(&identity).await;
489                if has_changed {
490                    Ok(IdentityUpdateResult::Updated(identity.into()))
491                } else {
492                    Ok(IdentityUpdateResult::Unchanged(identity.into()))
493                }
494            }
495            UserIdentityData::Other(mut identity) => {
496                let has_changed = identity.update(
497                    master_key,
498                    self_signing,
499                    maybe_verified_own_identity.map(|o| o.user_signing_key()),
500                )?;
501
502                if has_changed {
503                    Ok(IdentityUpdateResult::Updated(identity.into()))
504                } else {
505                    Ok(IdentityUpdateResult::Unchanged(identity.into()))
506                }
507            }
508        }
509    }
510
511    /// Process an identity received in a `/keys/query` response that we didn't
512    /// previously know about.
513    ///
514    /// If the identity is our own, we will look for a user-signing key, and if
515    /// it is present and correct, all three keys will be returned in the
516    /// `IdentityChange` result; otherwise, an error is returned. We will also
517    /// compare the received public identity against our stored private
518    /// identity; if they match, the returned public identity is marked as
519    /// verified and `*changed_private_identity` is set to `None`. If they do
520    /// *not* match, it is an indication that our identity has been rotated,
521    /// and `*changed_private_identity` is set to our currently-stored
522    /// private identity with the conflicting keys removed (which will need
523    /// to be persisted).
524    ///
525    /// If the identity is that of another user, we just parse the keys into the
526    /// `IdentityChange` result, since all other checks have already been done.
527    ///
528    /// # Arguments
529    ///
530    /// * `response` - The entire `/keys/query` response.
531    /// * `master_key` - The public master cross-signing key from the
532    ///   `/keys/query` response.
533    /// * `self_signing` - The public self-signing key from the `/keys/query`
534    ///   response.
535    /// * `changed_private_identity` - Output parameter. Unchanged if the
536    ///   identity is that of another user. If it is our own, set to `None` or
537    ///   `Some` depending on whether our stored private identity needs
538    ///   updating. See above for more detail.
539    async fn handle_new_identity(
540        &self,
541        response: &KeysQueryResponse,
542        maybe_verified_own_identity: Option<&OwnUserIdentity>,
543        master_key: MasterPubkey,
544        self_signing: SelfSigningPubkey,
545        changed_private_identity: &mut Option<PrivateCrossSigningIdentity>,
546    ) -> Result<UserIdentityData, SignatureError> {
547        if master_key.user_id() == self.user_id() {
548            // Own identity
549            let user_signing = self.get_user_signing_key_from_response(response)?;
550            let identity = OwnUserIdentityData::new(master_key, self_signing, user_signing)?;
551            *changed_private_identity = self.check_private_identity(&identity).await;
552            Ok(identity.into())
553        } else {
554            // First time seen, create the identity. The current MSK will be pinned.
555            let identity = OtherUserIdentityData::new(master_key, self_signing)?;
556            let is_verified = maybe_verified_own_identity
557                .is_some_and(|own_user_identity| own_user_identity.is_identity_signed(&identity));
558            if is_verified {
559                identity.mark_as_previously_verified();
560            }
561
562            Ok(identity.into())
563        }
564    }
565
566    /// Try to deserialize the master key and self-signing key of an
567    /// identity from a `/keys/query` response.
568    ///
569    /// Each user identity *must* at least contain a master and self-signing
570    /// key, and this function deserializes them. (Our own identity, in addition
571    /// to those two, also contains a user-signing key, but that is not
572    /// extracted here; see
573    /// [`IdentityManager::get_user_signing_key_from_response`])
574    ///
575    /// # Arguments
576    ///
577    ///  * `master_key` - The master key for a particular user from a
578    ///    `/keys/query` response.
579    ///  * `response` - The entire `/keys/query` response.
580    ///
581    /// # Returns
582    ///
583    /// `None` if the self-signing key couldn't be found in the response, or the
584    /// one of the keys couldn't be deserialized. Else, the deserialized
585    /// public keys.
586    fn get_minimal_set_of_keys(
587        master_key: &Raw<CrossSigningKey>,
588        response: &KeysQueryResponse,
589    ) -> Option<(MasterPubkey, SelfSigningPubkey)> {
590        match master_key.deserialize_as_unchecked::<MasterPubkey>() {
591            Ok(master_key) => {
592                if let Some(self_signing) = response
593                    .self_signing_keys
594                    .get(master_key.user_id())
595                    .and_then(|k| k.deserialize_as_unchecked::<SelfSigningPubkey>().ok())
596                {
597                    Some((master_key, self_signing))
598                } else {
599                    warn!(
600                        "A user identity didn't contain a self signing pubkey or the key was invalid"
601                    );
602                    None
603                }
604            }
605            Err(e) => {
606                warn!(
607                    error = ?e,
608                    "Couldn't update or create new user identity"
609                );
610                None
611            }
612        }
613    }
614
615    /// Try to deserialize the our user-signing key from a `/keys/query`
616    /// response.
617    ///
618    /// If a `/keys/query` response includes our own cross-signing keys, then it
619    /// should include our user-signing key. This method attempts to
620    /// extract, deserialize, and check the key from the response.
621    ///
622    /// # Arguments
623    ///
624    /// * `response` - the entire `/keys/query` response.
625    fn get_user_signing_key_from_response(
626        &self,
627        response: &KeysQueryResponse,
628    ) -> Result<UserSigningPubkey, SignatureError> {
629        let Some(user_signing) = response
630            .user_signing_keys
631            .get(self.user_id())
632            .and_then(|k| k.deserialize_as_unchecked::<UserSigningPubkey>().ok())
633        else {
634            warn!(
635                "User identity for our own user didn't contain a user signing pubkey or the key \
636                    isn't valid",
637            );
638            return Err(SignatureError::MissingSigningKey);
639        };
640
641        if user_signing.user_id() != self.user_id() {
642            warn!(
643                expected = ?self.user_id(),
644                got = ?user_signing.user_id(),
645                "User ID mismatch in our user-signing key",
646            );
647            return Err(SignatureError::UserIdMismatch);
648        }
649
650        Ok(user_signing)
651    }
652
653    /// Process the cross-signing keys for a particular identity from a
654    /// `/keys/query` response.
655    ///
656    /// Checks that the keys are consistent, verifies the updates, and produces
657    /// a list of changes to be stored.
658    ///
659    /// # Arguments
660    ///
661    /// * `response` - The entire `/keys/query` response.
662    /// * `changes` - The identity results so far, which we will add to.
663    /// * `changed_identity` - Output parameter: Unchanged if the identity is
664    ///   that of another user. If it is our own, set to `None` or `Some`
665    ///   depending on whether our stored private identity needs updating.
666    /// * `maybe_verified_own_identity` - Own verified identity if any to check
667    ///   verification status of updated identity.
668    /// * `key_set_info` - The identity info as returned by the `/keys/query`
669    ///   response.
670    #[instrument(skip_all, fields(user_id))]
671    async fn update_or_create_identity(
672        &self,
673        response: &KeysQueryResponse,
674        changes: &mut IdentityChanges,
675        changed_private_identity: &mut Option<PrivateCrossSigningIdentity>,
676        maybe_verified_own_identity: Option<&OwnUserIdentity>,
677        key_set_info: KeySetInfo,
678    ) -> StoreResult<()> {
679        let KeySetInfo { user_id, master_key, self_signing } = key_set_info;
680        if master_key.user_id() != user_id || self_signing.user_id() != user_id {
681            warn!(?user_id, "User ID mismatch in one of the cross signing keys");
682        } else if let Some(i) = self.store.get_user_identity(&user_id).await? {
683            // an identity we knew about before, which is being updated
684            match self
685                .handle_changed_identity(
686                    response,
687                    maybe_verified_own_identity,
688                    master_key,
689                    self_signing,
690                    i,
691                    changed_private_identity,
692                )
693                .await
694            {
695                Ok(IdentityUpdateResult::Updated(identity)) => {
696                    trace!(?identity, "Updated a user identity");
697                    changes.changed.push(identity);
698                }
699                Ok(IdentityUpdateResult::Unchanged(identity)) => {
700                    trace!(?identity, "Received an unchanged user identity");
701                    changes.unchanged.push(identity);
702                }
703                Err(e) => {
704                    warn!(error = ?e, "Couldn't update an existing user identity");
705                }
706            }
707        } else {
708            // an identity we did not know about before
709            match self
710                .handle_new_identity(
711                    response,
712                    maybe_verified_own_identity,
713                    master_key,
714                    self_signing,
715                    changed_private_identity,
716                )
717                .await
718            {
719                Ok(identity) => {
720                    trace!(?identity, "Created new user identity");
721                    changes.new.push(identity);
722                }
723                Err(e) => {
724                    warn!(error = ?e, "Couldn't create new user identity");
725                }
726            }
727        }
728
729        Ok(())
730    }
731
732    /// Handle the cross signing keys part of a key query response.
733    ///
734    /// # Arguments
735    ///
736    /// * `response` - The `/keys/query` response.
737    ///
738    /// # Returns
739    ///
740    /// The processed results, to be saved to the datastore, comprising:
741    ///
742    ///  * A list of public identities that were received, categorised as "new",
743    ///    "changed" or "unchanged".
744    ///
745    ///  * If our own identity was updated and did not match our private
746    ///    identity, an update to that private identity. Otherwise, `None`.
747    async fn handle_cross_signing_keys(
748        &self,
749        response: &KeysQueryResponse,
750    ) -> StoreResult<(IdentityChanges, Option<PrivateCrossSigningIdentity>)> {
751        let mut changes = IdentityChanges::default();
752        let mut changed_identity = None;
753
754        // We want to check if the updated/new other identities are trusted by us or
755        // not. This is based on the current verified state of the own identity.
756        let maybe_own_verified_identity = self
757            .store
758            .get_identity(self.user_id())
759            .await?
760            .and_then(UserIdentity::own)
761            .filter(|own| own.is_verified());
762
763        for (user_id, master_key) in &response.master_keys {
764            // Get the master and self-signing key for each identity; those are required for
765            // every user identity type. If we don't have those we skip over.
766            let Some((master_key, self_signing)) =
767                Self::get_minimal_set_of_keys(master_key.cast_ref(), response)
768            else {
769                continue;
770            };
771
772            let key_set_info = KeySetInfo { user_id: user_id.clone(), master_key, self_signing };
773
774            self.update_or_create_identity(
775                response,
776                &mut changes,
777                &mut changed_identity,
778                maybe_own_verified_identity.as_ref(),
779                key_set_info,
780            )
781            .await?;
782        }
783
784        Ok((changes, changed_identity))
785    }
786
787    /// Generate an "out-of-band" key query request for the given set of users.
788    ///
789    /// Unlike the regular key query requests returned by `users_for_key_query`,
790    /// there can be several of these in flight at once. This can be useful
791    /// if we need results to be as up-to-date as possible.
792    ///
793    /// Once the request has been made, the response can be fed back into the
794    /// IdentityManager and store by calling `receive_keys_query_response`.
795    ///
796    /// # Arguments
797    ///
798    /// * `users` - list of users whose keys should be queried
799    ///
800    /// # Returns
801    ///
802    /// A tuple containing the request ID for the request, and the request
803    /// itself.
804    pub(crate) fn build_key_query_for_users<'a>(
805        &self,
806        users: impl IntoIterator<Item = &'a UserId>,
807    ) -> (OwnedTransactionId, KeysQueryRequest) {
808        // Since this is an "out-of-band" request, we just make up a transaction ID and
809        // do not store the details in `self.keys_query_request_details`.
810        //
811        // `receive_keys_query_response` will process the response as normal, except
812        // that it will not mark the users as "up-to-date".
813
814        // We assume that there aren't too many users here; if we find a usecase that
815        // requires lots of users to be up-to-date we may need to rethink this.
816        (TransactionId::new(), KeysQueryRequest::new(users.into_iter().map(|u| u.to_owned())))
817    }
818
819    /// Get a list of key query requests needed.
820    ///
821    /// # Returns
822    ///
823    /// A map of a request ID to the `/keys/query` request.
824    ///
825    /// The response of a successful key query requests needs to be passed to
826    /// the [`OlmMachine`] with the [`receive_keys_query_response`].
827    ///
828    /// [`receive_keys_query_response`]: Self::receive_keys_query_response
829    pub async fn users_for_key_query(
830        &self,
831    ) -> StoreResult<BTreeMap<OwnedTransactionId, KeysQueryRequest>> {
832        // Forget about any previous key queries in flight.
833        *self.keys_query_request_details.lock().await = None;
834
835        // We always want to track our own user, but in case we aren't in an encrypted
836        // room yet, we won't be tracking ourselves yet. This ensures we are always
837        // tracking ourselves.
838        //
839        // The check for emptiness is done first for performance.
840        let (users, sequence_number) = {
841            let cache = self.store.cache().await?;
842            let key_query_manager = self.key_query_manager.synced(&cache).await?;
843
844            let (users, sequence_number) = key_query_manager.users_for_key_query().await;
845
846            if users.is_empty() && !key_query_manager.tracked_users().contains(self.user_id()) {
847                key_query_manager.mark_user_as_changed(self.user_id()).await?;
848                key_query_manager.users_for_key_query().await
849            } else {
850                (users, sequence_number)
851            }
852        };
853
854        if users.is_empty() {
855            Ok(BTreeMap::new())
856        } else {
857            // Let's remove users that are part of the `FailuresCache`. The cache, which is
858            // a TTL cache, remembers users for which a previous `/key/query` request has
859            // failed. We don't retry a `/keys/query` for such users for a
860            // certain amount of time.
861            let users = users.into_iter().filter(|u| !self.failures.contains(u.server_name()));
862
863            // We don't want to create a single `/keys/query` request with an infinite
864            // amount of users. Some servers will likely bail out after a
865            // certain amount of users and the responses will be large. In the
866            // case of a transmission error, we'll have to retransmit the large
867            // response.
868            //
869            // Convert the set of users into multiple /keys/query requests.
870            let requests: BTreeMap<_, _> = users
871                .chunks(Self::MAX_KEY_QUERY_USERS)
872                .into_iter()
873                .map(|user_chunk| {
874                    let request_id = TransactionId::new();
875                    let request = KeysQueryRequest::new(user_chunk);
876
877                    debug!(?request_id, users = ?request.device_keys.keys(), "Created a /keys/query request");
878
879                    (request_id, request)
880                })
881                .collect();
882
883            // Collect the request IDs, these will be used later in the
884            // `receive_keys_query_response()` method to figure out if the user can be
885            // marked as up-to-date/non-dirty.
886            let request_ids = requests.keys().cloned().collect();
887            let request_details = KeysQueryRequestDetails { sequence_number, request_ids };
888
889            *self.keys_query_request_details.lock().await = Some(request_details);
890
891            Ok(requests)
892        }
893    }
894
895    /// Receive the list of users that contained changed devices from the
896    /// `/sync` response.
897    ///
898    /// This will queue up the given user for a key query.
899    ///
900    /// Note: The user already needs to be tracked for it to be queued up for a
901    /// key query.
902    pub async fn receive_device_changes(
903        &self,
904        cache: &StoreCache,
905        users: impl Iterator<Item = &UserId>,
906    ) -> StoreResult<()> {
907        self.key_query_manager.synced(cache).await?.mark_tracked_users_as_changed(users).await
908    }
909
910    /// See the docs for [`OlmMachine::update_tracked_users()`].
911    pub async fn update_tracked_users(
912        &self,
913        users: impl IntoIterator<Item = &UserId>,
914    ) -> StoreResult<()> {
915        let cache = self.store.cache().await?;
916        self.key_query_manager.synced(&cache).await?.update_tracked_users(users.into_iter()).await
917    }
918
919    /// Retrieve a list of a user's current devices, so we can encrypt a message
920    /// to them.
921    ///
922    /// If we have not yet seen any devices for the user, and their device list
923    /// has been marked as outdated, then we wait for the `/keys/query` request
924    /// to complete. This helps ensure that we attempt at least once to fetch a
925    /// user's devices before encrypting to them.
926    pub async fn get_user_devices_for_encryption(
927        &self,
928        users: impl Iterator<Item = &UserId>,
929    ) -> StoreResult<HashMap<OwnedUserId, HashMap<OwnedDeviceId, DeviceData>>> {
930        // How long we wait for /keys/query to complete.
931        const KEYS_QUERY_WAIT_TIME: Duration = Duration::from_secs(5);
932
933        let mut devices_by_user = HashMap::new();
934        let mut users_with_no_devices_on_failed_servers = Vec::new();
935        let mut users_with_no_devices_on_unfailed_servers = Vec::new();
936
937        for user_id in users {
938            // First of all, check the store for this user.
939            let devices = self.store.get_device_data_for_user_filtered(user_id).await?;
940
941            // Now, look for users who have no devices at all.
942            //
943            // If a user has no devices at all, that implies we have never (successfully)
944            // done a `/keys/query` for them; we wait for one to complete if it is
945            // in flight. (Of course, the user might genuinely have no devices, but
946            // that's fine, it just means we redundantly grab the cache guard and
947            // check the pending-query flag.)
948            if !devices.is_empty() {
949                // This user has at least one known device.
950                //
951                // The device list may also be outdated in this case; but in this
952                // situation, we are racing between sending a message and retrieving their
953                // device list. That's an inherently racy situation and there is no real
954                // benefit to waiting for the `/keys/query` request to complete. So we don't
955                // bother.
956                //
957                // We just add their devices to the result and carry on.
958                devices_by_user.insert(user_id.to_owned(), devices);
959                continue;
960            }
961
962            // *However*, if the user's server is currently subject to a backoff due to
963            // previous failures, then `users_for_key_query` won't attempt to query
964            // for the user's devices, so there's no point waiting.
965            //
966            // XXX: this is racy. It's possible that:
967            //  * `failures` included the user's server when `users_for_key_query` was
968            //    called, so the user was not returned in the `KeyQueryRequest`, and:
969            //  * The backoff has now expired.
970            //
971            // In that case, we'll end up waiting for the *next* `users_for_key_query` call,
972            // which might not be for 30 seconds or so. (And by then, it might be `failed`
973            // again.)
974            if self.failures.contains(user_id.server_name()) {
975                users_with_no_devices_on_failed_servers.push(user_id);
976                continue;
977            }
978
979            users_with_no_devices_on_unfailed_servers.push(user_id);
980        }
981
982        if !users_with_no_devices_on_failed_servers.is_empty() {
983            info!(
984                ?users_with_no_devices_on_failed_servers,
985                "Not waiting for `/keys/query` for users whose server has previously failed"
986            );
987        }
988
989        if !users_with_no_devices_on_unfailed_servers.is_empty() {
990            // For each user with no devices, fire off a task to wait for a `/keys/query`
991            // result if one is pending.
992            //
993            // We don't actually update the `devices_by_user` map here since that could
994            // require concurrent access to it. Instead each task returns a
995            // `(OwnedUserId, HashMap)` pair (or rather, an `Option` of one) so that we can
996            // add the results to the map.
997            let results = join_all(
998                users_with_no_devices_on_unfailed_servers
999                    .into_iter()
1000                    .map(|user_id| self.get_updated_keys_for_user(KEYS_QUERY_WAIT_TIME, user_id)),
1001            )
1002            .await;
1003
1004            // Once all the tasks have completed, process the results.
1005            let mut updated_users = Vec::new();
1006            for result in results {
1007                if let Some((user_id, updated_devices)) = result? {
1008                    devices_by_user.insert(user_id.to_owned(), updated_devices);
1009                    updated_users.push(user_id);
1010                }
1011            }
1012
1013            if !updated_users.is_empty() {
1014                info!(
1015                    ?updated_users,
1016                    "Waited for `/keys/query` to complete for users who have no devices"
1017                );
1018            }
1019        }
1020
1021        Ok(devices_by_user)
1022    }
1023
1024    /// Helper for get_user_devices_for_encryption.
1025    ///
1026    /// Waits for any pending `/keys/query` for the given user. If one was
1027    /// pending, reloads the device list and returns `Some(user_id,
1028    /// device_list)`. If no request was pending, returns `None`.
1029    #[allow(clippy::type_complexity)]
1030    #[instrument(skip(self))]
1031    async fn get_updated_keys_for_user<'a>(
1032        &self,
1033        timeout_duration: Duration,
1034        user_id: &'a UserId,
1035    ) -> Result<Option<(&'a UserId, HashMap<OwnedDeviceId, DeviceData>)>, CryptoStoreError> {
1036        let cache = self.store.cache().await?;
1037        match self
1038            .key_query_manager
1039            .wait_if_user_key_query_pending(cache, timeout_duration, user_id)
1040            .await?
1041        {
1042            UserKeyQueryResult::WasPending => {
1043                Ok(Some((user_id, self.store.get_device_data_for_user_filtered(user_id).await?)))
1044            }
1045            _ => Ok(None),
1046        }
1047    }
1048
1049    /// Given a list of changed devices, update any [`InboundGroupSession`]s
1050    /// which were sent from those devices and which do not have complete
1051    /// sender data.
1052    async fn update_sender_data_from_device_changes(
1053        &self,
1054        device_changes: &DeviceChanges,
1055    ) -> Result<(), CryptoStoreError> {
1056        for device in device_changes.new.iter().chain(device_changes.changed.iter()) {
1057            // 1. Look for InboundGroupSessions from the device whose sender_data is
1058            //    UnknownDevice. For such sessions, we now have the device, and can update
1059            //    the sender_data accordingly.
1060            //
1061            // In theory, we only need to do this for new devices. In practice, I'm a bit
1062            // worried about races leading us to getting stuck in the
1063            // UnknownDevice state, so we'll paper over that by doing this check
1064            // on device updates too.
1065            self.update_sender_data_for_sessions_for_device(device, SenderDataType::UnknownDevice)
1066                .await?;
1067
1068            // 2. If, and only if, the device is now correctly cross-signed (ie,
1069            //    device.is_cross_signed_by_owner() is true, and we have the master
1070            //    cross-signing key for the owner), look for InboundGroupSessions from the
1071            //    device whose sender_data is DeviceInfo. We can also update the sender_data
1072            //    for these sessions.
1073            //
1074            // In theory, we can skip a couple of steps of the SenderDataFinder algorithm,
1075            // because we're doing the cross-signing check here. In practice,
1076            // it's *way* easier just to use the same logic.
1077            let device_owner_identity = self.store.get_user_identity(device.user_id()).await?;
1078            if device_owner_identity.is_some_and(|id| device.is_cross_signed_by_owner(&id)) {
1079                self.update_sender_data_for_sessions_for_device(device, SenderDataType::DeviceInfo)
1080                    .await?;
1081            }
1082        }
1083
1084        Ok(())
1085    }
1086
1087    /// Given a device, look for [`InboundGroupSession`]s whose sender data is
1088    /// in the given state, and update it.
1089    #[instrument(skip(self))]
1090    async fn update_sender_data_for_sessions_for_device(
1091        &self,
1092        device: &DeviceData,
1093        sender_data_type: SenderDataType,
1094    ) -> Result<(), CryptoStoreError> {
1095        const IGS_BATCH_SIZE: usize = 50;
1096
1097        let Some(curve_key) = device.curve25519_key() else { return Ok(()) };
1098
1099        let mut last_session_id: Option<String> = None;
1100        loop {
1101            let mut sessions = self
1102                .store
1103                .get_inbound_group_sessions_for_device_batch(
1104                    curve_key,
1105                    sender_data_type,
1106                    last_session_id,
1107                    IGS_BATCH_SIZE,
1108                )
1109                .await?;
1110
1111            if sessions.is_empty() {
1112                // end of the session list
1113                return Ok(());
1114            }
1115
1116            last_session_id = None;
1117            for session in &mut sessions {
1118                last_session_id = Some(session.session_id().to_owned());
1119                self.update_sender_data_for_session(session, device).await?;
1120            }
1121            self.store.save_inbound_group_sessions(&sessions).await?;
1122        }
1123    }
1124
1125    /// Update the sender data on the given inbound group session, using the
1126    /// given device data.
1127    #[instrument(skip(self, device, session), fields(session_id = session.session_id()))]
1128    async fn update_sender_data_for_session(
1129        &self,
1130        session: &mut InboundGroupSession,
1131        device: &DeviceData,
1132    ) -> Result<(), CryptoStoreError> {
1133        match SenderDataFinder::find_using_device_data(&self.store, device.clone(), session).await {
1134            Ok(sender_data) => {
1135                debug!("Updating existing InboundGroupSession with new SenderData {sender_data:?}");
1136                session.sender_data = sender_data;
1137            }
1138            Err(SessionDeviceCheckError::CryptoStoreError(e)) => {
1139                return Err(e);
1140            }
1141            Err(SessionDeviceCheckError::MismatchedIdentityKeys(e)) => {
1142                warn!(
1143                    ?session,
1144                    ?device,
1145                    "cannot update existing InboundGroupSession due to ownership error: {e}",
1146                );
1147            }
1148        }
1149
1150        Ok(())
1151    }
1152
1153    /// Mark all tracked users as dirty.
1154    ///
1155    /// All users *whose device lists we are tracking* are flagged as needing a
1156    /// key query. Users whose devices we are not tracking are ignored.
1157    pub(crate) async fn mark_all_tracked_users_as_dirty(
1158        &self,
1159        store_cache: StoreCacheGuard,
1160    ) -> StoreResult<()> {
1161        let store_wrapper = store_cache.store_wrapper();
1162        let tracked_users = store_wrapper.load_tracked_users().await?;
1163
1164        self.key_query_manager
1165            .synced(&store_cache)
1166            .await?
1167            .mark_tracked_users_as_changed(
1168                tracked_users.iter().map(|tracked_user| tracked_user.user_id.as_ref()),
1169            )
1170            .await?;
1171
1172        Ok(())
1173    }
1174}
1175
1176/// Log information about what changed after processing a /keys/query response.
1177/// Only does anything if the DEBUG log level is enabled.
1178fn debug_log_keys_query_response(
1179    devices: &DeviceChanges,
1180    identities: &IdentityChanges,
1181    request_id: &TransactionId,
1182) {
1183    #[allow(unknown_lints, clippy::unwrap_or_default)] // false positive
1184    let changed_devices = devices.changed.iter().fold(BTreeMap::new(), |mut acc, d| {
1185        acc.entry(d.user_id()).or_insert_with(BTreeSet::new).insert(d.device_id());
1186        acc
1187    });
1188
1189    #[allow(unknown_lints, clippy::unwrap_or_default)] // false positive
1190    let new_devices = devices.new.iter().fold(BTreeMap::new(), |mut acc, d| {
1191        acc.entry(d.user_id()).or_insert_with(BTreeSet::new).insert(d.device_id());
1192        acc
1193    });
1194
1195    #[allow(unknown_lints, clippy::unwrap_or_default)] // false positive
1196    let deleted_devices = devices.deleted.iter().fold(BTreeMap::new(), |mut acc, d| {
1197        acc.entry(d.user_id()).or_insert_with(BTreeSet::new).insert(d.device_id());
1198        acc
1199    });
1200
1201    let new_identities = identities.new.iter().map(|i| i.user_id()).collect::<BTreeSet<_>>();
1202    let changed_identities =
1203        identities.changed.iter().map(|i| i.user_id()).collect::<BTreeSet<_>>();
1204
1205    debug!(
1206        ?request_id,
1207        ?new_devices,
1208        ?changed_devices,
1209        ?deleted_devices,
1210        ?new_identities,
1211        ?changed_identities,
1212        "Finished handling of the `/keys/query` response"
1213    );
1214}
1215
1216#[cfg(any(test, feature = "testing"))]
1217#[allow(dead_code)]
1218pub(crate) mod testing {
1219    use std::sync::Arc;
1220
1221    use matrix_sdk_test::ruma_response_from_json;
1222    use ruma::{
1223        api::client::keys::get_keys::v3::Response as KeyQueryResponse, device_id, user_id,
1224        DeviceId, UserId,
1225    };
1226    use serde_json::json;
1227    use tokio::sync::Mutex;
1228
1229    use crate::{
1230        identities::IdentityManager,
1231        olm::{Account, PrivateCrossSigningIdentity},
1232        store::{types::PendingChanges, CryptoStoreWrapper, MemoryStore, Store},
1233        types::{requests::UploadSigningKeysRequest, DeviceKeys},
1234        verification::VerificationMachine,
1235    };
1236
1237    pub fn user_id() -> &'static UserId {
1238        user_id!("@example:localhost")
1239    }
1240
1241    pub fn other_user_id() -> &'static UserId {
1242        user_id!("@example2:localhost")
1243    }
1244
1245    pub fn device_id() -> &'static DeviceId {
1246        device_id!("WSKKLTJZCL")
1247    }
1248
1249    pub(crate) async fn manager_test_helper(
1250        user_id: &UserId,
1251        device_id: &DeviceId,
1252    ) -> IdentityManager {
1253        let identity = PrivateCrossSigningIdentity::new(user_id.into());
1254        let identity = Arc::new(Mutex::new(identity));
1255        let user_id = user_id.to_owned();
1256        let account = Account::with_device_id(&user_id, device_id);
1257        let static_account = account.static_data().clone();
1258        let store = Arc::new(CryptoStoreWrapper::new(&user_id, device_id, MemoryStore::new()));
1259        let verification =
1260            VerificationMachine::new(static_account.clone(), identity.clone(), store.clone());
1261        let store = Store::new(static_account, identity, store, verification);
1262        store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1263        IdentityManager::new(store)
1264    }
1265
1266    pub fn other_key_query() -> KeyQueryResponse {
1267        let data = &json!({
1268            "device_keys": {
1269                "@example2:localhost": {
1270                    "SKISMLNIMH": {
1271                        "algorithms": ["m.olm.v1.curve25519-aes-sha2", "m.megolm.v1.aes-sha2"],
1272                        "device_id": "SKISMLNIMH",
1273                        "keys": {
1274                            "curve25519:SKISMLNIMH": "qO9xFazIcW8dE0oqHGMojGgJwbBpMOhGnIfJy2pzvmI",
1275                            "ed25519:SKISMLNIMH": "y3wV3AoyIGREqrJJVH8DkQtlwHBUxoZ9ApP76kFgXQ8"
1276                        },
1277                        "signatures": {
1278                            "@example2:localhost": {
1279                                "ed25519:SKISMLNIMH": "YwbT35rbjKoYFZVU1tQP8MsL06+znVNhNzUMPt6jTEYRBFoC4GDq9hQEJBiFSq37r1jvLMteggVAWw37fs1yBA",
1280                                "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "PWuuTE/aTkp1EJQkPHhRx2BxbF+wjMIDFxDRp7JAerlMkDsNFUTfRRusl6vqROPU36cl+yY8oeJTZGFkU6+pBQ"
1281                            }
1282                        },
1283                        "user_id": "@example2:localhost",
1284                        "unsigned": {
1285                            "device_display_name": "Riot Desktop (Linux)"
1286                        }
1287                    }
1288                }
1289            },
1290            "failures": {},
1291            "master_keys": {
1292                "@example2:localhost": {
1293                    "user_id": "@example2:localhost",
1294                    "usage": ["master"],
1295                    "keys": {
1296                        "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do"
1297                    },
1298                    "signatures": {
1299                        "@example2:localhost": {
1300                            "ed25519:SKISMLNIMH": "KdUZqzt8VScGNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1301                        }
1302                    }
1303                }
1304            },
1305            "self_signing_keys": {
1306                "@example2:localhost": {
1307                    "user_id": "@example2:localhost",
1308                    "usage": ["self_signing"],
1309                    "keys": {
1310                        "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc"
1311                    },
1312                    "signatures": {
1313                        "@example2:localhost": {
1314                            "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "W/O8BnmiUETPpH02mwYaBgvvgF/atXnusmpSTJZeUSH/vHg66xiZOhveQDG4cwaW8iMa+t9N4h1DWnRoHB4mCQ"
1315                        }
1316                    }
1317                }
1318            },
1319            "user_signing_keys": {}
1320        });
1321        ruma_response_from_json(data)
1322    }
1323
1324    // An updated version of `other_key_query` featuring an additional signature on
1325    // the master key *Note*: The added signature is actually not valid, but a
1326    // valid signature  is not required for our test.
1327    pub fn other_key_query_cross_signed() -> KeyQueryResponse {
1328        let data = json!({
1329            "device_keys": {
1330                "@example2:localhost": {
1331                    "SKISMLNIMH": {
1332                        "algorithms": ["m.olm.v1.curve25519-aes-sha2", "m.megolm.v1.aes-sha2"],
1333                        "device_id": "SKISMLNIMH",
1334                        "keys": {
1335                            "curve25519:SKISMLNIMH": "qO9xFazIcW8dE0oqHGMojGgJwbBpMOhGnIfJy2pzvmI",
1336                            "ed25519:SKISMLNIMH": "y3wV3AoyIGREqrJJVH8DkQtlwHBUxoZ9ApP76kFgXQ8"
1337                        },
1338                        "signatures": {
1339                            "@example2:localhost": {
1340                                "ed25519:SKISMLNIMH": "YwbT35rbjKoYFZVU1tQP8MsL06+znVNhNzUMPt6jTEYRBFoC4GDq9hQEJBiFSq37r1jvLMteggVAWw37fs1yBA",
1341                                "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "PWuuTE/aTkp1EJQkPHhRx2BxbF+wjMIDFxDRp7JAerlMkDsNFUTfRRusl6vqROPU36cl+yY8oeJTZGFkU6+pBQ"
1342                            }
1343                        },
1344                        "user_id": "@example2:localhost",
1345                        "unsigned": {
1346                            "device_display_name": "Riot Desktop (Linux)"
1347                        }
1348                    }
1349                }
1350            },
1351            "failures": {},
1352            "master_keys": {
1353                "@example2:localhost": {
1354                    "user_id": "@example2:localhost",
1355                    "usage": ["master"],
1356                    "keys": {
1357                        "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do"
1358                    },
1359                    "signatures": {
1360                        "@example2:localhost": {
1361                            "ed25519:SKISMLNIMH": "KdUZqzt8VScGNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1362                        },
1363                        // This is the added signature from alice USK compared to `other_key_query`. Note that actual signature is not valid.
1364                        "@alice:localhost": {
1365                            "ed25519:DU9z4gBFKFKCk7a13sW9wjT0Iyg7Hqv5f0BPM7DEhPo": "NotAValidSignature+GNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1366                        }
1367                    }
1368                }
1369            },
1370            "self_signing_keys": {
1371                "@example2:localhost": {
1372                    "user_id": "@example2:localhost",
1373                    "usage": ["self_signing"],
1374                    "keys": {
1375                        "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc"
1376                    },
1377                    "signatures": {
1378                        "@example2:localhost": {
1379                            "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "W/O8BnmiUETPpH02mwYaBgvvgF/atXnusmpSTJZeUSH/vHg66xiZOhveQDG4cwaW8iMa+t9N4h1DWnRoHB4mCQ"
1380                        }
1381                    }
1382                }
1383            },
1384            "user_signing_keys": {}
1385        });
1386        ruma_response_from_json(&data)
1387    }
1388
1389    /// Mocked response to a /keys/query request.
1390    pub fn own_key_query_with_user_id(user_id: &UserId) -> KeyQueryResponse {
1391        let data = json!({
1392          "device_keys": {
1393            user_id: {
1394              "WSKKLTJZCL": {
1395                "algorithms": [
1396                  "m.olm.v1.curve25519-aes-sha2",
1397                  "m.megolm.v1.aes-sha2"
1398                ],
1399                "device_id": "WSKKLTJZCL",
1400                "keys": {
1401                  "curve25519:WSKKLTJZCL": "wnip2tbJBJxrFayC88NNJpm61TeSNgYcqBH4T9yEDhU",
1402                  "ed25519:WSKKLTJZCL": "lQ+eshkhgKoo+qp9Qgnj3OX5PBoWMU5M9zbuEevwYqE"
1403                },
1404                "signatures": {
1405                  user_id: {
1406                    "ed25519:WSKKLTJZCL": "SKpIUnq7QK0xleav0PrIQyKjVm+TgZr7Yi8cKjLeZDtkgyToE2d4/e3Aj79dqOlLB92jFVE4d1cM/Ry04wFwCA",
1407                    "ed25519:0C8lCBxrvrv/O7BQfsKnkYogHZX3zAgw3RfJuyiq210": "9UGu1iC5YhFCdELGfB29YaV+QE0t/X5UDSsPf4QcdZyXIwyp9zBbHX2lh9vWudNQ+akZpaq7ZRaaM+4TCnw/Ag"
1408                  }
1409                },
1410                "user_id": user_id,
1411                "unsigned": {
1412                  "device_display_name": "Cross signing capable"
1413                }
1414              },
1415              "LVWOVGOXME": {
1416                "algorithms": [
1417                  "m.olm.v1.curve25519-aes-sha2",
1418                  "m.megolm.v1.aes-sha2"
1419                ],
1420                "device_id": "LVWOVGOXME",
1421                "keys": {
1422                  "curve25519:LVWOVGOXME": "KMfWKUhnDW1D11hNzATs/Ax1FQRsJxKCWzq0NyGtIiI",
1423                  "ed25519:LVWOVGOXME": "k+NC3L7CBD6fBClcHBrKLOkqCyGNSKhWXiH5Q2STRnA"
1424                },
1425                "signatures": {
1426                  user_id: {
1427                    "ed25519:LVWOVGOXME": "39Ir5Bttpc5+bQwzLj7rkjm5E5/cp/JTbMJ/t0enj6J5w9MXVBFOUqqM2hpaRaRwILMMpwYbJ8IOGjl0Y/MGAw"
1428                  }
1429                },
1430                "user_id": user_id,
1431                "unsigned": {
1432                  "device_display_name": "Non-cross signing"
1433                }
1434              }
1435            }
1436          },
1437          "failures": {},
1438          "master_keys": {
1439            user_id: {
1440              "user_id": user_id,
1441              "usage": [
1442                "master"
1443              ],
1444              "keys": {
1445                "ed25519:rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0": "rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0"
1446              },
1447              "signatures": {
1448                user_id: {
1449                  "ed25519:WSKKLTJZCL": "ZzJp1wtmRdykXAUEItEjNiFlBrxx8L6/Vaen9am8AuGwlxxJtOkuY4m+4MPLvDPOgavKHLsrRuNLAfCeakMlCQ"
1450                }
1451              }
1452            }
1453          },
1454          "self_signing_keys": {
1455            user_id: {
1456              "user_id": user_id,
1457              "usage": [
1458                "self_signing"
1459              ],
1460              "keys": {
1461                "ed25519:0C8lCBxrvrv/O7BQfsKnkYogHZX3zAgw3RfJuyiq210": "0C8lCBxrvrv/O7BQfsKnkYogHZX3zAgw3RfJuyiq210"
1462              },
1463              "signatures": {
1464                user_id: {
1465                  "ed25519:rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0": "AC7oDUW4rUhtInwb4lAoBJ0wAuu4a5k+8e34B5+NKsDB8HXRwgVwUWN/MRWc/sJgtSbVlhzqS9THEmQQ1C51Bw"
1466                }
1467              }
1468            }
1469          },
1470          "user_signing_keys": {
1471            user_id: {
1472              "user_id": user_id,
1473              "usage": [
1474                "user_signing"
1475              ],
1476              "keys": {
1477                "ed25519:DU9z4gBFKFKCk7a13sW9wjT0Iyg7Hqv5f0BPM7DEhPo": "DU9z4gBFKFKCk7a13sW9wjT0Iyg7Hqv5f0BPM7DEhPo"
1478              },
1479              "signatures": {
1480                user_id: {
1481                  "ed25519:rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0": "C4L2sx9frGqj8w41KyynHGqwUbbwBYRZpYCB+6QWnvQFA5Oi/1PJj8w5anwzEsoO0TWmLYmf7FXuAGewanOWDg"
1482                }
1483              }
1484            }
1485          }
1486        });
1487        ruma_response_from_json(&data)
1488    }
1489
1490    pub fn own_key_query() -> KeyQueryResponse {
1491        own_key_query_with_user_id(user_id())
1492    }
1493
1494    pub fn key_query(
1495        identity: UploadSigningKeysRequest,
1496        device_keys: DeviceKeys,
1497    ) -> KeyQueryResponse {
1498        let json = json!({
1499            "device_keys": {
1500                "@example:localhost": {
1501                    device_keys.device_id.to_string(): device_keys
1502                }
1503            },
1504            "failures": {},
1505            "master_keys": {
1506                "@example:localhost": identity.master_key
1507            },
1508            "self_signing_keys": {
1509                "@example:localhost": identity.self_signing_key
1510            },
1511            "user_signing_keys": {
1512                "@example:localhost": identity.user_signing_key
1513            },
1514          }
1515        );
1516
1517        ruma_response_from_json(&json)
1518    }
1519}
1520
1521#[cfg(test)]
1522pub(crate) mod tests {
1523    use std::ops::Deref;
1524
1525    use futures_util::pin_mut;
1526    use matrix_sdk_test::{async_test, ruma_response_from_json, test_json};
1527    use ruma::{
1528        api::client::keys::get_keys::v3::Response as KeysQueryResponse, device_id, user_id,
1529        TransactionId,
1530    };
1531    use serde_json::json;
1532    use stream_assert::{assert_closed, assert_pending, assert_ready};
1533
1534    use super::testing::{
1535        device_id, key_query, manager_test_helper, other_key_query, other_user_id, user_id,
1536    };
1537    use crate::{
1538        identities::manager::testing::{other_key_query_cross_signed, own_key_query},
1539        olm::PrivateCrossSigningIdentity,
1540        store::types::Changes,
1541        CrossSigningKeyExport, OlmMachine,
1542    };
1543
1544    fn key_query_with_failures() -> KeysQueryResponse {
1545        let response = json!({
1546            "device_keys": {
1547            },
1548            "failures": {
1549                "example.org": {
1550                    "errcode": "M_RESOURCE_LIMIT_EXCEEDED",
1551                    "error": "Not yet ready to retry",
1552                }
1553            }
1554        });
1555
1556        ruma_response_from_json(&response)
1557    }
1558
1559    #[async_test]
1560    async fn test_tracked_users() {
1561        let manager = manager_test_helper(user_id(), device_id()).await;
1562        let alice = user_id!("@alice:example.org");
1563
1564        let cache = manager.store.cache().await.unwrap();
1565        let key_query_manager = manager.key_query_manager.synced(&cache).await.unwrap();
1566
1567        assert!(key_query_manager.tracked_users().is_empty(), "No users are initially tracked");
1568
1569        manager.receive_device_changes(&cache, [alice].iter().map(Deref::deref)).await.unwrap();
1570
1571        assert!(
1572            !key_query_manager.tracked_users().contains(alice),
1573            "Receiving a device changes update for a user we don't track does nothing"
1574        );
1575
1576        assert!(
1577            !key_query_manager.users_for_key_query().await.0.contains(alice),
1578            "The user we don't track doesn't end up in the `/keys/query` request"
1579        );
1580    }
1581
1582    #[async_test]
1583    async fn test_manager_creation() {
1584        let manager = manager_test_helper(user_id(), device_id()).await;
1585        let cache = manager.store.cache().await.unwrap();
1586        assert!(manager.key_query_manager.synced(&cache).await.unwrap().tracked_users().is_empty())
1587    }
1588
1589    #[async_test]
1590    async fn test_manager_key_query_response() {
1591        let manager = manager_test_helper(user_id(), device_id()).await;
1592        let other_user = other_user_id();
1593        let devices = manager.store.get_user_devices(other_user).await.unwrap();
1594        assert_eq!(devices.devices().count(), 0);
1595
1596        manager
1597            .receive_keys_query_response(&TransactionId::new(), &other_key_query())
1598            .await
1599            .unwrap();
1600
1601        let devices = manager.store.get_user_devices(other_user).await.unwrap();
1602        assert_eq!(devices.devices().count(), 1);
1603
1604        let device = manager
1605            .store
1606            .get_device_data(other_user, device_id!("SKISMLNIMH"))
1607            .await
1608            .unwrap()
1609            .unwrap();
1610        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
1611        let identity = identity.other().unwrap();
1612
1613        assert!(identity.is_device_signed(&device));
1614    }
1615
1616    #[async_test]
1617    async fn test_manager_own_key_query_response() {
1618        let manager = manager_test_helper(user_id(), device_id()).await;
1619        let our_user = user_id();
1620        let devices = manager.store.get_user_devices(our_user).await.unwrap();
1621        assert_eq!(devices.devices().count(), 0);
1622
1623        let private_identity = manager.store.private_identity();
1624        let private_identity = private_identity.lock().await;
1625        let identity_request = private_identity.as_upload_request().await;
1626        drop(private_identity);
1627
1628        let device_keys =
1629            manager.store.cache().await.unwrap().account().await.unwrap().device_keys();
1630        manager
1631            .receive_keys_query_response(
1632                &TransactionId::new(),
1633                &key_query(identity_request, device_keys),
1634            )
1635            .await
1636            .unwrap();
1637
1638        let identity = manager
1639            .store
1640            .get_user_identity(our_user)
1641            .await
1642            .unwrap()
1643            .expect("missing user identity");
1644        let identity = identity.own().expect("missing own identity");
1645        assert!(identity.is_verified());
1646
1647        let devices = manager.store.get_user_devices(our_user).await.unwrap();
1648        assert_eq!(devices.devices().count(), 1);
1649
1650        let device =
1651            manager.store.get_device_data(our_user, device_id!(device_id())).await.unwrap();
1652
1653        assert!(device.is_some());
1654    }
1655
1656    #[async_test]
1657    async fn test_private_identity_invalidation_after_public_keys_change() {
1658        let user_id = user_id!("@example1:localhost");
1659        let manager = manager_test_helper(user_id, "DEVICEID".into()).await;
1660
1661        let identity_request = {
1662            let private_identity = manager.store.private_identity();
1663            let private_identity = private_identity.lock().await;
1664            private_identity.as_upload_request().await
1665        };
1666        let device_keys = manager.store.static_account().unsigned_device_keys();
1667
1668        let response = json!({
1669            "device_keys": {
1670                user_id: {
1671                    device_keys.device_id.to_string(): device_keys
1672                }
1673            },
1674            "master_keys": {
1675                user_id: identity_request.master_key,
1676            },
1677            "self_signing_keys": {
1678                user_id: identity_request.self_signing_key,
1679            },
1680            "user_signing_keys": {
1681                user_id: identity_request.user_signing_key,
1682            }
1683        });
1684
1685        let response = ruma_response_from_json(&response);
1686        manager.receive_keys_query_response(&TransactionId::new(), &response).await.unwrap();
1687
1688        let identity = manager.store.get_user_identity(user_id).await.unwrap().unwrap();
1689        let identity = identity.own().unwrap();
1690        assert!(identity.is_verified());
1691
1692        let identity_request = {
1693            let private_identity = PrivateCrossSigningIdentity::new(user_id.into());
1694            private_identity.as_upload_request().await
1695        };
1696
1697        let response = json!({
1698            "master_keys": {
1699                user_id: identity_request.master_key,
1700                "@example2:localhost": {
1701                    "user_id": "@example2:localhost",
1702                    "usage": ["master"],
1703                    "keys": {
1704                        "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do"
1705                    },
1706                    "signatures": {
1707                        "@example2:localhost": {
1708                            "ed25519:SKISMLNIMH": "KdUZqzt8VScGNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1709                        }
1710                    }
1711                },
1712            },
1713            "self_signing_keys": {
1714                user_id: identity_request.self_signing_key,
1715                "@example2:localhost": {
1716                    "user_id": "@example2:localhost",
1717                    "usage": ["self_signing"],
1718                    "keys": {
1719                        "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc"
1720                    },
1721                    "signatures": {
1722                        "@example2:localhost": {
1723                            "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "W/O8BnmiUETPpH02mwYaBgvvgF/atXnusmpSTJZeUSH/vHg66xiZOhveQDG4cwaW8iMa+t9N4h1DWnRoHB4mCQ"
1724                        }
1725                    }
1726                }
1727            },
1728            "user_signing_keys": {
1729                user_id: identity_request.user_signing_key,
1730            }
1731        });
1732
1733        let response = ruma_response_from_json(&response);
1734        let (_, private_identity) = manager.handle_cross_signing_keys(&response).await.unwrap();
1735
1736        assert!(private_identity.is_some());
1737        let private_identity = manager.store.private_identity();
1738        assert!(private_identity.lock().await.is_empty().await);
1739    }
1740
1741    #[async_test]
1742    async fn test_no_tracked_users_key_query_request() {
1743        let manager = manager_test_helper(user_id(), device_id()).await;
1744
1745        let cache = manager.store.cache().await.unwrap();
1746        assert!(
1747            manager.key_query_manager.synced(&cache).await.unwrap().tracked_users().is_empty(),
1748            "No users are initially tracked"
1749        );
1750
1751        let requests = manager.users_for_key_query().await.unwrap();
1752        assert!(!requests.is_empty(), "We query the keys for our own user");
1753
1754        assert!(
1755            manager
1756                .key_query_manager
1757                .synced(&cache)
1758                .await
1759                .unwrap()
1760                .tracked_users()
1761                .contains(manager.user_id()),
1762            "Our own user is now tracked"
1763        );
1764    }
1765
1766    /// If a user is invalidated while a /keys/query request is in flight, that
1767    /// user is not removed from the list of outdated users when the
1768    /// response is received
1769    #[async_test]
1770    async fn test_invalidation_race_handling() {
1771        let manager = manager_test_helper(user_id(), device_id()).await;
1772        let alice = other_user_id();
1773        manager.update_tracked_users([alice]).await.unwrap();
1774
1775        // alice should be in the list of key queries
1776        let (reqid, req) = manager.users_for_key_query().await.unwrap().pop_first().unwrap();
1777        assert!(req.device_keys.contains_key(alice));
1778
1779        // another invalidation turns up
1780        {
1781            let cache = manager.store.cache().await.unwrap();
1782            manager.receive_device_changes(&cache, [alice].into_iter()).await.unwrap();
1783        }
1784
1785        // the response from the query arrives
1786        manager.receive_keys_query_response(&reqid, &other_key_query()).await.unwrap();
1787
1788        // alice should *still* be in the list of key queries
1789        let (reqid, req) = manager.users_for_key_query().await.unwrap().pop_first().unwrap();
1790        assert!(req.device_keys.contains_key(alice));
1791
1792        // another key query response
1793        manager.receive_keys_query_response(&reqid, &other_key_query()).await.unwrap();
1794
1795        // finally alice should not be in the list
1796        let queries = manager.users_for_key_query().await.unwrap();
1797        assert!(!queries.iter().any(|(_, r)| r.device_keys.contains_key(alice)));
1798    }
1799
1800    #[async_test]
1801    async fn test_failure_handling() {
1802        let manager = manager_test_helper(user_id(), device_id()).await;
1803        let alice = user_id!("@alice:example.org");
1804
1805        {
1806            let cache = manager.store.cache().await.unwrap();
1807            let key_query_manager = manager.key_query_manager.synced(&cache).await.unwrap();
1808            assert!(key_query_manager.tracked_users().is_empty(), "No users are initially tracked");
1809
1810            key_query_manager.mark_user_as_changed(alice).await.unwrap();
1811
1812            assert!(
1813                key_query_manager.tracked_users().contains(alice),
1814                "Alice is tracked after being marked as tracked"
1815            );
1816        }
1817
1818        let (reqid, req) = manager.users_for_key_query().await.unwrap().pop_first().unwrap();
1819        assert!(req.device_keys.contains_key(alice));
1820
1821        // a failure should stop us querying for the user's keys.
1822        let response = key_query_with_failures();
1823        manager.receive_keys_query_response(&reqid, &response).await.unwrap();
1824        assert!(manager.failures.contains(alice.server_name()));
1825        assert!(!manager
1826            .users_for_key_query()
1827            .await
1828            .unwrap()
1829            .iter()
1830            .any(|(_, r)| r.device_keys.contains_key(alice)));
1831
1832        // clearing the failure flag should make the user reappear in the query list.
1833        manager.failures.remove([alice.server_name().to_owned()].iter());
1834        assert!(manager
1835            .users_for_key_query()
1836            .await
1837            .unwrap()
1838            .iter()
1839            .any(|(_, r)| r.device_keys.contains_key(alice)));
1840    }
1841
1842    #[async_test]
1843    async fn test_out_of_band_key_query() {
1844        // build the request
1845        let manager = manager_test_helper(user_id(), device_id()).await;
1846        let (reqid, req) = manager.build_key_query_for_users(vec![user_id()]);
1847        assert!(req.device_keys.contains_key(user_id()));
1848
1849        // make up a response and check it is processed
1850        let (device_changes, identity_changes) =
1851            manager.receive_keys_query_response(&reqid, &own_key_query()).await.unwrap();
1852        assert_eq!(device_changes.new.len(), 1);
1853        assert_eq!(device_changes.new[0].device_id(), "LVWOVGOXME");
1854        assert_eq!(identity_changes.new.len(), 1);
1855        assert_eq!(identity_changes.new[0].user_id(), user_id());
1856
1857        let devices = manager.store.get_user_devices(user_id()).await.unwrap();
1858        assert_eq!(devices.devices().count(), 1);
1859        assert_eq!(devices.devices().next().unwrap().device_id(), "LVWOVGOXME");
1860    }
1861
1862    #[async_test]
1863    async fn test_invalid_key_response() {
1864        let my_user_id = user_id();
1865        let my_device_id = device_id();
1866        let manager = manager_test_helper(my_user_id, my_device_id).await;
1867
1868        // First of all, populate the store with good data
1869        let (reqid, _) = manager.build_key_query_for_users(vec![user_id()]);
1870        let (device_changes, identity_changes) =
1871            manager.receive_keys_query_response(&reqid, &own_key_query()).await.unwrap();
1872        assert_eq!(device_changes.new.len(), 1);
1873        let test_device_id = device_changes.new.first().unwrap().device_id().to_owned();
1874        let changes =
1875            Changes { devices: device_changes, identities: identity_changes, ..Changes::default() };
1876        manager.store.save_changes(changes).await.unwrap();
1877
1878        // Now provide an invalid update
1879        let (reqid, _) = manager.build_key_query_for_users(vec![my_user_id]);
1880        let response = ruma_response_from_json(&json!({
1881            "device_keys": {
1882                my_user_id: {
1883                    test_device_id.as_str(): {
1884                        "algorithms": [
1885                            "m.olm.v1.curve25519-aes-sha2",
1886                        ],
1887                        "device_id": test_device_id.as_str(),
1888                        "keys": {
1889                            format!("curve25519:{}", test_device_id): "wnip2tbJBJxrFayC88NNJpm61TeSNgYcqBH4T9yEDhU",
1890                            format!("ed25519:{}", test_device_id): "lQ+eshkhgKoo+qp9Qgnj3OX5PBoWMU5M9zbuEevwYqE"
1891                        },
1892                        "signatures": {
1893                            my_user_id: {
1894                                // Not a valid signature.
1895                                format!("ed25519:{}", test_device_id): "imadethisup",
1896                            }
1897                        },
1898                        "user_id": my_user_id,
1899                    }
1900                }
1901            }
1902        }));
1903
1904        let (device_changes, identity_changes) =
1905            manager.receive_keys_query_response(&reqid, &response).await.unwrap();
1906
1907        // The result should be empty
1908        assert_eq!(device_changes.new.len(), 0);
1909        assert_eq!(device_changes.changed.len(), 0);
1910        assert_eq!(device_changes.deleted.len(), 0);
1911        assert_eq!(identity_changes.new.len(), 0);
1912
1913        // And the device should not have been updated.
1914        let device =
1915            manager.store.get_user_devices(my_user_id).await.unwrap().get(&test_device_id).unwrap();
1916        assert_eq!(device.algorithms().len(), 2);
1917    }
1918
1919    #[async_test]
1920    async fn test_devices_stream() {
1921        let manager = manager_test_helper(user_id(), device_id()).await;
1922        let (request_id, _) = manager.build_key_query_for_users(vec![user_id()]);
1923
1924        let stream = manager.store.devices_stream();
1925        pin_mut!(stream);
1926
1927        manager.receive_keys_query_response(&request_id, &own_key_query()).await.unwrap();
1928
1929        let update = assert_ready!(stream);
1930        assert!(!update.new.is_empty(), "The device update should contain some devices");
1931    }
1932
1933    #[async_test]
1934    async fn test_identities_stream() {
1935        let manager = manager_test_helper(user_id(), device_id()).await;
1936        let (request_id, _) = manager.build_key_query_for_users(vec![user_id()]);
1937
1938        let stream = manager.store.user_identities_stream();
1939        pin_mut!(stream);
1940
1941        manager.receive_keys_query_response(&request_id, &own_key_query()).await.unwrap();
1942
1943        let update = assert_ready!(stream);
1944        assert!(!update.new.is_empty(), "The identities update should contain some identities");
1945    }
1946
1947    #[async_test]
1948    async fn test_identities_stream_raw() {
1949        let mut manager = Some(manager_test_helper(user_id(), device_id()).await);
1950        let (request_id, _) = manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]);
1951
1952        let stream = manager.as_ref().unwrap().store.identities_stream_raw();
1953        pin_mut!(stream);
1954
1955        manager
1956            .as_ref()
1957            .unwrap()
1958            .receive_keys_query_response(&request_id, &own_key_query())
1959            .await
1960            .unwrap();
1961
1962        let (identity_update, _) = assert_ready!(stream);
1963        assert_eq!(identity_update.new.len(), 1);
1964        assert_eq!(identity_update.changed.len(), 0);
1965        assert_eq!(identity_update.unchanged.len(), 0);
1966        assert_eq!(identity_update.new[0].user_id(), user_id());
1967
1968        assert_pending!(stream);
1969
1970        let (new_request_id, _) =
1971            manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]);
1972
1973        // A second `/keys/query` response with the same result shouldn't fire a change
1974        // notification: the identity and device should be unchanged.
1975        manager
1976            .as_ref()
1977            .unwrap()
1978            .receive_keys_query_response(&new_request_id, &own_key_query())
1979            .await
1980            .unwrap();
1981
1982        assert_pending!(stream);
1983
1984        // dropping the manager (and hence dropping the store) should close the stream
1985        manager.take();
1986        assert_closed!(stream);
1987    }
1988
1989    #[async_test]
1990    async fn test_identities_stream_raw_signature_update() {
1991        let mut manager = Some(manager_test_helper(user_id(), device_id()).await);
1992        let (request_id, _) =
1993            manager.as_ref().unwrap().build_key_query_for_users(vec![other_user_id()]);
1994
1995        let stream = manager.as_ref().unwrap().store.identities_stream_raw();
1996        pin_mut!(stream);
1997
1998        manager
1999            .as_ref()
2000            .unwrap()
2001            .receive_keys_query_response(&request_id, &other_key_query())
2002            .await
2003            .unwrap();
2004
2005        let (identity_update, _) = assert_ready!(stream);
2006        assert_eq!(identity_update.new.len(), 1);
2007        assert_eq!(identity_update.changed.len(), 0);
2008        assert_eq!(identity_update.unchanged.len(), 0);
2009        assert_eq!(identity_update.new[0].user_id(), other_user_id());
2010
2011        let initial_msk = identity_update.new[0].master_key().clone();
2012
2013        let (new_request_id, _) =
2014            manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]);
2015        // There is a new signature on the msk, should trigger a change
2016        manager
2017            .as_ref()
2018            .unwrap()
2019            .receive_keys_query_response(&new_request_id, &other_key_query_cross_signed())
2020            .await
2021            .unwrap();
2022
2023        let (identity_update_2, _) = assert_ready!(stream);
2024        assert_eq!(identity_update_2.new.len(), 0);
2025        assert_eq!(identity_update_2.changed.len(), 1);
2026        assert_eq!(identity_update_2.unchanged.len(), 0);
2027
2028        let updated_msk = identity_update_2.changed[0].master_key().clone();
2029
2030        // Identity has a change (new signature) but it's the same msk
2031        assert_eq!(initial_msk, updated_msk);
2032
2033        assert_pending!(stream);
2034
2035        manager.take();
2036    }
2037
2038    #[async_test]
2039    async fn test_key_query_with_unknown_properties() {
2040        let manager = manager_test_helper(user_id(), device_id()).await;
2041        let other_user = user_id!("@example:localhost");
2042        let devices = manager.store.get_user_devices(other_user).await.unwrap();
2043        assert_eq!(devices.devices().count(), 0);
2044
2045        let response = json!({
2046            "device_keys": {
2047                "@example:localhost": {
2048                    "OBEBOSKTBE": {
2049                        "algorithms": ["m.olm.v1.curve25519-aes-sha2", "m.megolm.v1.aes-sha2"],
2050                        "user_id": "@example:localhost",
2051                        "device_id": "OBEBOSKTBE",
2052                        "extra_property": "somevalue",
2053                        "keys": {
2054                            "curve25519:OBEBOSKTBE": "ECrdZebl0DskwbkxoztsiKPb6ivu7M2qQ70BFWwre3w",
2055                            "ed25519:OBEBOSKTBE": "hFWo+pG6TVWNzq/ZubUQVL5Ardu9rqHxpKkCbf1/KiA"
2056                        },
2057                        "signatures": {
2058                            "@example:localhost": {
2059                                "ed25519:OBEBOSKTBE": "6vyYUgX+IoT1x6Mvf0g/GEPVb2UI3brfL7WZ75WZ81sH4FBFgAzkkuGpw9suGLKXnlEdLH0suBzaT4esVhFDCw",
2060                            },
2061                        },
2062                    },
2063                },
2064            },
2065        });
2066
2067        let response = ruma_response_from_json(&response);
2068        manager.receive_keys_query_response(&TransactionId::new(), &response).await.unwrap();
2069
2070        let devices = manager.store.get_user_devices(other_user).await.unwrap();
2071        assert_eq!(devices.devices().count(), 1);
2072
2073        manager.store.get_device_data(other_user, device_id!("OBEBOSKTBE")).await.unwrap().unwrap();
2074    }
2075
2076    #[async_test]
2077    async fn test_manager_identity_updates() {
2078        use test_json::keys_query_sets::IdentityChangeDataSet as DataSet;
2079
2080        let manager = manager_test_helper(user_id(), device_id()).await;
2081        let other_user = DataSet::user_id();
2082        let devices = manager.store.get_user_devices(other_user).await.unwrap();
2083        assert_eq!(devices.devices().count(), 0);
2084
2085        let identity = manager.store.get_user_identity(other_user).await.unwrap();
2086        assert!(identity.is_none());
2087
2088        manager
2089            .receive_keys_query_response(
2090                &TransactionId::new(),
2091                &DataSet::key_query_with_identity_a(),
2092            )
2093            .await
2094            .unwrap();
2095
2096        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2097        let other_identity = identity.other().unwrap();
2098
2099        // We should now have an identity for the user but no pin violation
2100        // (pinned master key is the current one)
2101        assert!(!other_identity.has_pin_violation());
2102        let first_device =
2103            manager.store.get_device_data(other_user, DataSet::device_a()).await.unwrap().unwrap();
2104        assert!(first_device.is_cross_signed_by_owner(&identity));
2105
2106        // We receive a new keys update for that user, with a new identity
2107        manager
2108            .receive_keys_query_response(
2109                &TransactionId::new(),
2110                &DataSet::key_query_with_identity_b(),
2111            )
2112            .await
2113            .unwrap();
2114
2115        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2116        let other_identity = identity.other().unwrap();
2117
2118        // The previous known identity has been replaced, there should be a pin
2119        // violation
2120        assert!(other_identity.has_pin_violation());
2121
2122        let second_device =
2123            manager.store.get_device_data(other_user, DataSet::device_b()).await.unwrap().unwrap();
2124
2125        // There is a new device signed by the new identity
2126        assert!(second_device.is_cross_signed_by_owner(&identity));
2127
2128        // The first device should not be signed by the new identity
2129        let first_device =
2130            manager.store.get_device_data(other_user, DataSet::device_a()).await.unwrap().unwrap();
2131        assert!(!first_device.is_cross_signed_by_owner(&identity));
2132
2133        let remember_previous_identity = other_identity.clone();
2134        // We receive updated keys for that user, with no identity anymore.
2135        // Notice that there is no server API to delete identity, but we want to
2136        // test here that a home server cannot clear the identity and
2137        // subsequently serve a new one which would get automatically approved.
2138        manager
2139            .receive_keys_query_response(
2140                &TransactionId::new(),
2141                &DataSet::key_query_with_identity_no_identity(),
2142            )
2143            .await
2144            .unwrap();
2145
2146        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2147        let other_identity = identity.other().unwrap();
2148
2149        assert_eq!(other_identity, &remember_previous_identity);
2150        assert!(other_identity.has_pin_violation());
2151    }
2152
2153    #[async_test]
2154    async fn test_manager_resolve_identity_pin_violation() {
2155        use test_json::keys_query_sets::IdentityChangeDataSet as DataSet;
2156
2157        let manager = manager_test_helper(user_id(), device_id()).await;
2158        let other_user = DataSet::user_id();
2159
2160        manager
2161            .receive_keys_query_response(
2162                &TransactionId::new(),
2163                &DataSet::key_query_with_identity_a(),
2164            )
2165            .await
2166            .unwrap();
2167
2168        // We receive a new keys update for that user, with a new identity
2169        manager
2170            .receive_keys_query_response(
2171                &TransactionId::new(),
2172                &DataSet::key_query_with_identity_b(),
2173            )
2174            .await
2175            .unwrap();
2176
2177        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2178        let other_identity = identity.other().unwrap();
2179
2180        // We have a new identity now, so there should be a pin violation
2181        assert!(other_identity.has_pin_violation());
2182
2183        // Resolve the violation by pinning the new identity
2184        other_identity.pin();
2185
2186        assert!(!other_identity.has_pin_violation());
2187    }
2188
2189    // Set up a machine do initial own key query and import cross-signing secret to
2190    // make the current session verified.
2191    async fn common_verified_identity_changes_machine_setup() -> OlmMachine {
2192        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2193
2194        let machine = OlmMachine::new(DataSet::own_id(), device_id!("LOCAL")).await;
2195
2196        let keys_query = DataSet::own_keys_query_response_1();
2197        let txn_id = TransactionId::new();
2198        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2199
2200        machine
2201            .import_cross_signing_keys(CrossSigningKeyExport {
2202                master_key: DataSet::MASTER_KEY_PRIVATE_EXPORT.to_owned().into(),
2203                self_signing_key: DataSet::SELF_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2204                user_signing_key: DataSet::USER_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2205            })
2206            .await
2207            .unwrap();
2208        machine
2209    }
2210    #[async_test]
2211    async fn test_manager_verified_latch_setup_on_new_identities() {
2212        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2213
2214        let machine = common_verified_identity_changes_machine_setup().await;
2215
2216        // ######
2217        // First test: Assert that the latch is properly set on new identities
2218        // ######
2219        let keys_query = DataSet::bob_keys_query_response_signed();
2220        let txn_id = TransactionId::new();
2221        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2222
2223        let own_identity =
2224            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2225        // For sanity check that own identity is trusted
2226        assert!(own_identity.is_verified());
2227
2228        let bob_identity =
2229            machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap().other().unwrap();
2230        // The verified latch should be true
2231        assert!(bob_identity.was_previously_verified());
2232        // And bob is verified
2233        assert!(bob_identity.is_verified());
2234
2235        // ######
2236        // Second test: Assert that the local latch stays on if the identity is rotated
2237        // ######
2238        let keys_query = DataSet::bob_keys_query_response_rotated();
2239        let txn_id = TransactionId::new();
2240        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2241
2242        let bob_identity =
2243            machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap().other().unwrap();
2244        // Bob is not verified anymore
2245        assert!(!bob_identity.is_verified());
2246        // The verified latch should still be true
2247        assert!(bob_identity.was_previously_verified());
2248        // Bob device_2 is self-signed even if there is this verification latch
2249        // violation
2250        let bob_device = machine
2251            .get_device(DataSet::bob_id(), DataSet::bob_device_2_id(), None)
2252            .await
2253            .unwrap()
2254            .unwrap();
2255        assert!(bob_identity.is_device_signed(&bob_device));
2256        // there is also a pin violation
2257        assert!(bob_identity.has_pin_violation());
2258        // Fixing the pin violation won't fix the verification latch violation
2259        bob_identity.pin_current_master_key().await.unwrap();
2260        assert!(!bob_identity.has_pin_violation());
2261        let has_latch_violation =
2262            bob_identity.was_previously_verified() && !bob_identity.is_verified();
2263        assert!(has_latch_violation);
2264    }
2265
2266    #[async_test]
2267    async fn test_manager_verified_identity_changes_setup_on_updated_identities() {
2268        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2269
2270        let machine = common_verified_identity_changes_machine_setup().await;
2271
2272        // ######
2273        // Get the Carol identity for the first time
2274        // ######
2275        let keys_query = DataSet::carol_keys_query_response_unsigned();
2276        let txn_id = TransactionId::new();
2277        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2278
2279        let carol_identity =
2280            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2281        // The identity is not verified
2282        assert!(!carol_identity.is_verified());
2283        // The verified latch is off
2284        assert!(!carol_identity.was_previously_verified());
2285
2286        // Carol is verified, likely from another session. Ensure the latch is updated
2287        // when the key query response is processed
2288        let keys_query = DataSet::carol_keys_query_response_signed();
2289        let txn_id = TransactionId::new();
2290        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2291
2292        let carol_identity = machine
2293            .get_identity(DataSet::carol_id(), None)
2294            .await
2295            .unwrap()
2296            .unwrap()
2297            .other()
2298            .unwrap();
2299        assert!(carol_identity.is_verified());
2300        // This should have updated the latch
2301        assert!(carol_identity.was_previously_verified());
2302        // It is the same identity, it's just signed now so no pin violation
2303        assert!(!carol_identity.has_pin_violation());
2304    }
2305
2306    // Set up a machine do initial own key query.
2307    // The cross signing secrets are not yet uploaded.
2308    // Then query keys for carol and bob (both signed by own identity)
2309    async fn common_verified_identity_changes_own_trust_change_machine_setup() -> OlmMachine {
2310        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2311
2312        // Start on a non-verified session
2313        let machine = OlmMachine::new(DataSet::own_id(), device_id!("LOCAL")).await;
2314
2315        let keys_query = DataSet::own_keys_query_response_1();
2316        let txn_id = TransactionId::new();
2317        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2318
2319        // For sanity check that own identity is not trusted
2320        let own_identity =
2321            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2322        assert!(!own_identity.is_verified());
2323
2324        let keys_query = DataSet::own_keys_query_response_1();
2325        let txn_id = TransactionId::new();
2326        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2327
2328        // Get Bob and Carol already signed
2329        let keys_query = DataSet::bob_keys_query_response_signed();
2330        let txn_id = TransactionId::new();
2331        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2332
2333        let keys_query = DataSet::carol_keys_query_response_signed();
2334        let txn_id = TransactionId::new();
2335        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2336
2337        machine.update_tracked_users(vec![DataSet::bob_id(), DataSet::carol_id()]).await.unwrap();
2338
2339        machine
2340    }
2341
2342    #[async_test]
2343    async fn test_manager_verified_identity_changes_setup_on_own_identity_trust_change() {
2344        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2345        let machine = common_verified_identity_changes_own_trust_change_machine_setup().await;
2346
2347        let own_identity =
2348            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2349
2350        let bob_identity = machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap();
2351        // Bob is verified by our identity but our own identity is not yet trusted
2352        assert!(!bob_identity.was_previously_verified());
2353        assert!(own_identity.is_identity_signed(&bob_identity.other().unwrap()));
2354
2355        let carol_identity =
2356            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2357        // Carol is verified by our identity but our own identity is not yet trusted
2358        assert!(!carol_identity.was_previously_verified());
2359        assert!(own_identity.is_identity_signed(&carol_identity.other().unwrap()));
2360
2361        // Marking our own identity as trusted should update the existing identities
2362        let _ = own_identity.verify().await;
2363
2364        let own_identity = machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap();
2365        assert!(own_identity.is_verified());
2366
2367        let carol_identity =
2368            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2369        assert!(carol_identity.is_verified());
2370        // The latch should be set now
2371        assert!(carol_identity.was_previously_verified());
2372
2373        let bob_identity = machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap();
2374        assert!(bob_identity.is_verified());
2375        // The latch should be set now
2376        assert!(bob_identity.was_previously_verified());
2377    }
2378
2379    #[async_test]
2380    async fn test_manager_verified_identity_change_setup_on_import_secrets() {
2381        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2382        let machine = common_verified_identity_changes_own_trust_change_machine_setup().await;
2383
2384        let own_identity =
2385            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2386
2387        let bob_identity =
2388            machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap().other().unwrap();
2389        // Carol is verified by our identity but our own identity is not yet trusted
2390        assert!(own_identity.is_identity_signed(&bob_identity));
2391        assert!(!bob_identity.was_previously_verified());
2392
2393        let carol_identity = machine
2394            .get_identity(DataSet::carol_id(), None)
2395            .await
2396            .unwrap()
2397            .unwrap()
2398            .other()
2399            .unwrap();
2400        // Carol is verified by our identity but our own identity is not yet trusted
2401        assert!(own_identity.is_identity_signed(&carol_identity));
2402        assert!(!carol_identity.was_previously_verified());
2403
2404        // Marking our own identity as trusted should update the existing identities
2405        machine
2406            .import_cross_signing_keys(CrossSigningKeyExport {
2407                master_key: DataSet::MASTER_KEY_PRIVATE_EXPORT.to_owned().into(),
2408                self_signing_key: DataSet::SELF_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2409                user_signing_key: DataSet::USER_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2410            })
2411            .await
2412            .unwrap();
2413
2414        let own_identity = machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap();
2415        assert!(own_identity.is_verified());
2416
2417        let carol_identity =
2418            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2419        assert!(carol_identity.is_verified());
2420        // The latch should be set now
2421        assert!(carol_identity.was_previously_verified());
2422
2423        let bob_identity = machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap();
2424        assert!(bob_identity.is_verified());
2425        // The latch should be set now
2426        assert!(bob_identity.was_previously_verified());
2427    }
2428
2429    mod update_sender_data {
2430        use assert_matches::assert_matches;
2431        use matrix_sdk_test::async_test;
2432        use ruma::room_id;
2433
2434        use super::{device_id, manager_test_helper};
2435        use crate::{
2436            identities::manager::testing::{other_user_id, user_id},
2437            olm::{InboundGroupSession, SenderData},
2438            store::types::{Changes, DeviceChanges},
2439            Account, DeviceData, EncryptionSettings,
2440        };
2441
2442        #[async_test]
2443        async fn test_adds_device_info_to_existing_sessions() {
2444            let manager = manager_test_helper(user_id(), device_id()).await;
2445
2446            // Given that we have lots of sessions in the store, from each of two devices
2447            let account1 = Account::new(user_id());
2448            let account2 = Account::new(other_user_id());
2449
2450            let mut account1_sessions = Vec::new();
2451            for _ in 0..60 {
2452                account1_sessions.push(create_inbound_group_session(&account1).await);
2453            }
2454            let mut account2_sessions = Vec::new();
2455            for _ in 0..60 {
2456                account2_sessions.push(create_inbound_group_session(&account2).await);
2457            }
2458            manager
2459                .store
2460                .save_changes(Changes {
2461                    inbound_group_sessions: [account1_sessions.clone(), account2_sessions.clone()]
2462                        .concat(),
2463                    ..Default::default()
2464                })
2465                .await
2466                .unwrap();
2467
2468            // When we get an update for one device
2469            let device_data = DeviceData::from_account(&account1);
2470            manager
2471                .update_sender_data_from_device_changes(&DeviceChanges {
2472                    changed: vec![device_data],
2473                    ..Default::default()
2474                })
2475                .await
2476                .unwrap();
2477
2478            // Then those sessions should be updated
2479            for session in account1_sessions {
2480                let updated = manager
2481                    .store
2482                    .get_inbound_group_session(session.room_id(), session.session_id())
2483                    .await
2484                    .unwrap()
2485                    .expect("Could not find session after update");
2486                assert_matches!(
2487                    updated.sender_data,
2488                    SenderData::DeviceInfo { .. },
2489                    "incorrect sender data for session {}",
2490                    session.session_id()
2491                );
2492            }
2493
2494            // ... and those from the other account should not
2495            for session in account2_sessions {
2496                let updated = manager
2497                    .store
2498                    .get_inbound_group_session(session.room_id(), session.session_id())
2499                    .await
2500                    .unwrap()
2501                    .expect("Could not find session after update");
2502                assert_matches!(updated.sender_data, SenderData::UnknownDevice { .. });
2503            }
2504        }
2505
2506        /// Create an InboundGroupSession sent from the given account
2507        async fn create_inbound_group_session(account: &Account) -> InboundGroupSession {
2508            let (_, igs) = account
2509                .create_group_session_pair(
2510                    room_id!("!test:room"),
2511                    EncryptionSettings::default(),
2512                    SenderData::unknown(),
2513                )
2514                .await
2515                .unwrap();
2516            igs
2517        }
2518    }
2519}