1use std::{
24 collections::{BTreeMap, BTreeSet, HashMap},
25 fmt,
26 ops::Deref,
27 result::Result as StdResult,
28 str::Utf8Error,
29 sync::{Arc, RwLock as StdRwLock},
30};
31
32use eyeball_im::{Vector, VectorDiff};
33use futures_util::Stream;
34use once_cell::sync::OnceCell;
35
36#[cfg(any(test, feature = "testing"))]
37#[macro_use]
38pub mod integration_tests;
39mod observable_map;
40mod traits;
41
42#[cfg(feature = "e2e-encryption")]
43use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
44pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
45use observable_map::ObservableMap;
46use ruma::{
47 events::{
48 presence::PresenceEvent,
49 receipt::ReceiptEventContent,
50 room::{member::StrippedRoomMemberEvent, redaction::SyncRoomRedactionEvent},
51 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
52 AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
53 },
54 serde::Raw,
55 EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
56};
57use tokio::sync::{broadcast, Mutex, RwLock};
58use tracing::warn;
59
60use crate::{
61 deserialized_responses::DisplayName,
62 event_cache::store as event_cache_store,
63 room::{RoomInfo, RoomInfoNotableUpdate, RoomState},
64 MinimalRoomMemberEvent, Room, RoomStateFilter, SendOutsideWasm, SessionMeta, SyncOutsideWasm,
65};
66
67pub(crate) mod ambiguity_map;
68mod memory_store;
69pub mod migration_helpers;
70mod send_queue;
71
72#[cfg(any(test, feature = "testing"))]
73pub use self::integration_tests::StateStoreIntegrationTests;
74#[cfg(feature = "unstable-msc4274")]
75pub use self::send_queue::{AccumulatedSentMediaInfo, FinishGalleryItemInfo};
76pub use self::{
77 memory_store::MemoryStore,
78 send_queue::{
79 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
80 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
81 SentMediaInfo, SentRequestKey, SerializableEventContent,
82 },
83 traits::{
84 ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerCapabilities,
85 StateStore, StateStoreDataKey, StateStoreDataValue, StateStoreExt,
86 },
87};
88
89#[derive(Debug, thiserror::Error)]
91pub enum StoreError {
92 #[cfg(not(target_family = "wasm"))]
94 #[error(transparent)]
95 Backend(Box<dyn std::error::Error + Send + Sync>),
96
97 #[cfg(target_family = "wasm")]
99 #[error(transparent)]
100 Backend(Box<dyn std::error::Error>),
101 #[error(transparent)]
103 Json(#[from] serde_json::Error),
104 #[error(transparent)]
107 Identifier(#[from] ruma::IdParseError),
108 #[error("The store failed to be unlocked")]
111 StoreLocked,
112 #[error("The store is not encrypted but was tried to be opened with a passphrase")]
114 UnencryptedStore,
115 #[error("Error encrypting or decrypting data from the store: {0}")]
117 Encryption(#[from] StoreEncryptionError),
118
119 #[error("Error encoding or decoding data from the store: {0}")]
121 Codec(#[from] Utf8Error),
122
123 #[error(
125 "The database format changed in an incompatible way, current \
126 version: {0}, latest version: {1}"
127 )]
128 UnsupportedDatabaseVersion(usize, usize),
129 #[error("Redaction failed: {0}")]
133 Redaction(#[source] ruma::canonical_json::RedactionError),
134}
135
136impl StoreError {
137 #[inline]
141 pub fn backend<E>(error: E) -> Self
142 where
143 E: std::error::Error + SendOutsideWasm + SyncOutsideWasm + 'static,
144 {
145 Self::Backend(Box::new(error))
146 }
147}
148
149pub type Result<T, E = StoreError> = std::result::Result<T, E>;
151
152#[derive(Clone)]
157pub(crate) struct BaseStateStore {
158 pub(super) inner: Arc<DynStateStore>,
159 session_meta: Arc<OnceCell<SessionMeta>>,
160 room_load_settings: Arc<RwLock<RoomLoadSettings>>,
161 pub(super) sync_token: Arc<RwLock<Option<String>>>,
163 rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
165 sync_lock: Arc<Mutex<()>>,
168}
169
170impl BaseStateStore {
171 pub fn new(inner: Arc<DynStateStore>) -> Self {
173 Self {
174 inner,
175 session_meta: Default::default(),
176 room_load_settings: Default::default(),
177 sync_token: Default::default(),
178 rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
179 sync_lock: Default::default(),
180 }
181 }
182
183 pub fn sync_lock(&self) -> &Mutex<()> {
185 &self.sync_lock
186 }
187
188 pub(crate) fn set_session_meta(&self, session_meta: SessionMeta) {
194 self.session_meta.set(session_meta).expect("`SessionMeta` was already set");
195 }
196
197 pub(crate) async fn load_rooms(
200 &self,
201 user_id: &UserId,
202 room_load_settings: RoomLoadSettings,
203 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
204 ) -> Result<()> {
205 *self.room_load_settings.write().await = room_load_settings.clone();
206
207 let room_infos = self.load_and_migrate_room_infos(room_load_settings).await?;
208
209 let mut rooms = self.rooms.write().unwrap();
210
211 for room_info in room_infos {
212 let new_room = Room::restore(
213 user_id,
214 self.inner.clone(),
215 room_info,
216 room_info_notable_update_sender.clone(),
217 );
218 let new_room_id = new_room.room_id().to_owned();
219
220 rooms.insert(new_room_id, new_room);
221 }
222
223 Ok(())
224 }
225
226 async fn load_and_migrate_room_infos(
229 &self,
230 room_load_settings: RoomLoadSettings,
231 ) -> Result<Vec<RoomInfo>> {
232 let mut room_infos = self.inner.get_room_infos(&room_load_settings).await?;
233 let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
234
235 for room_info in room_infos.iter_mut() {
236 if room_info.apply_migrations(self.inner.clone()).await {
237 migrated_room_infos.push(room_info.clone());
238 }
239 }
240
241 if !migrated_room_infos.is_empty() {
242 let changes = StateChanges {
243 room_infos: migrated_room_infos
244 .into_iter()
245 .map(|room_info| (room_info.room_id.clone(), room_info))
246 .collect(),
247 ..Default::default()
248 };
249
250 if let Err(error) = self.inner.save_changes(&changes).await {
251 warn!("Failed to save migrated room infos: {error}");
252 }
253 }
254
255 Ok(room_infos)
256 }
257
258 pub(crate) async fn load_sync_token(&self) -> Result<()> {
261 let token =
262 self.get_kv_data(StateStoreDataKey::SyncToken).await?.and_then(|s| s.into_sync_token());
263 *self.sync_token.write().await = token;
264
265 Ok(())
266 }
267
268 #[cfg(any(feature = "e2e-encryption", test))]
271 pub(crate) async fn derive_from_other(
272 &self,
273 other: &Self,
274 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
275 ) -> Result<()> {
276 let Some(session_meta) = other.session_meta.get() else {
277 return Ok(());
278 };
279
280 let room_load_settings = other.room_load_settings.read().await.clone();
281
282 self.load_rooms(&session_meta.user_id, room_load_settings, room_info_notable_update_sender)
283 .await?;
284 self.load_sync_token().await?;
285 self.set_session_meta(session_meta.clone());
286
287 Ok(())
288 }
289
290 pub fn session_meta(&self) -> Option<&SessionMeta> {
292 self.session_meta.get()
293 }
294
295 pub fn rooms(&self) -> Vec<Room> {
297 self.rooms.read().unwrap().iter().cloned().collect()
298 }
299
300 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
302 self.rooms
303 .read()
304 .unwrap()
305 .iter()
306 .filter(|room| filter.matches(room.state()))
307 .cloned()
308 .collect()
309 }
310
311 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
314 self.rooms.read().unwrap().stream()
315 }
316
317 pub fn room(&self, room_id: &RoomId) -> Option<Room> {
319 self.rooms.read().unwrap().get(room_id).cloned()
320 }
321
322 pub(crate) fn room_exists(&self, room_id: &RoomId) -> bool {
324 self.rooms.read().unwrap().get(room_id).is_some()
325 }
326
327 pub fn get_or_create_room(
330 &self,
331 room_id: &RoomId,
332 room_state: RoomState,
333 room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
334 ) -> Room {
335 let user_id =
336 &self.session_meta.get().expect("Creating room while not being logged in").user_id;
337
338 self.rooms
339 .write()
340 .unwrap()
341 .get_or_create(room_id, || {
342 Room::new(
343 user_id,
344 self.inner.clone(),
345 room_id,
346 room_state,
347 room_info_notable_update_sender,
348 )
349 })
350 .clone()
351 }
352
353 pub(crate) async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
359 self.inner.remove_room(room_id).await?;
360 self.rooms.write().unwrap().remove(room_id);
361 Ok(())
362 }
363}
364
365#[cfg(not(tarpaulin_include))]
366impl fmt::Debug for BaseStateStore {
367 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
368 f.debug_struct("Store")
369 .field("inner", &self.inner)
370 .field("session_meta", &self.session_meta)
371 .field("sync_token", &self.sync_token)
372 .field("rooms", &self.rooms)
373 .finish_non_exhaustive()
374 }
375}
376
377impl Deref for BaseStateStore {
378 type Target = DynStateStore;
379
380 fn deref(&self) -> &Self::Target {
381 self.inner.deref()
382 }
383}
384
385#[derive(Clone, Debug, Default)]
420pub enum RoomLoadSettings {
421 #[default]
426 All,
427
428 One(OwnedRoomId),
434}
435
436#[derive(Clone, Debug, Default)]
438pub struct StateChanges {
439 pub sync_token: Option<String>,
441 pub account_data: BTreeMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
443 pub presence: BTreeMap<OwnedUserId, Raw<PresenceEvent>>,
445
446 pub profiles: BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, MinimalRoomMemberEvent>>,
449
450 pub profiles_to_delete: BTreeMap<OwnedRoomId, Vec<OwnedUserId>>,
454
455 pub state:
458 BTreeMap<OwnedRoomId, BTreeMap<StateEventType, BTreeMap<String, Raw<AnySyncStateEvent>>>>,
459 pub room_account_data:
461 BTreeMap<OwnedRoomId, BTreeMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
462
463 pub room_infos: BTreeMap<OwnedRoomId, RoomInfo>,
465
466 pub receipts: BTreeMap<OwnedRoomId, ReceiptEventContent>,
468
469 pub redactions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, Raw<SyncRoomRedactionEvent>>>,
472
473 pub stripped_state: BTreeMap<
476 OwnedRoomId,
477 BTreeMap<StateEventType, BTreeMap<String, Raw<AnyStrippedStateEvent>>>,
478 >,
479
480 pub ambiguity_maps: BTreeMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
483}
484
485impl StateChanges {
486 pub fn new(sync_token: String) -> Self {
488 Self { sync_token: Some(sync_token), ..Default::default() }
489 }
490
491 pub fn add_presence_event(&mut self, event: PresenceEvent, raw_event: Raw<PresenceEvent>) {
493 self.presence.insert(event.sender, raw_event);
494 }
495
496 pub fn add_room(&mut self, room: RoomInfo) {
498 self.room_infos.insert(room.room_id.clone(), room);
499 }
500
501 pub fn add_room_account_data(
504 &mut self,
505 room_id: &RoomId,
506 event: AnyRoomAccountDataEvent,
507 raw_event: Raw<AnyRoomAccountDataEvent>,
508 ) {
509 self.room_account_data
510 .entry(room_id.to_owned())
511 .or_default()
512 .insert(event.event_type(), raw_event);
513 }
514
515 pub fn add_stripped_member(
518 &mut self,
519 room_id: &RoomId,
520 user_id: &UserId,
521 event: Raw<StrippedRoomMemberEvent>,
522 ) {
523 self.stripped_state
524 .entry(room_id.to_owned())
525 .or_default()
526 .entry(StateEventType::RoomMember)
527 .or_default()
528 .insert(user_id.into(), event.cast());
529 }
530
531 pub fn add_state_event(
534 &mut self,
535 room_id: &RoomId,
536 event: AnySyncStateEvent,
537 raw_event: Raw<AnySyncStateEvent>,
538 ) {
539 self.state
540 .entry(room_id.to_owned())
541 .or_default()
542 .entry(event.event_type())
543 .or_default()
544 .insert(event.state_key().to_owned(), raw_event);
545 }
546
547 pub fn add_redaction(
549 &mut self,
550 room_id: &RoomId,
551 redacted_event_id: &EventId,
552 redaction: Raw<SyncRoomRedactionEvent>,
553 ) {
554 self.redactions
555 .entry(room_id.to_owned())
556 .or_default()
557 .insert(redacted_event_id.to_owned(), redaction);
558 }
559
560 pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
563 self.receipts.insert(room_id.to_owned(), event);
564 }
565}
566
567#[derive(Clone)]
582pub struct StoreConfig {
583 #[cfg(feature = "e2e-encryption")]
584 pub(crate) crypto_store: Arc<DynCryptoStore>,
585 pub(crate) state_store: Arc<DynStateStore>,
586 pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
587 cross_process_store_locks_holder_name: String,
588}
589
590#[cfg(not(tarpaulin_include))]
591impl fmt::Debug for StoreConfig {
592 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
593 fmt.debug_struct("StoreConfig").finish()
594 }
595}
596
597impl StoreConfig {
598 #[must_use]
603 pub fn new(cross_process_store_locks_holder_name: String) -> Self {
604 Self {
605 #[cfg(feature = "e2e-encryption")]
606 crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
607 state_store: Arc::new(MemoryStore::new()),
608 event_cache_store: event_cache_store::EventCacheStoreLock::new(
609 event_cache_store::MemoryStore::new(),
610 cross_process_store_locks_holder_name.clone(),
611 ),
612 cross_process_store_locks_holder_name,
613 }
614 }
615
616 #[cfg(feature = "e2e-encryption")]
620 pub fn crypto_store(mut self, store: impl IntoCryptoStore) -> Self {
621 self.crypto_store = store.into_crypto_store();
622 self
623 }
624
625 pub fn state_store(mut self, store: impl IntoStateStore) -> Self {
627 self.state_store = store.into_state_store();
628 self
629 }
630
631 pub fn event_cache_store<S>(mut self, event_cache_store: S) -> Self
633 where
634 S: event_cache_store::IntoEventCacheStore,
635 {
636 self.event_cache_store = event_cache_store::EventCacheStoreLock::new(
637 event_cache_store,
638 self.cross_process_store_locks_holder_name.clone(),
639 );
640 self
641 }
642}
643
644#[cfg(test)]
645mod tests {
646 use std::sync::Arc;
647
648 use assert_matches::assert_matches;
649 use matrix_sdk_test::async_test;
650 use ruma::{owned_device_id, owned_user_id, room_id, user_id};
651 use tokio::sync::broadcast;
652
653 use super::{BaseStateStore, MemoryStore, RoomLoadSettings};
654 use crate::{RoomInfo, RoomState, SessionMeta, StateChanges};
655
656 #[async_test]
657 async fn test_set_session_meta() {
658 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
659
660 let session_meta = SessionMeta {
661 user_id: owned_user_id!("@mnt_io:matrix.org"),
662 device_id: owned_device_id!("HELLOYOU"),
663 };
664
665 assert!(store.session_meta.get().is_none());
666
667 store.set_session_meta(session_meta.clone());
668
669 assert_eq!(store.session_meta.get(), Some(&session_meta));
670 }
671
672 #[async_test]
673 #[should_panic]
674 async fn test_set_session_meta_twice() {
675 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
676
677 let session_meta = SessionMeta {
678 user_id: owned_user_id!("@mnt_io:matrix.org"),
679 device_id: owned_device_id!("HELLOYOU"),
680 };
681
682 store.set_session_meta(session_meta.clone());
683 store.set_session_meta(session_meta);
685 }
686
687 #[async_test]
688 async fn test_derive_from_other() {
689 let other = BaseStateStore::new(Arc::new(MemoryStore::new()));
691
692 let session_meta = SessionMeta {
693 user_id: owned_user_id!("@mnt_io:matrix.org"),
694 device_id: owned_device_id!("HELLOYOU"),
695 };
696 let (room_info_notable_update_sender, _) = broadcast::channel(1);
697 let room_id_0 = room_id!("!r0");
698
699 other
700 .load_rooms(
701 &session_meta.user_id,
702 RoomLoadSettings::One(room_id_0.to_owned()),
703 &room_info_notable_update_sender,
704 )
705 .await
706 .unwrap();
707 other.set_session_meta(session_meta.clone());
708
709 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
711 store.derive_from_other(&other, &room_info_notable_update_sender).await.unwrap();
712
713 assert_eq!(store.session_meta.get(), Some(&session_meta));
715 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::One(ref room_id) => {
717 assert_eq!(room_id, room_id_0);
718 });
719 }
720
721 #[test]
722 fn test_room_load_settings_default() {
723 assert_matches!(RoomLoadSettings::default(), RoomLoadSettings::All);
724 }
725
726 #[async_test]
727 async fn test_load_all_rooms() {
728 let room_id_0 = room_id!("!r0");
729 let room_id_1 = room_id!("!r1");
730 let user_id = user_id!("@mnt_io:matrix.org");
731
732 let memory_state_store = Arc::new(MemoryStore::new());
733
734 {
736 let store = BaseStateStore::new(memory_state_store.clone());
737 let mut changes = StateChanges::default();
738 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
739 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
740
741 store.inner.save_changes(&changes).await.unwrap();
742 }
743
744 {
746 let store = BaseStateStore::new(memory_state_store.clone());
747 let (room_info_notable_update_sender, _) = broadcast::channel(2);
748
749 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
751
752 store
754 .load_rooms(user_id, RoomLoadSettings::All, &room_info_notable_update_sender)
755 .await
756 .unwrap();
757
758 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
760
761 let mut rooms = store.rooms();
763 rooms.sort_by(|a, b| a.room_id().cmp(b.room_id()));
764
765 assert_eq!(rooms.len(), 2);
766
767 assert_eq!(rooms[0].room_id(), room_id_0);
768 assert_eq!(rooms[0].own_user_id(), user_id);
769
770 assert_eq!(rooms[1].room_id(), room_id_1);
771 assert_eq!(rooms[1].own_user_id(), user_id);
772 }
773 }
774
775 #[async_test]
776 async fn test_load_one_room() {
777 let room_id_0 = room_id!("!r0");
778 let room_id_1 = room_id!("!r1");
779 let user_id = user_id!("@mnt_io:matrix.org");
780
781 let memory_state_store = Arc::new(MemoryStore::new());
782
783 {
785 let store = BaseStateStore::new(memory_state_store.clone());
786 let mut changes = StateChanges::default();
787 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
788 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
789
790 store.inner.save_changes(&changes).await.unwrap();
791 }
792
793 {
795 let store = BaseStateStore::new(memory_state_store.clone());
796 let (room_info_notable_update_sender, _) = broadcast::channel(2);
797
798 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
800
801 store
803 .load_rooms(
804 user_id,
805 RoomLoadSettings::One(room_id_1.to_owned()),
806 &room_info_notable_update_sender,
807 )
808 .await
809 .unwrap();
810
811 assert_matches!(
813 *store.room_load_settings.read().await,
814 RoomLoadSettings::One(ref room_id) => {
815 assert_eq!(room_id, room_id_1);
816 }
817 );
818
819 let rooms = store.rooms();
821 assert_eq!(rooms.len(), 1);
822
823 assert_eq!(rooms[0].room_id(), room_id_1);
824 assert_eq!(rooms[0].own_user_id(), user_id);
825 }
826 }
827}