1use std::{
24 borrow::Borrow,
25 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
26 fmt,
27 ops::Deref,
28 result::Result as StdResult,
29 str::{FromStr, Utf8Error},
30 sync::{Arc, RwLock as StdRwLock},
31};
32
33use eyeball_im::{Vector, VectorDiff};
34use futures_util::Stream;
35use matrix_sdk_common::ROOM_VERSION_RULES_FALLBACK;
36use once_cell::sync::OnceCell;
37
38#[cfg(any(test, feature = "testing"))]
39#[macro_use]
40pub mod integration_tests;
41mod observable_map;
42mod traits;
43
44use matrix_sdk_common::locks::Mutex as SyncMutex;
45#[cfg(feature = "e2e-encryption")]
46use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
47pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
48use observable_map::ObservableMap;
49use ruma::{
50 EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
51 events::{
52 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
53 AnySyncStateEvent, EmptyStateKey, GlobalAccountDataEventType, RedactContent,
54 RedactedStateEventContent, RoomAccountDataEventType, StateEventType, StaticEventContent,
55 StaticStateEventContent, StrippedStateEvent, SyncStateEvent,
56 presence::PresenceEvent,
57 receipt::ReceiptEventContent,
58 room::{
59 create::RoomCreateEventContent,
60 member::{RoomMemberEventContent, StrippedRoomMemberEvent},
61 power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent},
62 redaction::SyncRoomRedactionEvent,
63 },
64 },
65 serde::Raw,
66};
67use serde::de::DeserializeOwned;
68use tokio::sync::{Mutex, RwLock, broadcast};
69use tracing::warn;
70pub use traits::compare_thread_subscription_bump_stamps;
71
72use crate::{
73 MinimalRoomMemberEvent, Room, RoomCreateWithCreatorEventContent, RoomStateFilter, SessionMeta,
74 deserialized_responses::DisplayName,
75 event_cache::store as event_cache_store,
76 media::store as media_store,
77 room::{RoomInfo, RoomInfoNotableUpdate, RoomState},
78};
79
80pub(crate) mod ambiguity_map;
81mod memory_store;
82pub mod migration_helpers;
83mod send_queue;
84
85#[cfg(any(test, feature = "testing"))]
86pub use self::integration_tests::StateStoreIntegrationTests;
87#[cfg(feature = "unstable-msc4274")]
88pub use self::send_queue::{AccumulatedSentMediaInfo, FinishGalleryItemInfo};
89pub use self::{
90 memory_store::MemoryStore,
91 send_queue::{
92 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
93 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
94 SentMediaInfo, SentRequestKey, SerializableEventContent,
95 },
96 traits::{
97 ComposerDraft, ComposerDraftType, DraftAttachment, DraftAttachmentContent, DraftThumbnail,
98 DynStateStore, IntoStateStore, StateStore, StateStoreDataKey, StateStoreDataValue,
99 StateStoreExt, SupportedVersionsResponse, ThreadSubscriptionCatchupToken, TtlStoreValue,
100 WellKnownResponse,
101 },
102};
103
104#[derive(Debug, thiserror::Error)]
106pub enum StoreError {
107 #[error(transparent)]
109 Backend(Box<dyn std::error::Error + Send + Sync>),
110
111 #[error(transparent)]
113 Json(#[from] serde_json::Error),
114
115 #[error(transparent)]
118 Identifier(#[from] ruma::IdParseError),
119
120 #[error("The store failed to be unlocked")]
123 StoreLocked,
124
125 #[error("The store is not encrypted but was tried to be opened with a passphrase")]
127 UnencryptedStore,
128
129 #[error("Error encrypting or decrypting data from the store: {0}")]
131 Encryption(#[from] StoreEncryptionError),
132
133 #[error("Error encoding or decoding data from the store: {0}")]
135 Codec(#[from] Utf8Error),
136
137 #[error(
139 "The database format changed in an incompatible way, current \
140 version: {0}, latest version: {1}"
141 )]
142 UnsupportedDatabaseVersion(usize, usize),
143
144 #[error("Redaction failed: {0}")]
148 Redaction(#[source] ruma::canonical_json::RedactionError),
149
150 #[error("The store contains invalid data: {details}")]
152 InvalidData {
153 details: String,
155 },
156}
157
158impl StoreError {
159 #[inline]
163 pub fn backend<E>(error: E) -> Self
164 where
165 E: std::error::Error + Send + Sync + 'static,
166 {
167 Self::Backend(Box::new(error))
168 }
169}
170
171pub type Result<T, E = StoreError> = std::result::Result<T, E>;
173
174#[derive(Clone)]
179pub(crate) struct BaseStateStore {
180 pub(super) inner: Arc<DynStateStore>,
181 session_meta: Arc<OnceCell<SessionMeta>>,
182 room_load_settings: Arc<RwLock<RoomLoadSettings>>,
183 pub(super) sync_token: Arc<RwLock<Option<String>>>,
185 rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
187 lock: Arc<Mutex<()>>,
190
191 pub(crate) already_logged_missing_room: Arc<SyncMutex<HashSet<OwnedRoomId>>>,
194}
195
196impl BaseStateStore {
197 pub fn new(inner: Arc<DynStateStore>) -> Self {
199 Self {
200 inner,
201 session_meta: Default::default(),
202 room_load_settings: Default::default(),
203 sync_token: Default::default(),
204 rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
205 lock: Default::default(),
206 already_logged_missing_room: Default::default(),
207 }
208 }
209
210 pub fn lock(&self) -> &Mutex<()> {
212 &self.lock
213 }
214
215 pub(crate) fn set_session_meta(&self, session_meta: SessionMeta) {
221 self.session_meta.set(session_meta).expect("`SessionMeta` was already set");
222 }
223
224 pub(crate) async fn load_rooms(
227 &self,
228 user_id: &UserId,
229 room_load_settings: RoomLoadSettings,
230 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
231 ) -> Result<()> {
232 *self.room_load_settings.write().await = room_load_settings.clone();
233
234 let room_infos = self.load_and_migrate_room_infos(room_load_settings).await?;
235
236 let mut rooms = self.rooms.write().unwrap();
237
238 for room_info in room_infos {
239 let new_room = Room::restore(
240 user_id,
241 self.inner.clone(),
242 room_info,
243 room_info_notable_update_sender.clone(),
244 );
245 let new_room_id = new_room.room_id().to_owned();
246
247 rooms.insert(new_room_id, new_room);
248 }
249
250 Ok(())
251 }
252
253 async fn load_and_migrate_room_infos(
256 &self,
257 room_load_settings: RoomLoadSettings,
258 ) -> Result<Vec<RoomInfo>> {
259 let mut room_infos = self.inner.get_room_infos(&room_load_settings).await?;
260 let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
261
262 for room_info in room_infos.iter_mut() {
263 if room_info.apply_migrations(self.inner.clone()).await {
264 migrated_room_infos.push(room_info.clone());
265 }
266 }
267
268 if !migrated_room_infos.is_empty() {
269 let changes = StateChanges {
270 room_infos: migrated_room_infos
271 .into_iter()
272 .map(|room_info| (room_info.room_id.clone(), room_info))
273 .collect(),
274 ..Default::default()
275 };
276
277 if let Err(error) = self.inner.save_changes(&changes).await {
278 warn!("Failed to save migrated room infos: {error}");
279 }
280 }
281
282 Ok(room_infos)
283 }
284
285 pub(crate) async fn load_sync_token(&self) -> Result<()> {
288 let token =
289 self.get_kv_data(StateStoreDataKey::SyncToken).await?.and_then(|s| s.into_sync_token());
290 *self.sync_token.write().await = token;
291
292 Ok(())
293 }
294
295 #[cfg(any(feature = "e2e-encryption", test))]
298 pub(crate) async fn derive_from_other(
299 &self,
300 other: &Self,
301 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
302 ) -> Result<()> {
303 let Some(session_meta) = other.session_meta.get() else {
304 return Ok(());
305 };
306
307 let room_load_settings = other.room_load_settings.read().await.clone();
308
309 self.load_rooms(&session_meta.user_id, room_load_settings, room_info_notable_update_sender)
310 .await?;
311 self.load_sync_token().await?;
312 self.set_session_meta(session_meta.clone());
313
314 Ok(())
315 }
316
317 pub fn session_meta(&self) -> Option<&SessionMeta> {
319 self.session_meta.get()
320 }
321
322 pub fn rooms(&self) -> Vec<Room> {
324 self.rooms.read().unwrap().iter().cloned().collect()
325 }
326
327 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
329 self.rooms
330 .read()
331 .unwrap()
332 .iter()
333 .filter(|room| filter.matches(room.state()))
334 .cloned()
335 .collect()
336 }
337
338 pub fn rooms_stream(
341 &self,
342 ) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + use<>) {
343 self.rooms.read().unwrap().stream()
344 }
345
346 pub fn room(&self, room_id: &RoomId) -> Option<Room> {
348 self.rooms.read().unwrap().get(room_id).cloned()
349 }
350
351 pub(crate) fn room_exists(&self, room_id: &RoomId) -> bool {
353 self.rooms.read().unwrap().get(room_id).is_some()
354 }
355
356 pub fn get_or_create_room(
359 &self,
360 room_id: &RoomId,
361 room_state: RoomState,
362 room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
363 ) -> Room {
364 let user_id =
365 &self.session_meta.get().expect("Creating room while not being logged in").user_id;
366
367 self.rooms
368 .write()
369 .unwrap()
370 .get_or_create(room_id, || {
371 Room::new(
372 user_id,
373 self.inner.clone(),
374 room_id,
375 room_state,
376 room_info_notable_update_sender,
377 )
378 })
379 .clone()
380 }
381
382 pub(crate) async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
388 self.inner.remove_room(room_id).await?;
389 self.rooms.write().unwrap().remove(room_id);
390 Ok(())
391 }
392}
393
394#[cfg(not(tarpaulin_include))]
395impl fmt::Debug for BaseStateStore {
396 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
397 f.debug_struct("Store")
398 .field("inner", &self.inner)
399 .field("session_meta", &self.session_meta)
400 .field("sync_token", &self.sync_token)
401 .field("rooms", &self.rooms)
402 .finish_non_exhaustive()
403 }
404}
405
406impl Deref for BaseStateStore {
407 type Target = DynStateStore;
408
409 fn deref(&self) -> &Self::Target {
410 self.inner.deref()
411 }
412}
413
414#[derive(Clone, Debug, Default)]
449pub enum RoomLoadSettings {
450 #[default]
455 All,
456
457 One(OwnedRoomId),
463}
464
465#[derive(Clone, Copy, Debug, PartialEq, Eq)]
471pub enum ThreadSubscriptionStatus {
472 Subscribed {
474 automatic: bool,
477 },
478
479 Unsubscribed,
481}
482
483impl FromStr for ThreadSubscriptionStatus {
484 type Err = ();
485
486 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
487 match s {
488 "automatic" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: true }),
489 "manual" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: false }),
490 "unsubscribed" => Ok(ThreadSubscriptionStatus::Unsubscribed),
491 _ => Err(()),
492 }
493 }
494}
495
496impl ThreadSubscriptionStatus {
497 pub fn as_str(&self) -> &'static str {
504 match self {
505 ThreadSubscriptionStatus::Subscribed { automatic } => {
506 if *automatic {
507 "automatic"
508 } else {
509 "manual"
510 }
511 }
512 ThreadSubscriptionStatus::Unsubscribed => "unsubscribed",
513 }
514 }
515}
516
517#[derive(Clone, Copy, Debug, PartialEq, Eq)]
519pub struct StoredThreadSubscription {
520 pub status: ThreadSubscriptionStatus,
522
523 pub bump_stamp: Option<u64>,
531}
532
533#[derive(Clone, Debug, Default)]
535pub struct StateChanges {
536 pub sync_token: Option<String>,
538 pub account_data: BTreeMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
540 pub presence: BTreeMap<OwnedUserId, Raw<PresenceEvent>>,
542
543 pub profiles: BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, MinimalRoomMemberEvent>>,
546
547 pub profiles_to_delete: BTreeMap<OwnedRoomId, Vec<OwnedUserId>>,
551
552 pub state:
555 BTreeMap<OwnedRoomId, BTreeMap<StateEventType, BTreeMap<String, Raw<AnySyncStateEvent>>>>,
556 pub room_account_data:
558 BTreeMap<OwnedRoomId, BTreeMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
559
560 pub room_infos: BTreeMap<OwnedRoomId, RoomInfo>,
562
563 pub receipts: BTreeMap<OwnedRoomId, ReceiptEventContent>,
565
566 pub redactions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, Raw<SyncRoomRedactionEvent>>>,
569
570 pub stripped_state: BTreeMap<
573 OwnedRoomId,
574 BTreeMap<StateEventType, BTreeMap<String, Raw<AnyStrippedStateEvent>>>,
575 >,
576
577 pub ambiguity_maps: BTreeMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
580}
581
582impl StateChanges {
583 pub fn new(sync_token: String) -> Self {
585 Self { sync_token: Some(sync_token), ..Default::default() }
586 }
587
588 pub fn add_presence_event(&mut self, event: PresenceEvent, raw_event: Raw<PresenceEvent>) {
590 self.presence.insert(event.sender, raw_event);
591 }
592
593 pub fn add_room(&mut self, room: RoomInfo) {
595 self.room_infos.insert(room.room_id.clone(), room);
596 }
597
598 pub fn add_room_account_data(
601 &mut self,
602 room_id: &RoomId,
603 event: AnyRoomAccountDataEvent,
604 raw_event: Raw<AnyRoomAccountDataEvent>,
605 ) {
606 self.room_account_data
607 .entry(room_id.to_owned())
608 .or_default()
609 .insert(event.event_type(), raw_event);
610 }
611
612 pub fn add_stripped_member(
615 &mut self,
616 room_id: &RoomId,
617 user_id: &UserId,
618 event: Raw<StrippedRoomMemberEvent>,
619 ) {
620 self.stripped_state
621 .entry(room_id.to_owned())
622 .or_default()
623 .entry(StateEventType::RoomMember)
624 .or_default()
625 .insert(user_id.into(), event.cast());
626 }
627
628 pub fn add_state_event(
631 &mut self,
632 room_id: &RoomId,
633 event: AnySyncStateEvent,
634 raw_event: Raw<AnySyncStateEvent>,
635 ) {
636 self.state
637 .entry(room_id.to_owned())
638 .or_default()
639 .entry(event.event_type())
640 .or_default()
641 .insert(event.state_key().to_owned(), raw_event);
642 }
643
644 pub fn add_redaction(
646 &mut self,
647 room_id: &RoomId,
648 redacted_event_id: &EventId,
649 redaction: Raw<SyncRoomRedactionEvent>,
650 ) {
651 self.redactions
652 .entry(room_id.to_owned())
653 .or_default()
654 .insert(redacted_event_id.to_owned(), redaction);
655 }
656
657 pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
660 self.receipts.insert(room_id.to_owned(), event);
661 }
662
663 pub(crate) fn state_static_for_key<C, K>(
667 &self,
668 room_id: &RoomId,
669 state_key: &K,
670 ) -> Option<&Raw<SyncStateEvent<C>>>
671 where
672 C: StaticEventContent<IsPrefix = ruma::events::False>
673 + StaticStateEventContent
674 + RedactContent,
675 C::Redacted: RedactedStateEventContent,
676 C::StateKey: Borrow<K>,
677 K: AsRef<str> + ?Sized,
678 {
679 self.state
680 .get(room_id)?
681 .get(&C::TYPE.into())?
682 .get(state_key.as_ref())
683 .map(Raw::cast_ref_unchecked)
684 }
685
686 pub(crate) fn stripped_state_static_for_key<C, K>(
690 &self,
691 room_id: &RoomId,
692 state_key: &K,
693 ) -> Option<&Raw<StrippedStateEvent<C::PossiblyRedacted>>>
694 where
695 C: StaticEventContent<IsPrefix = ruma::events::False> + StaticStateEventContent,
696 C::StateKey: Borrow<K>,
697 K: AsRef<str> + ?Sized,
698 {
699 self.stripped_state
700 .get(room_id)?
701 .get(&C::TYPE.into())?
702 .get(state_key.as_ref())
703 .map(Raw::cast_ref_unchecked)
704 }
705
706 pub(crate) fn any_state_static_for_key<C, K>(
711 &self,
712 room_id: &RoomId,
713 state_key: &K,
714 ) -> Option<StrippedStateEvent<C::PossiblyRedacted>>
715 where
716 C: StaticEventContent<IsPrefix = ruma::events::False>
717 + StaticStateEventContent
718 + RedactContent,
719 C::Redacted: RedactedStateEventContent,
720 C::PossiblyRedacted: StaticEventContent + DeserializeOwned,
721 C::StateKey: Borrow<K>,
722 K: AsRef<str> + ?Sized,
723 {
724 self.state_static_for_key::<C, K>(room_id, state_key)
725 .map(Raw::cast_ref)
726 .or_else(|| self.stripped_state_static_for_key::<C, K>(room_id, state_key))?
727 .deserialize()
728 .ok()
729 }
730
731 pub(crate) fn member(
734 &self,
735 room_id: &RoomId,
736 user_id: &UserId,
737 ) -> Option<StrippedRoomMemberEvent> {
738 self.any_state_static_for_key::<RoomMemberEventContent, _>(room_id, user_id)
739 }
740
741 pub(crate) fn create(&self, room_id: &RoomId) -> Option<RoomCreateWithCreatorEventContent> {
744 self.any_state_static_for_key::<RoomCreateEventContent, _>(room_id, &EmptyStateKey)
745 .map(|event| {
746 RoomCreateWithCreatorEventContent::from_event_content(event.content, event.sender)
747 })
748 .or_else(|| self.room_infos.get(room_id)?.create().cloned())
750 }
751
752 pub(crate) fn power_levels(&self, room_id: &RoomId) -> Option<RoomPowerLevels> {
755 let power_levels_content = self
756 .any_state_static_for_key::<RoomPowerLevelsEventContent, _>(room_id, &EmptyStateKey)?;
757
758 let create_content = self.create(room_id)?;
759 let rules = create_content.room_version.rules().unwrap_or(ROOM_VERSION_RULES_FALLBACK);
760 let creators = create_content.creators();
761
762 Some(power_levels_content.power_levels(&rules.authorization, creators))
763 }
764}
765
766#[derive(Clone)]
781pub struct StoreConfig {
782 #[cfg(feature = "e2e-encryption")]
783 pub(crate) crypto_store: Arc<DynCryptoStore>,
784 pub(crate) state_store: Arc<DynStateStore>,
785 pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
786 pub(crate) media_store: media_store::MediaStoreLock,
787 cross_process_store_locks_holder_name: String,
788}
789
790#[cfg(not(tarpaulin_include))]
791impl fmt::Debug for StoreConfig {
792 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
793 fmt.debug_struct("StoreConfig").finish()
794 }
795}
796
797impl StoreConfig {
798 #[must_use]
803 pub fn new(cross_process_store_locks_holder_name: String) -> Self {
804 Self {
805 #[cfg(feature = "e2e-encryption")]
806 crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
807 state_store: Arc::new(MemoryStore::new()),
808 event_cache_store: event_cache_store::EventCacheStoreLock::new(
809 event_cache_store::MemoryStore::new(),
810 cross_process_store_locks_holder_name.clone(),
811 ),
812 media_store: media_store::MediaStoreLock::new(
813 media_store::MemoryMediaStore::new(),
814 cross_process_store_locks_holder_name.clone(),
815 ),
816 cross_process_store_locks_holder_name,
817 }
818 }
819
820 #[cfg(feature = "e2e-encryption")]
824 pub fn crypto_store(mut self, store: impl IntoCryptoStore) -> Self {
825 self.crypto_store = store.into_crypto_store();
826 self
827 }
828
829 pub fn state_store(mut self, store: impl IntoStateStore) -> Self {
831 self.state_store = store.into_state_store();
832 self
833 }
834
835 pub fn event_cache_store<S>(mut self, event_cache_store: S) -> Self
837 where
838 S: event_cache_store::IntoEventCacheStore,
839 {
840 self.event_cache_store = event_cache_store::EventCacheStoreLock::new(
841 event_cache_store,
842 self.cross_process_store_locks_holder_name.clone(),
843 );
844 self
845 }
846
847 pub fn media_store<S>(mut self, media_store: S) -> Self
849 where
850 S: media_store::IntoMediaStore,
851 {
852 self.media_store = media_store::MediaStoreLock::new(
853 media_store,
854 self.cross_process_store_locks_holder_name.clone(),
855 );
856 self
857 }
858}
859
860#[cfg(test)]
861mod tests {
862 use std::sync::Arc;
863
864 use assert_matches::assert_matches;
865 use matrix_sdk_test::async_test;
866 use ruma::{owned_device_id, owned_user_id, room_id, user_id};
867 use tokio::sync::broadcast;
868
869 use super::{BaseStateStore, MemoryStore, RoomLoadSettings};
870 use crate::{RoomInfo, RoomState, SessionMeta, StateChanges};
871
872 #[async_test]
873 async fn test_set_session_meta() {
874 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
875
876 let session_meta = SessionMeta {
877 user_id: owned_user_id!("@mnt_io:matrix.org"),
878 device_id: owned_device_id!("HELLOYOU"),
879 };
880
881 assert!(store.session_meta.get().is_none());
882
883 store.set_session_meta(session_meta.clone());
884
885 assert_eq!(store.session_meta.get(), Some(&session_meta));
886 }
887
888 #[async_test]
889 #[should_panic]
890 async fn test_set_session_meta_twice() {
891 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
892
893 let session_meta = SessionMeta {
894 user_id: owned_user_id!("@mnt_io:matrix.org"),
895 device_id: owned_device_id!("HELLOYOU"),
896 };
897
898 store.set_session_meta(session_meta.clone());
899 store.set_session_meta(session_meta);
901 }
902
903 #[async_test]
904 async fn test_derive_from_other() {
905 let other = BaseStateStore::new(Arc::new(MemoryStore::new()));
907
908 let session_meta = SessionMeta {
909 user_id: owned_user_id!("@mnt_io:matrix.org"),
910 device_id: owned_device_id!("HELLOYOU"),
911 };
912 let (room_info_notable_update_sender, _) = broadcast::channel(1);
913 let room_id_0 = room_id!("!r0");
914
915 other
916 .load_rooms(
917 &session_meta.user_id,
918 RoomLoadSettings::One(room_id_0.to_owned()),
919 &room_info_notable_update_sender,
920 )
921 .await
922 .unwrap();
923 other.set_session_meta(session_meta.clone());
924
925 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
927 store.derive_from_other(&other, &room_info_notable_update_sender).await.unwrap();
928
929 assert_eq!(store.session_meta.get(), Some(&session_meta));
931 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::One(ref room_id) => {
933 assert_eq!(room_id, room_id_0);
934 });
935 }
936
937 #[test]
938 fn test_room_load_settings_default() {
939 assert_matches!(RoomLoadSettings::default(), RoomLoadSettings::All);
940 }
941
942 #[async_test]
943 async fn test_load_all_rooms() {
944 let room_id_0 = room_id!("!r0");
945 let room_id_1 = room_id!("!r1");
946 let user_id = user_id!("@mnt_io:matrix.org");
947
948 let memory_state_store = Arc::new(MemoryStore::new());
949
950 {
952 let store = BaseStateStore::new(memory_state_store.clone());
953 let mut changes = StateChanges::default();
954 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
955 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
956
957 store.inner.save_changes(&changes).await.unwrap();
958 }
959
960 {
962 let store = BaseStateStore::new(memory_state_store.clone());
963 let (room_info_notable_update_sender, _) = broadcast::channel(2);
964
965 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
967
968 store
970 .load_rooms(user_id, RoomLoadSettings::All, &room_info_notable_update_sender)
971 .await
972 .unwrap();
973
974 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
976
977 let mut rooms = store.rooms();
979 rooms.sort_by(|a, b| a.room_id().cmp(b.room_id()));
980
981 assert_eq!(rooms.len(), 2);
982
983 assert_eq!(rooms[0].room_id(), room_id_0);
984 assert_eq!(rooms[0].own_user_id(), user_id);
985
986 assert_eq!(rooms[1].room_id(), room_id_1);
987 assert_eq!(rooms[1].own_user_id(), user_id);
988 }
989 }
990
991 #[async_test]
992 async fn test_load_one_room() {
993 let room_id_0 = room_id!("!r0");
994 let room_id_1 = room_id!("!r1");
995 let user_id = user_id!("@mnt_io:matrix.org");
996
997 let memory_state_store = Arc::new(MemoryStore::new());
998
999 {
1001 let store = BaseStateStore::new(memory_state_store.clone());
1002 let mut changes = StateChanges::default();
1003 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1004 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1005
1006 store.inner.save_changes(&changes).await.unwrap();
1007 }
1008
1009 {
1011 let store = BaseStateStore::new(memory_state_store.clone());
1012 let (room_info_notable_update_sender, _) = broadcast::channel(2);
1013
1014 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
1016
1017 store
1019 .load_rooms(
1020 user_id,
1021 RoomLoadSettings::One(room_id_1.to_owned()),
1022 &room_info_notable_update_sender,
1023 )
1024 .await
1025 .unwrap();
1026
1027 assert_matches!(
1029 *store.room_load_settings.read().await,
1030 RoomLoadSettings::One(ref room_id) => {
1031 assert_eq!(room_id, room_id_1);
1032 }
1033 );
1034
1035 let rooms = store.rooms();
1037 assert_eq!(rooms.len(), 1);
1038
1039 assert_eq!(rooms[0].room_id(), room_id_1);
1040 assert_eq!(rooms[0].own_user_id(), user_id);
1041 }
1042 }
1043}