1#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19 collections::{BTreeMap, BTreeSet, HashMap},
20 fmt,
21 ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27#[cfg(feature = "e2e-encryption")]
28use matrix_sdk_crypto::{
29 store::DynCryptoStore, types::requests::ToDeviceRequest, CollectStrategy, EncryptionSettings,
30 OlmError, OlmMachine, TrustRequirement,
31};
32#[cfg(feature = "e2e-encryption")]
33use ruma::events::room::{history_visibility::HistoryVisibility, member::MembershipState};
34#[cfg(doc)]
35use ruma::DeviceId;
36use ruma::{
37 api::client::{self as api, sync::sync_events::v5},
38 events::{
39 ignored_user_list::IgnoredUserListEventContent,
40 push_rules::{PushRulesEvent, PushRulesEventContent},
41 room::member::SyncRoomMemberEvent,
42 StateEvent, StateEventType,
43 },
44 push::Ruleset,
45 time::Instant,
46 OwnedRoomId, OwnedUserId, RoomId, UserId,
47};
48use tokio::sync::{broadcast, Mutex};
49#[cfg(feature = "e2e-encryption")]
50use tokio::sync::{RwLock, RwLockReadGuard};
51use tracing::{debug, enabled, info, instrument, warn, Level};
52
53#[cfg(feature = "e2e-encryption")]
54use crate::RoomMemberships;
55use crate::{
56 deserialized_responses::DisplayName,
57 error::{Error, Result},
58 event_cache::store::EventCacheStoreLock,
59 response_processors::{self as processors, Context},
60 room::{
61 Room, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate, RoomState,
62 },
63 store::{
64 ambiguity_map::AmbiguityCache, BaseStateStore, DynStateStore, MemoryStore,
65 Result as StoreResult, RoomLoadSettings, StateChanges, StateStoreDataKey,
66 StateStoreDataValue, StateStoreExt, StoreConfig,
67 },
68 sync::{RoomUpdates, SyncResponse},
69 RoomStateFilter, SessionMeta,
70};
71
72#[derive(Clone)]
86pub struct BaseClient {
87 pub(crate) state_store: BaseStateStore,
89
90 event_cache_store: EventCacheStoreLock,
92
93 #[cfg(feature = "e2e-encryption")]
98 crypto_store: Arc<DynCryptoStore>,
99
100 #[cfg(feature = "e2e-encryption")]
104 olm_machine: Arc<RwLock<Option<OlmMachine>>>,
105
106 pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
108
109 pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
112
113 #[cfg(feature = "e2e-encryption")]
116 pub room_key_recipient_strategy: CollectStrategy,
117
118 #[cfg(feature = "e2e-encryption")]
120 pub decryption_trust_requirement: TrustRequirement,
121
122 #[cfg(feature = "e2e-encryption")]
124 pub handle_verification_events: bool,
125}
126
127#[cfg(not(tarpaulin_include))]
128impl fmt::Debug for BaseClient {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 f.debug_struct("BaseClient")
131 .field("session_meta", &self.state_store.session_meta())
132 .field("sync_token", &self.state_store.sync_token)
133 .finish_non_exhaustive()
134 }
135}
136
137impl BaseClient {
138 pub fn new(config: StoreConfig) -> Self {
145 let store = BaseStateStore::new(config.state_store);
146
147 let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
157 broadcast::channel(500);
158
159 BaseClient {
160 state_store: store,
161 event_cache_store: config.event_cache_store,
162 #[cfg(feature = "e2e-encryption")]
163 crypto_store: config.crypto_store,
164 #[cfg(feature = "e2e-encryption")]
165 olm_machine: Default::default(),
166 ignore_user_list_changes: Default::default(),
167 room_info_notable_update_sender,
168 #[cfg(feature = "e2e-encryption")]
169 room_key_recipient_strategy: Default::default(),
170 #[cfg(feature = "e2e-encryption")]
171 decryption_trust_requirement: TrustRequirement::Untrusted,
172 #[cfg(feature = "e2e-encryption")]
173 handle_verification_events: true,
174 }
175 }
176
177 #[cfg(feature = "e2e-encryption")]
180 pub async fn clone_with_in_memory_state_store(
181 &self,
182 cross_process_store_locks_holder_name: &str,
183 handle_verification_events: bool,
184 ) -> Result<Self> {
185 let config = StoreConfig::new(cross_process_store_locks_holder_name.to_owned())
186 .state_store(MemoryStore::new());
187 let config = config.crypto_store(self.crypto_store.clone());
188
189 let copy = Self {
190 state_store: BaseStateStore::new(config.state_store),
191 event_cache_store: config.event_cache_store,
192 crypto_store: self.crypto_store.clone(),
199 olm_machine: self.olm_machine.clone(),
200 ignore_user_list_changes: Default::default(),
201 room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
202 room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
203 decryption_trust_requirement: self.decryption_trust_requirement,
204 handle_verification_events,
205 };
206
207 copy.state_store
208 .derive_from_other(&self.state_store, ©.room_info_notable_update_sender)
209 .await?;
210
211 Ok(copy)
212 }
213
214 #[cfg(not(feature = "e2e-encryption"))]
217 #[allow(clippy::unused_async)]
218 pub async fn clone_with_in_memory_state_store(
219 &self,
220 cross_process_store_locks_holder: &str,
221 _handle_verification_events: bool,
222 ) -> Result<Self> {
223 let config = StoreConfig::new(cross_process_store_locks_holder.to_owned())
224 .state_store(MemoryStore::new());
225 Ok(Self::new(config))
226 }
227
228 pub fn session_meta(&self) -> Option<&SessionMeta> {
234 self.state_store.session_meta()
235 }
236
237 pub fn rooms(&self) -> Vec<Room> {
239 self.state_store.rooms()
240 }
241
242 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
244 self.state_store.rooms_filtered(filter)
245 }
246
247 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
250 self.state_store.rooms_stream()
251 }
252
253 pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
256 self.state_store.get_or_create_room(
257 room_id,
258 room_state,
259 self.room_info_notable_update_sender.clone(),
260 )
261 }
262
263 pub fn state_store(&self) -> &DynStateStore {
265 self.state_store.deref()
266 }
267
268 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
270 &self.event_cache_store
271 }
272
273 pub fn is_active(&self) -> bool {
277 self.state_store.session_meta().is_some()
278 }
279
280 pub async fn activate(
312 &self,
313 session_meta: SessionMeta,
314 room_load_settings: RoomLoadSettings,
315 #[cfg(feature = "e2e-encryption")] custom_account: Option<
316 crate::crypto::vodozemac::olm::Account,
317 >,
318 ) -> Result<()> {
319 debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
320
321 self.state_store
322 .load_rooms(
323 &session_meta.user_id,
324 room_load_settings,
325 &self.room_info_notable_update_sender,
326 )
327 .await?;
328 self.state_store.load_sync_token().await?;
329 self.state_store.set_session_meta(session_meta);
330
331 #[cfg(feature = "e2e-encryption")]
332 self.regenerate_olm(custom_account).await?;
333
334 Ok(())
335 }
336
337 #[cfg(feature = "e2e-encryption")]
341 pub async fn regenerate_olm(
342 &self,
343 custom_account: Option<crate::crypto::vodozemac::olm::Account>,
344 ) -> Result<()> {
345 tracing::debug!("regenerating OlmMachine");
346 let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
347
348 let olm_machine = OlmMachine::with_store(
351 &session_meta.user_id,
352 &session_meta.device_id,
353 self.crypto_store.clone(),
354 custom_account,
355 )
356 .await
357 .map_err(OlmError::from)?;
358
359 *self.olm_machine.write().await = Some(olm_machine);
360 Ok(())
361 }
362
363 pub async fn sync_token(&self) -> Option<String> {
366 self.state_store.sync_token.read().await.clone()
367 }
368
369 pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
373 let room = self.state_store.get_or_create_room(
374 room_id,
375 RoomState::Knocked,
376 self.room_info_notable_update_sender.clone(),
377 );
378
379 if room.state() != RoomState::Knocked {
380 let _sync_lock = self.sync_lock().lock().await;
381
382 let mut room_info = room.clone_info();
383 room_info.mark_as_knocked();
384 room_info.mark_state_partially_synced();
385 room_info.mark_members_missing(); let mut changes = StateChanges::default();
387 changes.add_room(room_info.clone());
388 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
390 }
391
392 Ok(room)
393 }
394
395 pub async fn room_joined(&self, room_id: &RoomId) -> Result<Room> {
399 let room = self.state_store.get_or_create_room(
400 room_id,
401 RoomState::Joined,
402 self.room_info_notable_update_sender.clone(),
403 );
404
405 if room.state() != RoomState::Joined {
406 let _sync_lock = self.sync_lock().lock().await;
407
408 let mut room_info = room.clone_info();
409 room_info.mark_as_joined();
410 room_info.mark_state_partially_synced();
411 room_info.mark_members_missing(); let mut changes = StateChanges::default();
413 changes.add_room(room_info.clone());
414 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
416 }
417
418 Ok(room)
419 }
420
421 pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
425 let room = self.state_store.get_or_create_room(
426 room_id,
427 RoomState::Left,
428 self.room_info_notable_update_sender.clone(),
429 );
430
431 if room.state() != RoomState::Left {
432 let _sync_lock = self.sync_lock().lock().await;
433
434 let mut room_info = room.clone_info();
435 room_info.mark_as_left();
436 room_info.mark_state_partially_synced();
437 room_info.mark_members_missing(); let mut changes = StateChanges::default();
439 changes.add_room(room_info.clone());
440 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
442 }
443
444 Ok(())
445 }
446
447 pub fn sync_lock(&self) -> &Mutex<()> {
449 self.state_store.sync_lock()
450 }
451
452 #[instrument(skip_all)]
458 pub async fn receive_sync_response(
459 &self,
460 response: api::sync::sync_events::v3::Response,
461 ) -> Result<SyncResponse> {
462 self.receive_sync_response_with_requested_required_states(
463 response,
464 &RequestedRequiredStates::default(),
465 )
466 .await
467 }
468
469 pub async fn receive_sync_response_with_requested_required_states(
477 &self,
478 response: api::sync::sync_events::v3::Response,
479 requested_required_states: &RequestedRequiredStates,
480 ) -> Result<SyncResponse> {
481 if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
485 info!("Got the same sync response twice");
486 return Ok(SyncResponse::default());
487 }
488
489 let now = if enabled!(Level::INFO) { Some(Instant::now()) } else { None };
490
491 #[cfg(feature = "e2e-encryption")]
492 let olm_machine = self.olm_machine().await;
493
494 let mut context = Context::new(StateChanges::new(response.next_batch.clone()));
495
496 #[cfg(feature = "e2e-encryption")]
497 let to_device = {
498 let processors::e2ee::to_device::Output {
499 decrypted_to_device_events: to_device,
500 room_key_updates,
501 } = processors::e2ee::to_device::from_sync_v2(&response, olm_machine.as_ref()).await?;
502
503 processors::latest_event::decrypt_from_rooms(
504 &mut context,
505 room_key_updates
506 .into_iter()
507 .flatten()
508 .filter_map(|room_key_info| self.get_room(&room_key_info.room_id))
509 .collect(),
510 processors::e2ee::E2EE::new(
511 olm_machine.as_ref(),
512 self.decryption_trust_requirement,
513 self.handle_verification_events,
514 ),
515 )
516 .await?;
517
518 to_device
519 };
520
521 #[cfg(not(feature = "e2e-encryption"))]
522 let to_device = response.to_device.events;
523
524 let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
525
526 let global_account_data_processor =
527 processors::account_data::global(&response.account_data.events);
528
529 let push_rules = self.get_push_rules(&global_account_data_processor).await?;
530
531 let mut room_updates = RoomUpdates::default();
532 let mut notifications = Default::default();
533
534 let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
535 BTreeMap::new();
536
537 for (room_id, joined_room) in response.rooms.join {
538 let joined_room_update = processors::room::sync_v2::update_joined_room(
539 &mut context,
540 processors::room::RoomCreationData::new(
541 &room_id,
542 self.room_info_notable_update_sender.clone(),
543 requested_required_states,
544 &mut ambiguity_cache,
545 ),
546 joined_room,
547 &mut updated_members_in_room,
548 processors::notification::Notification::new(
549 &push_rules,
550 &mut notifications,
551 &self.state_store,
552 ),
553 #[cfg(feature = "e2e-encryption")]
554 processors::e2ee::E2EE::new(
555 olm_machine.as_ref(),
556 self.decryption_trust_requirement,
557 self.handle_verification_events,
558 ),
559 )
560 .await?;
561
562 room_updates.joined.insert(room_id, joined_room_update);
563 }
564
565 for (room_id, left_room) in response.rooms.leave {
566 let left_room_update = processors::room::sync_v2::update_left_room(
567 &mut context,
568 processors::room::RoomCreationData::new(
569 &room_id,
570 self.room_info_notable_update_sender.clone(),
571 requested_required_states,
572 &mut ambiguity_cache,
573 ),
574 left_room,
575 processors::notification::Notification::new(
576 &push_rules,
577 &mut notifications,
578 &self.state_store,
579 ),
580 #[cfg(feature = "e2e-encryption")]
581 processors::e2ee::E2EE::new(
582 olm_machine.as_ref(),
583 self.decryption_trust_requirement,
584 self.handle_verification_events,
585 ),
586 )
587 .await?;
588
589 room_updates.left.insert(room_id, left_room_update);
590 }
591
592 for (room_id, invited_room) in response.rooms.invite {
593 let invited_room_update = processors::room::sync_v2::update_invited_room(
594 &mut context,
595 &room_id,
596 invited_room,
597 self.room_info_notable_update_sender.clone(),
598 processors::notification::Notification::new(
599 &push_rules,
600 &mut notifications,
601 &self.state_store,
602 ),
603 )
604 .await?;
605
606 room_updates.invited.insert(room_id, invited_room_update);
607 }
608
609 for (room_id, knocked_room) in response.rooms.knock {
610 let knocked_room_update = processors::room::sync_v2::update_knocked_room(
611 &mut context,
612 &room_id,
613 knocked_room,
614 self.room_info_notable_update_sender.clone(),
615 processors::notification::Notification::new(
616 &push_rules,
617 &mut notifications,
618 &self.state_store,
619 ),
620 )
621 .await?;
622
623 room_updates.knocked.insert(room_id, knocked_room_update);
624 }
625
626 global_account_data_processor.apply(&mut context, &self.state_store).await;
627
628 context.state_changes.presence = response
629 .presence
630 .events
631 .iter()
632 .filter_map(|e| {
633 let event = e.deserialize().ok()?;
634 Some((event.sender, e.clone()))
635 })
636 .collect();
637
638 context.state_changes.ambiguity_maps = ambiguity_cache.cache;
639
640 {
641 let _sync_lock = self.sync_lock().lock().await;
642
643 processors::changes::save_and_apply(
644 context,
645 &self.state_store,
646 &self.ignore_user_list_changes,
647 Some(response.next_batch.clone()),
648 )
649 .await?;
650 }
651
652 let mut context = Context::default();
653
654 processors::room::display_name::update_for_rooms(
657 &mut context,
658 &room_updates,
659 &self.state_store,
660 )
661 .await;
662
663 processors::changes::save_only(context, &self.state_store).await?;
665
666 for (room_id, member_ids) in updated_members_in_room {
667 if let Some(room) = self.get_room(&room_id) {
668 let _ =
669 room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
670 }
671 }
672
673 if enabled!(Level::INFO) {
674 info!("Processed a sync response in {:?}", now.map(|now| now.elapsed()));
675 }
676
677 let response = SyncResponse {
678 rooms: room_updates,
679 presence: response.presence.events,
680 account_data: response.account_data.events,
681 to_device,
682 notifications,
683 };
684
685 Ok(response)
686 }
687
688 #[instrument(skip_all, fields(?room_id))]
700 pub async fn receive_all_members(
701 &self,
702 room_id: &RoomId,
703 request: &api::membership::get_member_events::v3::Request,
704 response: &api::membership::get_member_events::v3::Response,
705 ) -> Result<()> {
706 if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
707 {
708 return Err(Error::InvalidReceiveMembersParameters);
712 }
713
714 let Some(room) = self.state_store.room(room_id) else {
715 return Ok(());
717 };
718
719 let mut chunk = Vec::with_capacity(response.chunk.len());
720 let mut context = Context::default();
721
722 #[cfg(feature = "e2e-encryption")]
723 let mut user_ids = BTreeSet::new();
724
725 let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
726
727 for raw_event in &response.chunk {
728 let member = match raw_event.deserialize() {
729 Ok(ev) => ev,
730 Err(e) => {
731 let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
732 debug!(event_id, "Failed to deserialize member event: {e}");
733 continue;
734 }
735 };
736
737 #[cfg(feature = "e2e-encryption")]
747 match member.membership() {
748 MembershipState::Join | MembershipState::Invite => {
749 user_ids.insert(member.state_key().to_owned());
750 }
751 _ => (),
752 }
753
754 if let StateEvent::Original(e) = &member {
755 if let Some(d) = &e.content.displayname {
756 let display_name = DisplayName::new(d);
757 ambiguity_map
758 .entry(display_name)
759 .or_default()
760 .insert(member.state_key().clone());
761 }
762 }
763
764 let sync_member: SyncRoomMemberEvent = member.clone().into();
765 processors::profiles::upsert_or_delete(&mut context, room_id, &sync_member);
766
767 context
768 .state_changes
769 .state
770 .entry(room_id.to_owned())
771 .or_default()
772 .entry(member.event_type())
773 .or_default()
774 .insert(member.state_key().to_string(), raw_event.clone().cast());
775 chunk.push(member);
776 }
777
778 #[cfg(feature = "e2e-encryption")]
779 processors::e2ee::tracked_users::update(
780 self.olm_machine().await.as_ref(),
781 room.encryption_state(),
782 &user_ids,
783 )
784 .await?;
785
786 context.state_changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
787
788 let _sync_lock = self.sync_lock().lock().await;
789 let mut room_info = room.clone_info();
790 room_info.mark_members_synced();
791 context.state_changes.add_room(room_info);
792
793 processors::changes::save_and_apply(
794 context,
795 &self.state_store,
796 &self.ignore_user_list_changes,
797 None,
798 )
799 .await?;
800
801 let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
802
803 Ok(())
804 }
805
806 pub async fn receive_filter_upload(
822 &self,
823 filter_name: &str,
824 response: &api::filter::create_filter::v3::Response,
825 ) -> Result<()> {
826 Ok(self
827 .state_store
828 .set_kv_data(
829 StateStoreDataKey::Filter(filter_name),
830 StateStoreDataValue::Filter(response.filter_id.clone()),
831 )
832 .await?)
833 }
834
835 pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
847 let filter = self
848 .state_store
849 .get_kv_data(StateStoreDataKey::Filter(filter_name))
850 .await?
851 .map(|d| d.into_filter().expect("State store data not a filter"));
852
853 Ok(filter)
854 }
855
856 #[cfg(feature = "e2e-encryption")]
858 pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
859 match self.olm_machine().await.as_ref() {
860 Some(o) => {
861 let Some(room) = self.get_room(room_id) else {
862 return Err(Error::InsufficientData);
863 };
864
865 let history_visibility = room.history_visibility_or_default();
866 let Some(room_encryption_event) = room.encryption_settings() else {
867 return Err(Error::EncryptionNotEnabled);
868 };
869
870 let filter = if history_visibility == HistoryVisibility::Joined {
873 RoomMemberships::JOIN
874 } else {
875 RoomMemberships::ACTIVE
876 };
877
878 let members = self.state_store.get_user_ids(room_id, filter).await?;
879
880 let settings = EncryptionSettings::new(
881 room_encryption_event,
882 history_visibility,
883 self.room_key_recipient_strategy.clone(),
884 );
885
886 Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
887 }
888 None => panic!("Olm machine wasn't started"),
889 }
890 }
891
892 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
898 self.state_store.room(room_id)
899 }
900
901 pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
909 self.state_store.forget_room(room_id).await?;
911
912 self.event_cache_store().lock().await?.remove_room(room_id).await?;
914
915 Ok(())
916 }
917
918 #[cfg(feature = "e2e-encryption")]
920 pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
921 self.olm_machine.read().await
922 }
923
924 pub(crate) async fn get_push_rules(
930 &self,
931 global_account_data_processor: &processors::account_data::Global,
932 ) -> Result<Ruleset> {
933 if let Some(event) = global_account_data_processor
934 .push_rules()
935 .and_then(|ev| ev.deserialize_as::<PushRulesEvent>().ok())
936 {
937 Ok(event.content.global)
938 } else if let Some(event) = self
939 .state_store
940 .get_account_data_event_static::<PushRulesEventContent>()
941 .await?
942 .and_then(|ev| ev.deserialize().ok())
943 {
944 Ok(event.content.global)
945 } else if let Some(session_meta) = self.state_store.session_meta() {
946 Ok(Ruleset::server_default(&session_meta.user_id))
947 } else {
948 Ok(Ruleset::new())
949 }
950 }
951
952 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
955 self.ignore_user_list_changes.subscribe()
956 }
957
958 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
962 self.room_info_notable_update_sender.subscribe()
963 }
964
965 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
967 match self.state_store.get_account_data_event_static::<IgnoredUserListEventContent>().await
968 {
969 Ok(Some(raw_ignored_user_list)) => match raw_ignored_user_list.deserialize() {
970 Ok(current_ignored_user_list) => {
971 current_ignored_user_list.content.ignored_users.contains_key(user_id)
972 }
973 Err(error) => {
974 warn!(?error, "Failed to deserialize the ignored user list event");
975 false
976 }
977 },
978 Ok(None) => false,
979 Err(error) => {
980 warn!(?error, "Could not get the ignored user list from the state store");
981 false
982 }
983 }
984 }
985}
986
987#[derive(Debug, Default)]
999pub struct RequestedRequiredStates {
1000 default: Vec<(StateEventType, String)>,
1001 for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1002}
1003
1004impl RequestedRequiredStates {
1005 pub fn new(
1010 default: Vec<(StateEventType, String)>,
1011 for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1012 ) -> Self {
1013 Self { default, for_rooms }
1014 }
1015
1016 pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
1018 self.for_rooms.get(room_id).unwrap_or(&self.default)
1019 }
1020}
1021
1022impl From<&v5::Request> for RequestedRequiredStates {
1023 fn from(request: &v5::Request) -> Self {
1024 let mut default = BTreeSet::new();
1031
1032 for list in request.lists.values() {
1033 default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1034 }
1035
1036 for room_subscription in request.room_subscriptions.values() {
1037 default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1038 }
1039
1040 Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1041 }
1042}
1043
1044#[cfg(test)]
1045mod tests {
1046 use std::collections::HashMap;
1047
1048 use assert_matches2::assert_let;
1049 use futures_util::FutureExt as _;
1050 use matrix_sdk_test::{
1051 async_test, event_factory::EventFactory, ruma_response_from_json, InvitedRoomBuilder,
1052 LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder, BOB,
1053 };
1054 use ruma::{
1055 api::client::{self as api, sync::sync_events::v5},
1056 event_id,
1057 events::{room::member::MembershipState, StateEventType},
1058 room_id,
1059 serde::Raw,
1060 user_id,
1061 };
1062 use serde_json::{json, value::to_raw_value};
1063
1064 use super::{BaseClient, RequestedRequiredStates};
1065 use crate::{
1066 store::{RoomLoadSettings, StateStoreExt, StoreConfig},
1067 test_utils::logged_in_base_client,
1068 RoomDisplayName, RoomState, SessionMeta,
1069 };
1070
1071 #[test]
1072 fn test_requested_required_states() {
1073 let room_id_0 = room_id!("!r0");
1074 let room_id_1 = room_id!("!r1");
1075
1076 let requested_required_states = RequestedRequiredStates::new(
1077 vec![(StateEventType::RoomAvatar, "".to_owned())],
1078 HashMap::from([(
1079 room_id_0.to_owned(),
1080 vec![
1081 (StateEventType::RoomMember, "foo".to_owned()),
1082 (StateEventType::RoomEncryption, "".to_owned()),
1083 ],
1084 )]),
1085 );
1086
1087 assert_eq!(
1089 requested_required_states.for_room(room_id_0),
1090 &[
1091 (StateEventType::RoomMember, "foo".to_owned()),
1092 (StateEventType::RoomEncryption, "".to_owned()),
1093 ]
1094 );
1095
1096 assert_eq!(
1098 requested_required_states.for_room(room_id_1),
1099 &[(StateEventType::RoomAvatar, "".to_owned()),]
1100 );
1101 }
1102
1103 #[test]
1104 fn test_requested_required_states_from_sync_v5_request() {
1105 let room_id_0 = room_id!("!r0");
1106 let room_id_1 = room_id!("!r1");
1107
1108 let mut request = v5::Request::new();
1110
1111 {
1112 let requested_required_states = RequestedRequiredStates::from(&request);
1113
1114 assert!(requested_required_states.default.is_empty());
1115 assert!(requested_required_states.for_rooms.is_empty());
1116 }
1117
1118 request.lists.insert("foo".to_owned(), {
1120 let mut list = v5::request::List::default();
1121 list.room_details.required_state = vec![
1122 (StateEventType::RoomAvatar, "".to_owned()),
1123 (StateEventType::RoomEncryption, "".to_owned()),
1124 ];
1125
1126 list
1127 });
1128
1129 {
1130 let requested_required_states = RequestedRequiredStates::from(&request);
1131
1132 assert_eq!(
1133 requested_required_states.default,
1134 &[
1135 (StateEventType::RoomAvatar, "".to_owned()),
1136 (StateEventType::RoomEncryption, "".to_owned())
1137 ]
1138 );
1139 assert!(requested_required_states.for_rooms.is_empty());
1140 }
1141
1142 request.lists.insert("bar".to_owned(), {
1144 let mut list = v5::request::List::default();
1145 list.room_details.required_state = vec![
1146 (StateEventType::RoomEncryption, "".to_owned()),
1147 (StateEventType::RoomName, "".to_owned()),
1148 ];
1149
1150 list
1151 });
1152
1153 {
1154 let requested_required_states = RequestedRequiredStates::from(&request);
1155
1156 assert_eq!(
1158 requested_required_states.default,
1159 &[
1160 (StateEventType::RoomAvatar, "".to_owned()),
1161 (StateEventType::RoomEncryption, "".to_owned()),
1162 (StateEventType::RoomName, "".to_owned()),
1163 ]
1164 );
1165 assert!(requested_required_states.for_rooms.is_empty());
1166 }
1167
1168 request.room_subscriptions.insert(room_id_0.to_owned(), {
1170 let mut room_subscription = v5::request::RoomSubscription::default();
1171
1172 room_subscription.required_state = vec![
1173 (StateEventType::RoomJoinRules, "".to_owned()),
1174 (StateEventType::RoomEncryption, "".to_owned()),
1175 ];
1176
1177 room_subscription
1178 });
1179
1180 {
1181 let requested_required_states = RequestedRequiredStates::from(&request);
1182
1183 assert_eq!(
1185 requested_required_states.default,
1186 &[
1187 (StateEventType::RoomAvatar, "".to_owned()),
1188 (StateEventType::RoomEncryption, "".to_owned()),
1189 (StateEventType::RoomJoinRules, "".to_owned()),
1190 (StateEventType::RoomName, "".to_owned()),
1191 ]
1192 );
1193 assert!(requested_required_states.for_rooms.is_empty());
1194 }
1195
1196 request.room_subscriptions.insert(room_id_1.to_owned(), {
1198 let mut room_subscription = v5::request::RoomSubscription::default();
1199
1200 room_subscription.required_state = vec![
1201 (StateEventType::RoomName, "".to_owned()),
1202 (StateEventType::RoomTopic, "".to_owned()),
1203 ];
1204
1205 room_subscription
1206 });
1207
1208 {
1209 let requested_required_states = RequestedRequiredStates::from(&request);
1210
1211 assert_eq!(
1213 requested_required_states.default,
1214 &[
1215 (StateEventType::RoomAvatar, "".to_owned()),
1216 (StateEventType::RoomEncryption, "".to_owned()),
1217 (StateEventType::RoomJoinRules, "".to_owned()),
1218 (StateEventType::RoomName, "".to_owned()),
1219 (StateEventType::RoomTopic, "".to_owned()),
1220 ]
1221 );
1222 }
1223 }
1224
1225 #[async_test]
1226 async fn test_invite_after_leaving() {
1227 let user_id = user_id!("@alice:example.org");
1228 let room_id = room_id!("!test:example.org");
1229
1230 let client = logged_in_base_client(Some(user_id)).await;
1231
1232 let mut sync_builder = SyncResponseBuilder::new();
1233
1234 let response = sync_builder
1235 .add_left_room(
1236 LeftRoomBuilder::new(room_id).add_timeline_event(
1237 EventFactory::new()
1238 .member(user_id)
1239 .membership(MembershipState::Leave)
1240 .display_name("Alice")
1241 .event_id(event_id!("$994173582443PhrSn:example.org")),
1242 ),
1243 )
1244 .build_sync_response();
1245 client.receive_sync_response(response).await.unwrap();
1246 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1247
1248 let response = sync_builder
1249 .add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
1250 StrippedStateTestEvent::Custom(json!({
1251 "content": {
1252 "displayname": "Alice",
1253 "membership": "invite",
1254 },
1255 "event_id": "$143273582443PhrSn:example.org",
1256 "origin_server_ts": 1432735824653u64,
1257 "sender": "@example:example.org",
1258 "state_key": user_id,
1259 "type": "m.room.member",
1260 })),
1261 ))
1262 .build_sync_response();
1263 client.receive_sync_response(response).await.unwrap();
1264 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1265 }
1266
1267 #[async_test]
1268 async fn test_invite_displayname() {
1269 let user_id = user_id!("@alice:example.org");
1270 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1271
1272 let client = logged_in_base_client(Some(user_id)).await;
1273
1274 let response = ruma_response_from_json(&json!({
1275 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1276 "device_one_time_keys_count": {
1277 "signed_curve25519": 50u64
1278 },
1279 "device_unused_fallback_key_types": [
1280 "signed_curve25519"
1281 ],
1282 "rooms": {
1283 "invite": {
1284 "!ithpyNKDtmhneaTQja:example.org": {
1285 "invite_state": {
1286 "events": [
1287 {
1288 "content": {
1289 "creator": "@test:example.org",
1290 "room_version": "9"
1291 },
1292 "sender": "@test:example.org",
1293 "state_key": "",
1294 "type": "m.room.create"
1295 },
1296 {
1297 "content": {
1298 "join_rule": "invite"
1299 },
1300 "sender": "@test:example.org",
1301 "state_key": "",
1302 "type": "m.room.join_rules"
1303 },
1304 {
1305 "content": {
1306 "algorithm": "m.megolm.v1.aes-sha2"
1307 },
1308 "sender": "@test:example.org",
1309 "state_key": "",
1310 "type": "m.room.encryption"
1311 },
1312 {
1313 "content": {
1314 "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1315 "displayname": "Kyra",
1316 "membership": "join"
1317 },
1318 "sender": "@test:example.org",
1319 "state_key": "@test:example.org",
1320 "type": "m.room.member"
1321 },
1322 {
1323 "content": {
1324 "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1325 "displayname": "alice",
1326 "is_direct": true,
1327 "membership": "invite"
1328 },
1329 "origin_server_ts": 1650878657984u64,
1330 "sender": "@test:example.org",
1331 "state_key": "@alice:example.org",
1332 "type": "m.room.member",
1333 "unsigned": {
1334 "age": 14u64
1335 },
1336 "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1337 }
1338 ]
1339 }
1340 }
1341 }
1342 }
1343 }));
1344
1345 client.receive_sync_response(response).await.unwrap();
1346
1347 let room = client.get_room(room_id).expect("Room not found");
1348 assert_eq!(room.state(), RoomState::Invited);
1349 assert_eq!(
1350 room.compute_display_name().await.expect("fetching display name failed").into_inner(),
1351 RoomDisplayName::Calculated("Kyra".to_owned())
1352 );
1353 }
1354
1355 #[async_test]
1356 async fn test_deserialization_failure() {
1357 let user_id = user_id!("@alice:example.org");
1358 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1359
1360 let client =
1361 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1362 client
1363 .activate(
1364 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1365 RoomLoadSettings::default(),
1366 #[cfg(feature = "e2e-encryption")]
1367 None,
1368 )
1369 .await
1370 .unwrap();
1371
1372 let response = ruma_response_from_json(&json!({
1373 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1374 "rooms": {
1375 "join": {
1376 "!ithpyNKDtmhneaTQja:example.org": {
1377 "state": {
1378 "events": [
1379 {
1380 "invalid": "invalid",
1381 },
1382 {
1383 "content": {
1384 "name": "The room name"
1385 },
1386 "event_id": "$143273582443PhrSn:example.org",
1387 "origin_server_ts": 1432735824653u64,
1388 "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1389 "sender": "@example:example.org",
1390 "state_key": "",
1391 "type": "m.room.name",
1392 "unsigned": {
1393 "age": 1234
1394 }
1395 },
1396 ]
1397 }
1398 }
1399 }
1400 }
1401 }));
1402
1403 client.receive_sync_response(response).await.unwrap();
1404 client
1405 .state_store()
1406 .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
1407 .await
1408 .expect("Failed to fetch state event")
1409 .expect("State event not found")
1410 .deserialize()
1411 .expect("Failed to deserialize state event");
1412 }
1413
1414 #[async_test]
1415 async fn test_invited_members_arent_ignored() {
1416 let user_id = user_id!("@alice:example.org");
1417 let inviter_user_id = user_id!("@bob:example.org");
1418 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1419
1420 let client =
1421 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1422 client
1423 .activate(
1424 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1425 RoomLoadSettings::default(),
1426 #[cfg(feature = "e2e-encryption")]
1427 None,
1428 )
1429 .await
1430 .unwrap();
1431
1432 let mut sync_builder = SyncResponseBuilder::new();
1434 let response = sync_builder
1435 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
1436 .build_sync_response();
1437 client.receive_sync_response(response).await.unwrap();
1438
1439 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1442
1443 let raw_member_event = json!({
1444 "content": {
1445 "avatar_url": "mxc://localhost/fewjilfewjil42",
1446 "displayname": "Invited Alice",
1447 "membership": "invite"
1448 },
1449 "event_id": "$151800140517rfvjc:localhost",
1450 "origin_server_ts": 151800140,
1451 "room_id": room_id,
1452 "sender": inviter_user_id,
1453 "state_key": user_id,
1454 "type": "m.room.member",
1455 "unsigned": {
1456 "age": 13374242,
1457 }
1458 });
1459 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1460 to_raw_value(&raw_member_event).unwrap(),
1461 )]);
1462
1463 client.receive_all_members(room_id, &request, &response).await.unwrap();
1465
1466 let room = client.get_room(room_id).unwrap();
1467
1468 let member = room.get_member(user_id).await.expect("ok").expect("exists");
1470
1471 assert_eq!(member.user_id(), user_id);
1472 assert_eq!(member.display_name().unwrap(), "Invited Alice");
1473 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1474 }
1475
1476 #[async_test]
1477 async fn test_reinvited_members_get_a_display_name() {
1478 let user_id = user_id!("@alice:example.org");
1479 let inviter_user_id = user_id!("@bob:example.org");
1480 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1481
1482 let client =
1483 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1484 client
1485 .activate(
1486 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1487 RoomLoadSettings::default(),
1488 #[cfg(feature = "e2e-encryption")]
1489 None,
1490 )
1491 .await
1492 .unwrap();
1493
1494 let mut sync_builder = SyncResponseBuilder::new();
1496 let response = sync_builder
1497 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
1498 StateTestEvent::Custom(json!({
1499 "content": {
1500 "avatar_url": null,
1501 "displayname": null,
1502 "membership": "leave"
1503 },
1504 "event_id": "$151803140217rkvjc:localhost",
1505 "origin_server_ts": 151800139,
1506 "room_id": room_id,
1507 "sender": user_id,
1508 "state_key": user_id,
1509 "type": "m.room.member",
1510 })),
1511 ))
1512 .build_sync_response();
1513 client.receive_sync_response(response).await.unwrap();
1514
1515 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1517
1518 let raw_member_event = json!({
1519 "content": {
1520 "avatar_url": "mxc://localhost/fewjilfewjil42",
1521 "displayname": "Invited Alice",
1522 "membership": "invite"
1523 },
1524 "event_id": "$151800140517rfvjc:localhost",
1525 "origin_server_ts": 151800140,
1526 "room_id": room_id,
1527 "sender": inviter_user_id,
1528 "state_key": user_id,
1529 "type": "m.room.member",
1530 "unsigned": {
1531 "age": 13374242,
1532 }
1533 });
1534 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1535 to_raw_value(&raw_member_event).unwrap(),
1536 )]);
1537
1538 client.receive_all_members(room_id, &request, &response).await.unwrap();
1540
1541 let room = client.get_room(room_id).unwrap();
1542
1543 let member = room.get_member(user_id).await.expect("ok").expect("exists");
1545
1546 assert_eq!(member.user_id(), user_id);
1547 assert_eq!(member.display_name().unwrap(), "Invited Alice");
1548 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1549 }
1550
1551 #[async_test]
1552 async fn test_ignored_user_list_changes() {
1553 let user_id = user_id!("@alice:example.org");
1554 let client =
1555 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1556
1557 client
1558 .activate(
1559 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1560 RoomLoadSettings::default(),
1561 #[cfg(feature = "e2e-encryption")]
1562 None,
1563 )
1564 .await
1565 .unwrap();
1566
1567 let mut subscriber = client.subscribe_to_ignore_user_list_changes();
1568 assert!(subscriber.next().now_or_never().is_none());
1569
1570 let mut sync_builder = SyncResponseBuilder::new();
1571 let response = sync_builder
1572 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1573 json!({
1574 "content": {
1575 "ignored_users": {
1576 *BOB: {}
1577 }
1578 },
1579 "type": "m.ignored_user_list",
1580 }),
1581 ))
1582 .build_sync_response();
1583 client.receive_sync_response(response).await.unwrap();
1584
1585 assert_let!(Some(ignored) = subscriber.next().await);
1586 assert_eq!(ignored, [BOB.to_string()]);
1587
1588 let response = sync_builder
1590 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1591 json!({
1592 "content": {
1593 "ignored_users": {
1594 *BOB: {}
1595 }
1596 },
1597 "type": "m.ignored_user_list",
1598 }),
1599 ))
1600 .build_sync_response();
1601 client.receive_sync_response(response).await.unwrap();
1602
1603 assert!(subscriber.next().now_or_never().is_none());
1605
1606 let response = sync_builder
1608 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1609 json!({
1610 "content": {
1611 "ignored_users": {}
1612 },
1613 "type": "m.ignored_user_list",
1614 }),
1615 ))
1616 .build_sync_response();
1617 client.receive_sync_response(response).await.unwrap();
1618
1619 assert_let!(Some(ignored) = subscriber.next().await);
1620 assert!(ignored.is_empty());
1621 }
1622
1623 #[async_test]
1624 async fn test_is_user_ignored() {
1625 let ignored_user_id = user_id!("@alice:example.org");
1626 let client = logged_in_base_client(None).await;
1627
1628 let mut sync_builder = SyncResponseBuilder::new();
1629 let response = sync_builder
1630 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1631 json!({
1632 "content": {
1633 "ignored_users": {
1634 ignored_user_id: {}
1635 }
1636 },
1637 "type": "m.ignored_user_list",
1638 }),
1639 ))
1640 .build_sync_response();
1641 client.receive_sync_response(response).await.unwrap();
1642
1643 assert!(client.is_user_ignored(ignored_user_id).await);
1644 }
1645}