matrix_sdk/room/
identity_status_changes.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Facility to track changes to the identity of members of rooms.
16#![cfg(feature = "e2e-encryption")]
17
18use std::collections::BTreeMap;
19
20use async_stream::stream;
21use futures_core::Stream;
22use futures_util::{stream_select, StreamExt};
23use matrix_sdk_base::crypto::{
24    IdentityState, IdentityStatusChange, RoomIdentityChange, RoomIdentityState,
25};
26use ruma::{events::room::member::SyncRoomMemberEvent, OwnedUserId, UserId};
27use tokio::sync::mpsc;
28use tokio_stream::wrappers::ReceiverStream;
29
30use super::Room;
31use crate::{
32    encryption::identities::{IdentityUpdates, UserIdentity},
33    event_handler::EventHandlerDropGuard,
34    Client, Error, Result,
35};
36
37/// Support for creating a stream of batches of [`IdentityStatusChange`].
38///
39/// Internally, this subscribes to all identity changes, and to room events that
40/// change the membership, and provides a stream of all changes to the identity
41/// status of all room members.
42///
43/// This struct does not represent the actual stream, but the state that is used
44/// to produce the values of the stream.
45///
46/// It does provide a method to create the stream:
47/// [`IdentityStatusChanges::create_stream`].
48#[derive(Debug)]
49pub struct IdentityStatusChanges {
50    /// Who is in the room and who is in identity violation at this moment
51    room_identity_state: RoomIdentityState<Room>,
52
53    /// Dropped when we are dropped, and unregisters the event handler we
54    /// registered to listen for room events
55    _drop_guard: EventHandlerDropGuard,
56}
57
58impl IdentityStatusChanges {
59    /// Create a new stream of significant changes to the identity status of
60    /// members of a room.
61    ///
62    /// The "status" of an identity changes when our level of trust in it
63    /// changes.
64    ///
65    /// A "significant" change means a warning should either be added or removed
66    /// (e.g. the user changed from pinned to unpinned (show a warning) or
67    /// from verification violation to pinned (remove a warning). An
68    /// insignificant change would be from pinned to verified - no warning
69    /// is needed in this case.
70    ///
71    /// For example, if an identity is "pinned" i.e. not manually verified, but
72    /// known, and it becomes a "unpinned" i.e. unknown, because the
73    /// encryption keys are different and the user has not acknowledged
74    /// this, then this constitutes a status change. Also, if an identity is
75    /// "unpinned" and becomes "pinned", this is also a status change.
76    ///
77    /// The supplied stream is intended to provide enough information for a
78    /// client to display a list of room members whose identities have
79    /// changed, and allow the user to acknowledge this or act upon it.
80    ///
81    /// The first item in the stream provides the current state of the room:
82    /// each member of the room who is not in "pinned" or "verified" state will
83    /// be included (except the current user).
84    ///
85    /// Note: when an unpinned user leaves a room, an update is generated
86    /// stating that they have become pinned, even though they may not
87    /// necessarily have become pinned, but we don't care any more because they
88    /// left the room.
89    pub async fn create_stream(
90        room: Room,
91    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
92        let identity_updates = wrap_identity_updates(&room.client).await?;
93        let (drop_guard, room_member_events) = wrap_room_member_events(&room);
94        let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
95        let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
96
97        let mut state = IdentityStatusChanges {
98            room_identity_state: RoomIdentityState::new(room).await,
99            _drop_guard: drop_guard,
100        };
101
102        Ok(stream!({
103            let mut current_state =
104                filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
105
106            if !current_state.is_empty() {
107                current_state.sort();
108                yield current_state;
109            }
110
111            while let Some(item) = unprocessed_stream.next().await {
112                let mut update = filter_non_self(
113                    state.room_identity_state.process_change(item).await,
114                    &own_user_id,
115                );
116                if !update.is_empty() {
117                    update.sort();
118                    yield update;
119                }
120            }
121        }))
122    }
123}
124
125fn filter_for_initial_update(
126    mut input: Vec<IdentityStatusChange>,
127    own_user_id: &UserId,
128) -> Vec<IdentityStatusChange> {
129    // We are never interested in changes to our own identity, and also for initial
130    // updates, we are only interested in "bad" states where we need to
131    // notify the user, so we can remove Verified states (Pinned states are
132    // already missing, because Pinned is considered the default).
133    input.retain(|change| {
134        change.user_id != own_user_id && change.changed_to != IdentityState::Verified
135    });
136
137    input
138}
139
140fn filter_non_self(
141    mut input: Vec<IdentityStatusChange>,
142    own_user_id: &UserId,
143) -> Vec<IdentityStatusChange> {
144    // We are never interested in changes to our own identity
145    input.retain(|change| change.user_id != own_user_id);
146    input
147}
148
149fn combine_streams(
150    identity_updates: impl Stream<Item = RoomIdentityChange> + Unpin,
151    room_member_events: impl Stream<Item = RoomIdentityChange> + Unpin,
152) -> impl Stream<Item = RoomIdentityChange> {
153    stream_select!(identity_updates, room_member_events)
154}
155
156async fn wrap_identity_updates(client: &Client) -> Result<impl Stream<Item = RoomIdentityChange>> {
157    Ok(client
158        .encryption()
159        .user_identities_stream()
160        .await?
161        .map(|item| RoomIdentityChange::IdentityUpdates(to_base_updates(item))))
162}
163
164fn to_base_updates(input: IdentityUpdates) -> matrix_sdk_base::crypto::store::IdentityUpdates {
165    matrix_sdk_base::crypto::store::IdentityUpdates {
166        new: to_base_identities(input.new),
167        changed: to_base_identities(input.changed),
168        unchanged: Default::default(),
169    }
170}
171
172fn to_base_identities(
173    input: BTreeMap<OwnedUserId, UserIdentity>,
174) -> BTreeMap<OwnedUserId, matrix_sdk_base::crypto::UserIdentity> {
175    input.into_iter().map(|(k, v)| (k, v.underlying_identity())).collect()
176}
177
178fn wrap_room_member_events(
179    room: &Room,
180) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange>) {
181    let own_user_id = room.own_user_id().to_owned();
182    let room_id = room.room_id();
183    let (sender, receiver) = mpsc::channel(16);
184    let handle =
185        room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
186            if *event.state_key() == own_user_id {
187                return;
188            }
189            let _: Result<_, _> =
190                sender.send(RoomIdentityChange::SyncRoomMemberEvent(Box::new(event))).await;
191        });
192    let drop_guard = room.client.event_handler_drop_guard(handle);
193    (drop_guard, ReceiverStream::new(receiver))
194}
195
196#[cfg(all(test, not(target_family = "wasm")))]
197mod tests {
198    use std::time::Duration;
199
200    use futures_util::{pin_mut, FutureExt as _, StreamExt as _};
201    use matrix_sdk_base::crypto::IdentityState;
202    use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
203    use test_setup::TestSetup;
204
205    use crate::assert_next_with_timeout;
206
207    #[async_test]
208    async fn test_when_user_becomes_unpinned_we_report_it() {
209        // Given a room containing us and Bob
210        let t = TestSetup::new_room_with_other_bob().await;
211
212        // And Bob's identity is pinned
213        t.pin_bob().await;
214
215        // And we are listening for identity changes
216        let stream = t.subscribe_to_identity_status_changes().await;
217        pin_mut!(stream);
218
219        // When Bob becomes unpinned
220        t.unpin_bob().await;
221
222        // Then we were notified about it
223        let change = assert_next_with_timeout!(stream);
224        assert_eq!(change[0].user_id, t.bob_user_id());
225        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
226        assert_eq!(change.len(), 1);
227    }
228
229    #[async_test]
230    async fn test_when_user_becomes_verification_violation_we_report_it() {
231        // Given a room containing us and Bob
232        let t = TestSetup::new_room_with_other_bob().await;
233
234        // And Bob's identity is verified
235        t.verify_bob().await;
236
237        // And we are listening for identity changes
238        let stream = t.subscribe_to_identity_status_changes().await;
239        pin_mut!(stream);
240
241        // When Bob's identity changes
242        t.unpin_bob().await;
243
244        // Then we were notified about a verification violation
245        let change = assert_next_with_timeout!(stream);
246        assert_eq!(change[0].user_id, t.bob_user_id());
247        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
248        assert_eq!(change.len(), 1);
249    }
250
251    #[async_test]
252    async fn test_when_user_becomes_pinned_we_report_it() {
253        // Given a room containing us and Bob
254        let t = TestSetup::new_room_with_other_bob().await;
255
256        // And Bob's identity is unpinned
257        t.unpin_bob().await;
258
259        // And we are listening for identity changes
260        let stream = t.subscribe_to_identity_status_changes().await;
261        pin_mut!(stream);
262
263        // When Bob becomes pinned
264        t.pin_bob().await;
265
266        // Then we were notified about the initial state of the room
267        let change1 = assert_next_with_timeout!(stream);
268        assert_eq!(change1[0].user_id, t.bob_user_id());
269        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
270        assert_eq!(change1.len(), 1);
271
272        // And the change when Bob became pinned
273        let change2 = assert_next_with_timeout!(stream);
274        assert_eq!(change2[0].user_id, t.bob_user_id());
275        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
276        assert_eq!(change2.len(), 1);
277    }
278
279    #[async_test]
280    async fn test_when_user_becomes_verified_we_report_it() {
281        // Given a room containing us and Bob
282        let t = TestSetup::new_room_with_other_bob().await;
283
284        // And we are listening for identity changes
285        let stream = t.subscribe_to_identity_status_changes().await;
286        pin_mut!(stream);
287
288        // When Bob becomes verified
289        t.verify_bob().await;
290
291        // Then we are notified about Bob being verified
292        let change = assert_next_with_timeout!(stream);
293        assert_eq!(change[0].user_id, t.bob_user_id());
294        assert_eq!(change[0].changed_to, IdentityState::Verified);
295        assert_eq!(change.len(), 1);
296
297        // (And then unpinned, so we have something to come through the stream)
298        t.unpin_bob().await;
299
300        // Then we are notified about the unpinning part
301        let change = assert_next_with_timeout!(stream);
302        assert_eq!(change[0].user_id, t.bob_user_id());
303        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
304        assert_eq!(change.len(), 1);
305    }
306
307    #[async_test]
308    async fn test_when_an_unpinned_user_becomes_verified_we_report_it() {
309        // Given a room containing us and Bob
310        let t = TestSetup::new_room_with_other_bob().await;
311
312        // And Bob's identity is unpinned
313        t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;
314
315        // And we are listening for identity changes
316        let stream = t.subscribe_to_identity_status_changes().await;
317        pin_mut!(stream);
318
319        // When Bob becomes verified
320        t.verify_bob().await;
321
322        // Then we were notified about the initial state of the room
323        let change1 = assert_next_with_timeout!(stream);
324        assert_eq!(change1[0].user_id, t.bob_user_id());
325        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
326        assert_eq!(change1.len(), 1);
327
328        // And the change when Bob became verified
329        let change2 = assert_next_with_timeout!(stream);
330        assert_eq!(change2[0].user_id, t.bob_user_id());
331        assert_eq!(change2[0].changed_to, IdentityState::Verified);
332        assert_eq!(change2.len(), 1);
333    }
334
335    #[async_test]
336    async fn test_when_user_in_verification_violation_becomes_verified_we_report_it() {
337        // Given a room containing us and Bob
338        let t = TestSetup::new_room_with_other_bob().await;
339
340        // And Bob is in verification violation
341        t.verify_bob_with(
342            IdentityChangeDataSet::key_query_with_identity_b(),
343            IdentityChangeDataSet::master_signing_keys_b(),
344            IdentityChangeDataSet::self_signing_keys_b(),
345        )
346        .await;
347        t.unpin_bob().await;
348
349        // And we are listening for identity changes
350        let stream = t.subscribe_to_identity_status_changes().await;
351        pin_mut!(stream);
352
353        // When Bob becomes verified
354        t.verify_bob().await;
355
356        // Then we were notified about the initial state of the room
357        let change1 = assert_next_with_timeout!(stream);
358        assert_eq!(change1[0].user_id, t.bob_user_id());
359        assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
360        assert_eq!(change1.len(), 1);
361
362        // And the change when Bob became verified
363        let change2 = assert_next_with_timeout!(stream);
364        assert_eq!(change2[0].user_id, t.bob_user_id());
365        assert_eq!(change2[0].changed_to, IdentityState::Verified);
366        assert_eq!(change2.len(), 1);
367    }
368
369    #[async_test]
370    async fn test_when_an_unpinned_user_joins_we_report_it() {
371        // Given a room containing just us
372        let mut t = TestSetup::new_just_me_room().await;
373
374        // And Bob's identity is unpinned
375        t.unpin_bob().await;
376
377        // And we are listening for identity changes
378        let stream = t.subscribe_to_identity_status_changes().await;
379        pin_mut!(stream);
380
381        // When Bob joins the room
382        t.bob_joins().await;
383
384        // Then we were notified about it
385        let change = assert_next_with_timeout!(stream);
386        assert_eq!(change[0].user_id, t.bob_user_id());
387        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
388        assert_eq!(change.len(), 1);
389    }
390
391    #[async_test]
392    async fn test_when_an_verification_violating_user_joins_we_report_it() {
393        // Given a room containing just us
394        let mut t = TestSetup::new_just_me_room().await;
395
396        // And Bob's identity is in verification violation
397        t.verify_bob().await;
398        t.unpin_bob().await;
399
400        // And we are listening for identity changes
401        let stream = t.subscribe_to_identity_status_changes().await;
402        pin_mut!(stream);
403
404        // When Bob joins the room
405        t.bob_joins().await;
406
407        // Then we were notified about it
408        let change = assert_next_with_timeout!(stream);
409        assert_eq!(change[0].user_id, t.bob_user_id());
410        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
411        assert_eq!(change.len(), 1);
412    }
413
414    #[async_test]
415    async fn test_when_a_verified_user_joins_we_dont_report_it() {
416        // Given a room containing just us
417        let mut t = TestSetup::new_just_me_room().await;
418
419        // And Bob's identity is verified
420        t.verify_bob().await;
421
422        // And we are listening for identity changes
423        let stream = t.subscribe_to_identity_status_changes().await;
424        pin_mut!(stream);
425
426        // When Bob joins the room
427        t.bob_joins().await;
428
429        // (Then becomes unpinned so we have something to report)
430        t.unpin_bob().await;
431
432        //// Then we were only notified about the unpin
433        let change = assert_next_with_timeout!(stream);
434        assert_eq!(change[0].user_id, t.bob_user_id());
435        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
436        assert_eq!(change.len(), 1);
437    }
438
439    #[async_test]
440    async fn test_when_a_pinned_user_joins_we_do_not_report() {
441        // Given a room containing just us
442        let mut t = TestSetup::new_just_me_room().await;
443
444        // And Bob's identity is unpinned
445        t.pin_bob().await;
446
447        // And we are listening for identity changes
448        let stream = t.subscribe_to_identity_status_changes().await;
449        pin_mut!(stream);
450
451        // When Bob joins the room
452        t.bob_joins().await;
453
454        // Then there is no notification
455        tokio::time::sleep(Duration::from_millis(200)).await;
456        let change = stream.next().now_or_never();
457        assert!(change.is_none());
458    }
459
460    #[async_test]
461    async fn test_when_an_unpinned_user_leaves_we_report_it() {
462        // Given a room containing us and Bob
463        let mut t = TestSetup::new_room_with_other_bob().await;
464
465        // And Bob's identity is unpinned
466        t.unpin_bob().await;
467
468        // And we are listening for identity changes
469        let stream = t.subscribe_to_identity_status_changes().await;
470        pin_mut!(stream);
471
472        // When Bob leaves the room
473        t.bob_leaves().await;
474
475        // Then we were notified about the initial state of the room
476        let change1 = assert_next_with_timeout!(stream);
477        assert_eq!(change1[0].user_id, t.bob_user_id());
478        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
479        assert_eq!(change1.len(), 1);
480
481        // And we were notified about the change when the user left
482        let change2 = assert_next_with_timeout!(stream);
483        // Note: the user left the room, but we see that as them "becoming pinned" i.e.
484        // "you no longer need to notify about this user".
485        assert_eq!(change2[0].user_id, t.bob_user_id());
486        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
487        assert_eq!(change2.len(), 1);
488    }
489
490    #[async_test]
491    async fn test_multiple_identity_changes_are_reported() {
492        // Given a room containing just us
493        let mut t = TestSetup::new_just_me_room().await;
494
495        // And Bob's identity is unpinned
496        t.unpin_bob().await;
497
498        // And we are listening for identity changes
499        let stream = t.subscribe_to_identity_status_changes().await;
500        pin_mut!(stream);
501
502        // NOTE: below we pull the changes out of the subscription after each action.
503        // This makes sure that the identity changes and membership changes are properly
504        // ordered. If we pull them out later, the identity changes get shifted forward
505        // because they rely on less-complex async stuff under the hood. Calling
506        // next_change ends up winding the async machinery sufficiently that the
507        // membership change and any subsequent events have fully completed.
508
509        // When Bob joins the room ...
510        t.bob_joins().await;
511        let change1 = assert_next_with_timeout!(stream);
512
513        // ... becomes pinned ...
514        t.pin_bob().await;
515        let change2 = assert_next_with_timeout!(stream);
516
517        // ... leaves and joins again (ignored since they stay pinned) ...
518        t.bob_leaves().await;
519        t.bob_joins().await;
520
521        // ... becomes unpinned ...
522        t.unpin_bob().await;
523        let change3 = assert_next_with_timeout!(stream);
524
525        // ... and leaves.
526        t.bob_leaves().await;
527        let change4 = assert_next_with_timeout!(stream);
528
529        assert_eq!(change1[0].user_id, t.bob_user_id());
530        assert_eq!(change2[0].user_id, t.bob_user_id());
531        assert_eq!(change3[0].user_id, t.bob_user_id());
532        assert_eq!(change4[0].user_id, t.bob_user_id());
533
534        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
535        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
536        assert_eq!(change3[0].changed_to, IdentityState::PinViolation);
537        assert_eq!(change4[0].changed_to, IdentityState::Pinned);
538
539        assert_eq!(change1.len(), 1);
540        assert_eq!(change2.len(), 1);
541        assert_eq!(change3.len(), 1);
542        assert_eq!(change4.len(), 1);
543    }
544
545    #[async_test]
546    async fn test_when_an_unpinned_user_is_already_present_we_report_it_immediately() {
547        // Given a room containing Bob, who is unpinned
548        let t = TestSetup::new_room_with_other_bob().await;
549        t.unpin_bob().await;
550
551        // When we start listening for identity changes
552        let stream = t.subscribe_to_identity_status_changes().await;
553        pin_mut!(stream);
554
555        // Then we were immediately notified about Bob being unpinned
556        let change = assert_next_with_timeout!(stream);
557        assert_eq!(change[0].user_id, t.bob_user_id());
558        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
559        assert_eq!(change.len(), 1);
560    }
561
562    #[async_test]
563    async fn test_when_a_verified_user_is_already_present_we_dont_report_it() {
564        // Given a room containing Bob, who is unpinned
565        let t = TestSetup::new_room_with_other_bob().await;
566        t.verify_bob().await;
567
568        // When we start listening for identity changes
569        let stream = t.subscribe_to_identity_status_changes().await;
570        pin_mut!(stream);
571
572        // (And we unpin so that something is available in the changes stream)
573        t.unpin_bob().await;
574
575        // Then we were only notified about the unpin, not being verified
576        let next_change = assert_next_with_timeout!(stream);
577
578        assert_eq!(next_change[0].user_id, t.bob_user_id());
579        assert_eq!(next_change[0].changed_to, IdentityState::VerificationViolation);
580        assert_eq!(next_change.len(), 1);
581    }
582
583    // TODO: I (andyb) haven't figured out how to test room membership changes that
584    // affect our own user (they should not be shown). Specifically, I haven't
585    // figure out how to get out own user into a non-pinned state.
586
587    mod test_setup {
588        use std::time::{SystemTime, UNIX_EPOCH};
589
590        use futures_core::Stream;
591        use matrix_sdk_base::{
592            crypto::{
593                testing::simulate_key_query_response_for_verification, IdentityStatusChange,
594                OtherUserIdentity,
595            },
596            RoomState,
597        };
598        use matrix_sdk_test::{
599            test_json, test_json::keys_query_sets::IdentityChangeDataSet, JoinedRoomBuilder,
600            StateTestEvent, SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
601        };
602        use ruma::{
603            api::client::keys::{get_keys, get_keys::v3::Response as KeyQueryResponse},
604            events::room::member::MembershipState,
605            owned_user_id, OwnedUserId, TransactionId, UserId,
606        };
607        use serde_json::json;
608        use wiremock::{
609            matchers::{header, method, path_regex},
610            Mock, MockServer, ResponseTemplate,
611        };
612
613        use crate::{
614            encryption::identities::UserIdentity, test_utils::logged_in_client, Client, Room,
615        };
616
617        /// Sets up a client and a room and allows changing user identities and
618        /// room memberships. Note: most methods e.g. [`TestSetup::bob_user_id`]
619        /// are talking about the OTHER user, not our own user. Only
620        /// methods starting with `self_` are talking about this user.
621        ///
622        /// This user is called `@example:localhost` but is rarely used
623        /// mentioned.
624        ///
625        /// The other user is called `@bob:localhost`.
626        pub(super) struct TestSetup {
627            client: Client,
628            bob_user_id: OwnedUserId,
629            sync_response_builder: SyncResponseBuilder,
630            room: Room,
631        }
632
633        impl TestSetup {
634            pub(super) async fn new_just_me_room() -> Self {
635                let (client, user_id, mut sync_response_builder) = Self::init().await;
636                let room = create_just_me_room(&client, &mut sync_response_builder).await;
637                Self { client, bob_user_id: user_id, sync_response_builder, room }
638            }
639
640            pub(super) async fn new_room_with_other_bob() -> Self {
641                let (client, bob_user_id, mut sync_response_builder) = Self::init().await;
642                let room = create_room_with_other_member(
643                    &mut sync_response_builder,
644                    &client,
645                    &bob_user_id,
646                )
647                .await;
648                Self { client, bob_user_id, sync_response_builder, room }
649            }
650
651            pub(super) fn bob_user_id(&self) -> &UserId {
652                &self.bob_user_id
653            }
654
655            pub(super) async fn pin_bob(&self) {
656                if self.bob_user_identity().await.is_some() {
657                    assert!(
658                        !self.bob_is_pinned().await,
659                        "pin_bob() called when the identity is already pinned!"
660                    );
661
662                    // Pin it
663                    self.bob_user_identity()
664                        .await
665                        .expect("User should exist")
666                        .pin()
667                        .await
668                        .expect("Should not fail to pin");
669                } else {
670                    // There was no existing identity. Set one. It will be pinned by default.
671                    self.change_bob_identity(IdentityChangeDataSet::key_query_with_identity_a())
672                        .await;
673                }
674
675                // Sanity check: they are pinned
676                assert!(self.bob_is_pinned().await);
677            }
678
679            pub(super) async fn unpin_bob(&self) {
680                self.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_b()).await;
681            }
682
683            pub(super) async fn unpin_bob_with(&self, requested: KeyQueryResponse) {
684                fn master_key_json(key_query_response: &KeyQueryResponse) -> String {
685                    serde_json::to_string(
686                        key_query_response
687                            .master_keys
688                            .first_key_value()
689                            .expect("Master key should have a value")
690                            .1,
691                    )
692                    .expect("Should be able to serialise master key")
693                }
694
695                let a = IdentityChangeDataSet::key_query_with_identity_a();
696                let b = IdentityChangeDataSet::key_query_with_identity_b();
697                let requested_master_key = master_key_json(&requested);
698                let a_master_key = master_key_json(&a);
699
700                // Change/set their identity pin it, then change it again - this will definitely
701                // unpin, even if the first identity we supply is their very first, making them
702                // initially pinned.
703                if requested_master_key == a_master_key {
704                    self.change_bob_identity(b).await;
705                    if !self.bob_is_pinned().await {
706                        self.pin_bob().await;
707                    }
708                    self.change_bob_identity(a).await;
709                } else {
710                    self.change_bob_identity(a).await;
711                    if !self.bob_is_pinned().await {
712                        self.pin_bob().await;
713                    }
714                    self.change_bob_identity(b).await;
715                }
716
717                // Sanity: they are unpinned
718                assert!(!self.bob_is_pinned().await);
719            }
720
721            pub(super) async fn verify_bob(&self) {
722                self.verify_bob_with(
723                    IdentityChangeDataSet::key_query_with_identity_a(),
724                    IdentityChangeDataSet::master_signing_keys_a(),
725                    IdentityChangeDataSet::self_signing_keys_a(),
726                )
727                .await;
728            }
729
730            pub(super) async fn verify_bob_with(
731                &self,
732                key_query: KeyQueryResponse,
733                master_signing_key: serde_json::Value,
734                self_signing_key: serde_json::Value,
735            ) {
736                // Make sure the requested identity is set
737                self.change_bob_identity(key_query).await;
738
739                let my_user_id = self.client.user_id().expect("I should have a user id");
740                let my_identity = self
741                    .client
742                    .encryption()
743                    .get_user_identity(my_user_id)
744                    .await
745                    .expect("Should not fail to get own user identity")
746                    .expect("Should have an own user identity")
747                    .underlying_identity()
748                    .own()
749                    .expect("Our own identity should be of type Own");
750
751                // Get the request
752                let signature_upload_request = self
753                    .bob_crypto_other_identity()
754                    .await
755                    .verify()
756                    .await
757                    .expect("Should be able to verify other identity");
758
759                let verification_response = simulate_key_query_response_for_verification(
760                    signature_upload_request,
761                    my_identity,
762                    my_user_id,
763                    self.bob_user_id(),
764                    master_signing_key,
765                    self_signing_key,
766                );
767
768                // Receive the response into our client
769                self.client
770                    .mark_request_as_sent(&TransactionId::new(), &verification_response)
771                    .await
772                    .unwrap();
773
774                // Sanity: they are verified
775                assert!(self.bob_is_verified().await);
776            }
777
778            pub(super) async fn bob_joins(&mut self) {
779                self.bob_membership_change(MembershipState::Join).await;
780            }
781
782            pub(super) async fn bob_leaves(&mut self) {
783                self.bob_membership_change(MembershipState::Leave).await;
784            }
785
786            pub(super) async fn subscribe_to_identity_status_changes(
787                &self,
788            ) -> impl Stream<Item = Vec<IdentityStatusChange>> {
789                self.room
790                    .subscribe_to_identity_status_changes()
791                    .await
792                    .expect("Should be able to subscribe")
793            }
794
795            async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
796                let (client, _server) = create_client_and_server().await;
797
798                // Ensure our user has cross-signing keys etc.
799                client
800                    .olm_machine()
801                    .await
802                    .as_ref()
803                    .expect("We should have an Olm machine")
804                    .bootstrap_cross_signing(true)
805                    .await
806                    .expect("Should be able to bootstrap cross-signing");
807
808                // Note: if you change the user_id, you will need to change lots of hard-coded
809                // stuff inside IdentityChangeDataSet
810                let bob_user_id = owned_user_id!("@bob:localhost");
811
812                let sync_response_builder = SyncResponseBuilder::default();
813
814                (client, bob_user_id, sync_response_builder)
815            }
816
817            async fn change_bob_identity(
818                &self,
819                key_query_response: get_keys::v3::Response,
820            ) -> OtherUserIdentity {
821                self.client
822                    .mark_request_as_sent(&TransactionId::new(), &key_query_response)
823                    .await
824                    .expect("Should not fail to send identity changes");
825
826                self.bob_crypto_other_identity().await
827            }
828
829            async fn bob_membership_change(&mut self, new_state: MembershipState) {
830                let sync_response = self
831                    .sync_response_builder
832                    .add_joined_room(JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
833                        StateTestEvent::Custom(sync_response_member(
834                            &self.bob_user_id,
835                            new_state.clone(),
836                        )),
837                    ))
838                    .build_sync_response();
839                self.room.client.process_sync(sync_response).await.unwrap();
840
841                // Make sure the membership stuck as expected
842                let m = self
843                    .room
844                    .get_member_no_sync(&self.bob_user_id)
845                    .await
846                    .expect("Should not fail to get member");
847
848                match (&new_state, m) {
849                    (MembershipState::Leave, None) => {}
850                    (_, None) => {
851                        panic!("Member should exist")
852                    }
853                    (_, Some(m)) => {
854                        assert_eq!(*m.membership(), new_state);
855                    }
856                };
857            }
858
859            async fn bob_is_pinned(&self) -> bool {
860                !self.bob_crypto_other_identity().await.identity_needs_user_approval()
861            }
862
863            async fn bob_is_verified(&self) -> bool {
864                self.bob_crypto_other_identity().await.is_verified()
865            }
866
867            async fn bob_crypto_other_identity(&self) -> OtherUserIdentity {
868                self.bob_user_identity()
869                    .await
870                    .expect("User identity should exist")
871                    .underlying_identity()
872                    .other()
873                    .expect("Identity should be Other, not Own")
874            }
875
876            async fn bob_user_identity(&self) -> Option<UserIdentity> {
877                self.client
878                    .encryption()
879                    .get_user_identity(&self.bob_user_id)
880                    .await
881                    .expect("Should not fail to get user identity")
882            }
883        }
884
885        async fn create_just_me_room(
886            client: &Client,
887            sync_response_builder: &mut SyncResponseBuilder,
888        ) -> Room {
889            let create_room_sync_response = sync_response_builder
890                .add_joined_room(
891                    JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
892                        .add_state_event(StateTestEvent::Member),
893                )
894                .build_sync_response();
895            client.process_sync(create_room_sync_response).await.unwrap();
896            let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
897            assert_eq!(room.state(), RoomState::Joined);
898            room
899        }
900
901        async fn create_room_with_other_member(
902            builder: &mut SyncResponseBuilder,
903            client: &Client,
904            other_user_id: &UserId,
905        ) -> Room {
906            let create_room_sync_response = builder
907                .add_joined_room(
908                    JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
909                        .add_state_event(StateTestEvent::Member)
910                        .add_state_event(StateTestEvent::Custom(sync_response_member(
911                            other_user_id,
912                            MembershipState::Join,
913                        ))),
914                )
915                .build_sync_response();
916            client.process_sync(create_room_sync_response).await.unwrap();
917            let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
918            room.inner.mark_members_synced();
919
920            assert_eq!(room.state(), RoomState::Joined);
921            assert_eq!(
922                *room
923                    .get_member_no_sync(other_user_id)
924                    .await
925                    .expect("Should not fail to get member")
926                    .expect("Member should exist")
927                    .membership(),
928                MembershipState::Join
929            );
930            room
931        }
932
933        async fn create_client_and_server() -> (Client, MockServer) {
934            let server = MockServer::start().await;
935            mock_members_request(&server).await;
936            mock_secret_storage_default_key(&server).await;
937            let client = logged_in_client(Some(server.uri())).await;
938            (client, server)
939        }
940
941        async fn mock_members_request(server: &MockServer) {
942            Mock::given(method("GET"))
943                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
944                .and(header("authorization", "Bearer 1234"))
945                .respond_with(
946                    ResponseTemplate::new(200).set_body_json(&*test_json::members::MEMBERS),
947                )
948                .mount(server)
949                .await;
950        }
951
952        async fn mock_secret_storage_default_key(server: &MockServer) {
953            Mock::given(method("GET"))
954                .and(path_regex(
955                    r"^/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
956                ))
957                .and(header("authorization", "Bearer 1234"))
958                .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
959                .mount(server)
960                .await;
961        }
962
963        fn sync_response_member(
964            user_id: &UserId,
965            membership: MembershipState,
966        ) -> serde_json::Value {
967            json!({
968                "content": {
969                    "membership": membership.to_string(),
970                },
971                "event_id": format!(
972                    "$aa{}bb:localhost",
973                    SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() % 100_000
974                ),
975                "origin_server_ts": 1472735824,
976                "sender": "@example:localhost",
977                "state_key": user_id,
978                "type": "m.room.member",
979                "unsigned": {
980                    "age": 1234
981                }
982            })
983        }
984    }
985}