1use std::{
24 borrow::Borrow,
25 collections::{BTreeMap, BTreeSet, HashMap},
26 fmt,
27 ops::Deref,
28 result::Result as StdResult,
29 str::{FromStr, Utf8Error},
30 sync::{Arc, RwLock as StdRwLock},
31};
32
33use eyeball_im::{Vector, VectorDiff};
34use futures_util::Stream;
35use matrix_sdk_common::ROOM_VERSION_RULES_FALLBACK;
36use once_cell::sync::OnceCell;
37
38#[cfg(any(test, feature = "testing"))]
39#[macro_use]
40pub mod integration_tests;
41mod observable_map;
42mod traits;
43
44#[cfg(feature = "e2e-encryption")]
45use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
46pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
47use observable_map::ObservableMap;
48use ruma::{
49 EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
50 events::{
51 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
52 AnySyncStateEvent, EmptyStateKey, GlobalAccountDataEventType, RedactContent,
53 RedactedStateEventContent, RoomAccountDataEventType, StateEventType, StaticEventContent,
54 StaticStateEventContent, StrippedStateEvent, SyncStateEvent,
55 presence::PresenceEvent,
56 receipt::ReceiptEventContent,
57 room::{
58 create::RoomCreateEventContent,
59 member::{RoomMemberEventContent, StrippedRoomMemberEvent},
60 power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent},
61 redaction::SyncRoomRedactionEvent,
62 },
63 },
64 serde::Raw,
65};
66use serde::de::DeserializeOwned;
67use tokio::sync::{Mutex, RwLock, broadcast};
68use tracing::warn;
69pub use traits::compare_thread_subscription_bump_stamps;
70
71use crate::{
72 MinimalRoomMemberEvent, Room, RoomCreateWithCreatorEventContent, RoomStateFilter, SessionMeta,
73 deserialized_responses::DisplayName,
74 event_cache::store as event_cache_store,
75 room::{RoomInfo, RoomInfoNotableUpdate, RoomState},
76};
77
78pub(crate) mod ambiguity_map;
79mod memory_store;
80pub mod migration_helpers;
81mod send_queue;
82
83#[cfg(any(test, feature = "testing"))]
84pub use self::integration_tests::StateStoreIntegrationTests;
85#[cfg(feature = "unstable-msc4274")]
86pub use self::send_queue::{AccumulatedSentMediaInfo, FinishGalleryItemInfo};
87pub use self::{
88 memory_store::MemoryStore,
89 send_queue::{
90 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
91 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
92 SentMediaInfo, SentRequestKey, SerializableEventContent,
93 },
94 traits::{
95 ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerInfo, StateStore,
96 StateStoreDataKey, StateStoreDataValue, StateStoreExt, ThreadSubscriptionCatchupToken,
97 WellKnownResponse,
98 },
99};
100
101#[derive(Debug, thiserror::Error)]
103pub enum StoreError {
104 #[error(transparent)]
106 Backend(Box<dyn std::error::Error + Send + Sync>),
107
108 #[error(transparent)]
110 Json(#[from] serde_json::Error),
111
112 #[error(transparent)]
115 Identifier(#[from] ruma::IdParseError),
116
117 #[error("The store failed to be unlocked")]
120 StoreLocked,
121
122 #[error("The store is not encrypted but was tried to be opened with a passphrase")]
124 UnencryptedStore,
125
126 #[error("Error encrypting or decrypting data from the store: {0}")]
128 Encryption(#[from] StoreEncryptionError),
129
130 #[error("Error encoding or decoding data from the store: {0}")]
132 Codec(#[from] Utf8Error),
133
134 #[error(
136 "The database format changed in an incompatible way, current \
137 version: {0}, latest version: {1}"
138 )]
139 UnsupportedDatabaseVersion(usize, usize),
140
141 #[error("Redaction failed: {0}")]
145 Redaction(#[source] ruma::canonical_json::RedactionError),
146
147 #[error("The store contains invalid data: {details}")]
149 InvalidData {
150 details: String,
152 },
153}
154
155impl StoreError {
156 #[inline]
160 pub fn backend<E>(error: E) -> Self
161 where
162 E: std::error::Error + Send + Sync + 'static,
163 {
164 Self::Backend(Box::new(error))
165 }
166}
167
168pub type Result<T, E = StoreError> = std::result::Result<T, E>;
170
171#[derive(Clone)]
176pub(crate) struct BaseStateStore {
177 pub(super) inner: Arc<DynStateStore>,
178 session_meta: Arc<OnceCell<SessionMeta>>,
179 room_load_settings: Arc<RwLock<RoomLoadSettings>>,
180 pub(super) sync_token: Arc<RwLock<Option<String>>>,
182 rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
184 sync_lock: Arc<Mutex<()>>,
187}
188
189impl BaseStateStore {
190 pub fn new(inner: Arc<DynStateStore>) -> Self {
192 Self {
193 inner,
194 session_meta: Default::default(),
195 room_load_settings: Default::default(),
196 sync_token: Default::default(),
197 rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
198 sync_lock: Default::default(),
199 }
200 }
201
202 pub fn sync_lock(&self) -> &Mutex<()> {
204 &self.sync_lock
205 }
206
207 pub(crate) fn set_session_meta(&self, session_meta: SessionMeta) {
213 self.session_meta.set(session_meta).expect("`SessionMeta` was already set");
214 }
215
216 pub(crate) async fn load_rooms(
219 &self,
220 user_id: &UserId,
221 room_load_settings: RoomLoadSettings,
222 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
223 ) -> Result<()> {
224 *self.room_load_settings.write().await = room_load_settings.clone();
225
226 let room_infos = self.load_and_migrate_room_infos(room_load_settings).await?;
227
228 let mut rooms = self.rooms.write().unwrap();
229
230 for room_info in room_infos {
231 let new_room = Room::restore(
232 user_id,
233 self.inner.clone(),
234 room_info,
235 room_info_notable_update_sender.clone(),
236 );
237 let new_room_id = new_room.room_id().to_owned();
238
239 rooms.insert(new_room_id, new_room);
240 }
241
242 Ok(())
243 }
244
245 async fn load_and_migrate_room_infos(
248 &self,
249 room_load_settings: RoomLoadSettings,
250 ) -> Result<Vec<RoomInfo>> {
251 let mut room_infos = self.inner.get_room_infos(&room_load_settings).await?;
252 let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
253
254 for room_info in room_infos.iter_mut() {
255 if room_info.apply_migrations(self.inner.clone()).await {
256 migrated_room_infos.push(room_info.clone());
257 }
258 }
259
260 if !migrated_room_infos.is_empty() {
261 let changes = StateChanges {
262 room_infos: migrated_room_infos
263 .into_iter()
264 .map(|room_info| (room_info.room_id.clone(), room_info))
265 .collect(),
266 ..Default::default()
267 };
268
269 if let Err(error) = self.inner.save_changes(&changes).await {
270 warn!("Failed to save migrated room infos: {error}");
271 }
272 }
273
274 Ok(room_infos)
275 }
276
277 pub(crate) async fn load_sync_token(&self) -> Result<()> {
280 let token =
281 self.get_kv_data(StateStoreDataKey::SyncToken).await?.and_then(|s| s.into_sync_token());
282 *self.sync_token.write().await = token;
283
284 Ok(())
285 }
286
287 #[cfg(any(feature = "e2e-encryption", test))]
290 pub(crate) async fn derive_from_other(
291 &self,
292 other: &Self,
293 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
294 ) -> Result<()> {
295 let Some(session_meta) = other.session_meta.get() else {
296 return Ok(());
297 };
298
299 let room_load_settings = other.room_load_settings.read().await.clone();
300
301 self.load_rooms(&session_meta.user_id, room_load_settings, room_info_notable_update_sender)
302 .await?;
303 self.load_sync_token().await?;
304 self.set_session_meta(session_meta.clone());
305
306 Ok(())
307 }
308
309 pub fn session_meta(&self) -> Option<&SessionMeta> {
311 self.session_meta.get()
312 }
313
314 pub fn rooms(&self) -> Vec<Room> {
316 self.rooms.read().unwrap().iter().cloned().collect()
317 }
318
319 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
321 self.rooms
322 .read()
323 .unwrap()
324 .iter()
325 .filter(|room| filter.matches(room.state()))
326 .cloned()
327 .collect()
328 }
329
330 pub fn rooms_stream(
333 &self,
334 ) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + use<>) {
335 self.rooms.read().unwrap().stream()
336 }
337
338 pub fn room(&self, room_id: &RoomId) -> Option<Room> {
340 self.rooms.read().unwrap().get(room_id).cloned()
341 }
342
343 pub(crate) fn room_exists(&self, room_id: &RoomId) -> bool {
345 self.rooms.read().unwrap().get(room_id).is_some()
346 }
347
348 pub fn get_or_create_room(
351 &self,
352 room_id: &RoomId,
353 room_state: RoomState,
354 room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
355 ) -> Room {
356 let user_id =
357 &self.session_meta.get().expect("Creating room while not being logged in").user_id;
358
359 self.rooms
360 .write()
361 .unwrap()
362 .get_or_create(room_id, || {
363 Room::new(
364 user_id,
365 self.inner.clone(),
366 room_id,
367 room_state,
368 room_info_notable_update_sender,
369 )
370 })
371 .clone()
372 }
373
374 pub(crate) async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
380 self.inner.remove_room(room_id).await?;
381 self.rooms.write().unwrap().remove(room_id);
382 Ok(())
383 }
384}
385
386#[cfg(not(tarpaulin_include))]
387impl fmt::Debug for BaseStateStore {
388 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
389 f.debug_struct("Store")
390 .field("inner", &self.inner)
391 .field("session_meta", &self.session_meta)
392 .field("sync_token", &self.sync_token)
393 .field("rooms", &self.rooms)
394 .finish_non_exhaustive()
395 }
396}
397
398impl Deref for BaseStateStore {
399 type Target = DynStateStore;
400
401 fn deref(&self) -> &Self::Target {
402 self.inner.deref()
403 }
404}
405
406#[derive(Clone, Debug, Default)]
441pub enum RoomLoadSettings {
442 #[default]
447 All,
448
449 One(OwnedRoomId),
455}
456
457#[derive(Clone, Copy, Debug, PartialEq, Eq)]
463pub enum ThreadSubscriptionStatus {
464 Subscribed {
466 automatic: bool,
469 },
470
471 Unsubscribed,
473}
474
475impl FromStr for ThreadSubscriptionStatus {
476 type Err = ();
477
478 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
479 match s {
480 "automatic" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: true }),
481 "manual" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: false }),
482 "unsubscribed" => Ok(ThreadSubscriptionStatus::Unsubscribed),
483 _ => Err(()),
484 }
485 }
486}
487
488impl ThreadSubscriptionStatus {
489 pub fn as_str(&self) -> &'static str {
496 match self {
497 ThreadSubscriptionStatus::Subscribed { automatic } => {
498 if *automatic {
499 "automatic"
500 } else {
501 "manual"
502 }
503 }
504 ThreadSubscriptionStatus::Unsubscribed => "unsubscribed",
505 }
506 }
507}
508
509#[derive(Clone, Copy, Debug, PartialEq, Eq)]
511pub struct StoredThreadSubscription {
512 pub status: ThreadSubscriptionStatus,
514
515 pub bump_stamp: Option<u64>,
523}
524
525#[derive(Clone, Debug, Default)]
527pub struct StateChanges {
528 pub sync_token: Option<String>,
530 pub account_data: BTreeMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
532 pub presence: BTreeMap<OwnedUserId, Raw<PresenceEvent>>,
534
535 pub profiles: BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, MinimalRoomMemberEvent>>,
538
539 pub profiles_to_delete: BTreeMap<OwnedRoomId, Vec<OwnedUserId>>,
543
544 pub state:
547 BTreeMap<OwnedRoomId, BTreeMap<StateEventType, BTreeMap<String, Raw<AnySyncStateEvent>>>>,
548 pub room_account_data:
550 BTreeMap<OwnedRoomId, BTreeMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
551
552 pub room_infos: BTreeMap<OwnedRoomId, RoomInfo>,
554
555 pub receipts: BTreeMap<OwnedRoomId, ReceiptEventContent>,
557
558 pub redactions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, Raw<SyncRoomRedactionEvent>>>,
561
562 pub stripped_state: BTreeMap<
565 OwnedRoomId,
566 BTreeMap<StateEventType, BTreeMap<String, Raw<AnyStrippedStateEvent>>>,
567 >,
568
569 pub ambiguity_maps: BTreeMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
572}
573
574impl StateChanges {
575 pub fn new(sync_token: String) -> Self {
577 Self { sync_token: Some(sync_token), ..Default::default() }
578 }
579
580 pub fn add_presence_event(&mut self, event: PresenceEvent, raw_event: Raw<PresenceEvent>) {
582 self.presence.insert(event.sender, raw_event);
583 }
584
585 pub fn add_room(&mut self, room: RoomInfo) {
587 self.room_infos.insert(room.room_id.clone(), room);
588 }
589
590 pub fn add_room_account_data(
593 &mut self,
594 room_id: &RoomId,
595 event: AnyRoomAccountDataEvent,
596 raw_event: Raw<AnyRoomAccountDataEvent>,
597 ) {
598 self.room_account_data
599 .entry(room_id.to_owned())
600 .or_default()
601 .insert(event.event_type(), raw_event);
602 }
603
604 pub fn add_stripped_member(
607 &mut self,
608 room_id: &RoomId,
609 user_id: &UserId,
610 event: Raw<StrippedRoomMemberEvent>,
611 ) {
612 self.stripped_state
613 .entry(room_id.to_owned())
614 .or_default()
615 .entry(StateEventType::RoomMember)
616 .or_default()
617 .insert(user_id.into(), event.cast());
618 }
619
620 pub fn add_state_event(
623 &mut self,
624 room_id: &RoomId,
625 event: AnySyncStateEvent,
626 raw_event: Raw<AnySyncStateEvent>,
627 ) {
628 self.state
629 .entry(room_id.to_owned())
630 .or_default()
631 .entry(event.event_type())
632 .or_default()
633 .insert(event.state_key().to_owned(), raw_event);
634 }
635
636 pub fn add_redaction(
638 &mut self,
639 room_id: &RoomId,
640 redacted_event_id: &EventId,
641 redaction: Raw<SyncRoomRedactionEvent>,
642 ) {
643 self.redactions
644 .entry(room_id.to_owned())
645 .or_default()
646 .insert(redacted_event_id.to_owned(), redaction);
647 }
648
649 pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
652 self.receipts.insert(room_id.to_owned(), event);
653 }
654
655 pub(crate) fn state_static_for_key<C, K>(
659 &self,
660 room_id: &RoomId,
661 state_key: &K,
662 ) -> Option<&Raw<SyncStateEvent<C>>>
663 where
664 C: StaticEventContent<IsPrefix = ruma::events::False>
665 + StaticStateEventContent
666 + RedactContent,
667 C::Redacted: RedactedStateEventContent,
668 C::StateKey: Borrow<K>,
669 K: AsRef<str> + ?Sized,
670 {
671 self.state
672 .get(room_id)?
673 .get(&C::TYPE.into())?
674 .get(state_key.as_ref())
675 .map(Raw::cast_ref_unchecked)
676 }
677
678 pub(crate) fn stripped_state_static_for_key<C, K>(
682 &self,
683 room_id: &RoomId,
684 state_key: &K,
685 ) -> Option<&Raw<StrippedStateEvent<C::PossiblyRedacted>>>
686 where
687 C: StaticEventContent<IsPrefix = ruma::events::False> + StaticStateEventContent,
688 C::StateKey: Borrow<K>,
689 K: AsRef<str> + ?Sized,
690 {
691 self.stripped_state
692 .get(room_id)?
693 .get(&C::TYPE.into())?
694 .get(state_key.as_ref())
695 .map(Raw::cast_ref_unchecked)
696 }
697
698 pub(crate) fn any_state_static_for_key<C, K>(
703 &self,
704 room_id: &RoomId,
705 state_key: &K,
706 ) -> Option<StrippedStateEvent<C::PossiblyRedacted>>
707 where
708 C: StaticEventContent<IsPrefix = ruma::events::False>
709 + StaticStateEventContent
710 + RedactContent,
711 C::Redacted: RedactedStateEventContent,
712 C::PossiblyRedacted: StaticEventContent + DeserializeOwned,
713 C::StateKey: Borrow<K>,
714 K: AsRef<str> + ?Sized,
715 {
716 self.state_static_for_key::<C, K>(room_id, state_key)
717 .map(Raw::cast_ref)
718 .or_else(|| self.stripped_state_static_for_key::<C, K>(room_id, state_key))?
719 .deserialize()
720 .ok()
721 }
722
723 pub(crate) fn member(
726 &self,
727 room_id: &RoomId,
728 user_id: &UserId,
729 ) -> Option<StrippedRoomMemberEvent> {
730 self.any_state_static_for_key::<RoomMemberEventContent, _>(room_id, user_id)
731 }
732
733 pub(crate) fn create(&self, room_id: &RoomId) -> Option<RoomCreateWithCreatorEventContent> {
736 self.any_state_static_for_key::<RoomCreateEventContent, _>(room_id, &EmptyStateKey)
737 .map(|event| {
738 RoomCreateWithCreatorEventContent::from_event_content(event.content, event.sender)
739 })
740 .or_else(|| self.room_infos.get(room_id)?.create().cloned())
742 }
743
744 pub(crate) fn power_levels(&self, room_id: &RoomId) -> Option<RoomPowerLevels> {
747 let power_levels_content = self
748 .any_state_static_for_key::<RoomPowerLevelsEventContent, _>(room_id, &EmptyStateKey)?;
749
750 let create_content = self.create(room_id)?;
751 let rules = create_content.room_version.rules().unwrap_or(ROOM_VERSION_RULES_FALLBACK);
752 let creators = create_content.creators();
753
754 Some(power_levels_content.power_levels(&rules.authorization, creators))
755 }
756}
757
758#[derive(Clone)]
773pub struct StoreConfig {
774 #[cfg(feature = "e2e-encryption")]
775 pub(crate) crypto_store: Arc<DynCryptoStore>,
776 pub(crate) state_store: Arc<DynStateStore>,
777 pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
778 cross_process_store_locks_holder_name: String,
779}
780
781#[cfg(not(tarpaulin_include))]
782impl fmt::Debug for StoreConfig {
783 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
784 fmt.debug_struct("StoreConfig").finish()
785 }
786}
787
788impl StoreConfig {
789 #[must_use]
794 pub fn new(cross_process_store_locks_holder_name: String) -> Self {
795 Self {
796 #[cfg(feature = "e2e-encryption")]
797 crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
798 state_store: Arc::new(MemoryStore::new()),
799 event_cache_store: event_cache_store::EventCacheStoreLock::new(
800 event_cache_store::MemoryStore::new(),
801 cross_process_store_locks_holder_name.clone(),
802 ),
803 cross_process_store_locks_holder_name,
804 }
805 }
806
807 #[cfg(feature = "e2e-encryption")]
811 pub fn crypto_store(mut self, store: impl IntoCryptoStore) -> Self {
812 self.crypto_store = store.into_crypto_store();
813 self
814 }
815
816 pub fn state_store(mut self, store: impl IntoStateStore) -> Self {
818 self.state_store = store.into_state_store();
819 self
820 }
821
822 pub fn event_cache_store<S>(mut self, event_cache_store: S) -> Self
824 where
825 S: event_cache_store::IntoEventCacheStore,
826 {
827 self.event_cache_store = event_cache_store::EventCacheStoreLock::new(
828 event_cache_store,
829 self.cross_process_store_locks_holder_name.clone(),
830 );
831 self
832 }
833}
834
835#[cfg(test)]
836mod tests {
837 use std::sync::Arc;
838
839 use assert_matches::assert_matches;
840 use matrix_sdk_test::async_test;
841 use ruma::{owned_device_id, owned_user_id, room_id, user_id};
842 use tokio::sync::broadcast;
843
844 use super::{BaseStateStore, MemoryStore, RoomLoadSettings};
845 use crate::{RoomInfo, RoomState, SessionMeta, StateChanges};
846
847 #[async_test]
848 async fn test_set_session_meta() {
849 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
850
851 let session_meta = SessionMeta {
852 user_id: owned_user_id!("@mnt_io:matrix.org"),
853 device_id: owned_device_id!("HELLOYOU"),
854 };
855
856 assert!(store.session_meta.get().is_none());
857
858 store.set_session_meta(session_meta.clone());
859
860 assert_eq!(store.session_meta.get(), Some(&session_meta));
861 }
862
863 #[async_test]
864 #[should_panic]
865 async fn test_set_session_meta_twice() {
866 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
867
868 let session_meta = SessionMeta {
869 user_id: owned_user_id!("@mnt_io:matrix.org"),
870 device_id: owned_device_id!("HELLOYOU"),
871 };
872
873 store.set_session_meta(session_meta.clone());
874 store.set_session_meta(session_meta);
876 }
877
878 #[async_test]
879 async fn test_derive_from_other() {
880 let other = BaseStateStore::new(Arc::new(MemoryStore::new()));
882
883 let session_meta = SessionMeta {
884 user_id: owned_user_id!("@mnt_io:matrix.org"),
885 device_id: owned_device_id!("HELLOYOU"),
886 };
887 let (room_info_notable_update_sender, _) = broadcast::channel(1);
888 let room_id_0 = room_id!("!r0");
889
890 other
891 .load_rooms(
892 &session_meta.user_id,
893 RoomLoadSettings::One(room_id_0.to_owned()),
894 &room_info_notable_update_sender,
895 )
896 .await
897 .unwrap();
898 other.set_session_meta(session_meta.clone());
899
900 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
902 store.derive_from_other(&other, &room_info_notable_update_sender).await.unwrap();
903
904 assert_eq!(store.session_meta.get(), Some(&session_meta));
906 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::One(ref room_id) => {
908 assert_eq!(room_id, room_id_0);
909 });
910 }
911
912 #[test]
913 fn test_room_load_settings_default() {
914 assert_matches!(RoomLoadSettings::default(), RoomLoadSettings::All);
915 }
916
917 #[async_test]
918 async fn test_load_all_rooms() {
919 let room_id_0 = room_id!("!r0");
920 let room_id_1 = room_id!("!r1");
921 let user_id = user_id!("@mnt_io:matrix.org");
922
923 let memory_state_store = Arc::new(MemoryStore::new());
924
925 {
927 let store = BaseStateStore::new(memory_state_store.clone());
928 let mut changes = StateChanges::default();
929 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
930 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
931
932 store.inner.save_changes(&changes).await.unwrap();
933 }
934
935 {
937 let store = BaseStateStore::new(memory_state_store.clone());
938 let (room_info_notable_update_sender, _) = broadcast::channel(2);
939
940 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
942
943 store
945 .load_rooms(user_id, RoomLoadSettings::All, &room_info_notable_update_sender)
946 .await
947 .unwrap();
948
949 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
951
952 let mut rooms = store.rooms();
954 rooms.sort_by(|a, b| a.room_id().cmp(b.room_id()));
955
956 assert_eq!(rooms.len(), 2);
957
958 assert_eq!(rooms[0].room_id(), room_id_0);
959 assert_eq!(rooms[0].own_user_id(), user_id);
960
961 assert_eq!(rooms[1].room_id(), room_id_1);
962 assert_eq!(rooms[1].own_user_id(), user_id);
963 }
964 }
965
966 #[async_test]
967 async fn test_load_one_room() {
968 let room_id_0 = room_id!("!r0");
969 let room_id_1 = room_id!("!r1");
970 let user_id = user_id!("@mnt_io:matrix.org");
971
972 let memory_state_store = Arc::new(MemoryStore::new());
973
974 {
976 let store = BaseStateStore::new(memory_state_store.clone());
977 let mut changes = StateChanges::default();
978 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
979 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
980
981 store.inner.save_changes(&changes).await.unwrap();
982 }
983
984 {
986 let store = BaseStateStore::new(memory_state_store.clone());
987 let (room_info_notable_update_sender, _) = broadcast::channel(2);
988
989 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
991
992 store
994 .load_rooms(
995 user_id,
996 RoomLoadSettings::One(room_id_1.to_owned()),
997 &room_info_notable_update_sender,
998 )
999 .await
1000 .unwrap();
1001
1002 assert_matches!(
1004 *store.room_load_settings.read().await,
1005 RoomLoadSettings::One(ref room_id) => {
1006 assert_eq!(room_id, room_id_1);
1007 }
1008 );
1009
1010 let rooms = store.rooms();
1012 assert_eq!(rooms.len(), 1);
1013
1014 assert_eq!(rooms[0].room_id(), room_id_1);
1015 assert_eq!(rooms[0].own_user_id(), user_id);
1016 }
1017 }
1018}