1use std::{
24 collections::{BTreeMap, BTreeSet, HashMap},
25 fmt,
26 ops::Deref,
27 result::Result as StdResult,
28 str::Utf8Error,
29 sync::{Arc, RwLock as StdRwLock},
30};
31
32use eyeball_im::{Vector, VectorDiff};
33use futures_util::Stream;
34use once_cell::sync::OnceCell;
35
36#[cfg(any(test, feature = "testing"))]
37#[macro_use]
38pub mod integration_tests;
39mod observable_map;
40mod traits;
41
42#[cfg(feature = "e2e-encryption")]
43use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
44pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
45use observable_map::ObservableMap;
46use ruma::{
47 events::{
48 presence::PresenceEvent,
49 receipt::ReceiptEventContent,
50 room::{member::StrippedRoomMemberEvent, redaction::SyncRoomRedactionEvent},
51 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
52 AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
53 },
54 serde::Raw,
55 EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
56};
57use tokio::sync::{broadcast, Mutex, RwLock};
58use tracing::warn;
59
60use crate::{
61 deserialized_responses::DisplayName,
62 event_cache::store as event_cache_store,
63 rooms::{normal::RoomInfoNotableUpdate, RoomInfo, RoomState},
64 MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
65};
66
67pub(crate) mod ambiguity_map;
68mod memory_store;
69pub mod migration_helpers;
70mod send_queue;
71
72#[cfg(any(test, feature = "testing"))]
73pub use self::integration_tests::StateStoreIntegrationTests;
74pub use self::{
75 memory_store::MemoryStore,
76 send_queue::{
77 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
78 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
79 SentMediaInfo, SentRequestKey, SerializableEventContent,
80 },
81 traits::{
82 ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerCapabilities,
83 StateStore, StateStoreDataKey, StateStoreDataValue, StateStoreExt,
84 },
85};
86
87#[derive(Debug, thiserror::Error)]
89pub enum StoreError {
90 #[error(transparent)]
92 Backend(Box<dyn std::error::Error + Send + Sync>),
93 #[error(transparent)]
95 Json(#[from] serde_json::Error),
96 #[error(transparent)]
99 Identifier(#[from] ruma::IdParseError),
100 #[error("The store failed to be unlocked")]
103 StoreLocked,
104 #[error("The store is not encrypted but was tried to be opened with a passphrase")]
106 UnencryptedStore,
107 #[error("Error encrypting or decrypting data from the store: {0}")]
109 Encryption(#[from] StoreEncryptionError),
110
111 #[error("Error encoding or decoding data from the store: {0}")]
113 Codec(#[from] Utf8Error),
114
115 #[error(
117 "The database format changed in an incompatible way, current \
118 version: {0}, latest version: {1}"
119 )]
120 UnsupportedDatabaseVersion(usize, usize),
121 #[error("Redaction failed: {0}")]
125 Redaction(#[source] ruma::canonical_json::RedactionError),
126}
127
128impl StoreError {
129 #[inline]
133 pub fn backend<E>(error: E) -> Self
134 where
135 E: std::error::Error + Send + Sync + 'static,
136 {
137 Self::Backend(Box::new(error))
138 }
139}
140
141pub type Result<T, E = StoreError> = std::result::Result<T, E>;
143
144#[derive(Clone)]
149pub(crate) struct Store {
150 pub(super) inner: Arc<DynStateStore>,
151 session_meta: Arc<OnceCell<SessionMeta>>,
152 pub(super) sync_token: Arc<RwLock<Option<String>>>,
154 rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
156 sync_lock: Arc<Mutex<()>>,
159}
160
161impl Store {
162 pub fn new(inner: Arc<DynStateStore>) -> Self {
164 Self {
165 inner,
166 session_meta: Default::default(),
167 sync_token: Default::default(),
168 rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
169 sync_lock: Default::default(),
170 }
171 }
172
173 pub fn sync_lock(&self) -> &Mutex<()> {
175 &self.sync_lock
176 }
177
178 async fn load_room_infos(&self) -> Result<Vec<RoomInfo>> {
182 let mut room_infos = self.inner.get_room_infos().await?;
183 let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
184
185 for room_info in room_infos.iter_mut() {
186 if room_info.apply_migrations(self.inner.clone()).await {
187 migrated_room_infos.push(room_info.clone());
188 }
189 }
190
191 if !migrated_room_infos.is_empty() {
192 let changes = StateChanges {
193 room_infos: migrated_room_infos
194 .into_iter()
195 .map(|room_info| (room_info.room_id.clone(), room_info))
196 .collect(),
197 ..Default::default()
198 };
199
200 if let Err(error) = self.inner.save_changes(&changes).await {
201 warn!("Failed to save migrated room infos: {error}");
202 }
203 }
204
205 Ok(room_infos)
206 }
207
208 pub async fn set_session_meta(
215 &self,
216 session_meta: SessionMeta,
217 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
218 ) -> Result<()> {
219 {
220 let room_infos = self.load_room_infos().await?;
221
222 let mut rooms = self.rooms.write().unwrap();
223
224 for room_info in room_infos {
225 let new_room = Room::restore(
226 &session_meta.user_id,
227 self.inner.clone(),
228 room_info,
229 room_info_notable_update_sender.clone(),
230 );
231 let new_room_id = new_room.room_id().to_owned();
232
233 rooms.insert(new_room_id, new_room);
234 }
235 }
236
237 let token =
238 self.get_kv_data(StateStoreDataKey::SyncToken).await?.and_then(|s| s.into_sync_token());
239 *self.sync_token.write().await = token;
240
241 self.session_meta.set(session_meta).expect("Session Meta was already set");
242
243 Ok(())
244 }
245
246 pub fn session_meta(&self) -> Option<&SessionMeta> {
248 self.session_meta.get()
249 }
250
251 pub fn rooms(&self) -> Vec<Room> {
253 self.rooms.read().unwrap().iter().cloned().collect()
254 }
255
256 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
258 self.rooms
259 .read()
260 .unwrap()
261 .iter()
262 .filter(|room| filter.matches(room.state()))
263 .cloned()
264 .collect()
265 }
266
267 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
270 self.rooms.read().unwrap().stream()
271 }
272
273 pub fn room(&self, room_id: &RoomId) -> Option<Room> {
275 self.rooms.read().unwrap().get(room_id).cloned()
276 }
277
278 pub(crate) fn room_exists(&self, room_id: &RoomId) -> bool {
280 self.rooms.read().unwrap().get(room_id).is_some()
281 }
282
283 pub fn get_or_create_room(
286 &self,
287 room_id: &RoomId,
288 room_type: RoomState,
289 room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
290 ) -> Room {
291 let user_id =
292 &self.session_meta.get().expect("Creating room while not being logged in").user_id;
293
294 self.rooms
295 .write()
296 .unwrap()
297 .get_or_create(room_id, || {
298 Room::new(
299 user_id,
300 self.inner.clone(),
301 room_id,
302 room_type,
303 room_info_notable_update_sender,
304 )
305 })
306 .clone()
307 }
308
309 pub(crate) async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
315 self.inner.remove_room(room_id).await?;
316 self.rooms.write().unwrap().remove(room_id);
317 Ok(())
318 }
319}
320
321#[cfg(not(tarpaulin_include))]
322impl fmt::Debug for Store {
323 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
324 f.debug_struct("Store")
325 .field("inner", &self.inner)
326 .field("session_meta", &self.session_meta)
327 .field("sync_token", &self.sync_token)
328 .field("rooms", &self.rooms)
329 .finish_non_exhaustive()
330 }
331}
332
333impl Deref for Store {
334 type Target = DynStateStore;
335
336 fn deref(&self) -> &Self::Target {
337 self.inner.deref()
338 }
339}
340
341#[derive(Clone, Debug, Default)]
343pub struct StateChanges {
344 pub sync_token: Option<String>,
346 pub account_data: BTreeMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
348 pub presence: BTreeMap<OwnedUserId, Raw<PresenceEvent>>,
350
351 pub profiles: BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, MinimalRoomMemberEvent>>,
354
355 pub profiles_to_delete: BTreeMap<OwnedRoomId, Vec<OwnedUserId>>,
359
360 pub state:
363 BTreeMap<OwnedRoomId, BTreeMap<StateEventType, BTreeMap<String, Raw<AnySyncStateEvent>>>>,
364 pub room_account_data:
366 BTreeMap<OwnedRoomId, BTreeMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
367
368 pub room_infos: BTreeMap<OwnedRoomId, RoomInfo>,
370
371 pub receipts: BTreeMap<OwnedRoomId, ReceiptEventContent>,
373
374 pub redactions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, Raw<SyncRoomRedactionEvent>>>,
377
378 pub stripped_state: BTreeMap<
381 OwnedRoomId,
382 BTreeMap<StateEventType, BTreeMap<String, Raw<AnyStrippedStateEvent>>>,
383 >,
384
385 pub ambiguity_maps: BTreeMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
388}
389
390impl StateChanges {
391 pub fn new(sync_token: String) -> Self {
393 Self { sync_token: Some(sync_token), ..Default::default() }
394 }
395
396 pub fn add_presence_event(&mut self, event: PresenceEvent, raw_event: Raw<PresenceEvent>) {
398 self.presence.insert(event.sender, raw_event);
399 }
400
401 pub fn add_room(&mut self, room: RoomInfo) {
403 self.room_infos.insert(room.room_id.clone(), room);
404 }
405
406 pub fn add_room_account_data(
409 &mut self,
410 room_id: &RoomId,
411 event: AnyRoomAccountDataEvent,
412 raw_event: Raw<AnyRoomAccountDataEvent>,
413 ) {
414 self.room_account_data
415 .entry(room_id.to_owned())
416 .or_default()
417 .insert(event.event_type(), raw_event);
418 }
419
420 pub fn add_stripped_member(
423 &mut self,
424 room_id: &RoomId,
425 user_id: &UserId,
426 event: Raw<StrippedRoomMemberEvent>,
427 ) {
428 self.stripped_state
429 .entry(room_id.to_owned())
430 .or_default()
431 .entry(StateEventType::RoomMember)
432 .or_default()
433 .insert(user_id.into(), event.cast());
434 }
435
436 pub fn add_state_event(
439 &mut self,
440 room_id: &RoomId,
441 event: AnySyncStateEvent,
442 raw_event: Raw<AnySyncStateEvent>,
443 ) {
444 self.state
445 .entry(room_id.to_owned())
446 .or_default()
447 .entry(event.event_type())
448 .or_default()
449 .insert(event.state_key().to_owned(), raw_event);
450 }
451
452 pub fn add_redaction(
454 &mut self,
455 room_id: &RoomId,
456 redacted_event_id: &EventId,
457 redaction: Raw<SyncRoomRedactionEvent>,
458 ) {
459 self.redactions
460 .entry(room_id.to_owned())
461 .or_default()
462 .insert(redacted_event_id.to_owned(), redaction);
463 }
464
465 pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
468 self.receipts.insert(room_id.to_owned(), event);
469 }
470}
471
472#[derive(Clone)]
487pub struct StoreConfig {
488 #[cfg(feature = "e2e-encryption")]
489 pub(crate) crypto_store: Arc<DynCryptoStore>,
490 pub(crate) state_store: Arc<DynStateStore>,
491 pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
492 cross_process_store_locks_holder_name: String,
493}
494
495#[cfg(not(tarpaulin_include))]
496impl fmt::Debug for StoreConfig {
497 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
498 fmt.debug_struct("StoreConfig").finish()
499 }
500}
501
502impl StoreConfig {
503 #[must_use]
508 pub fn new(cross_process_store_locks_holder_name: String) -> Self {
509 Self {
510 #[cfg(feature = "e2e-encryption")]
511 crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
512 state_store: Arc::new(MemoryStore::new()),
513 event_cache_store: event_cache_store::EventCacheStoreLock::new(
514 event_cache_store::MemoryStore::new(),
515 cross_process_store_locks_holder_name.clone(),
516 ),
517 cross_process_store_locks_holder_name,
518 }
519 }
520
521 #[cfg(feature = "e2e-encryption")]
525 pub fn crypto_store(mut self, store: impl IntoCryptoStore) -> Self {
526 self.crypto_store = store.into_crypto_store();
527 self
528 }
529
530 pub fn state_store(mut self, store: impl IntoStateStore) -> Self {
532 self.state_store = store.into_state_store();
533 self
534 }
535
536 pub fn event_cache_store<S>(mut self, event_cache_store: S) -> Self
538 where
539 S: event_cache_store::IntoEventCacheStore,
540 {
541 self.event_cache_store = event_cache_store::EventCacheStoreLock::new(
542 event_cache_store,
543 self.cross_process_store_locks_holder_name.clone(),
544 );
545 self
546 }
547}