1#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19 collections::{BTreeMap, BTreeSet, HashMap},
20 fmt, iter,
21 ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27#[cfg(feature = "e2e-encryption")]
28use matrix_sdk_crypto::{
29 store::DynCryptoStore, types::requests::ToDeviceRequest, CollectStrategy, DecryptionSettings,
30 EncryptionSettings, EncryptionSyncChanges, OlmError, OlmMachine, RoomEventDecryptionResult,
31 TrustRequirement,
32};
33#[cfg(feature = "e2e-encryption")]
34use ruma::events::{
35 room::{history_visibility::HistoryVisibility, message::MessageType},
36 SyncMessageLikeEvent,
37};
38#[cfg(doc)]
39use ruma::DeviceId;
40use ruma::{
41 api::client as api,
42 events::{
43 ignored_user_list::IgnoredUserListEvent,
44 marked_unread::MarkedUnreadEventContent,
45 push_rules::{PushRulesEvent, PushRulesEventContent},
46 room::{
47 member::{MembershipState, RoomMemberEventContent, SyncRoomMemberEvent},
48 power_levels::{
49 RoomPowerLevelsEvent, RoomPowerLevelsEventContent, StrippedRoomPowerLevelsEvent,
50 },
51 },
52 AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncEphemeralRoomEvent,
53 AnySyncMessageLikeEvent, AnySyncStateEvent, AnySyncTimelineEvent,
54 GlobalAccountDataEventType, StateEvent, StateEventType, SyncStateEvent,
55 },
56 push::{Action, PushConditionRoomCtx, Ruleset},
57 serde::Raw,
58 time::Instant,
59 OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, UInt, UserId,
60};
61use tokio::sync::{broadcast, Mutex};
62#[cfg(feature = "e2e-encryption")]
63use tokio::sync::{RwLock, RwLockReadGuard};
64use tracing::{debug, error, info, instrument, trace, warn};
65
66#[cfg(feature = "e2e-encryption")]
67use crate::latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent};
68#[cfg(feature = "e2e-encryption")]
69use crate::RoomMemberships;
70use crate::{
71 deserialized_responses::{DisplayName, RawAnySyncOrStrippedTimelineEvent, TimelineEvent},
72 error::{Error, Result},
73 event_cache::store::EventCacheStoreLock,
74 response_processors::AccountDataProcessor,
75 rooms::{
76 normal::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate},
77 Room, RoomInfo, RoomState,
78 },
79 store::{
80 ambiguity_map::AmbiguityCache, DynStateStore, MemoryStore, Result as StoreResult,
81 StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt, Store, StoreConfig,
82 },
83 sync::{JoinedRoomUpdate, LeftRoomUpdate, Notification, RoomUpdates, SyncResponse, Timeline},
84 RoomStateFilter, SessionMeta,
85};
86
87#[derive(Clone)]
92pub struct BaseClient {
93 pub(crate) store: Store,
95
96 event_cache_store: EventCacheStoreLock,
98
99 #[cfg(feature = "e2e-encryption")]
104 crypto_store: Arc<DynCryptoStore>,
105
106 #[cfg(feature = "e2e-encryption")]
110 olm_machine: Arc<RwLock<Option<OlmMachine>>>,
111
112 pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
114
115 pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
118
119 #[cfg(feature = "e2e-encryption")]
122 pub room_key_recipient_strategy: CollectStrategy,
123
124 #[cfg(feature = "e2e-encryption")]
126 pub decryption_trust_requirement: TrustRequirement,
127}
128
129#[cfg(not(tarpaulin_include))]
130impl fmt::Debug for BaseClient {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 f.debug_struct("BaseClient")
133 .field("session_meta", &self.store.session_meta())
134 .field("sync_token", &self.store.sync_token)
135 .finish_non_exhaustive()
136 }
137}
138
139impl BaseClient {
140 pub fn with_store_config(config: StoreConfig) -> Self {
147 let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
148 broadcast::channel(u16::MAX as usize);
149
150 BaseClient {
151 store: Store::new(config.state_store),
152 event_cache_store: config.event_cache_store,
153 #[cfg(feature = "e2e-encryption")]
154 crypto_store: config.crypto_store,
155 #[cfg(feature = "e2e-encryption")]
156 olm_machine: Default::default(),
157 ignore_user_list_changes: Default::default(),
158 room_info_notable_update_sender,
159 #[cfg(feature = "e2e-encryption")]
160 room_key_recipient_strategy: Default::default(),
161 #[cfg(feature = "e2e-encryption")]
162 decryption_trust_requirement: TrustRequirement::Untrusted,
163 }
164 }
165
166 #[cfg(feature = "e2e-encryption")]
169 pub async fn clone_with_in_memory_state_store(
170 &self,
171 cross_process_store_locks_holder_name: &str,
172 ) -> Result<Self> {
173 let config = StoreConfig::new(cross_process_store_locks_holder_name.to_owned())
174 .state_store(MemoryStore::new());
175 let config = config.crypto_store(self.crypto_store.clone());
176
177 let copy = Self {
178 store: Store::new(config.state_store),
179 event_cache_store: config.event_cache_store,
180 crypto_store: self.crypto_store.clone(),
187 olm_machine: self.olm_machine.clone(),
188 ignore_user_list_changes: Default::default(),
189 room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
190 room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
191 decryption_trust_requirement: self.decryption_trust_requirement,
192 };
193
194 if let Some(session_meta) = self.session_meta().cloned() {
195 copy.store
196 .set_session_meta(session_meta, ©.room_info_notable_update_sender)
197 .await?;
198 }
199
200 Ok(copy)
201 }
202
203 #[cfg(not(feature = "e2e-encryption"))]
206 #[allow(clippy::unused_async)]
207 pub async fn clone_with_in_memory_state_store(
208 &self,
209 cross_process_store_locks_holder: &str,
210 ) -> Result<Self> {
211 let config = StoreConfig::new(cross_process_store_locks_holder.to_owned())
212 .state_store(MemoryStore::new());
213 Ok(Self::with_store_config(config))
214 }
215
216 pub fn session_meta(&self) -> Option<&SessionMeta> {
222 self.store.session_meta()
223 }
224
225 pub fn rooms(&self) -> Vec<Room> {
227 self.store.rooms()
228 }
229
230 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
232 self.store.rooms_filtered(filter)
233 }
234
235 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
238 self.store.rooms_stream()
239 }
240
241 pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
244 self.store.get_or_create_room(
245 room_id,
246 room_state,
247 self.room_info_notable_update_sender.clone(),
248 )
249 }
250
251 pub fn store(&self) -> &DynStateStore {
253 self.store.deref()
254 }
255
256 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
258 &self.event_cache_store
259 }
260
261 pub fn logged_in(&self) -> bool {
263 self.store.session_meta().is_some()
264 }
265
266 pub async fn set_session_meta(
287 &self,
288 session_meta: SessionMeta,
289 #[cfg(feature = "e2e-encryption")] custom_account: Option<
290 crate::crypto::vodozemac::olm::Account,
291 >,
292 ) -> Result<()> {
293 debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Restoring login");
294 self.store
295 .set_session_meta(session_meta.clone(), &self.room_info_notable_update_sender)
296 .await?;
297
298 #[cfg(feature = "e2e-encryption")]
299 self.regenerate_olm(custom_account).await?;
300
301 Ok(())
302 }
303
304 #[cfg(feature = "e2e-encryption")]
308 pub async fn regenerate_olm(
309 &self,
310 custom_account: Option<crate::crypto::vodozemac::olm::Account>,
311 ) -> Result<()> {
312 tracing::debug!("regenerating OlmMachine");
313 let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
314
315 let olm_machine = OlmMachine::with_store(
318 &session_meta.user_id,
319 &session_meta.device_id,
320 self.crypto_store.clone(),
321 custom_account,
322 )
323 .await
324 .map_err(OlmError::from)?;
325
326 *self.olm_machine.write().await = Some(olm_machine);
327 Ok(())
328 }
329
330 pub async fn sync_token(&self) -> Option<String> {
333 self.store.sync_token.read().await.clone()
334 }
335
336 #[cfg(feature = "e2e-encryption")]
337 async fn handle_verification_event(
338 &self,
339 event: &AnySyncMessageLikeEvent,
340 room_id: &RoomId,
341 ) -> Result<()> {
342 if let Some(olm) = self.olm_machine().await.as_ref() {
343 olm.receive_verification_event(&event.clone().into_full_event(room_id.to_owned()))
344 .await?;
345 }
346
347 Ok(())
348 }
349
350 #[cfg(feature = "e2e-encryption")]
358 async fn decrypt_sync_room_event(
359 &self,
360 event: &Raw<AnySyncTimelineEvent>,
361 room_id: &RoomId,
362 ) -> Result<Option<TimelineEvent>> {
363 let olm = self.olm_machine().await;
364 let Some(olm) = olm.as_ref() else { return Ok(None) };
365
366 let decryption_settings = DecryptionSettings {
367 sender_device_trust_requirement: self.decryption_trust_requirement,
368 };
369
370 let event = match olm
371 .try_decrypt_room_event(event.cast_ref(), room_id, &decryption_settings)
372 .await?
373 {
374 RoomEventDecryptionResult::Decrypted(decrypted) => {
375 let event: TimelineEvent = decrypted.into();
376
377 if let Ok(AnySyncTimelineEvent::MessageLike(e)) = event.raw().deserialize() {
378 match &e {
379 AnySyncMessageLikeEvent::RoomMessage(SyncMessageLikeEvent::Original(
380 original_event,
381 )) => {
382 if let MessageType::VerificationRequest(_) =
383 &original_event.content.msgtype
384 {
385 self.handle_verification_event(&e, room_id).await?;
386 }
387 }
388 _ if e.event_type().to_string().starts_with("m.key.verification") => {
389 self.handle_verification_event(&e, room_id).await?;
390 }
391 _ => (),
392 }
393 }
394 event
395 }
396 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
397 TimelineEvent::new_utd_event(event.clone(), utd_info)
398 }
399 };
400
401 Ok(Some(event))
402 }
403
404 #[allow(clippy::too_many_arguments)]
405 #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
406 pub(crate) async fn handle_timeline(
407 &self,
408 room: &Room,
409 limited: bool,
410 events: Vec<Raw<AnySyncTimelineEvent>>,
411 ignore_state_events: bool,
412 prev_batch: Option<String>,
413 push_rules: &Ruleset,
414 user_ids: &mut BTreeSet<OwnedUserId>,
415 room_info: &mut RoomInfo,
416 changes: &mut StateChanges,
417 notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
418 ambiguity_cache: &mut AmbiguityCache,
419 ) -> Result<Timeline> {
420 let mut timeline = Timeline::new(limited, prev_batch);
421 let mut push_context = self.get_push_room_context(room, room_info, changes).await?;
422
423 for raw_event in events {
424 let mut event = TimelineEvent::new(raw_event);
427
428 match event.raw().deserialize() {
429 Ok(e) => {
430 #[allow(clippy::single_match)]
431 match &e {
432 AnySyncTimelineEvent::State(s) if !ignore_state_events => {
433 match s {
434 AnySyncStateEvent::RoomMember(member) => {
435 Box::pin(ambiguity_cache.handle_event(
436 changes,
437 room.room_id(),
438 member,
439 ))
440 .await?;
441
442 match member.membership() {
443 MembershipState::Join | MembershipState::Invite => {
444 user_ids.insert(member.state_key().to_owned());
445 }
446 _ => {
447 user_ids.remove(member.state_key());
448 }
449 }
450
451 handle_room_member_event_for_profiles(
452 room.room_id(),
453 member,
454 changes,
455 );
456 }
457 _ => {
458 room_info.handle_state_event(s);
459 }
460 }
461
462 let raw_event: Raw<AnySyncStateEvent> = event.raw().clone().cast();
463 changes.add_state_event(room.room_id(), s.clone(), raw_event);
464 }
465
466 AnySyncTimelineEvent::State(_) => { }
467
468 AnySyncTimelineEvent::MessageLike(
469 AnySyncMessageLikeEvent::RoomRedaction(r),
470 ) => {
471 let room_version =
472 room_info.room_version().unwrap_or(&RoomVersionId::V1);
473
474 if let Some(redacts) = r.redacts(room_version) {
475 room_info.handle_redaction(r, event.raw().cast_ref());
476 let raw_event = event.raw().clone().cast();
477
478 changes.add_redaction(room.room_id(), redacts, raw_event);
479 }
480 }
481
482 #[cfg(feature = "e2e-encryption")]
483 AnySyncTimelineEvent::MessageLike(e) => match e {
484 AnySyncMessageLikeEvent::RoomEncrypted(
485 SyncMessageLikeEvent::Original(_),
486 ) => {
487 if let Some(e) = Box::pin(
488 self.decrypt_sync_room_event(event.raw(), room.room_id()),
489 )
490 .await?
491 {
492 event = e;
493 }
494 }
495 AnySyncMessageLikeEvent::RoomMessage(
496 SyncMessageLikeEvent::Original(original_event),
497 ) => match &original_event.content.msgtype {
498 MessageType::VerificationRequest(_) => {
499 Box::pin(self.handle_verification_event(e, room.room_id()))
500 .await?;
501 }
502 _ => (),
503 },
504 _ if e.event_type().to_string().starts_with("m.key.verification") => {
505 Box::pin(self.handle_verification_event(e, room.room_id())).await?;
506 }
507 _ => (),
508 },
509
510 #[cfg(not(feature = "e2e-encryption"))]
511 AnySyncTimelineEvent::MessageLike(_) => (),
512 }
513
514 if let Some(context) = &mut push_context {
515 self.update_push_room_context(
516 context,
517 room.own_user_id(),
518 room_info,
519 changes,
520 )
521 } else {
522 push_context = self.get_push_room_context(room, room_info, changes).await?;
523 }
524
525 if let Some(context) = &push_context {
526 let actions = push_rules.get_actions(event.raw(), context);
527
528 if actions.iter().any(Action::should_notify) {
529 notifications.entry(room.room_id().to_owned()).or_default().push(
530 Notification {
531 actions: actions.to_owned(),
532 event: RawAnySyncOrStrippedTimelineEvent::Sync(
533 event.raw().clone(),
534 ),
535 },
536 );
537 }
538 event.push_actions = Some(actions.to_owned());
539 }
540 }
541 Err(e) => {
542 warn!("Error deserializing event: {e}");
543 }
544 }
545
546 timeline.events.push(event);
547 }
548
549 Ok(timeline)
550 }
551
552 #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
564 pub(crate) async fn handle_invited_state(
565 &self,
566 room: &Room,
567 events: &[(Raw<AnyStrippedStateEvent>, AnyStrippedStateEvent)],
568 push_rules: &Ruleset,
569 room_info: &mut RoomInfo,
570 changes: &mut StateChanges,
571 notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
572 ) -> Result<()> {
573 let mut state_events = BTreeMap::new();
574
575 for (raw_event, event) in events {
576 room_info.handle_stripped_state_event(event);
577 state_events
578 .entry(event.event_type())
579 .or_insert_with(BTreeMap::new)
580 .insert(event.state_key().to_owned(), raw_event.clone());
581 }
582
583 changes.stripped_state.insert(room_info.room_id().to_owned(), state_events.clone());
584
585 if let Some(push_context) = self.get_push_room_context(room, room_info, changes).await? {
588 for event in state_events.values().flat_map(|map| map.values()) {
590 let actions = push_rules.get_actions(event, &push_context);
591 if actions.iter().any(Action::should_notify) {
592 notifications.entry(room.room_id().to_owned()).or_default().push(
593 Notification {
594 actions: actions.to_owned(),
595 event: RawAnySyncOrStrippedTimelineEvent::Stripped(event.clone()),
596 },
597 );
598 }
599 }
600 }
601
602 Ok(())
603 }
604
605 #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
611 pub(crate) async fn handle_state(
612 &self,
613 raw_events: &[Raw<AnySyncStateEvent>],
614 events: &[AnySyncStateEvent],
615 room_info: &mut RoomInfo,
616 changes: &mut StateChanges,
617 ambiguity_cache: &mut AmbiguityCache,
618 ) -> StoreResult<BTreeSet<OwnedUserId>> {
619 let mut state_events = BTreeMap::new();
620 let mut user_ids = BTreeSet::new();
621
622 assert_eq!(raw_events.len(), events.len());
623
624 for (raw_event, event) in iter::zip(raw_events, events) {
625 room_info.handle_state_event(event);
626
627 if let AnySyncStateEvent::RoomMember(member) = &event {
628 ambiguity_cache.handle_event(changes, &room_info.room_id, member).await?;
629
630 match member.membership() {
631 MembershipState::Join | MembershipState::Invite => {
632 user_ids.insert(member.state_key().to_owned());
633 }
634 _ => (),
635 }
636
637 handle_room_member_event_for_profiles(&room_info.room_id, member, changes);
638 }
639
640 state_events
641 .entry(event.event_type())
642 .or_insert_with(BTreeMap::new)
643 .insert(event.state_key().to_owned(), raw_event.clone());
644 }
645
646 changes.state.insert((*room_info.room_id).to_owned(), state_events);
647
648 Ok(user_ids)
649 }
650
651 #[instrument(skip_all, fields(?room_id))]
652 pub(crate) async fn handle_room_account_data(
653 &self,
654 room_id: &RoomId,
655 events: &[Raw<AnyRoomAccountDataEvent>],
656 changes: &mut StateChanges,
657 room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
658 ) {
659 fn on_room_info<F>(
664 room_id: &RoomId,
665 changes: &mut StateChanges,
666 client: &BaseClient,
667 mut on_room_info: F,
668 ) where
669 F: FnMut(&mut RoomInfo),
670 {
671 if let Some(room_info) = changes.room_infos.get_mut(room_id) {
673 on_room_info(room_info);
675 }
676 else if let Some(room) = client.store.room(room_id) {
678 let mut room_info = room.clone_info();
680
681 on_room_info(&mut room_info);
683
684 changes.add_room(room_info);
686 }
687 }
688
689 fn on_unread_marker(
691 room_id: &RoomId,
692 content: &MarkedUnreadEventContent,
693 room_info: &mut RoomInfo,
694 room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
695 ) {
696 if room_info.base_info.is_marked_unread != content.unread {
697 room_info_notable_updates
700 .entry(room_id.to_owned())
701 .or_default()
702 .insert(RoomInfoNotableUpdateReasons::UNREAD_MARKER);
703 }
704
705 room_info.base_info.is_marked_unread = content.unread;
706 }
707
708 for raw_event in events {
710 match raw_event.deserialize() {
711 Ok(event) => {
712 changes.add_room_account_data(room_id, event.clone(), raw_event.clone());
713
714 match event {
715 AnyRoomAccountDataEvent::MarkedUnread(event) => {
716 on_room_info(room_id, changes, self, |room_info| {
717 on_unread_marker(
718 room_id,
719 &event.content,
720 room_info,
721 room_info_notable_updates,
722 );
723 });
724 }
725 AnyRoomAccountDataEvent::UnstableMarkedUnread(event) => {
726 on_room_info(room_id, changes, self, |room_info| {
727 on_unread_marker(
728 room_id,
729 &event.content.0,
730 room_info,
731 room_info_notable_updates,
732 );
733 });
734 }
735 AnyRoomAccountDataEvent::Tag(event) => {
736 on_room_info(room_id, changes, self, |room_info| {
737 room_info.base_info.handle_notable_tags(&event.content.tags);
738 });
739 }
740
741 _ => {}
743 }
744 }
745
746 Err(err) => {
747 warn!("unable to deserialize account data event: {err}");
748 }
749 }
750 }
751 }
752
753 #[cfg(feature = "e2e-encryption")]
754 #[instrument(skip_all)]
755 pub(crate) async fn preprocess_to_device_events(
756 &self,
757 encryption_sync_changes: EncryptionSyncChanges<'_>,
758 changes: &mut StateChanges,
759 room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
760 ) -> Result<Vec<Raw<ruma::events::AnyToDeviceEvent>>> {
761 if let Some(o) = self.olm_machine().await.as_ref() {
762 let (events, room_key_updates) =
767 o.receive_sync_changes(encryption_sync_changes).await?;
768
769 for room_key_update in room_key_updates {
770 if let Some(room) = self.get_room(&room_key_update.room_id) {
771 self.decrypt_latest_events(&room, changes, room_info_notable_updates).await;
772 }
773 }
774
775 Ok(events)
776 } else {
777 Ok(encryption_sync_changes.to_device_events)
781 }
782 }
783
784 #[cfg(feature = "e2e-encryption")]
789 async fn decrypt_latest_events(
790 &self,
791 room: &Room,
792 changes: &mut StateChanges,
793 room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
794 ) {
795 if let Some((found, found_index)) = self.decrypt_latest_suitable_event(room).await {
799 room.on_latest_event_decrypted(found, found_index, changes, room_info_notable_updates);
800 }
801 }
802
803 #[cfg(feature = "e2e-encryption")]
810 async fn decrypt_latest_suitable_event(
811 &self,
812 room: &Room,
813 ) -> Option<(Box<LatestEvent>, usize)> {
814 let enc_events = room.latest_encrypted_events();
815 let power_levels = room.power_levels().await.ok();
816 let power_levels_info = Some(room.own_user_id()).zip(power_levels.as_ref());
817
818 for (i, event) in enc_events.iter().enumerate().rev() {
820 let decrypt_sync_room_event =
824 Box::pin(self.decrypt_sync_room_event(event, room.room_id()));
825
826 if let Ok(Some(decrypted)) = decrypt_sync_room_event.await {
827 if let Ok(any_sync_event) = decrypted.raw().deserialize() {
829 match is_suitable_for_latest_event(&any_sync_event, power_levels_info) {
831 PossibleLatestEvent::YesRoomMessage(_)
832 | PossibleLatestEvent::YesPoll(_)
833 | PossibleLatestEvent::YesCallInvite(_)
834 | PossibleLatestEvent::YesCallNotify(_)
835 | PossibleLatestEvent::YesSticker(_)
836 | PossibleLatestEvent::YesKnockedStateEvent(_) => {
837 return Some((Box::new(LatestEvent::new(decrypted)), i));
838 }
839 _ => (),
840 }
841 }
842 }
843 }
844 None
845 }
846
847 pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
851 let room = self.store.get_or_create_room(
852 room_id,
853 RoomState::Knocked,
854 self.room_info_notable_update_sender.clone(),
855 );
856
857 if room.state() != RoomState::Knocked {
858 let _sync_lock = self.sync_lock().lock().await;
859
860 let mut room_info = room.clone_info();
861 room_info.mark_as_knocked();
862 room_info.mark_state_partially_synced();
863 room_info.mark_members_missing(); let mut changes = StateChanges::default();
865 changes.add_room(room_info.clone());
866 self.store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
868 }
869
870 Ok(room)
871 }
872
873 pub async fn room_joined(&self, room_id: &RoomId) -> Result<Room> {
877 let room = self.store.get_or_create_room(
878 room_id,
879 RoomState::Joined,
880 self.room_info_notable_update_sender.clone(),
881 );
882
883 if room.state() != RoomState::Joined {
884 let _sync_lock = self.sync_lock().lock().await;
885
886 let mut room_info = room.clone_info();
887 room_info.mark_as_joined();
888 room_info.mark_state_partially_synced();
889 room_info.mark_members_missing(); let mut changes = StateChanges::default();
891 changes.add_room(room_info.clone());
892 self.store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
894 }
895
896 Ok(room)
897 }
898
899 pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
903 let room = self.store.get_or_create_room(
904 room_id,
905 RoomState::Left,
906 self.room_info_notable_update_sender.clone(),
907 );
908
909 if room.state() != RoomState::Left {
910 let _sync_lock = self.sync_lock().lock().await;
911
912 let mut room_info = room.clone_info();
913 room_info.mark_as_left();
914 room_info.mark_state_partially_synced();
915 room_info.mark_members_missing(); let mut changes = StateChanges::default();
917 changes.add_room(room_info.clone());
918 self.store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
920 }
921
922 Ok(())
923 }
924
925 pub fn sync_lock(&self) -> &Mutex<()> {
927 self.store.sync_lock()
928 }
929
930 #[instrument(skip_all)]
936 pub async fn receive_sync_response(
937 &self,
938 response: api::sync::sync_events::v3::Response,
939 ) -> Result<SyncResponse> {
940 if self.store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
944 info!("Got the same sync response twice");
945 return Ok(SyncResponse::default());
946 }
947
948 let now = Instant::now();
949 let mut changes = Box::new(StateChanges::new(response.next_batch.clone()));
950
951 #[cfg_attr(not(feature = "e2e-encryption"), allow(unused_mut))]
952 let mut room_info_notable_updates =
953 BTreeMap::<OwnedRoomId, RoomInfoNotableUpdateReasons>::new();
954
955 #[cfg(feature = "e2e-encryption")]
956 let to_device = self
957 .preprocess_to_device_events(
958 EncryptionSyncChanges {
959 to_device_events: response.to_device.events,
960 changed_devices: &response.device_lists,
961 one_time_keys_counts: &response.device_one_time_keys_count,
962 unused_fallback_keys: response.device_unused_fallback_key_types.as_deref(),
963 next_batch_token: Some(response.next_batch.clone()),
964 },
965 &mut changes,
966 &mut room_info_notable_updates,
967 )
968 .await?;
969
970 #[cfg(not(feature = "e2e-encryption"))]
971 let to_device = response.to_device.events;
972
973 let mut ambiguity_cache = AmbiguityCache::new(self.store.inner.clone());
974
975 let account_data_processor = AccountDataProcessor::process(&response.account_data.events);
976
977 let push_rules = self.get_push_rules(&account_data_processor).await?;
978
979 let mut new_rooms = RoomUpdates::default();
980 let mut notifications = Default::default();
981
982 let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
983 BTreeMap::new();
984
985 for (room_id, new_info) in response.rooms.join {
986 let room = self.store.get_or_create_room(
987 &room_id,
988 RoomState::Joined,
989 self.room_info_notable_update_sender.clone(),
990 );
991
992 let mut room_info = room.clone_info();
993
994 room_info.mark_as_joined();
995 room_info.update_from_ruma_summary(&new_info.summary);
996 room_info.set_prev_batch(new_info.timeline.prev_batch.as_deref());
997 room_info.mark_state_fully_synced();
998
999 let state_events = Self::deserialize_state_events(&new_info.state.events);
1000 let (raw_state_events, state_events): (Vec<_>, Vec<_>) =
1001 state_events.into_iter().unzip();
1002
1003 let mut user_ids = self
1004 .handle_state(
1005 &raw_state_events,
1006 &state_events,
1007 &mut room_info,
1008 &mut changes,
1009 &mut ambiguity_cache,
1010 )
1011 .await?;
1012
1013 updated_members_in_room.insert(room_id.to_owned(), user_ids.clone());
1014
1015 for raw in &new_info.ephemeral.events {
1016 match raw.deserialize() {
1017 Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => {
1018 changes.add_receipts(&room_id, event.content);
1019 }
1020 Ok(_) => {}
1021 Err(e) => {
1022 let event_id: Option<String> = raw.get_field("event_id").ok().flatten();
1023 #[rustfmt::skip]
1024 info!(
1025 ?room_id, event_id,
1026 "Failed to deserialize ephemeral room event: {e}"
1027 );
1028 }
1029 }
1030 }
1031
1032 if new_info.timeline.limited {
1033 room_info.mark_members_missing();
1034 }
1035
1036 let timeline = self
1037 .handle_timeline(
1038 &room,
1039 new_info.timeline.limited,
1040 new_info.timeline.events,
1041 false,
1042 new_info.timeline.prev_batch,
1043 &push_rules,
1044 &mut user_ids,
1045 &mut room_info,
1046 &mut changes,
1047 &mut notifications,
1048 &mut ambiguity_cache,
1049 )
1050 .await?;
1051
1052 changes.add_room(room_info);
1054
1055 self.handle_room_account_data(
1056 &room_id,
1057 &new_info.account_data.events,
1058 &mut changes,
1059 &mut Default::default(),
1060 )
1061 .await;
1062
1063 let mut room_info = changes.room_infos.get(&room_id).unwrap().clone();
1069
1070 #[cfg(feature = "e2e-encryption")]
1071 if room_info.is_encrypted() {
1072 if let Some(o) = self.olm_machine().await.as_ref() {
1073 if !room.is_encrypted() {
1074 let user_ids =
1078 self.store.get_user_ids(&room_id, RoomMemberships::ACTIVE).await?;
1079 o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
1080 }
1081
1082 o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?;
1083 }
1084 }
1085
1086 let notification_count = new_info.unread_notifications.into();
1087 room_info.update_notification_count(notification_count);
1088
1089 let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
1090
1091 new_rooms.join.insert(
1092 room_id,
1093 JoinedRoomUpdate::new(
1094 timeline,
1095 new_info.state.events,
1096 new_info.account_data.events,
1097 new_info.ephemeral.events,
1098 notification_count,
1099 ambiguity_changes,
1100 ),
1101 );
1102
1103 changes.add_room(room_info);
1104 }
1105
1106 for (room_id, new_info) in response.rooms.leave {
1107 let room = self.store.get_or_create_room(
1108 &room_id,
1109 RoomState::Left,
1110 self.room_info_notable_update_sender.clone(),
1111 );
1112
1113 let mut room_info = room.clone_info();
1114 room_info.mark_as_left();
1115 room_info.mark_state_partially_synced();
1116
1117 let state_events = Self::deserialize_state_events(&new_info.state.events);
1118 let (raw_state_events, state_events): (Vec<_>, Vec<_>) =
1119 state_events.into_iter().unzip();
1120
1121 let mut user_ids = self
1122 .handle_state(
1123 &raw_state_events,
1124 &state_events,
1125 &mut room_info,
1126 &mut changes,
1127 &mut ambiguity_cache,
1128 )
1129 .await?;
1130
1131 let timeline = self
1132 .handle_timeline(
1133 &room,
1134 new_info.timeline.limited,
1135 new_info.timeline.events,
1136 false,
1137 new_info.timeline.prev_batch,
1138 &push_rules,
1139 &mut user_ids,
1140 &mut room_info,
1141 &mut changes,
1142 &mut notifications,
1143 &mut ambiguity_cache,
1144 )
1145 .await?;
1146
1147 changes.add_room(room_info);
1149
1150 self.handle_room_account_data(
1151 &room_id,
1152 &new_info.account_data.events,
1153 &mut changes,
1154 &mut Default::default(),
1155 )
1156 .await;
1157
1158 let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
1159
1160 new_rooms.leave.insert(
1161 room_id,
1162 LeftRoomUpdate::new(
1163 timeline,
1164 new_info.state.events,
1165 new_info.account_data.events,
1166 ambiguity_changes,
1167 ),
1168 );
1169 }
1170
1171 for (room_id, new_info) in response.rooms.invite {
1172 let room = self.store.get_or_create_room(
1173 &room_id,
1174 RoomState::Invited,
1175 self.room_info_notable_update_sender.clone(),
1176 );
1177
1178 let invite_state =
1179 Self::deserialize_stripped_state_events(&new_info.invite_state.events);
1180
1181 let mut room_info = room.clone_info();
1182 room_info.mark_as_invited();
1183 room_info.mark_state_fully_synced();
1184
1185 self.handle_invited_state(
1186 &room,
1187 &invite_state,
1188 &push_rules,
1189 &mut room_info,
1190 &mut changes,
1191 &mut notifications,
1192 )
1193 .await?;
1194
1195 changes.add_room(room_info);
1196
1197 new_rooms.invite.insert(room_id, new_info);
1198 }
1199
1200 for (room_id, new_info) in response.rooms.knock {
1201 let room = self.store.get_or_create_room(
1202 &room_id,
1203 RoomState::Knocked,
1204 self.room_info_notable_update_sender.clone(),
1205 );
1206
1207 let knock_state = Self::deserialize_stripped_state_events(&new_info.knock_state.events);
1208
1209 let mut room_info = room.clone_info();
1210 room_info.mark_as_knocked();
1211 room_info.mark_state_fully_synced();
1212
1213 self.handle_invited_state(
1214 &room,
1215 &knock_state,
1216 &push_rules,
1217 &mut room_info,
1218 &mut changes,
1219 &mut notifications,
1220 )
1221 .await?;
1222
1223 changes.add_room(room_info);
1224
1225 new_rooms.knocked.insert(room_id, new_info);
1226 }
1227
1228 account_data_processor.apply(&mut changes, &self.store).await;
1229
1230 changes.presence = response
1231 .presence
1232 .events
1233 .iter()
1234 .filter_map(|e| {
1235 let event = e.deserialize().ok()?;
1236 Some((event.sender, e.clone()))
1237 })
1238 .collect();
1239
1240 changes.ambiguity_maps = ambiguity_cache.cache;
1241
1242 {
1243 let _sync_lock = self.sync_lock().lock().await;
1244 self.store.save_changes(&changes).await?;
1245 *self.store.sync_token.write().await = Some(response.next_batch.clone());
1246 self.apply_changes(&changes, room_info_notable_updates);
1247 }
1248
1249 new_rooms.update_in_memory_caches(&self.store).await;
1255
1256 for (room_id, member_ids) in updated_members_in_room {
1257 if let Some(room) = self.get_room(&room_id) {
1258 let _ =
1259 room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
1260 }
1261 }
1262
1263 info!("Processed a sync response in {:?}", now.elapsed());
1264
1265 let response = SyncResponse {
1266 rooms: new_rooms,
1267 presence: response.presence.events,
1268 account_data: response.account_data.events,
1269 to_device,
1270 notifications,
1271 };
1272
1273 Ok(response)
1274 }
1275
1276 pub(crate) fn apply_changes(
1277 &self,
1278 changes: &StateChanges,
1279 room_info_notable_updates: BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
1280 ) {
1281 if let Some(event) = changes.account_data.get(&GlobalAccountDataEventType::IgnoredUserList)
1282 {
1283 match event.deserialize_as::<IgnoredUserListEvent>() {
1284 Ok(event) => {
1285 let user_ids: Vec<String> =
1286 event.content.ignored_users.keys().map(|id| id.to_string()).collect();
1287
1288 self.ignore_user_list_changes.set(user_ids);
1289 }
1290 Err(error) => {
1291 error!("Failed to deserialize ignored user list event: {error}")
1292 }
1293 }
1294 }
1295
1296 for (room_id, room_info) in &changes.room_infos {
1297 if let Some(room) = self.store.room(room_id) {
1298 let room_info_notable_update_reasons =
1299 room_info_notable_updates.get(room_id).copied().unwrap_or_default();
1300
1301 room.set_room_info(room_info.clone(), room_info_notable_update_reasons)
1302 }
1303 }
1304 }
1305
1306 #[instrument(skip_all, fields(?room_id))]
1318 pub async fn receive_all_members(
1319 &self,
1320 room_id: &RoomId,
1321 request: &api::membership::get_member_events::v3::Request,
1322 response: &api::membership::get_member_events::v3::Response,
1323 ) -> Result<()> {
1324 if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
1325 {
1326 return Err(Error::InvalidReceiveMembersParameters);
1330 }
1331
1332 let Some(room) = self.store.room(room_id) else {
1333 return Ok(());
1335 };
1336
1337 let mut chunk = Vec::with_capacity(response.chunk.len());
1338 let mut changes = StateChanges::default();
1339
1340 #[cfg(feature = "e2e-encryption")]
1341 let mut user_ids = BTreeSet::new();
1342
1343 let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
1344
1345 for raw_event in &response.chunk {
1346 let member = match raw_event.deserialize() {
1347 Ok(ev) => ev,
1348 Err(e) => {
1349 let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
1350 debug!(event_id, "Failed to deserialize member event: {e}");
1351 continue;
1352 }
1353 };
1354
1355 #[cfg(feature = "e2e-encryption")]
1365 match member.membership() {
1366 MembershipState::Join | MembershipState::Invite => {
1367 user_ids.insert(member.state_key().to_owned());
1368 }
1369 _ => (),
1370 }
1371
1372 if let StateEvent::Original(e) = &member {
1373 if let Some(d) = &e.content.displayname {
1374 let display_name = DisplayName::new(d);
1375 ambiguity_map
1376 .entry(display_name)
1377 .or_default()
1378 .insert(member.state_key().clone());
1379 }
1380 }
1381
1382 let sync_member: SyncRoomMemberEvent = member.clone().into();
1383 handle_room_member_event_for_profiles(room_id, &sync_member, &mut changes);
1384
1385 changes
1386 .state
1387 .entry(room_id.to_owned())
1388 .or_default()
1389 .entry(member.event_type())
1390 .or_default()
1391 .insert(member.state_key().to_string(), raw_event.clone().cast());
1392 chunk.push(member);
1393 }
1394
1395 #[cfg(feature = "e2e-encryption")]
1396 if room.is_encrypted() {
1397 if let Some(o) = self.olm_machine().await.as_ref() {
1398 o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
1399 }
1400 }
1401
1402 changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
1403
1404 let _sync_lock = self.sync_lock().lock().await;
1405 let mut room_info = room.clone_info();
1406 room_info.mark_members_synced();
1407 changes.add_room(room_info);
1408
1409 self.store.save_changes(&changes).await?;
1410 self.apply_changes(&changes, Default::default());
1411
1412 let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
1413
1414 Ok(())
1415 }
1416
1417 pub async fn receive_filter_upload(
1433 &self,
1434 filter_name: &str,
1435 response: &api::filter::create_filter::v3::Response,
1436 ) -> Result<()> {
1437 Ok(self
1438 .store
1439 .set_kv_data(
1440 StateStoreDataKey::Filter(filter_name),
1441 StateStoreDataValue::Filter(response.filter_id.clone()),
1442 )
1443 .await?)
1444 }
1445
1446 pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
1458 let filter = self
1459 .store
1460 .get_kv_data(StateStoreDataKey::Filter(filter_name))
1461 .await?
1462 .map(|d| d.into_filter().expect("State store data not a filter"));
1463
1464 Ok(filter)
1465 }
1466
1467 #[cfg(feature = "e2e-encryption")]
1469 pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
1470 match self.olm_machine().await.as_ref() {
1471 Some(o) => {
1472 let Some(room) = self.get_room(room_id) else {
1473 return Err(Error::InsufficientData);
1474 };
1475
1476 let history_visibility = room.history_visibility_or_default();
1477 let Some(room_encryption_event) = room.encryption_settings() else {
1478 return Err(Error::EncryptionNotEnabled);
1479 };
1480
1481 let filter = if history_visibility == HistoryVisibility::Joined {
1484 RoomMemberships::JOIN
1485 } else {
1486 RoomMemberships::ACTIVE
1487 };
1488
1489 let members = self.store.get_user_ids(room_id, filter).await?;
1490
1491 let settings = EncryptionSettings::new(
1492 room_encryption_event,
1493 history_visibility,
1494 self.room_key_recipient_strategy.clone(),
1495 );
1496
1497 Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
1498 }
1499 None => panic!("Olm machine wasn't started"),
1500 }
1501 }
1502
1503 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1509 self.store.room(room_id)
1510 }
1511
1512 pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
1520 self.store.forget_room(room_id).await?;
1522
1523 self.event_cache_store().lock().await?.remove_room(room_id).await?;
1525
1526 Ok(())
1527 }
1528
1529 #[cfg(feature = "e2e-encryption")]
1531 pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1532 self.olm_machine.read().await
1533 }
1534
1535 pub(crate) async fn get_push_rules(
1541 &self,
1542 account_data_processor: &AccountDataProcessor,
1543 ) -> Result<Ruleset> {
1544 if let Some(event) = account_data_processor
1545 .push_rules()
1546 .and_then(|ev| ev.deserialize_as::<PushRulesEvent>().ok())
1547 {
1548 Ok(event.content.global)
1549 } else if let Some(event) = self
1550 .store
1551 .get_account_data_event_static::<PushRulesEventContent>()
1552 .await?
1553 .and_then(|ev| ev.deserialize().ok())
1554 {
1555 Ok(event.content.global)
1556 } else if let Some(session_meta) = self.store.session_meta() {
1557 Ok(Ruleset::server_default(&session_meta.user_id))
1558 } else {
1559 Ok(Ruleset::new())
1560 }
1561 }
1562
1563 pub async fn get_push_room_context(
1571 &self,
1572 room: &Room,
1573 room_info: &RoomInfo,
1574 changes: &StateChanges,
1575 ) -> Result<Option<PushConditionRoomCtx>> {
1576 let room_id = room.room_id();
1577 let user_id = room.own_user_id();
1578
1579 let member_count = room_info.active_members_count();
1580
1581 let user_display_name = if let Some(AnySyncStateEvent::RoomMember(member)) =
1583 changes.state.get(room_id).and_then(|events| {
1584 events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
1585 }) {
1586 member
1587 .as_original()
1588 .and_then(|ev| ev.content.displayname.clone())
1589 .unwrap_or_else(|| user_id.localpart().to_owned())
1590 } else if let Some(AnyStrippedStateEvent::RoomMember(member)) =
1591 changes.stripped_state.get(room_id).and_then(|events| {
1592 events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
1593 })
1594 {
1595 member.content.displayname.unwrap_or_else(|| user_id.localpart().to_owned())
1596 } else if let Some(member) = Box::pin(room.get_member(user_id)).await? {
1597 member.name().to_owned()
1598 } else {
1599 trace!("Couldn't get push context because of missing own member information");
1600 return Ok(None);
1601 };
1602
1603 let power_levels = if let Some(event) = changes.state.get(room_id).and_then(|types| {
1604 types
1605 .get(&StateEventType::RoomPowerLevels)?
1606 .get("")?
1607 .deserialize_as::<RoomPowerLevelsEvent>()
1608 .ok()
1609 }) {
1610 Some(event.power_levels().into())
1611 } else if let Some(event) = changes.stripped_state.get(room_id).and_then(|types| {
1612 types
1613 .get(&StateEventType::RoomPowerLevels)?
1614 .get("")?
1615 .deserialize_as::<StrippedRoomPowerLevelsEvent>()
1616 .ok()
1617 }) {
1618 Some(event.power_levels().into())
1619 } else {
1620 self.store
1621 .get_state_event_static::<RoomPowerLevelsEventContent>(room_id)
1622 .await?
1623 .and_then(|e| e.deserialize().ok())
1624 .map(|event| event.power_levels().into())
1625 };
1626
1627 Ok(Some(PushConditionRoomCtx {
1628 user_id: user_id.to_owned(),
1629 room_id: room_id.to_owned(),
1630 member_count: UInt::new(member_count).unwrap_or(UInt::MAX),
1631 user_display_name,
1632 power_levels,
1633 }))
1634 }
1635
1636 pub fn update_push_room_context(
1640 &self,
1641 push_rules: &mut PushConditionRoomCtx,
1642 user_id: &UserId,
1643 room_info: &RoomInfo,
1644 changes: &StateChanges,
1645 ) {
1646 let room_id = &*room_info.room_id;
1647
1648 push_rules.member_count = UInt::new(room_info.active_members_count()).unwrap_or(UInt::MAX);
1649
1650 if let Some(AnySyncStateEvent::RoomMember(member)) =
1652 changes.state.get(room_id).and_then(|events| {
1653 events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
1654 })
1655 {
1656 push_rules.user_display_name = member
1657 .as_original()
1658 .and_then(|ev| ev.content.displayname.clone())
1659 .unwrap_or_else(|| user_id.localpart().to_owned())
1660 }
1661
1662 if let Some(AnySyncStateEvent::RoomPowerLevels(event)) =
1663 changes.state.get(room_id).and_then(|types| {
1664 types.get(&StateEventType::RoomPowerLevels)?.get("")?.deserialize().ok()
1665 })
1666 {
1667 push_rules.power_levels = Some(event.power_levels().into());
1668 }
1669 }
1670
1671 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1674 self.ignore_user_list_changes.subscribe()
1675 }
1676
1677 pub(crate) fn deserialize_state_events(
1678 raw_events: &[Raw<AnySyncStateEvent>],
1679 ) -> Vec<(Raw<AnySyncStateEvent>, AnySyncStateEvent)> {
1680 raw_events
1681 .iter()
1682 .filter_map(|raw_event| match raw_event.deserialize() {
1683 Ok(event) => Some((raw_event.clone(), event)),
1684 Err(e) => {
1685 warn!("Couldn't deserialize state event: {e}");
1686 None
1687 }
1688 })
1689 .collect()
1690 }
1691
1692 pub(crate) fn deserialize_stripped_state_events(
1693 raw_events: &[Raw<AnyStrippedStateEvent>],
1694 ) -> Vec<(Raw<AnyStrippedStateEvent>, AnyStrippedStateEvent)> {
1695 raw_events
1696 .iter()
1697 .filter_map(|raw_event| match raw_event.deserialize() {
1698 Ok(event) => Some((raw_event.clone(), event)),
1699 Err(e) => {
1700 warn!("Couldn't deserialize stripped state event: {e}");
1701 None
1702 }
1703 })
1704 .collect()
1705 }
1706
1707 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1711 self.room_info_notable_update_sender.subscribe()
1712 }
1713}
1714
1715fn handle_room_member_event_for_profiles(
1716 room_id: &RoomId,
1717 event: &SyncStateEvent<RoomMemberEventContent>,
1718 changes: &mut StateChanges,
1719) {
1720 if event.state_key() == event.sender() {
1724 changes
1725 .profiles
1726 .entry(room_id.to_owned())
1727 .or_default()
1728 .insert(event.sender().to_owned(), event.into());
1729 }
1730
1731 if *event.membership() == MembershipState::Invite {
1732 changes
1739 .profiles_to_delete
1740 .entry(room_id.to_owned())
1741 .or_default()
1742 .push(event.state_key().clone());
1743 }
1744}
1745
1746#[cfg(test)]
1747mod tests {
1748 use matrix_sdk_test::{
1749 async_test, event_factory::EventFactory, ruma_response_from_json, InvitedRoomBuilder,
1750 LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder,
1751 };
1752 use ruma::{
1753 api::client as api, event_id, events::room::member::MembershipState, room_id, serde::Raw,
1754 user_id,
1755 };
1756 use serde_json::{json, value::to_raw_value};
1757
1758 use super::BaseClient;
1759 use crate::{
1760 store::{StateStoreExt, StoreConfig},
1761 test_utils::logged_in_base_client,
1762 RoomDisplayName, RoomState, SessionMeta,
1763 };
1764
1765 #[async_test]
1766 async fn test_invite_after_leaving() {
1767 let user_id = user_id!("@alice:example.org");
1768 let room_id = room_id!("!test:example.org");
1769
1770 let client = logged_in_base_client(Some(user_id)).await;
1771
1772 let mut sync_builder = SyncResponseBuilder::new();
1773
1774 let response = sync_builder
1775 .add_left_room(
1776 LeftRoomBuilder::new(room_id).add_timeline_event(
1777 EventFactory::new()
1778 .member(user_id)
1779 .membership(MembershipState::Leave)
1780 .display_name("Alice")
1781 .event_id(event_id!("$994173582443PhrSn:example.org")),
1782 ),
1783 )
1784 .build_sync_response();
1785 client.receive_sync_response(response).await.unwrap();
1786 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1787
1788 let response = sync_builder
1789 .add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
1790 StrippedStateTestEvent::Custom(json!({
1791 "content": {
1792 "displayname": "Alice",
1793 "membership": "invite",
1794 },
1795 "event_id": "$143273582443PhrSn:example.org",
1796 "origin_server_ts": 1432735824653u64,
1797 "sender": "@example:example.org",
1798 "state_key": user_id,
1799 "type": "m.room.member",
1800 })),
1801 ))
1802 .build_sync_response();
1803 client.receive_sync_response(response).await.unwrap();
1804 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1805 }
1806
1807 #[async_test]
1808 async fn test_invite_displayname() {
1809 let user_id = user_id!("@alice:example.org");
1810 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1811
1812 let client = logged_in_base_client(Some(user_id)).await;
1813
1814 let response = ruma_response_from_json(&json!({
1815 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1816 "device_one_time_keys_count": {
1817 "signed_curve25519": 50u64
1818 },
1819 "device_unused_fallback_key_types": [
1820 "signed_curve25519"
1821 ],
1822 "rooms": {
1823 "invite": {
1824 "!ithpyNKDtmhneaTQja:example.org": {
1825 "invite_state": {
1826 "events": [
1827 {
1828 "content": {
1829 "creator": "@test:example.org",
1830 "room_version": "9"
1831 },
1832 "sender": "@test:example.org",
1833 "state_key": "",
1834 "type": "m.room.create"
1835 },
1836 {
1837 "content": {
1838 "join_rule": "invite"
1839 },
1840 "sender": "@test:example.org",
1841 "state_key": "",
1842 "type": "m.room.join_rules"
1843 },
1844 {
1845 "content": {
1846 "algorithm": "m.megolm.v1.aes-sha2"
1847 },
1848 "sender": "@test:example.org",
1849 "state_key": "",
1850 "type": "m.room.encryption"
1851 },
1852 {
1853 "content": {
1854 "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1855 "displayname": "Kyra",
1856 "membership": "join"
1857 },
1858 "sender": "@test:example.org",
1859 "state_key": "@test:example.org",
1860 "type": "m.room.member"
1861 },
1862 {
1863 "content": {
1864 "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1865 "displayname": "alice",
1866 "is_direct": true,
1867 "membership": "invite"
1868 },
1869 "origin_server_ts": 1650878657984u64,
1870 "sender": "@test:example.org",
1871 "state_key": "@alice:example.org",
1872 "type": "m.room.member",
1873 "unsigned": {
1874 "age": 14u64
1875 },
1876 "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1877 }
1878 ]
1879 }
1880 }
1881 }
1882 }
1883 }));
1884
1885 client.receive_sync_response(response).await.unwrap();
1886
1887 let room = client.get_room(room_id).expect("Room not found");
1888 assert_eq!(room.state(), RoomState::Invited);
1889 assert_eq!(
1890 room.compute_display_name().await.expect("fetching display name failed"),
1891 RoomDisplayName::Calculated("Kyra".to_owned())
1892 );
1893 }
1894
1895 #[cfg(feature = "e2e-encryption")]
1896 #[async_test]
1897 async fn test_when_there_are_no_latest_encrypted_events_decrypting_them_does_nothing() {
1898 use std::collections::BTreeMap;
1899
1900 use matrix_sdk_test::event_factory::EventFactory;
1901 use ruma::{event_id, events::room::member::MembershipState};
1902
1903 use crate::{rooms::normal::RoomInfoNotableUpdateReasons, StateChanges};
1904
1905 let user_id = user_id!("@u:u.to");
1907 let room_id = room_id!("!r:u.to");
1908 let client = logged_in_base_client(Some(user_id)).await;
1909
1910 let mut sync_builder = SyncResponseBuilder::new();
1911
1912 let response = sync_builder
1913 .add_joined_room(
1914 matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_timeline_event(
1915 EventFactory::new()
1916 .member(user_id)
1917 .display_name("Alice")
1918 .membership(MembershipState::Join)
1919 .event_id(event_id!("$1")),
1920 ),
1921 )
1922 .build_sync_response();
1923 client.receive_sync_response(response).await.unwrap();
1924
1925 let room = client.get_room(room_id).expect("Just-created room not found!");
1926
1927 assert!(room.latest_encrypted_events().is_empty());
1929 assert!(room.latest_event().is_none());
1930
1931 let mut changes = StateChanges::default();
1933 let mut room_info_notable_updates = BTreeMap::new();
1934 client.decrypt_latest_events(&room, &mut changes, &mut room_info_notable_updates).await;
1935
1936 assert!(room.latest_encrypted_events().is_empty());
1938 assert!(room.latest_event().is_none());
1939 assert!(changes.room_infos.is_empty());
1940 assert!(!room_info_notable_updates
1941 .get(room_id)
1942 .copied()
1943 .unwrap_or_default()
1944 .contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
1945 }
1946
1947 #[async_test]
1948 async fn test_deserialization_failure() {
1949 let user_id = user_id!("@alice:example.org");
1950 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1951
1952 let client = BaseClient::with_store_config(StoreConfig::new(
1953 "cross-process-store-locks-holder-name".to_owned(),
1954 ));
1955 client
1956 .set_session_meta(
1957 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1958 #[cfg(feature = "e2e-encryption")]
1959 None,
1960 )
1961 .await
1962 .unwrap();
1963
1964 let response = ruma_response_from_json(&json!({
1965 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1966 "rooms": {
1967 "join": {
1968 "!ithpyNKDtmhneaTQja:example.org": {
1969 "state": {
1970 "events": [
1971 {
1972 "invalid": "invalid",
1973 },
1974 {
1975 "content": {
1976 "name": "The room name"
1977 },
1978 "event_id": "$143273582443PhrSn:example.org",
1979 "origin_server_ts": 1432735824653u64,
1980 "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1981 "sender": "@example:example.org",
1982 "state_key": "",
1983 "type": "m.room.name",
1984 "unsigned": {
1985 "age": 1234
1986 }
1987 },
1988 ]
1989 }
1990 }
1991 }
1992 }
1993 }));
1994
1995 client.receive_sync_response(response).await.unwrap();
1996 client
1997 .store()
1998 .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
1999 .await
2000 .expect("Failed to fetch state event")
2001 .expect("State event not found")
2002 .deserialize()
2003 .expect("Failed to deserialize state event");
2004 }
2005
2006 #[async_test]
2007 async fn test_invited_members_arent_ignored() {
2008 let user_id = user_id!("@alice:example.org");
2009 let inviter_user_id = user_id!("@bob:example.org");
2010 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
2011
2012 let client = BaseClient::with_store_config(StoreConfig::new(
2013 "cross-process-store-locks-holder-name".to_owned(),
2014 ));
2015 client
2016 .set_session_meta(
2017 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
2018 #[cfg(feature = "e2e-encryption")]
2019 None,
2020 )
2021 .await
2022 .unwrap();
2023
2024 let mut sync_builder = SyncResponseBuilder::new();
2026 let response = sync_builder
2027 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
2028 .build_sync_response();
2029 client.receive_sync_response(response).await.unwrap();
2030
2031 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
2034
2035 let raw_member_event = json!({
2036 "content": {
2037 "avatar_url": "mxc://localhost/fewjilfewjil42",
2038 "displayname": "Invited Alice",
2039 "membership": "invite"
2040 },
2041 "event_id": "$151800140517rfvjc:localhost",
2042 "origin_server_ts": 151800140,
2043 "room_id": room_id,
2044 "sender": inviter_user_id,
2045 "state_key": user_id,
2046 "type": "m.room.member",
2047 "unsigned": {
2048 "age": 13374242,
2049 }
2050 });
2051 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
2052 to_raw_value(&raw_member_event).unwrap(),
2053 )]);
2054
2055 client.receive_all_members(room_id, &request, &response).await.unwrap();
2057
2058 let room = client.get_room(room_id).unwrap();
2059
2060 let member = room.get_member(user_id).await.expect("ok").expect("exists");
2062
2063 assert_eq!(member.user_id(), user_id);
2064 assert_eq!(member.display_name().unwrap(), "Invited Alice");
2065 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
2066 }
2067
2068 #[async_test]
2069 async fn test_reinvited_members_get_a_display_name() {
2070 let user_id = user_id!("@alice:example.org");
2071 let inviter_user_id = user_id!("@bob:example.org");
2072 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
2073
2074 let client = BaseClient::with_store_config(StoreConfig::new(
2075 "cross-process-store-locks-holder-name".to_owned(),
2076 ));
2077 client
2078 .set_session_meta(
2079 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
2080 #[cfg(feature = "e2e-encryption")]
2081 None,
2082 )
2083 .await
2084 .unwrap();
2085
2086 let mut sync_builder = SyncResponseBuilder::new();
2088 let response = sync_builder
2089 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
2090 StateTestEvent::Custom(json!({
2091 "content": {
2092 "avatar_url": null,
2093 "displayname": null,
2094 "membership": "leave"
2095 },
2096 "event_id": "$151803140217rkvjc:localhost",
2097 "origin_server_ts": 151800139,
2098 "room_id": room_id,
2099 "sender": user_id,
2100 "state_key": user_id,
2101 "type": "m.room.member",
2102 })),
2103 ))
2104 .build_sync_response();
2105 client.receive_sync_response(response).await.unwrap();
2106
2107 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
2109
2110 let raw_member_event = json!({
2111 "content": {
2112 "avatar_url": "mxc://localhost/fewjilfewjil42",
2113 "displayname": "Invited Alice",
2114 "membership": "invite"
2115 },
2116 "event_id": "$151800140517rfvjc:localhost",
2117 "origin_server_ts": 151800140,
2118 "room_id": room_id,
2119 "sender": inviter_user_id,
2120 "state_key": user_id,
2121 "type": "m.room.member",
2122 "unsigned": {
2123 "age": 13374242,
2124 }
2125 });
2126 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
2127 to_raw_value(&raw_member_event).unwrap(),
2128 )]);
2129
2130 client.receive_all_members(room_id, &request, &response).await.unwrap();
2132
2133 let room = client.get_room(room_id).unwrap();
2134
2135 let member = room.get_member(user_id).await.expect("ok").expect("exists");
2137
2138 assert_eq!(member.user_id(), user_id);
2139 assert_eq!(member.display_name().unwrap(), "Invited Alice");
2140 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
2141 }
2142}