1#![cfg(feature = "e2e-encryption")]
17
18use std::collections::BTreeMap;
19
20use async_stream::stream;
21use futures_core::Stream;
22use futures_util::{stream_select, StreamExt};
23use matrix_sdk_base::crypto::{
24 IdentityState, IdentityStatusChange, RoomIdentityChange, RoomIdentityState,
25};
26use ruma::{events::room::member::SyncRoomMemberEvent, OwnedUserId, UserId};
27use tokio::sync::mpsc;
28use tokio_stream::wrappers::ReceiverStream;
29
30use super::Room;
31use crate::{
32 encryption::identities::{IdentityUpdates, UserIdentity},
33 event_handler::EventHandlerDropGuard,
34 Client, Error, Result,
35};
36
37#[derive(Debug)]
49pub struct IdentityStatusChanges {
50 room_identity_state: RoomIdentityState<Room>,
52
53 _drop_guard: EventHandlerDropGuard,
56}
57
58impl IdentityStatusChanges {
59 pub async fn create_stream(
90 room: Room,
91 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
92 let identity_updates = wrap_identity_updates(&room.client).await?;
93 let (drop_guard, room_member_events) = wrap_room_member_events(&room);
94 let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
95 let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
96
97 let mut state = IdentityStatusChanges {
98 room_identity_state: RoomIdentityState::new(room).await,
99 _drop_guard: drop_guard,
100 };
101
102 Ok(stream!({
103 let mut current_state =
104 filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
105
106 if !current_state.is_empty() {
107 current_state.sort();
108 yield current_state;
109 }
110
111 while let Some(item) = unprocessed_stream.next().await {
112 let mut update = filter_non_self(
113 state.room_identity_state.process_change(item).await,
114 &own_user_id,
115 );
116 if !update.is_empty() {
117 update.sort();
118 yield update;
119 }
120 }
121 }))
122 }
123}
124
125fn filter_for_initial_update(
126 mut input: Vec<IdentityStatusChange>,
127 own_user_id: &UserId,
128) -> Vec<IdentityStatusChange> {
129 input.retain(|change| {
134 change.user_id != own_user_id && change.changed_to != IdentityState::Verified
135 });
136
137 input
138}
139
140fn filter_non_self(
141 mut input: Vec<IdentityStatusChange>,
142 own_user_id: &UserId,
143) -> Vec<IdentityStatusChange> {
144 input.retain(|change| change.user_id != own_user_id);
146 input
147}
148
149fn combine_streams(
150 identity_updates: impl Stream<Item = RoomIdentityChange> + Unpin,
151 room_member_events: impl Stream<Item = RoomIdentityChange> + Unpin,
152) -> impl Stream<Item = RoomIdentityChange> {
153 stream_select!(identity_updates, room_member_events)
154}
155
156async fn wrap_identity_updates(client: &Client) -> Result<impl Stream<Item = RoomIdentityChange>> {
157 Ok(client
158 .encryption()
159 .user_identities_stream()
160 .await?
161 .map(|item| RoomIdentityChange::IdentityUpdates(to_base_updates(item))))
162}
163
164fn to_base_updates(input: IdentityUpdates) -> matrix_sdk_base::crypto::store::IdentityUpdates {
165 matrix_sdk_base::crypto::store::IdentityUpdates {
166 new: to_base_identities(input.new),
167 changed: to_base_identities(input.changed),
168 unchanged: Default::default(),
169 }
170}
171
172fn to_base_identities(
173 input: BTreeMap<OwnedUserId, UserIdentity>,
174) -> BTreeMap<OwnedUserId, matrix_sdk_base::crypto::UserIdentity> {
175 input.into_iter().map(|(k, v)| (k, v.underlying_identity())).collect()
176}
177
178fn wrap_room_member_events(
179 room: &Room,
180) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange>) {
181 let own_user_id = room.own_user_id().to_owned();
182 let room_id = room.room_id();
183 let (sender, receiver) = mpsc::channel(16);
184 let handle =
185 room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
186 if *event.state_key() == own_user_id {
187 return;
188 }
189 let _: Result<_, _> =
190 sender.send(RoomIdentityChange::SyncRoomMemberEvent(Box::new(event))).await;
191 });
192 let drop_guard = room.client.event_handler_drop_guard(handle);
193 (drop_guard, ReceiverStream::new(receiver))
194}
195
196#[cfg(all(test, not(target_family = "wasm")))]
197mod tests {
198 use std::time::Duration;
199
200 use futures_util::{pin_mut, FutureExt as _, StreamExt as _};
201 use matrix_sdk_base::crypto::IdentityState;
202 use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
203 use test_setup::TestSetup;
204
205 use crate::assert_next_with_timeout;
206
207 #[async_test]
208 async fn test_when_user_becomes_unpinned_we_report_it() {
209 let t = TestSetup::new_room_with_other_bob().await;
211
212 t.pin_bob().await;
214
215 let stream = t.subscribe_to_identity_status_changes().await;
217 pin_mut!(stream);
218
219 t.unpin_bob().await;
221
222 let change = assert_next_with_timeout!(stream);
224 assert_eq!(change[0].user_id, t.bob_user_id());
225 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
226 assert_eq!(change.len(), 1);
227 }
228
229 #[async_test]
230 async fn test_when_user_becomes_verification_violation_we_report_it() {
231 let t = TestSetup::new_room_with_other_bob().await;
233
234 t.verify_bob().await;
236
237 let stream = t.subscribe_to_identity_status_changes().await;
239 pin_mut!(stream);
240
241 t.unpin_bob().await;
243
244 let change = assert_next_with_timeout!(stream);
246 assert_eq!(change[0].user_id, t.bob_user_id());
247 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
248 assert_eq!(change.len(), 1);
249 }
250
251 #[async_test]
252 async fn test_when_user_becomes_pinned_we_report_it() {
253 let t = TestSetup::new_room_with_other_bob().await;
255
256 t.unpin_bob().await;
258
259 let stream = t.subscribe_to_identity_status_changes().await;
261 pin_mut!(stream);
262
263 t.pin_bob().await;
265
266 let change1 = assert_next_with_timeout!(stream);
268 assert_eq!(change1[0].user_id, t.bob_user_id());
269 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
270 assert_eq!(change1.len(), 1);
271
272 let change2 = assert_next_with_timeout!(stream);
274 assert_eq!(change2[0].user_id, t.bob_user_id());
275 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
276 assert_eq!(change2.len(), 1);
277 }
278
279 #[async_test]
280 async fn test_when_user_becomes_verified_we_report_it() {
281 let t = TestSetup::new_room_with_other_bob().await;
283
284 let stream = t.subscribe_to_identity_status_changes().await;
286 pin_mut!(stream);
287
288 t.verify_bob().await;
290
291 let change = assert_next_with_timeout!(stream);
293 assert_eq!(change[0].user_id, t.bob_user_id());
294 assert_eq!(change[0].changed_to, IdentityState::Verified);
295 assert_eq!(change.len(), 1);
296
297 t.unpin_bob().await;
299
300 let change = assert_next_with_timeout!(stream);
302 assert_eq!(change[0].user_id, t.bob_user_id());
303 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
304 assert_eq!(change.len(), 1);
305 }
306
307 #[async_test]
308 async fn test_when_an_unpinned_user_becomes_verified_we_report_it() {
309 let t = TestSetup::new_room_with_other_bob().await;
311
312 t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;
314
315 let stream = t.subscribe_to_identity_status_changes().await;
317 pin_mut!(stream);
318
319 t.verify_bob().await;
321
322 let change1 = assert_next_with_timeout!(stream);
324 assert_eq!(change1[0].user_id, t.bob_user_id());
325 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
326 assert_eq!(change1.len(), 1);
327
328 let change2 = assert_next_with_timeout!(stream);
330 assert_eq!(change2[0].user_id, t.bob_user_id());
331 assert_eq!(change2[0].changed_to, IdentityState::Verified);
332 assert_eq!(change2.len(), 1);
333 }
334
335 #[async_test]
336 async fn test_when_user_in_verification_violation_becomes_verified_we_report_it() {
337 let t = TestSetup::new_room_with_other_bob().await;
339
340 t.verify_bob_with(
342 IdentityChangeDataSet::key_query_with_identity_b(),
343 IdentityChangeDataSet::master_signing_keys_b(),
344 IdentityChangeDataSet::self_signing_keys_b(),
345 )
346 .await;
347 t.unpin_bob().await;
348
349 let stream = t.subscribe_to_identity_status_changes().await;
351 pin_mut!(stream);
352
353 t.verify_bob().await;
355
356 let change1 = assert_next_with_timeout!(stream);
358 assert_eq!(change1[0].user_id, t.bob_user_id());
359 assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
360 assert_eq!(change1.len(), 1);
361
362 let change2 = assert_next_with_timeout!(stream);
364 assert_eq!(change2[0].user_id, t.bob_user_id());
365 assert_eq!(change2[0].changed_to, IdentityState::Verified);
366 assert_eq!(change2.len(), 1);
367 }
368
369 #[async_test]
370 async fn test_when_an_unpinned_user_joins_we_report_it() {
371 let mut t = TestSetup::new_just_me_room().await;
373
374 t.unpin_bob().await;
376
377 let stream = t.subscribe_to_identity_status_changes().await;
379 pin_mut!(stream);
380
381 t.bob_joins().await;
383
384 let change = assert_next_with_timeout!(stream);
386 assert_eq!(change[0].user_id, t.bob_user_id());
387 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
388 assert_eq!(change.len(), 1);
389 }
390
391 #[async_test]
392 async fn test_when_an_verification_violating_user_joins_we_report_it() {
393 let mut t = TestSetup::new_just_me_room().await;
395
396 t.verify_bob().await;
398 t.unpin_bob().await;
399
400 let stream = t.subscribe_to_identity_status_changes().await;
402 pin_mut!(stream);
403
404 t.bob_joins().await;
406
407 let change = assert_next_with_timeout!(stream);
409 assert_eq!(change[0].user_id, t.bob_user_id());
410 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
411 assert_eq!(change.len(), 1);
412 }
413
414 #[async_test]
415 async fn test_when_a_verified_user_joins_we_dont_report_it() {
416 let mut t = TestSetup::new_just_me_room().await;
418
419 t.verify_bob().await;
421
422 let stream = t.subscribe_to_identity_status_changes().await;
424 pin_mut!(stream);
425
426 t.bob_joins().await;
428
429 t.unpin_bob().await;
431
432 let change = assert_next_with_timeout!(stream);
434 assert_eq!(change[0].user_id, t.bob_user_id());
435 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
436 assert_eq!(change.len(), 1);
437 }
438
439 #[async_test]
440 async fn test_when_a_pinned_user_joins_we_do_not_report() {
441 let mut t = TestSetup::new_just_me_room().await;
443
444 t.pin_bob().await;
446
447 let stream = t.subscribe_to_identity_status_changes().await;
449 pin_mut!(stream);
450
451 t.bob_joins().await;
453
454 tokio::time::sleep(Duration::from_millis(200)).await;
456 let change = stream.next().now_or_never();
457 assert!(change.is_none());
458 }
459
460 #[async_test]
461 async fn test_when_an_unpinned_user_leaves_we_report_it() {
462 let mut t = TestSetup::new_room_with_other_bob().await;
464
465 t.unpin_bob().await;
467
468 let stream = t.subscribe_to_identity_status_changes().await;
470 pin_mut!(stream);
471
472 t.bob_leaves().await;
474
475 let change1 = assert_next_with_timeout!(stream);
477 assert_eq!(change1[0].user_id, t.bob_user_id());
478 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
479 assert_eq!(change1.len(), 1);
480
481 let change2 = assert_next_with_timeout!(stream);
483 assert_eq!(change2[0].user_id, t.bob_user_id());
486 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
487 assert_eq!(change2.len(), 1);
488 }
489
490 #[async_test]
491 async fn test_multiple_identity_changes_are_reported() {
492 let mut t = TestSetup::new_just_me_room().await;
494
495 t.unpin_bob().await;
497
498 let stream = t.subscribe_to_identity_status_changes().await;
500 pin_mut!(stream);
501
502 t.bob_joins().await;
511 let change1 = assert_next_with_timeout!(stream);
512
513 t.pin_bob().await;
515 let change2 = assert_next_with_timeout!(stream);
516
517 t.bob_leaves().await;
519 t.bob_joins().await;
520
521 t.unpin_bob().await;
523 let change3 = assert_next_with_timeout!(stream);
524
525 t.bob_leaves().await;
527 let change4 = assert_next_with_timeout!(stream);
528
529 assert_eq!(change1[0].user_id, t.bob_user_id());
530 assert_eq!(change2[0].user_id, t.bob_user_id());
531 assert_eq!(change3[0].user_id, t.bob_user_id());
532 assert_eq!(change4[0].user_id, t.bob_user_id());
533
534 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
535 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
536 assert_eq!(change3[0].changed_to, IdentityState::PinViolation);
537 assert_eq!(change4[0].changed_to, IdentityState::Pinned);
538
539 assert_eq!(change1.len(), 1);
540 assert_eq!(change2.len(), 1);
541 assert_eq!(change3.len(), 1);
542 assert_eq!(change4.len(), 1);
543 }
544
545 #[async_test]
546 async fn test_when_an_unpinned_user_is_already_present_we_report_it_immediately() {
547 let t = TestSetup::new_room_with_other_bob().await;
549 t.unpin_bob().await;
550
551 let stream = t.subscribe_to_identity_status_changes().await;
553 pin_mut!(stream);
554
555 let change = assert_next_with_timeout!(stream);
557 assert_eq!(change[0].user_id, t.bob_user_id());
558 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
559 assert_eq!(change.len(), 1);
560 }
561
562 #[async_test]
563 async fn test_when_a_verified_user_is_already_present_we_dont_report_it() {
564 let t = TestSetup::new_room_with_other_bob().await;
566 t.verify_bob().await;
567
568 let stream = t.subscribe_to_identity_status_changes().await;
570 pin_mut!(stream);
571
572 t.unpin_bob().await;
574
575 let next_change = assert_next_with_timeout!(stream);
577
578 assert_eq!(next_change[0].user_id, t.bob_user_id());
579 assert_eq!(next_change[0].changed_to, IdentityState::VerificationViolation);
580 assert_eq!(next_change.len(), 1);
581 }
582
583 mod test_setup {
588 use std::time::{SystemTime, UNIX_EPOCH};
589
590 use futures_core::Stream;
591 use matrix_sdk_base::{
592 crypto::{
593 testing::simulate_key_query_response_for_verification, IdentityStatusChange,
594 OtherUserIdentity,
595 },
596 RoomState,
597 };
598 use matrix_sdk_test::{
599 test_json, test_json::keys_query_sets::IdentityChangeDataSet, JoinedRoomBuilder,
600 StateTestEvent, SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
601 };
602 use ruma::{
603 api::client::keys::{get_keys, get_keys::v3::Response as KeyQueryResponse},
604 events::room::member::MembershipState,
605 owned_user_id, OwnedUserId, TransactionId, UserId,
606 };
607 use serde_json::json;
608 use wiremock::{
609 matchers::{header, method, path_regex},
610 Mock, MockServer, ResponseTemplate,
611 };
612
613 use crate::{
614 encryption::identities::UserIdentity, test_utils::logged_in_client, Client, Room,
615 };
616
617 pub(super) struct TestSetup {
627 client: Client,
628 bob_user_id: OwnedUserId,
629 sync_response_builder: SyncResponseBuilder,
630 room: Room,
631 }
632
633 impl TestSetup {
634 pub(super) async fn new_just_me_room() -> Self {
635 let (client, user_id, mut sync_response_builder) = Self::init().await;
636 let room = create_just_me_room(&client, &mut sync_response_builder).await;
637 Self { client, bob_user_id: user_id, sync_response_builder, room }
638 }
639
640 pub(super) async fn new_room_with_other_bob() -> Self {
641 let (client, bob_user_id, mut sync_response_builder) = Self::init().await;
642 let room = create_room_with_other_member(
643 &mut sync_response_builder,
644 &client,
645 &bob_user_id,
646 )
647 .await;
648 Self { client, bob_user_id, sync_response_builder, room }
649 }
650
651 pub(super) fn bob_user_id(&self) -> &UserId {
652 &self.bob_user_id
653 }
654
655 pub(super) async fn pin_bob(&self) {
656 if self.bob_user_identity().await.is_some() {
657 assert!(
658 !self.bob_is_pinned().await,
659 "pin_bob() called when the identity is already pinned!"
660 );
661
662 self.bob_user_identity()
664 .await
665 .expect("User should exist")
666 .pin()
667 .await
668 .expect("Should not fail to pin");
669 } else {
670 self.change_bob_identity(IdentityChangeDataSet::key_query_with_identity_a())
672 .await;
673 }
674
675 assert!(self.bob_is_pinned().await);
677 }
678
679 pub(super) async fn unpin_bob(&self) {
680 self.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_b()).await;
681 }
682
683 pub(super) async fn unpin_bob_with(&self, requested: KeyQueryResponse) {
684 fn master_key_json(key_query_response: &KeyQueryResponse) -> String {
685 serde_json::to_string(
686 key_query_response
687 .master_keys
688 .first_key_value()
689 .expect("Master key should have a value")
690 .1,
691 )
692 .expect("Should be able to serialise master key")
693 }
694
695 let a = IdentityChangeDataSet::key_query_with_identity_a();
696 let b = IdentityChangeDataSet::key_query_with_identity_b();
697 let requested_master_key = master_key_json(&requested);
698 let a_master_key = master_key_json(&a);
699
700 if requested_master_key == a_master_key {
704 self.change_bob_identity(b).await;
705 if !self.bob_is_pinned().await {
706 self.pin_bob().await;
707 }
708 self.change_bob_identity(a).await;
709 } else {
710 self.change_bob_identity(a).await;
711 if !self.bob_is_pinned().await {
712 self.pin_bob().await;
713 }
714 self.change_bob_identity(b).await;
715 }
716
717 assert!(!self.bob_is_pinned().await);
719 }
720
721 pub(super) async fn verify_bob(&self) {
722 self.verify_bob_with(
723 IdentityChangeDataSet::key_query_with_identity_a(),
724 IdentityChangeDataSet::master_signing_keys_a(),
725 IdentityChangeDataSet::self_signing_keys_a(),
726 )
727 .await;
728 }
729
730 pub(super) async fn verify_bob_with(
731 &self,
732 key_query: KeyQueryResponse,
733 master_signing_key: serde_json::Value,
734 self_signing_key: serde_json::Value,
735 ) {
736 self.change_bob_identity(key_query).await;
738
739 let my_user_id = self.client.user_id().expect("I should have a user id");
740 let my_identity = self
741 .client
742 .encryption()
743 .get_user_identity(my_user_id)
744 .await
745 .expect("Should not fail to get own user identity")
746 .expect("Should have an own user identity")
747 .underlying_identity()
748 .own()
749 .expect("Our own identity should be of type Own");
750
751 let signature_upload_request = self
753 .bob_crypto_other_identity()
754 .await
755 .verify()
756 .await
757 .expect("Should be able to verify other identity");
758
759 let verification_response = simulate_key_query_response_for_verification(
760 signature_upload_request,
761 my_identity,
762 my_user_id,
763 self.bob_user_id(),
764 master_signing_key,
765 self_signing_key,
766 );
767
768 self.client
770 .mark_request_as_sent(&TransactionId::new(), &verification_response)
771 .await
772 .unwrap();
773
774 assert!(self.bob_is_verified().await);
776 }
777
778 pub(super) async fn bob_joins(&mut self) {
779 self.bob_membership_change(MembershipState::Join).await;
780 }
781
782 pub(super) async fn bob_leaves(&mut self) {
783 self.bob_membership_change(MembershipState::Leave).await;
784 }
785
786 pub(super) async fn subscribe_to_identity_status_changes(
787 &self,
788 ) -> impl Stream<Item = Vec<IdentityStatusChange>> {
789 self.room
790 .subscribe_to_identity_status_changes()
791 .await
792 .expect("Should be able to subscribe")
793 }
794
795 async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
796 let (client, _server) = create_client_and_server().await;
797
798 client
800 .olm_machine()
801 .await
802 .as_ref()
803 .expect("We should have an Olm machine")
804 .bootstrap_cross_signing(true)
805 .await
806 .expect("Should be able to bootstrap cross-signing");
807
808 let bob_user_id = owned_user_id!("@bob:localhost");
811
812 let sync_response_builder = SyncResponseBuilder::default();
813
814 (client, bob_user_id, sync_response_builder)
815 }
816
817 async fn change_bob_identity(
818 &self,
819 key_query_response: get_keys::v3::Response,
820 ) -> OtherUserIdentity {
821 self.client
822 .mark_request_as_sent(&TransactionId::new(), &key_query_response)
823 .await
824 .expect("Should not fail to send identity changes");
825
826 self.bob_crypto_other_identity().await
827 }
828
829 async fn bob_membership_change(&mut self, new_state: MembershipState) {
830 let sync_response = self
831 .sync_response_builder
832 .add_joined_room(JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
833 StateTestEvent::Custom(sync_response_member(
834 &self.bob_user_id,
835 new_state.clone(),
836 )),
837 ))
838 .build_sync_response();
839 self.room.client.process_sync(sync_response).await.unwrap();
840
841 let m = self
843 .room
844 .get_member_no_sync(&self.bob_user_id)
845 .await
846 .expect("Should not fail to get member");
847
848 match (&new_state, m) {
849 (MembershipState::Leave, None) => {}
850 (_, None) => {
851 panic!("Member should exist")
852 }
853 (_, Some(m)) => {
854 assert_eq!(*m.membership(), new_state);
855 }
856 };
857 }
858
859 async fn bob_is_pinned(&self) -> bool {
860 !self.bob_crypto_other_identity().await.identity_needs_user_approval()
861 }
862
863 async fn bob_is_verified(&self) -> bool {
864 self.bob_crypto_other_identity().await.is_verified()
865 }
866
867 async fn bob_crypto_other_identity(&self) -> OtherUserIdentity {
868 self.bob_user_identity()
869 .await
870 .expect("User identity should exist")
871 .underlying_identity()
872 .other()
873 .expect("Identity should be Other, not Own")
874 }
875
876 async fn bob_user_identity(&self) -> Option<UserIdentity> {
877 self.client
878 .encryption()
879 .get_user_identity(&self.bob_user_id)
880 .await
881 .expect("Should not fail to get user identity")
882 }
883 }
884
885 async fn create_just_me_room(
886 client: &Client,
887 sync_response_builder: &mut SyncResponseBuilder,
888 ) -> Room {
889 let create_room_sync_response = sync_response_builder
890 .add_joined_room(
891 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
892 .add_state_event(StateTestEvent::Member),
893 )
894 .build_sync_response();
895 client.process_sync(create_room_sync_response).await.unwrap();
896 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
897 assert_eq!(room.state(), RoomState::Joined);
898 room
899 }
900
901 async fn create_room_with_other_member(
902 builder: &mut SyncResponseBuilder,
903 client: &Client,
904 other_user_id: &UserId,
905 ) -> Room {
906 let create_room_sync_response = builder
907 .add_joined_room(
908 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
909 .add_state_event(StateTestEvent::Member)
910 .add_state_event(StateTestEvent::Custom(sync_response_member(
911 other_user_id,
912 MembershipState::Join,
913 ))),
914 )
915 .build_sync_response();
916 client.process_sync(create_room_sync_response).await.unwrap();
917 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
918 room.inner.mark_members_synced();
919
920 assert_eq!(room.state(), RoomState::Joined);
921 assert_eq!(
922 *room
923 .get_member_no_sync(other_user_id)
924 .await
925 .expect("Should not fail to get member")
926 .expect("Member should exist")
927 .membership(),
928 MembershipState::Join
929 );
930 room
931 }
932
933 async fn create_client_and_server() -> (Client, MockServer) {
934 let server = MockServer::start().await;
935 mock_members_request(&server).await;
936 mock_secret_storage_default_key(&server).await;
937 let client = logged_in_client(Some(server.uri())).await;
938 (client, server)
939 }
940
941 async fn mock_members_request(server: &MockServer) {
942 Mock::given(method("GET"))
943 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
944 .and(header("authorization", "Bearer 1234"))
945 .respond_with(
946 ResponseTemplate::new(200).set_body_json(&*test_json::members::MEMBERS),
947 )
948 .mount(server)
949 .await;
950 }
951
952 async fn mock_secret_storage_default_key(server: &MockServer) {
953 Mock::given(method("GET"))
954 .and(path_regex(
955 r"^/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
956 ))
957 .and(header("authorization", "Bearer 1234"))
958 .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
959 .mount(server)
960 .await;
961 }
962
963 fn sync_response_member(
964 user_id: &UserId,
965 membership: MembershipState,
966 ) -> serde_json::Value {
967 json!({
968 "content": {
969 "membership": membership.to_string(),
970 },
971 "event_id": format!(
972 "$aa{}bb:localhost",
973 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() % 100_000
974 ),
975 "origin_server_ts": 1472735824,
976 "sender": "@example:localhost",
977 "state_key": user_id,
978 "type": "m.room.member",
979 "unsigned": {
980 "age": 1234
981 }
982 })
983 }
984 }
985}