1#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19 collections::{BTreeMap, BTreeSet, HashMap},
20 fmt,
21 ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27#[cfg(feature = "e2e-encryption")]
28use matrix_sdk_crypto::{
29 store::DynCryptoStore, types::requests::ToDeviceRequest, CollectStrategy, DecryptionSettings,
30 EncryptionSettings, OlmError, OlmMachine, TrustRequirement,
31};
32#[cfg(feature = "e2e-encryption")]
33use ruma::events::room::{history_visibility::HistoryVisibility, member::MembershipState};
34#[cfg(doc)]
35use ruma::DeviceId;
36use ruma::{
37 api::client::{self as api, sync::sync_events::v5},
38 events::{
39 ignored_user_list::IgnoredUserListEventContent,
40 push_rules::{PushRulesEvent, PushRulesEventContent},
41 room::member::SyncRoomMemberEvent,
42 StateEvent, StateEventType,
43 },
44 push::Ruleset,
45 time::Instant,
46 OwnedRoomId, OwnedUserId, RoomId, UserId,
47};
48use tokio::sync::{broadcast, Mutex};
49#[cfg(feature = "e2e-encryption")]
50use tokio::sync::{RwLock, RwLockReadGuard};
51use tracing::{debug, enabled, info, instrument, warn, Level};
52
53#[cfg(feature = "e2e-encryption")]
54use crate::RoomMemberships;
55use crate::{
56 deserialized_responses::DisplayName,
57 error::{Error, Result},
58 event_cache::store::EventCacheStoreLock,
59 response_processors::{self as processors, Context},
60 room::{
61 Room, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate, RoomState,
62 },
63 store::{
64 ambiguity_map::AmbiguityCache, BaseStateStore, DynStateStore, MemoryStore,
65 Result as StoreResult, RoomLoadSettings, StateChanges, StateStoreDataKey,
66 StateStoreDataValue, StateStoreExt, StoreConfig,
67 },
68 sync::{RoomUpdates, SyncResponse},
69 RoomStateFilter, SessionMeta,
70};
71
72#[derive(Clone)]
87pub struct BaseClient {
88 pub(crate) state_store: BaseStateStore,
90
91 event_cache_store: EventCacheStoreLock,
93
94 #[cfg(feature = "e2e-encryption")]
99 crypto_store: Arc<DynCryptoStore>,
100
101 #[cfg(feature = "e2e-encryption")]
105 olm_machine: Arc<RwLock<Option<OlmMachine>>>,
106
107 pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
109
110 pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
113
114 #[cfg(feature = "e2e-encryption")]
117 pub room_key_recipient_strategy: CollectStrategy,
118
119 #[cfg(feature = "e2e-encryption")]
121 pub decryption_settings: DecryptionSettings,
122
123 #[cfg(feature = "e2e-encryption")]
125 pub handle_verification_events: bool,
126
127 pub threading_support: ThreadingSupport,
129}
130
131#[cfg(not(tarpaulin_include))]
132impl fmt::Debug for BaseClient {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 f.debug_struct("BaseClient")
135 .field("session_meta", &self.state_store.session_meta())
136 .field("sync_token", &self.state_store.sync_token)
137 .finish_non_exhaustive()
138 }
139}
140
141#[derive(Clone, Copy, Debug)]
153pub enum ThreadingSupport {
154 Enabled,
156 Disabled,
158}
159
160impl BaseClient {
161 pub fn new(config: StoreConfig, threading_support: ThreadingSupport) -> Self {
168 let store = BaseStateStore::new(config.state_store);
169
170 let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
180 broadcast::channel(500);
181
182 BaseClient {
183 state_store: store,
184 event_cache_store: config.event_cache_store,
185 #[cfg(feature = "e2e-encryption")]
186 crypto_store: config.crypto_store,
187 #[cfg(feature = "e2e-encryption")]
188 olm_machine: Default::default(),
189 ignore_user_list_changes: Default::default(),
190 room_info_notable_update_sender,
191 #[cfg(feature = "e2e-encryption")]
192 room_key_recipient_strategy: Default::default(),
193 #[cfg(feature = "e2e-encryption")]
194 decryption_settings: DecryptionSettings {
195 sender_device_trust_requirement: TrustRequirement::Untrusted,
196 },
197 #[cfg(feature = "e2e-encryption")]
198 handle_verification_events: true,
199 threading_support,
200 }
201 }
202
203 #[cfg(feature = "e2e-encryption")]
206 pub async fn clone_with_in_memory_state_store(
207 &self,
208 cross_process_store_locks_holder_name: &str,
209 handle_verification_events: bool,
210 ) -> Result<Self> {
211 let config = StoreConfig::new(cross_process_store_locks_holder_name.to_owned())
212 .state_store(MemoryStore::new());
213 let config = config.crypto_store(self.crypto_store.clone());
214
215 let copy = Self {
216 state_store: BaseStateStore::new(config.state_store),
217 event_cache_store: config.event_cache_store,
218 crypto_store: self.crypto_store.clone(),
225 olm_machine: self.olm_machine.clone(),
226 ignore_user_list_changes: Default::default(),
227 room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
228 room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
229 decryption_settings: self.decryption_settings.clone(),
230 handle_verification_events,
231 threading_support: self.threading_support,
232 };
233
234 copy.state_store
235 .derive_from_other(&self.state_store, ©.room_info_notable_update_sender)
236 .await?;
237
238 Ok(copy)
239 }
240
241 #[cfg(not(feature = "e2e-encryption"))]
244 #[allow(clippy::unused_async)]
245 pub async fn clone_with_in_memory_state_store(
246 &self,
247 cross_process_store_locks_holder: &str,
248 _handle_verification_events: bool,
249 ) -> Result<Self> {
250 let config = StoreConfig::new(cross_process_store_locks_holder.to_owned())
251 .state_store(MemoryStore::new());
252 Ok(Self::new(config, ThreadingSupport::Disabled))
253 }
254
255 pub fn session_meta(&self) -> Option<&SessionMeta> {
261 self.state_store.session_meta()
262 }
263
264 pub fn rooms(&self) -> Vec<Room> {
266 self.state_store.rooms()
267 }
268
269 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
271 self.state_store.rooms_filtered(filter)
272 }
273
274 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
277 self.state_store.rooms_stream()
278 }
279
280 pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
283 self.state_store.get_or_create_room(
284 room_id,
285 room_state,
286 self.room_info_notable_update_sender.clone(),
287 )
288 }
289
290 pub fn state_store(&self) -> &DynStateStore {
292 self.state_store.deref()
293 }
294
295 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
297 &self.event_cache_store
298 }
299
300 pub fn is_active(&self) -> bool {
304 self.state_store.session_meta().is_some()
305 }
306
307 pub async fn activate(
339 &self,
340 session_meta: SessionMeta,
341 room_load_settings: RoomLoadSettings,
342 #[cfg(feature = "e2e-encryption")] custom_account: Option<
343 crate::crypto::vodozemac::olm::Account,
344 >,
345 ) -> Result<()> {
346 debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
347
348 self.state_store
349 .load_rooms(
350 &session_meta.user_id,
351 room_load_settings,
352 &self.room_info_notable_update_sender,
353 )
354 .await?;
355 self.state_store.load_sync_token().await?;
356 self.state_store.set_session_meta(session_meta);
357
358 #[cfg(feature = "e2e-encryption")]
359 self.regenerate_olm(custom_account).await?;
360
361 Ok(())
362 }
363
364 #[cfg(feature = "e2e-encryption")]
368 pub async fn regenerate_olm(
369 &self,
370 custom_account: Option<crate::crypto::vodozemac::olm::Account>,
371 ) -> Result<()> {
372 tracing::debug!("regenerating OlmMachine");
373 let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
374
375 let olm_machine = OlmMachine::with_store(
378 &session_meta.user_id,
379 &session_meta.device_id,
380 self.crypto_store.clone(),
381 custom_account,
382 )
383 .await
384 .map_err(OlmError::from)?;
385
386 *self.olm_machine.write().await = Some(olm_machine);
387 Ok(())
388 }
389
390 pub async fn sync_token(&self) -> Option<String> {
393 self.state_store.sync_token.read().await.clone()
394 }
395
396 pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
400 let room = self.state_store.get_or_create_room(
401 room_id,
402 RoomState::Knocked,
403 self.room_info_notable_update_sender.clone(),
404 );
405
406 if room.state() != RoomState::Knocked {
407 let _sync_lock = self.sync_lock().lock().await;
408
409 let mut room_info = room.clone_info();
410 room_info.mark_as_knocked();
411 room_info.mark_state_partially_synced();
412 room_info.mark_members_missing(); let mut changes = StateChanges::default();
414 changes.add_room(room_info.clone());
415 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
417 }
418
419 Ok(room)
420 }
421
422 pub async fn room_joined(&self, room_id: &RoomId) -> Result<Room> {
450 let room = self.state_store.get_or_create_room(
451 room_id,
452 RoomState::Joined,
453 self.room_info_notable_update_sender.clone(),
454 );
455
456 if room.state() != RoomState::Joined {
459 let _sync_lock = self.sync_lock().lock().await;
460
461 let mut room_info = room.clone_info();
462
463 if room.state() == RoomState::Invited {
474 room_info.set_invite_accepted_now();
475 }
476
477 room_info.mark_as_joined();
478 room_info.mark_state_partially_synced();
479 room_info.mark_members_missing(); let mut changes = StateChanges::default();
482 changes.add_room(room_info.clone());
483
484 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
487 }
488
489 Ok(room)
490 }
491
492 pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
496 let room = self.state_store.get_or_create_room(
497 room_id,
498 RoomState::Left,
499 self.room_info_notable_update_sender.clone(),
500 );
501
502 if room.state() != RoomState::Left {
503 let _sync_lock = self.sync_lock().lock().await;
504
505 let mut room_info = room.clone_info();
506 room_info.mark_as_left();
507 room_info.mark_state_partially_synced();
508 room_info.mark_members_missing(); let mut changes = StateChanges::default();
510 changes.add_room(room_info.clone());
511 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
513 }
514
515 Ok(())
516 }
517
518 pub fn sync_lock(&self) -> &Mutex<()> {
520 self.state_store.sync_lock()
521 }
522
523 #[instrument(skip_all)]
529 pub async fn receive_sync_response(
530 &self,
531 response: api::sync::sync_events::v3::Response,
532 ) -> Result<SyncResponse> {
533 self.receive_sync_response_with_requested_required_states(
534 response,
535 &RequestedRequiredStates::default(),
536 )
537 .await
538 }
539
540 pub async fn receive_sync_response_with_requested_required_states(
548 &self,
549 response: api::sync::sync_events::v3::Response,
550 requested_required_states: &RequestedRequiredStates,
551 ) -> Result<SyncResponse> {
552 if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
556 info!("Got the same sync response twice");
557 return Ok(SyncResponse::default());
558 }
559
560 let now = if enabled!(Level::INFO) { Some(Instant::now()) } else { None };
561
562 #[cfg(feature = "e2e-encryption")]
563 let olm_machine = self.olm_machine().await;
564
565 let mut context = Context::new(StateChanges::new(response.next_batch.clone()));
566
567 #[cfg(feature = "e2e-encryption")]
568 let to_device = {
569 let processors::e2ee::to_device::Output {
570 processed_to_device_events: to_device,
571 room_key_updates,
572 } = processors::e2ee::to_device::from_sync_v2(&response, olm_machine.as_ref()).await?;
573
574 processors::latest_event::decrypt_from_rooms(
575 &mut context,
576 room_key_updates
577 .into_iter()
578 .flatten()
579 .filter_map(|room_key_info| self.get_room(&room_key_info.room_id))
580 .collect(),
581 processors::e2ee::E2EE::new(
582 olm_machine.as_ref(),
583 &self.decryption_settings,
584 self.handle_verification_events,
585 ),
586 )
587 .await?;
588
589 to_device
590 };
591
592 #[cfg(not(feature = "e2e-encryption"))]
593 let to_device = response
594 .to_device
595 .events
596 .into_iter()
597 .map(|raw| {
598 if let Ok(Some(event_type)) = raw.get_field::<String>("type") {
599 if event_type == "m.room.encrypted" {
600 matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent::UnableToDecrypt(raw)
601 } else {
602 matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent::PlainText(raw)
603 }
604 } else {
605 matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent::Invalid(raw) }
607 })
608 .collect();
609
610 let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
611
612 let global_account_data_processor =
613 processors::account_data::global(&response.account_data.events);
614
615 let push_rules = self.get_push_rules(&global_account_data_processor).await?;
616
617 let mut room_updates = RoomUpdates::default();
618 let mut notifications = Default::default();
619
620 let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
621 BTreeMap::new();
622
623 for (room_id, joined_room) in response.rooms.join {
624 let joined_room_update = processors::room::sync_v2::update_joined_room(
625 &mut context,
626 processors::room::RoomCreationData::new(
627 &room_id,
628 self.room_info_notable_update_sender.clone(),
629 requested_required_states,
630 &mut ambiguity_cache,
631 ),
632 joined_room,
633 &mut updated_members_in_room,
634 processors::notification::Notification::new(
635 &push_rules,
636 &mut notifications,
637 &self.state_store,
638 ),
639 #[cfg(feature = "e2e-encryption")]
640 processors::e2ee::E2EE::new(
641 olm_machine.as_ref(),
642 &self.decryption_settings,
643 self.handle_verification_events,
644 ),
645 )
646 .await?;
647
648 room_updates.joined.insert(room_id, joined_room_update);
649 }
650
651 for (room_id, left_room) in response.rooms.leave {
652 let left_room_update = processors::room::sync_v2::update_left_room(
653 &mut context,
654 processors::room::RoomCreationData::new(
655 &room_id,
656 self.room_info_notable_update_sender.clone(),
657 requested_required_states,
658 &mut ambiguity_cache,
659 ),
660 left_room,
661 processors::notification::Notification::new(
662 &push_rules,
663 &mut notifications,
664 &self.state_store,
665 ),
666 #[cfg(feature = "e2e-encryption")]
667 processors::e2ee::E2EE::new(
668 olm_machine.as_ref(),
669 &self.decryption_settings,
670 self.handle_verification_events,
671 ),
672 )
673 .await?;
674
675 room_updates.left.insert(room_id, left_room_update);
676 }
677
678 for (room_id, invited_room) in response.rooms.invite {
679 let invited_room_update = processors::room::sync_v2::update_invited_room(
680 &mut context,
681 &room_id,
682 invited_room,
683 self.room_info_notable_update_sender.clone(),
684 processors::notification::Notification::new(
685 &push_rules,
686 &mut notifications,
687 &self.state_store,
688 ),
689 )
690 .await?;
691
692 room_updates.invited.insert(room_id, invited_room_update);
693 }
694
695 for (room_id, knocked_room) in response.rooms.knock {
696 let knocked_room_update = processors::room::sync_v2::update_knocked_room(
697 &mut context,
698 &room_id,
699 knocked_room,
700 self.room_info_notable_update_sender.clone(),
701 processors::notification::Notification::new(
702 &push_rules,
703 &mut notifications,
704 &self.state_store,
705 ),
706 )
707 .await?;
708
709 room_updates.knocked.insert(room_id, knocked_room_update);
710 }
711
712 global_account_data_processor.apply(&mut context, &self.state_store).await;
713
714 context.state_changes.presence = response
715 .presence
716 .events
717 .iter()
718 .filter_map(|e| {
719 let event = e.deserialize().ok()?;
720 Some((event.sender, e.clone()))
721 })
722 .collect();
723
724 context.state_changes.ambiguity_maps = ambiguity_cache.cache;
725
726 {
727 let _sync_lock = self.sync_lock().lock().await;
728
729 processors::changes::save_and_apply(
730 context,
731 &self.state_store,
732 &self.ignore_user_list_changes,
733 Some(response.next_batch.clone()),
734 )
735 .await?;
736 }
737
738 let mut context = Context::default();
739
740 processors::room::display_name::update_for_rooms(
743 &mut context,
744 &room_updates,
745 &self.state_store,
746 )
747 .await;
748
749 processors::changes::save_only(context, &self.state_store).await?;
751
752 for (room_id, member_ids) in updated_members_in_room {
753 if let Some(room) = self.get_room(&room_id) {
754 let _ =
755 room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
756 }
757 }
758
759 if enabled!(Level::INFO) {
760 info!("Processed a sync response in {:?}", now.map(|now| now.elapsed()));
761 }
762
763 let response = SyncResponse {
764 rooms: room_updates,
765 presence: response.presence.events,
766 account_data: response.account_data.events,
767 to_device,
768 notifications,
769 };
770
771 Ok(response)
772 }
773
774 #[instrument(skip_all, fields(?room_id))]
786 pub async fn receive_all_members(
787 &self,
788 room_id: &RoomId,
789 request: &api::membership::get_member_events::v3::Request,
790 response: &api::membership::get_member_events::v3::Response,
791 ) -> Result<()> {
792 if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
793 {
794 return Err(Error::InvalidReceiveMembersParameters);
798 }
799
800 let Some(room) = self.state_store.room(room_id) else {
801 return Ok(());
803 };
804
805 let mut chunk = Vec::with_capacity(response.chunk.len());
806 let mut context = Context::default();
807
808 #[cfg(feature = "e2e-encryption")]
809 let mut user_ids = BTreeSet::new();
810
811 let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
812
813 for raw_event in &response.chunk {
814 let member = match raw_event.deserialize() {
815 Ok(ev) => ev,
816 Err(e) => {
817 let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
818 debug!(event_id, "Failed to deserialize member event: {e}");
819 continue;
820 }
821 };
822
823 #[cfg(feature = "e2e-encryption")]
833 match member.membership() {
834 MembershipState::Join | MembershipState::Invite => {
835 user_ids.insert(member.state_key().to_owned());
836 }
837 _ => (),
838 }
839
840 if let StateEvent::Original(e) = &member {
841 if let Some(d) = &e.content.displayname {
842 let display_name = DisplayName::new(d);
843 ambiguity_map
844 .entry(display_name)
845 .or_default()
846 .insert(member.state_key().clone());
847 }
848 }
849
850 let sync_member: SyncRoomMemberEvent = member.clone().into();
851 processors::profiles::upsert_or_delete(&mut context, room_id, &sync_member);
852
853 context
854 .state_changes
855 .state
856 .entry(room_id.to_owned())
857 .or_default()
858 .entry(member.event_type())
859 .or_default()
860 .insert(member.state_key().to_string(), raw_event.clone().cast());
861 chunk.push(member);
862 }
863
864 #[cfg(feature = "e2e-encryption")]
865 processors::e2ee::tracked_users::update(
866 self.olm_machine().await.as_ref(),
867 room.encryption_state(),
868 &user_ids,
869 )
870 .await?;
871
872 context.state_changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
873
874 let _sync_lock = self.sync_lock().lock().await;
875 let mut room_info = room.clone_info();
876 room_info.mark_members_synced();
877 context.state_changes.add_room(room_info);
878
879 processors::changes::save_and_apply(
880 context,
881 &self.state_store,
882 &self.ignore_user_list_changes,
883 None,
884 )
885 .await?;
886
887 let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
888
889 Ok(())
890 }
891
892 pub async fn receive_filter_upload(
908 &self,
909 filter_name: &str,
910 response: &api::filter::create_filter::v3::Response,
911 ) -> Result<()> {
912 Ok(self
913 .state_store
914 .set_kv_data(
915 StateStoreDataKey::Filter(filter_name),
916 StateStoreDataValue::Filter(response.filter_id.clone()),
917 )
918 .await?)
919 }
920
921 pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
933 let filter = self
934 .state_store
935 .get_kv_data(StateStoreDataKey::Filter(filter_name))
936 .await?
937 .map(|d| d.into_filter().expect("State store data not a filter"));
938
939 Ok(filter)
940 }
941
942 #[cfg(feature = "e2e-encryption")]
944 pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
945 match self.olm_machine().await.as_ref() {
946 Some(o) => {
947 let Some(room) = self.get_room(room_id) else {
948 return Err(Error::InsufficientData);
949 };
950
951 let history_visibility = room.history_visibility_or_default();
952 let Some(room_encryption_event) = room.encryption_settings() else {
953 return Err(Error::EncryptionNotEnabled);
954 };
955
956 let filter = if history_visibility == HistoryVisibility::Joined {
959 RoomMemberships::JOIN
960 } else {
961 RoomMemberships::ACTIVE
962 };
963
964 let members = self.state_store.get_user_ids(room_id, filter).await?;
965
966 let settings = EncryptionSettings::new(
967 room_encryption_event,
968 history_visibility,
969 self.room_key_recipient_strategy.clone(),
970 );
971
972 Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
973 }
974 None => panic!("Olm machine wasn't started"),
975 }
976 }
977
978 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
984 self.state_store.room(room_id)
985 }
986
987 pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
995 self.state_store.forget_room(room_id).await?;
997
998 self.event_cache_store().lock().await?.remove_room(room_id).await?;
1000
1001 Ok(())
1002 }
1003
1004 #[cfg(feature = "e2e-encryption")]
1006 pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1007 self.olm_machine.read().await
1008 }
1009
1010 pub(crate) async fn get_push_rules(
1016 &self,
1017 global_account_data_processor: &processors::account_data::Global,
1018 ) -> Result<Ruleset> {
1019 if let Some(event) = global_account_data_processor
1020 .push_rules()
1021 .and_then(|ev| ev.deserialize_as::<PushRulesEvent>().ok())
1022 {
1023 Ok(event.content.global)
1024 } else if let Some(event) = self
1025 .state_store
1026 .get_account_data_event_static::<PushRulesEventContent>()
1027 .await?
1028 .and_then(|ev| ev.deserialize().ok())
1029 {
1030 Ok(event.content.global)
1031 } else if let Some(session_meta) = self.state_store.session_meta() {
1032 Ok(Ruleset::server_default(&session_meta.user_id))
1033 } else {
1034 Ok(Ruleset::new())
1035 }
1036 }
1037
1038 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1041 self.ignore_user_list_changes.subscribe()
1042 }
1043
1044 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1048 self.room_info_notable_update_sender.subscribe()
1049 }
1050
1051 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
1053 match self.state_store.get_account_data_event_static::<IgnoredUserListEventContent>().await
1054 {
1055 Ok(Some(raw_ignored_user_list)) => match raw_ignored_user_list.deserialize() {
1056 Ok(current_ignored_user_list) => {
1057 current_ignored_user_list.content.ignored_users.contains_key(user_id)
1058 }
1059 Err(error) => {
1060 warn!(?error, "Failed to deserialize the ignored user list event");
1061 false
1062 }
1063 },
1064 Ok(None) => false,
1065 Err(error) => {
1066 warn!(?error, "Could not get the ignored user list from the state store");
1067 false
1068 }
1069 }
1070 }
1071}
1072
1073#[derive(Debug, Default)]
1085pub struct RequestedRequiredStates {
1086 default: Vec<(StateEventType, String)>,
1087 for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1088}
1089
1090impl RequestedRequiredStates {
1091 pub fn new(
1096 default: Vec<(StateEventType, String)>,
1097 for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1098 ) -> Self {
1099 Self { default, for_rooms }
1100 }
1101
1102 pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
1104 self.for_rooms.get(room_id).unwrap_or(&self.default)
1105 }
1106}
1107
1108impl From<&v5::Request> for RequestedRequiredStates {
1109 fn from(request: &v5::Request) -> Self {
1110 let mut default = BTreeSet::new();
1117
1118 for list in request.lists.values() {
1119 default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1120 }
1121
1122 for room_subscription in request.room_subscriptions.values() {
1123 default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1124 }
1125
1126 Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1127 }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132 use std::collections::HashMap;
1133
1134 use assert_matches2::assert_let;
1135 use futures_util::FutureExt as _;
1136 use matrix_sdk_test::{
1137 async_test, event_factory::EventFactory, ruma_response_from_json, InvitedRoomBuilder,
1138 LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder, BOB,
1139 };
1140 use ruma::{
1141 api::client::{self as api, sync::sync_events::v5},
1142 event_id,
1143 events::{room::member::MembershipState, StateEventType},
1144 room_id,
1145 serde::Raw,
1146 user_id,
1147 };
1148 use serde_json::{json, value::to_raw_value};
1149
1150 use super::{BaseClient, RequestedRequiredStates};
1151 use crate::{
1152 client::ThreadingSupport,
1153 store::{RoomLoadSettings, StateStoreExt, StoreConfig},
1154 test_utils::logged_in_base_client,
1155 RoomDisplayName, RoomState, SessionMeta,
1156 };
1157
1158 #[test]
1159 fn test_requested_required_states() {
1160 let room_id_0 = room_id!("!r0");
1161 let room_id_1 = room_id!("!r1");
1162
1163 let requested_required_states = RequestedRequiredStates::new(
1164 vec![(StateEventType::RoomAvatar, "".to_owned())],
1165 HashMap::from([(
1166 room_id_0.to_owned(),
1167 vec![
1168 (StateEventType::RoomMember, "foo".to_owned()),
1169 (StateEventType::RoomEncryption, "".to_owned()),
1170 ],
1171 )]),
1172 );
1173
1174 assert_eq!(
1176 requested_required_states.for_room(room_id_0),
1177 &[
1178 (StateEventType::RoomMember, "foo".to_owned()),
1179 (StateEventType::RoomEncryption, "".to_owned()),
1180 ]
1181 );
1182
1183 assert_eq!(
1185 requested_required_states.for_room(room_id_1),
1186 &[(StateEventType::RoomAvatar, "".to_owned()),]
1187 );
1188 }
1189
1190 #[test]
1191 fn test_requested_required_states_from_sync_v5_request() {
1192 let room_id_0 = room_id!("!r0");
1193 let room_id_1 = room_id!("!r1");
1194
1195 let mut request = v5::Request::new();
1197
1198 {
1199 let requested_required_states = RequestedRequiredStates::from(&request);
1200
1201 assert!(requested_required_states.default.is_empty());
1202 assert!(requested_required_states.for_rooms.is_empty());
1203 }
1204
1205 request.lists.insert("foo".to_owned(), {
1207 let mut list = v5::request::List::default();
1208 list.room_details.required_state = vec![
1209 (StateEventType::RoomAvatar, "".to_owned()),
1210 (StateEventType::RoomEncryption, "".to_owned()),
1211 ];
1212
1213 list
1214 });
1215
1216 {
1217 let requested_required_states = RequestedRequiredStates::from(&request);
1218
1219 assert_eq!(
1220 requested_required_states.default,
1221 &[
1222 (StateEventType::RoomAvatar, "".to_owned()),
1223 (StateEventType::RoomEncryption, "".to_owned())
1224 ]
1225 );
1226 assert!(requested_required_states.for_rooms.is_empty());
1227 }
1228
1229 request.lists.insert("bar".to_owned(), {
1231 let mut list = v5::request::List::default();
1232 list.room_details.required_state = vec![
1233 (StateEventType::RoomEncryption, "".to_owned()),
1234 (StateEventType::RoomName, "".to_owned()),
1235 ];
1236
1237 list
1238 });
1239
1240 {
1241 let requested_required_states = RequestedRequiredStates::from(&request);
1242
1243 assert_eq!(
1245 requested_required_states.default,
1246 &[
1247 (StateEventType::RoomAvatar, "".to_owned()),
1248 (StateEventType::RoomEncryption, "".to_owned()),
1249 (StateEventType::RoomName, "".to_owned()),
1250 ]
1251 );
1252 assert!(requested_required_states.for_rooms.is_empty());
1253 }
1254
1255 request.room_subscriptions.insert(room_id_0.to_owned(), {
1257 let mut room_subscription = v5::request::RoomSubscription::default();
1258
1259 room_subscription.required_state = vec![
1260 (StateEventType::RoomJoinRules, "".to_owned()),
1261 (StateEventType::RoomEncryption, "".to_owned()),
1262 ];
1263
1264 room_subscription
1265 });
1266
1267 {
1268 let requested_required_states = RequestedRequiredStates::from(&request);
1269
1270 assert_eq!(
1272 requested_required_states.default,
1273 &[
1274 (StateEventType::RoomAvatar, "".to_owned()),
1275 (StateEventType::RoomEncryption, "".to_owned()),
1276 (StateEventType::RoomJoinRules, "".to_owned()),
1277 (StateEventType::RoomName, "".to_owned()),
1278 ]
1279 );
1280 assert!(requested_required_states.for_rooms.is_empty());
1281 }
1282
1283 request.room_subscriptions.insert(room_id_1.to_owned(), {
1285 let mut room_subscription = v5::request::RoomSubscription::default();
1286
1287 room_subscription.required_state = vec![
1288 (StateEventType::RoomName, "".to_owned()),
1289 (StateEventType::RoomTopic, "".to_owned()),
1290 ];
1291
1292 room_subscription
1293 });
1294
1295 {
1296 let requested_required_states = RequestedRequiredStates::from(&request);
1297
1298 assert_eq!(
1300 requested_required_states.default,
1301 &[
1302 (StateEventType::RoomAvatar, "".to_owned()),
1303 (StateEventType::RoomEncryption, "".to_owned()),
1304 (StateEventType::RoomJoinRules, "".to_owned()),
1305 (StateEventType::RoomName, "".to_owned()),
1306 (StateEventType::RoomTopic, "".to_owned()),
1307 ]
1308 );
1309 }
1310 }
1311
1312 #[async_test]
1313 async fn test_invite_after_leaving() {
1314 let user_id = user_id!("@alice:example.org");
1315 let room_id = room_id!("!test:example.org");
1316
1317 let client = logged_in_base_client(Some(user_id)).await;
1318
1319 let mut sync_builder = SyncResponseBuilder::new();
1320
1321 let response = sync_builder
1322 .add_left_room(
1323 LeftRoomBuilder::new(room_id).add_timeline_event(
1324 EventFactory::new()
1325 .member(user_id)
1326 .membership(MembershipState::Leave)
1327 .display_name("Alice")
1328 .event_id(event_id!("$994173582443PhrSn:example.org")),
1329 ),
1330 )
1331 .build_sync_response();
1332 client.receive_sync_response(response).await.unwrap();
1333 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1334
1335 let response = sync_builder
1336 .add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
1337 StrippedStateTestEvent::Custom(json!({
1338 "content": {
1339 "displayname": "Alice",
1340 "membership": "invite",
1341 },
1342 "event_id": "$143273582443PhrSn:example.org",
1343 "origin_server_ts": 1432735824653u64,
1344 "sender": "@example:example.org",
1345 "state_key": user_id,
1346 "type": "m.room.member",
1347 })),
1348 ))
1349 .build_sync_response();
1350 client.receive_sync_response(response).await.unwrap();
1351 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1352 }
1353
1354 #[async_test]
1355 async fn test_invite_displayname() {
1356 let user_id = user_id!("@alice:example.org");
1357 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1358
1359 let client = logged_in_base_client(Some(user_id)).await;
1360
1361 let response = ruma_response_from_json(&json!({
1362 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1363 "device_one_time_keys_count": {
1364 "signed_curve25519": 50u64
1365 },
1366 "device_unused_fallback_key_types": [
1367 "signed_curve25519"
1368 ],
1369 "rooms": {
1370 "invite": {
1371 "!ithpyNKDtmhneaTQja:example.org": {
1372 "invite_state": {
1373 "events": [
1374 {
1375 "content": {
1376 "creator": "@test:example.org",
1377 "room_version": "9"
1378 },
1379 "sender": "@test:example.org",
1380 "state_key": "",
1381 "type": "m.room.create"
1382 },
1383 {
1384 "content": {
1385 "join_rule": "invite"
1386 },
1387 "sender": "@test:example.org",
1388 "state_key": "",
1389 "type": "m.room.join_rules"
1390 },
1391 {
1392 "content": {
1393 "algorithm": "m.megolm.v1.aes-sha2"
1394 },
1395 "sender": "@test:example.org",
1396 "state_key": "",
1397 "type": "m.room.encryption"
1398 },
1399 {
1400 "content": {
1401 "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1402 "displayname": "Kyra",
1403 "membership": "join"
1404 },
1405 "sender": "@test:example.org",
1406 "state_key": "@test:example.org",
1407 "type": "m.room.member"
1408 },
1409 {
1410 "content": {
1411 "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1412 "displayname": "alice",
1413 "is_direct": true,
1414 "membership": "invite"
1415 },
1416 "origin_server_ts": 1650878657984u64,
1417 "sender": "@test:example.org",
1418 "state_key": "@alice:example.org",
1419 "type": "m.room.member",
1420 "unsigned": {
1421 "age": 14u64
1422 },
1423 "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1424 }
1425 ]
1426 }
1427 }
1428 }
1429 }
1430 }));
1431
1432 client.receive_sync_response(response).await.unwrap();
1433
1434 let room = client.get_room(room_id).expect("Room not found");
1435 assert_eq!(room.state(), RoomState::Invited);
1436 assert_eq!(
1437 room.compute_display_name().await.expect("fetching display name failed").into_inner(),
1438 RoomDisplayName::Calculated("Kyra".to_owned())
1439 );
1440 }
1441
1442 #[async_test]
1443 async fn test_deserialization_failure() {
1444 let user_id = user_id!("@alice:example.org");
1445 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1446
1447 let client = BaseClient::new(
1448 StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1449 ThreadingSupport::Disabled,
1450 );
1451 client
1452 .activate(
1453 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1454 RoomLoadSettings::default(),
1455 #[cfg(feature = "e2e-encryption")]
1456 None,
1457 )
1458 .await
1459 .unwrap();
1460
1461 let response = ruma_response_from_json(&json!({
1462 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1463 "rooms": {
1464 "join": {
1465 "!ithpyNKDtmhneaTQja:example.org": {
1466 "state": {
1467 "events": [
1468 {
1469 "invalid": "invalid",
1470 },
1471 {
1472 "content": {
1473 "name": "The room name"
1474 },
1475 "event_id": "$143273582443PhrSn:example.org",
1476 "origin_server_ts": 1432735824653u64,
1477 "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1478 "sender": "@example:example.org",
1479 "state_key": "",
1480 "type": "m.room.name",
1481 "unsigned": {
1482 "age": 1234
1483 }
1484 },
1485 ]
1486 }
1487 }
1488 }
1489 }
1490 }));
1491
1492 client.receive_sync_response(response).await.unwrap();
1493 client
1494 .state_store()
1495 .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
1496 .await
1497 .expect("Failed to fetch state event")
1498 .expect("State event not found")
1499 .deserialize()
1500 .expect("Failed to deserialize state event");
1501 }
1502
1503 #[async_test]
1504 async fn test_invited_members_arent_ignored() {
1505 let user_id = user_id!("@alice:example.org");
1506 let inviter_user_id = user_id!("@bob:example.org");
1507 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1508
1509 let client = BaseClient::new(
1510 StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1511 ThreadingSupport::Disabled,
1512 );
1513 client
1514 .activate(
1515 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1516 RoomLoadSettings::default(),
1517 #[cfg(feature = "e2e-encryption")]
1518 None,
1519 )
1520 .await
1521 .unwrap();
1522
1523 let mut sync_builder = SyncResponseBuilder::new();
1525 let response = sync_builder
1526 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
1527 .build_sync_response();
1528 client.receive_sync_response(response).await.unwrap();
1529
1530 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1533
1534 let raw_member_event = json!({
1535 "content": {
1536 "avatar_url": "mxc://localhost/fewjilfewjil42",
1537 "displayname": "Invited Alice",
1538 "membership": "invite"
1539 },
1540 "event_id": "$151800140517rfvjc:localhost",
1541 "origin_server_ts": 151800140,
1542 "room_id": room_id,
1543 "sender": inviter_user_id,
1544 "state_key": user_id,
1545 "type": "m.room.member",
1546 "unsigned": {
1547 "age": 13374242,
1548 }
1549 });
1550 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1551 to_raw_value(&raw_member_event).unwrap(),
1552 )]);
1553
1554 client.receive_all_members(room_id, &request, &response).await.unwrap();
1556
1557 let room = client.get_room(room_id).unwrap();
1558
1559 let member = room.get_member(user_id).await.expect("ok").expect("exists");
1561
1562 assert_eq!(member.user_id(), user_id);
1563 assert_eq!(member.display_name().unwrap(), "Invited Alice");
1564 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1565 }
1566
1567 #[async_test]
1568 async fn test_reinvited_members_get_a_display_name() {
1569 let user_id = user_id!("@alice:example.org");
1570 let inviter_user_id = user_id!("@bob:example.org");
1571 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1572
1573 let client = BaseClient::new(
1574 StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1575 ThreadingSupport::Disabled,
1576 );
1577 client
1578 .activate(
1579 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1580 RoomLoadSettings::default(),
1581 #[cfg(feature = "e2e-encryption")]
1582 None,
1583 )
1584 .await
1585 .unwrap();
1586
1587 let mut sync_builder = SyncResponseBuilder::new();
1589 let response = sync_builder
1590 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
1591 StateTestEvent::Custom(json!({
1592 "content": {
1593 "avatar_url": null,
1594 "displayname": null,
1595 "membership": "leave"
1596 },
1597 "event_id": "$151803140217rkvjc:localhost",
1598 "origin_server_ts": 151800139,
1599 "room_id": room_id,
1600 "sender": user_id,
1601 "state_key": user_id,
1602 "type": "m.room.member",
1603 })),
1604 ))
1605 .build_sync_response();
1606 client.receive_sync_response(response).await.unwrap();
1607
1608 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1610
1611 let raw_member_event = json!({
1612 "content": {
1613 "avatar_url": "mxc://localhost/fewjilfewjil42",
1614 "displayname": "Invited Alice",
1615 "membership": "invite"
1616 },
1617 "event_id": "$151800140517rfvjc:localhost",
1618 "origin_server_ts": 151800140,
1619 "room_id": room_id,
1620 "sender": inviter_user_id,
1621 "state_key": user_id,
1622 "type": "m.room.member",
1623 "unsigned": {
1624 "age": 13374242,
1625 }
1626 });
1627 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1628 to_raw_value(&raw_member_event).unwrap(),
1629 )]);
1630
1631 client.receive_all_members(room_id, &request, &response).await.unwrap();
1633
1634 let room = client.get_room(room_id).unwrap();
1635
1636 let member = room.get_member(user_id).await.expect("ok").expect("exists");
1638
1639 assert_eq!(member.user_id(), user_id);
1640 assert_eq!(member.display_name().unwrap(), "Invited Alice");
1641 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1642 }
1643
1644 #[async_test]
1645 async fn test_ignored_user_list_changes() {
1646 let user_id = user_id!("@alice:example.org");
1647 let client = BaseClient::new(
1648 StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1649 ThreadingSupport::Disabled,
1650 );
1651
1652 client
1653 .activate(
1654 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1655 RoomLoadSettings::default(),
1656 #[cfg(feature = "e2e-encryption")]
1657 None,
1658 )
1659 .await
1660 .unwrap();
1661
1662 let mut subscriber = client.subscribe_to_ignore_user_list_changes();
1663 assert!(subscriber.next().now_or_never().is_none());
1664
1665 let mut sync_builder = SyncResponseBuilder::new();
1666 let response = sync_builder
1667 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1668 json!({
1669 "content": {
1670 "ignored_users": {
1671 *BOB: {}
1672 }
1673 },
1674 "type": "m.ignored_user_list",
1675 }),
1676 ))
1677 .build_sync_response();
1678 client.receive_sync_response(response).await.unwrap();
1679
1680 assert_let!(Some(ignored) = subscriber.next().await);
1681 assert_eq!(ignored, [BOB.to_string()]);
1682
1683 let response = sync_builder
1685 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1686 json!({
1687 "content": {
1688 "ignored_users": {
1689 *BOB: {}
1690 }
1691 },
1692 "type": "m.ignored_user_list",
1693 }),
1694 ))
1695 .build_sync_response();
1696 client.receive_sync_response(response).await.unwrap();
1697
1698 assert!(subscriber.next().now_or_never().is_none());
1700
1701 let response = sync_builder
1703 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1704 json!({
1705 "content": {
1706 "ignored_users": {}
1707 },
1708 "type": "m.ignored_user_list",
1709 }),
1710 ))
1711 .build_sync_response();
1712 client.receive_sync_response(response).await.unwrap();
1713
1714 assert_let!(Some(ignored) = subscriber.next().await);
1715 assert!(ignored.is_empty());
1716 }
1717
1718 #[async_test]
1719 async fn test_is_user_ignored() {
1720 let ignored_user_id = user_id!("@alice:example.org");
1721 let client = logged_in_base_client(None).await;
1722
1723 let mut sync_builder = SyncResponseBuilder::new();
1724 let response = sync_builder
1725 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1726 json!({
1727 "content": {
1728 "ignored_users": {
1729 ignored_user_id: {}
1730 }
1731 },
1732 "type": "m.ignored_user_list",
1733 }),
1734 ))
1735 .build_sync_response();
1736 client.receive_sync_response(response).await.unwrap();
1737
1738 assert!(client.is_user_ignored(ignored_user_id).await);
1739 }
1740
1741 #[async_test]
1742 async fn test_joined_at_timestamp_is_set() {
1743 let client = logged_in_base_client(None).await;
1744 let invited_room_id = room_id!("!invited:localhost");
1745 let unknown_room_id = room_id!("!unknown:localhost");
1746
1747 let mut sync_builder = SyncResponseBuilder::new();
1748 let response = sync_builder
1749 .add_invited_room(InvitedRoomBuilder::new(invited_room_id))
1750 .build_sync_response();
1751 client.receive_sync_response(response).await.unwrap();
1752
1753 let invited_room = client
1756 .get_room(invited_room_id)
1757 .expect("The sync should have created a room in the invited state");
1758
1759 assert_eq!(invited_room.state(), RoomState::Invited);
1760 assert!(invited_room.inner.get().invite_accepted_at().is_none());
1761
1762 let joined_room = client
1764 .room_joined(invited_room_id)
1765 .await
1766 .expect("We should be able to mark a room as joined");
1767
1768 assert_eq!(joined_room.state(), RoomState::Joined);
1770 assert!(joined_room.inner.get().invite_accepted_at().is_some());
1771
1772 assert!(client.get_room(unknown_room_id).is_none());
1775 let unknown_room = client
1776 .room_joined(unknown_room_id)
1777 .await
1778 .expect("We should be able to mark a room as joined");
1779
1780 assert_eq!(unknown_room.state(), RoomState::Joined);
1781 assert!(unknown_room.inner.get().invite_accepted_at().is_none());
1782 }
1783}