Skip to main content

matrix_sdk_base/store/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The state store holds the overall state for rooms, users and their
16//! profiles and their timelines. It is an overall cache for faster access
17//! and convenience- accessible through `Store`.
18//!
19//! Implementing the `StateStore` trait, you can plug any storage backend
20//! into the store for the actual storage. By default this brings an in-memory
21//! store.
22
23use std::{
24    borrow::Borrow,
25    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
26    fmt,
27    ops::Deref,
28    result::Result as StdResult,
29    str::{FromStr, Utf8Error},
30    sync::{Arc, OnceLock, RwLock as StdRwLock},
31};
32
33use eyeball_im::{Vector, VectorDiff};
34use futures_util::Stream;
35use matrix_sdk_common::ROOM_VERSION_RULES_FALLBACK;
36
37#[cfg(any(test, feature = "testing"))]
38#[macro_use]
39pub mod integration_tests;
40mod observable_map;
41mod traits;
42
43use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, locks::Mutex as SyncMutex};
44#[cfg(feature = "e2e-encryption")]
45use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
46pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
47use observable_map::ObservableMap;
48use ruma::{
49    EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
50    events::{
51        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
52        AnySyncStateEvent, EmptyStateKey, GlobalAccountDataEventType, RedactContent,
53        RedactedStateEventContent, RoomAccountDataEventType, StateEventType, StaticEventContent,
54        StaticStateEventContent, StrippedStateEvent, SyncStateEvent,
55        presence::PresenceEvent,
56        receipt::ReceiptEventContent,
57        room::{
58            create::RoomCreateEventContent,
59            member::{RoomMemberEventContent, StrippedRoomMemberEvent},
60            power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent},
61            redaction::SyncRoomRedactionEvent,
62        },
63    },
64    serde::Raw,
65};
66use serde::de::DeserializeOwned;
67use tokio::sync::{Mutex, RwLock, broadcast};
68use tracing::warn;
69pub use traits::compare_thread_subscription_bump_stamps;
70
71use crate::{
72    MinimalRoomMemberEvent, Room, RoomCreateWithCreatorEventContent, RoomStateFilter, SessionMeta,
73    deserialized_responses::DisplayName,
74    event_cache::store as event_cache_store,
75    media::store as media_store,
76    room::{RoomInfo, RoomInfoNotableUpdate, RoomState},
77};
78
79pub(crate) mod ambiguity_map;
80mod memory_store;
81pub mod migration_helpers;
82mod send_queue;
83
84#[cfg(any(test, feature = "testing"))]
85pub use self::integration_tests::StateStoreIntegrationTests;
86#[cfg(feature = "unstable-msc4274")]
87pub use self::send_queue::{AccumulatedSentMediaInfo, FinishGalleryItemInfo};
88pub use self::{
89    memory_store::MemoryStore,
90    send_queue::{
91        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
92        FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
93        SentMediaInfo, SentRequestKey, SerializableEventContent,
94    },
95    traits::{
96        ComposerDraft, ComposerDraftType, DraftAttachment, DraftAttachmentContent, DraftThumbnail,
97        DynStateStore, IncorrectMutexGuardError, IntoStateStore, SaveLockedStateStore, StateStore,
98        StateStoreDataKey, StateStoreDataValue, StateStoreExt, SupportedVersionsResponse,
99        ThreadSubscriptionCatchupToken, WellKnownResponse,
100    },
101};
102
103/// State store specific error type.
104#[derive(Debug, thiserror::Error)]
105pub enum StoreError {
106    /// An error happened in the underlying database backend.
107    #[error(transparent)]
108    Backend(Box<dyn std::error::Error + Send + Sync>),
109
110    /// An error happened while serializing or deserializing some data.
111    #[error(transparent)]
112    Json(#[from] serde_json::Error),
113
114    /// An error happened while deserializing a Matrix identifier, e.g. an user
115    /// id.
116    #[error(transparent)]
117    Identifier(#[from] ruma::IdParseError),
118
119    /// The store is locked with a passphrase and an incorrect passphrase was
120    /// given.
121    #[error("The store failed to be unlocked")]
122    StoreLocked,
123
124    /// An unencrypted store was tried to be unlocked with a passphrase.
125    #[error("The store is not encrypted but was tried to be opened with a passphrase")]
126    UnencryptedStore,
127
128    /// The store failed to encrypt or decrypt some data.
129    #[error("Error encrypting or decrypting data from the store: {0}")]
130    Encryption(#[from] StoreEncryptionError),
131
132    /// The store failed to encode or decode some data.
133    #[error("Error encoding or decoding data from the store: {0}")]
134    Codec(#[from] Utf8Error),
135
136    /// The database format has changed in a backwards incompatible way.
137    #[error(
138        "The database format changed in an incompatible way, current \
139        version: {0}, latest version: {1}"
140    )]
141    UnsupportedDatabaseVersion(usize, usize),
142
143    /// Redacting an event in the store has failed.
144    ///
145    /// This should never happen.
146    #[error("Redaction failed: {0}")]
147    Redaction(#[source] ruma::canonical_json::RedactionError),
148
149    /// The store contains invalid data.
150    #[error("The store contains invalid data: {details}")]
151    InvalidData {
152        /// Details about which data is invalid, and how.
153        details: String,
154    },
155}
156
157impl StoreError {
158    /// Create a new [`Backend`][Self::Backend] error.
159    ///
160    /// Shorthand for `StoreError::Backend(Box::new(error))`.
161    #[inline]
162    pub fn backend<E>(error: E) -> Self
163    where
164        E: std::error::Error + Send + Sync + 'static,
165    {
166        Self::Backend(Box::new(error))
167    }
168}
169
170/// A `StateStore` specific result type.
171pub type Result<T, E = StoreError> = std::result::Result<T, E>;
172
173/// A state store wrapper for the SDK.
174///
175/// This adds additional higher level store functionality on top of a
176/// `StateStore` implementation.
177#[derive(Clone)]
178pub(crate) struct BaseStateStore {
179    pub(super) inner: SaveLockedStateStore,
180    session_meta: Arc<OnceLock<SessionMeta>>,
181    room_load_settings: Arc<RwLock<RoomLoadSettings>>,
182
183    /// A sender that is used to communicate changes to room information. Each
184    /// tick contains the room ID and the reasons that have generated this tick.
185    pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
186
187    /// The current sync token that should be used for the next sync call.
188    pub(super) sync_token: Arc<RwLock<Option<String>>>,
189
190    /// All rooms the store knows about.
191    rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
192
193    /// Which rooms have already logged a log line about missing room info, in
194    /// the context of response processors?
195    pub(crate) already_logged_missing_room: Arc<SyncMutex<HashSet<OwnedRoomId>>>,
196}
197
198impl BaseStateStore {
199    /// Create a new store, wrapping the given `StateStore`
200    pub fn new(inner: Arc<DynStateStore>) -> Self {
201        // Create the channel to receive `RoomInfoNotableUpdate`.
202        //
203        // Let's consider the channel will receive 5 updates for 100 rooms maximum. This
204        // is unrealistic in practise, as the sync mechanism is pretty unlikely to
205        // trigger such amount of updates, it's a safe value.
206        //
207        // Also, note that it must not be zero, because (i) it will panic,
208        // (ii) a new user has no room, but can create rooms; remember that the
209        // channel's capacity is immutable.
210        let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
211            broadcast::channel(500);
212
213        Self {
214            inner: SaveLockedStateStore::new(inner),
215            session_meta: Default::default(),
216            room_load_settings: Default::default(),
217            room_info_notable_update_sender,
218            sync_token: Default::default(),
219            rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
220            already_logged_missing_room: Default::default(),
221        }
222    }
223
224    /// Get access to the syncing lock.
225    pub fn lock(&self) -> &Mutex<()> {
226        self.inner.lock()
227    }
228
229    /// Set the [`SessionMeta`] into [`BaseStateStore::session_meta`].
230    ///
231    /// # Panics
232    ///
233    /// Panics if called twice.
234    pub(crate) fn set_session_meta(&self, session_meta: SessionMeta) {
235        self.session_meta.set(session_meta).expect("`SessionMeta` was already set");
236    }
237
238    /// Loads rooms from the given [`DynStateStore`] (in
239    /// [`BaseStateStore::new`]) into [`BaseStateStore::rooms`].
240    pub(crate) async fn load_rooms(
241        &self,
242        user_id: &UserId,
243        room_load_settings: RoomLoadSettings,
244    ) -> Result<()> {
245        *self.room_load_settings.write().await = room_load_settings.clone();
246
247        let room_infos = self.load_and_migrate_room_infos(room_load_settings).await?;
248
249        let mut rooms = self.rooms.write().unwrap();
250
251        for room_info in room_infos {
252            let new_room = Room::restore(
253                user_id,
254                self.inner.clone(),
255                room_info,
256                self.room_info_notable_update_sender.clone(),
257            );
258            let new_room_id = new_room.room_id().to_owned();
259
260            rooms.insert(new_room_id, new_room);
261        }
262
263        Ok(())
264    }
265
266    /// Load room infos from the [`StateStore`] and applies migrations onto
267    /// them.
268    async fn load_and_migrate_room_infos(
269        &self,
270        room_load_settings: RoomLoadSettings,
271    ) -> Result<Vec<RoomInfo>> {
272        let mut room_infos = self.inner.get_room_infos(&room_load_settings).await?;
273        let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
274
275        for room_info in room_infos.iter_mut() {
276            if room_info.apply_migrations(self.inner.clone()).await {
277                migrated_room_infos.push(room_info.clone());
278            }
279        }
280
281        if !migrated_room_infos.is_empty() {
282            let changes = StateChanges {
283                room_infos: migrated_room_infos
284                    .into_iter()
285                    .map(|room_info| (room_info.room_id.clone(), room_info))
286                    .collect(),
287                ..Default::default()
288            };
289
290            if let Err(error) = self.inner.save_changes(&changes).await {
291                warn!("Failed to save migrated room infos: {error}");
292            }
293        }
294
295        Ok(room_infos)
296    }
297
298    /// Load sync token from the [`StateStore`], and put it in
299    /// [`BaseStateStore::sync_token`].
300    pub(crate) async fn load_sync_token(&self) -> Result<()> {
301        let token =
302            self.get_kv_data(StateStoreDataKey::SyncToken).await?.and_then(|s| s.into_sync_token());
303        *self.sync_token.write().await = token;
304
305        Ok(())
306    }
307
308    /// Restore the session meta, sync token and rooms from an existing
309    /// [`BaseStateStore`].
310    #[cfg(any(feature = "e2e-encryption", test))]
311    pub(crate) async fn derive_from_other(&self, other: &Self) -> Result<()> {
312        let Some(session_meta) = other.session_meta.get() else {
313            return Ok(());
314        };
315
316        let room_load_settings = other.room_load_settings.read().await.clone();
317
318        self.load_rooms(&session_meta.user_id, room_load_settings).await?;
319        self.load_sync_token().await?;
320        self.set_session_meta(session_meta.clone());
321
322        Ok(())
323    }
324
325    /// The current [`SessionMeta`] containing our user ID and device ID.
326    pub fn session_meta(&self) -> Option<&SessionMeta> {
327        self.session_meta.get()
328    }
329
330    /// Get all the rooms this store knows about.
331    pub fn rooms(&self) -> Vec<Room> {
332        self.rooms.read().unwrap().iter().cloned().collect()
333    }
334
335    /// Get all the rooms this store knows about, filtered by state.
336    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
337        self.rooms
338            .read()
339            .unwrap()
340            .iter()
341            .filter(|room| filter.matches(room.state()))
342            .cloned()
343            .collect()
344    }
345
346    /// Get a stream of all the rooms changes, in addition to the existing
347    /// rooms.
348    pub fn rooms_stream(
349        &self,
350    ) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + use<>) {
351        self.rooms.read().unwrap().stream()
352    }
353
354    /// Get the room with the given room id.
355    pub fn room(&self, room_id: &RoomId) -> Option<Room> {
356        self.rooms.read().unwrap().get(room_id).cloned()
357    }
358
359    /// Check if a room exists.
360    pub(crate) fn room_exists(&self, room_id: &RoomId) -> bool {
361        self.rooms.read().unwrap().get(room_id).is_some()
362    }
363
364    /// Lookup the `Room` for the given `RoomId`, or create one, if it didn't
365    /// exist yet in the store
366    pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
367        let user_id =
368            &self.session_meta.get().expect("Creating room while not being logged in").user_id;
369
370        self.rooms
371            .write()
372            .unwrap()
373            .get_or_create(room_id, || {
374                Room::new(
375                    user_id,
376                    self.inner.clone(),
377                    room_id,
378                    room_state,
379                    self.room_info_notable_update_sender.clone(),
380                )
381            })
382            .clone()
383    }
384
385    /// Forget the room with the given room ID.
386    ///
387    /// # Arguments
388    ///
389    /// * `room_id` - The id of the room that should be forgotten.
390    pub(crate) async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
391        self.inner.remove_room(room_id).await?;
392        self.rooms.write().unwrap().remove(room_id);
393        Ok(())
394    }
395}
396
397#[cfg(not(tarpaulin_include))]
398impl fmt::Debug for BaseStateStore {
399    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
400        f.debug_struct("Store")
401            .field("inner", &self.inner)
402            .field("session_meta", &self.session_meta)
403            .field("sync_token", &self.sync_token)
404            .field("rooms", &self.rooms)
405            .finish_non_exhaustive()
406    }
407}
408
409impl Deref for BaseStateStore {
410    type Target = SaveLockedStateStore;
411
412    fn deref(&self) -> &Self::Target {
413        &self.inner
414    }
415}
416
417/// Configure how many rooms will be restored when restoring the session with
418/// `BaseStateStore::load_rooms`.
419///
420/// <div class="warning">
421///
422/// # ⚠️ Be careful!
423///
424/// When loading a single room with [`RoomLoadSettings::One`], the in-memory
425/// state may not reflect the store state (in the databases). Thus, when one
426/// will get a room that exists in the store state but _not_ in the in-memory
427/// state, it will be created from scratch and, when saved, will override the
428/// data in the store state (in the databases). This can lead to weird
429/// behaviours.
430///
431/// This option is expected to be used as follows:
432///
433/// 1. Create a `BaseStateStore` with a [`StateStore`] based on SQLite for
434///    example,
435/// 2. Restore a session and load one room from the [`StateStore`] (in the case
436///    of dealing with a notification for example),
437/// 3. Derive the `BaseStateStore`, with `BaseStateStore::derive_from_other`,
438///    into another one with an in-memory [`StateStore`], such as
439///    [`MemoryStore`],
440/// 4. Work on this derived `BaseStateStore`.
441///
442/// Now, all operations happen in the [`MemoryStore`], not on the original store
443/// (SQLite in this example), thus protecting original data.
444///
445/// From a higher-level point of view, this is what
446/// [`BaseClient::clone_with_in_memory_state_store`] does.
447///
448/// </div>
449///
450/// [`BaseClient::clone_with_in_memory_state_store`]: crate::BaseClient::clone_with_in_memory_state_store
451#[derive(Clone, Debug, Default)]
452pub enum RoomLoadSettings {
453    /// Load all rooms from the [`StateStore`] into the in-memory state store
454    /// `BaseStateStore`.
455    ///
456    /// This is the default variant.
457    #[default]
458    All,
459
460    /// Load a single room from the [`StateStore`] into the in-memory state
461    /// store `BaseStateStore`.
462    ///
463    /// Please, be careful with this option. Read the documentation of
464    /// [`RoomLoadSettings`].
465    One(OwnedRoomId),
466}
467
468/// The subscription status of a thread.
469///
470/// We keep unsubscriptions in the database, because we need the bumpstamp
471/// information (in `ThreadSubscription`) to be around to order subscriptions
472/// and unsubscriptions.
473#[derive(Clone, Copy, Debug, PartialEq, Eq)]
474pub enum ThreadSubscriptionStatus {
475    /// The user is subscribed to the related thread.
476    Subscribed {
477        /// Whether the subscription was made automatically by a client, not by
478        /// manual user choice.
479        automatic: bool,
480    },
481
482    /// The user has been unsubscribed to the related thread.
483    Unsubscribed,
484}
485
486impl FromStr for ThreadSubscriptionStatus {
487    type Err = ();
488
489    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
490        match s {
491            "automatic" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: true }),
492            "manual" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: false }),
493            "unsubscribed" => Ok(ThreadSubscriptionStatus::Unsubscribed),
494            _ => Err(()),
495        }
496    }
497}
498
499impl ThreadSubscriptionStatus {
500    /// Represent the status as a static string ref, for it to be stored into a
501    /// persistent format.
502    ///
503    /// Note: this is serialized in some databases implementations, so make sure
504    /// to not change it lightly, and keep it in sync with
505    /// [`Self::from_str`].
506    pub fn as_str(&self) -> &'static str {
507        match self {
508            ThreadSubscriptionStatus::Subscribed { automatic } => {
509                if *automatic {
510                    "automatic"
511                } else {
512                    "manual"
513                }
514            }
515            ThreadSubscriptionStatus::Unsubscribed => "unsubscribed",
516        }
517    }
518}
519
520/// A thread subscription, as saved in the state store.
521#[derive(Clone, Copy, Debug, PartialEq, Eq)]
522pub struct StoredThreadSubscription {
523    /// Current status of the subscription.
524    pub status: ThreadSubscriptionStatus,
525
526    /// An optional bump stamp, as defined in the MSC; the higher the value, the
527    /// most recent the thread subscription information is, and should be
528    /// remembered.
529    ///
530    /// If not set, this means it's a user-provided thread subscription, for
531    /// which we're waiting validation from a server (e.g. through a remote
532    /// echo via sync).
533    pub bump_stamp: Option<u64>,
534}
535
536/// Store state changes and pass them to the StateStore.
537#[derive(Clone, Debug, Default)]
538pub struct StateChanges {
539    /// The sync token that relates to this update.
540    pub sync_token: Option<String>,
541    /// A mapping of event type string to `AnyBasicEvent`.
542    pub account_data: BTreeMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
543    /// A mapping of `UserId` to `PresenceEvent`.
544    pub presence: BTreeMap<OwnedUserId, Raw<PresenceEvent>>,
545
546    /// A mapping of `RoomId` to a map of users and their
547    /// `MinimalRoomMemberEvent`.
548    pub profiles: BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, MinimalRoomMemberEvent>>,
549
550    /// A mapping of room profiles to delete.
551    ///
552    /// These are deleted *before* other room profiles are inserted.
553    pub profiles_to_delete: BTreeMap<OwnedRoomId, Vec<OwnedUserId>>,
554
555    /// A mapping of `RoomId` to a map of event type string to a state key and
556    /// `AnySyncStateEvent`.
557    pub state:
558        BTreeMap<OwnedRoomId, BTreeMap<StateEventType, BTreeMap<String, Raw<AnySyncStateEvent>>>>,
559    /// A mapping of `RoomId` to a map of event type string to `AnyBasicEvent`.
560    pub room_account_data:
561        BTreeMap<OwnedRoomId, BTreeMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
562
563    /// A map of `OwnedRoomId` to `RoomInfo`.
564    pub room_infos: BTreeMap<OwnedRoomId, RoomInfo>,
565
566    /// A map of `RoomId` to `ReceiptEventContent`.
567    pub receipts: BTreeMap<OwnedRoomId, ReceiptEventContent>,
568
569    /// A map of `RoomId` to maps of `OwnedEventId` to be redacted by
570    /// `SyncRoomRedactionEvent`.
571    pub redactions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, Raw<SyncRoomRedactionEvent>>>,
572
573    /// A mapping of `RoomId` to a map of event type to a map of state key to
574    /// `StrippedState`.
575    pub stripped_state: BTreeMap<
576        OwnedRoomId,
577        BTreeMap<StateEventType, BTreeMap<String, Raw<AnyStrippedStateEvent>>>,
578    >,
579
580    /// A map from room id to a map of a display name and a set of user ids that
581    /// share that display name in the given room.
582    pub ambiguity_maps: BTreeMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
583}
584
585impl StateChanges {
586    /// Create a new `StateChanges` struct with the given sync_token.
587    pub fn new(sync_token: String) -> Self {
588        Self { sync_token: Some(sync_token), ..Default::default() }
589    }
590
591    /// Update the `StateChanges` struct with the given `PresenceEvent`.
592    pub fn add_presence_event(&mut self, event: PresenceEvent, raw_event: Raw<PresenceEvent>) {
593        self.presence.insert(event.sender, raw_event);
594    }
595
596    /// Update the `StateChanges` struct with the given `RoomInfo`.
597    pub fn add_room(&mut self, room: RoomInfo) {
598        self.room_infos.insert(room.room_id.clone(), room);
599    }
600
601    /// Update the `StateChanges` struct with the given room with a new
602    /// `AnyBasicEvent`.
603    pub fn add_room_account_data(
604        &mut self,
605        room_id: &RoomId,
606        event: AnyRoomAccountDataEvent,
607        raw_event: Raw<AnyRoomAccountDataEvent>,
608    ) {
609        self.room_account_data
610            .entry(room_id.to_owned())
611            .or_default()
612            .insert(event.event_type(), raw_event);
613    }
614
615    /// Update the `StateChanges` struct with the given room with a new
616    /// `StrippedMemberEvent`.
617    pub fn add_stripped_member(
618        &mut self,
619        room_id: &RoomId,
620        user_id: &UserId,
621        event: Raw<StrippedRoomMemberEvent>,
622    ) {
623        self.stripped_state
624            .entry(room_id.to_owned())
625            .or_default()
626            .entry(StateEventType::RoomMember)
627            .or_default()
628            .insert(user_id.into(), event.cast());
629    }
630
631    /// Update the `StateChanges` struct with the given room with a new
632    /// `AnySyncStateEvent`.
633    pub fn add_state_event(
634        &mut self,
635        room_id: &RoomId,
636        event: AnySyncStateEvent,
637        raw_event: Raw<AnySyncStateEvent>,
638    ) {
639        self.state
640            .entry(room_id.to_owned())
641            .or_default()
642            .entry(event.event_type())
643            .or_default()
644            .insert(event.state_key().to_owned(), raw_event);
645    }
646
647    /// Redact an event in the room
648    pub fn add_redaction(
649        &mut self,
650        room_id: &RoomId,
651        redacted_event_id: &EventId,
652        redaction: Raw<SyncRoomRedactionEvent>,
653    ) {
654        self.redactions
655            .entry(room_id.to_owned())
656            .or_default()
657            .insert(redacted_event_id.to_owned(), redaction);
658    }
659
660    /// Update the `StateChanges` struct with the given room with a new
661    /// `Receipts`.
662    pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
663        self.receipts.insert(room_id.to_owned(), event);
664    }
665
666    /// Get a specific state event of statically-known type with the given state
667    /// key in the given room, if it is present in the `state` map of these
668    /// `StateChanges`.
669    pub(crate) fn state_static_for_key<C, K>(
670        &self,
671        room_id: &RoomId,
672        state_key: &K,
673    ) -> Option<&Raw<SyncStateEvent<C>>>
674    where
675        C: StaticEventContent<IsPrefix = ruma::events::False>
676            + StaticStateEventContent
677            + RedactContent,
678        C::Redacted: RedactedStateEventContent,
679        C::StateKey: Borrow<K>,
680        K: AsRef<str> + ?Sized,
681    {
682        self.state
683            .get(room_id)?
684            .get(&C::TYPE.into())?
685            .get(state_key.as_ref())
686            .map(Raw::cast_ref_unchecked)
687    }
688
689    /// Get a specific stripped state event of statically-known type with the
690    /// given state key in the given room, if it is present in the
691    /// `stripped_state` map of these `StateChanges`.
692    pub(crate) fn stripped_state_static_for_key<C, K>(
693        &self,
694        room_id: &RoomId,
695        state_key: &K,
696    ) -> Option<&Raw<StrippedStateEvent<C::PossiblyRedacted>>>
697    where
698        C: StaticEventContent<IsPrefix = ruma::events::False> + StaticStateEventContent,
699        C::StateKey: Borrow<K>,
700        K: AsRef<str> + ?Sized,
701    {
702        self.stripped_state
703            .get(room_id)?
704            .get(&C::TYPE.into())?
705            .get(state_key.as_ref())
706            .map(Raw::cast_ref_unchecked)
707    }
708
709    /// Get a specific state event of statically-known type with the given state
710    /// key in the given room, if it is present in the `state` or
711    /// `stripped_state` map of these `StateChanges` and it deserializes
712    /// successfully.
713    pub(crate) fn any_state_static_for_key<C, K>(
714        &self,
715        room_id: &RoomId,
716        state_key: &K,
717    ) -> Option<StrippedStateEvent<C::PossiblyRedacted>>
718    where
719        C: StaticEventContent<IsPrefix = ruma::events::False>
720            + StaticStateEventContent
721            + RedactContent,
722        C::Redacted: RedactedStateEventContent,
723        C::PossiblyRedacted: StaticEventContent + DeserializeOwned,
724        C::StateKey: Borrow<K>,
725        K: AsRef<str> + ?Sized,
726    {
727        self.state_static_for_key::<C, K>(room_id, state_key)
728            .map(Raw::cast_ref)
729            .or_else(|| self.stripped_state_static_for_key::<C, K>(room_id, state_key))?
730            .deserialize()
731            .ok()
732    }
733
734    /// Get the member for the given user in the given room from an event
735    /// contained in these `StateChanges`, if any.
736    pub(crate) fn member(
737        &self,
738        room_id: &RoomId,
739        user_id: &UserId,
740    ) -> Option<StrippedRoomMemberEvent> {
741        self.any_state_static_for_key::<RoomMemberEventContent, _>(room_id, user_id)
742    }
743
744    /// Get the create event for the given room from an event contained in these
745    /// `StateChanges`, if any.
746    pub(crate) fn create(&self, room_id: &RoomId) -> Option<RoomCreateWithCreatorEventContent> {
747        self.any_state_static_for_key::<RoomCreateEventContent, _>(room_id, &EmptyStateKey)
748            .map(|event| {
749                RoomCreateWithCreatorEventContent::from_event_content(event.content, event.sender)
750            })
751            // Fallback to the content in the room info.
752            .or_else(|| self.room_infos.get(room_id)?.create().cloned())
753    }
754
755    /// Get the power levels for the given room from an event contained in these
756    /// `StateChanges`, if any.
757    pub(crate) fn power_levels(&self, room_id: &RoomId) -> Option<RoomPowerLevels> {
758        let power_levels_content = self
759            .any_state_static_for_key::<RoomPowerLevelsEventContent, _>(room_id, &EmptyStateKey)?;
760
761        let create_content = self.create(room_id)?;
762        let rules = create_content.room_version.rules().unwrap_or(ROOM_VERSION_RULES_FALLBACK);
763        let creators = create_content.creators();
764
765        Some(power_levels_content.power_levels(&rules.authorization, creators))
766    }
767}
768
769/// Configuration for the various stores.
770///
771/// By default, this always includes a state store and an event cache store.
772/// When the `e2e-encryption` feature is enabled, this also includes a crypto
773/// store.
774///
775/// # Examples
776///
777/// ```
778/// # use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
779/// # use matrix_sdk_base::store::StoreConfig;
780/// #
781/// let store_config = StoreConfig::new(CrossProcessLockConfig::MultiProcess {
782///     holder_name: "cross-process-store-locks-holder-name".to_owned(),
783/// });
784/// ```
785#[derive(Clone)]
786pub struct StoreConfig {
787    #[cfg(feature = "e2e-encryption")]
788    pub(crate) crypto_store: Arc<DynCryptoStore>,
789    pub(crate) state_store: Arc<DynStateStore>,
790    pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
791    pub(crate) media_store: media_store::MediaStoreLock,
792    cross_process_lock_config: CrossProcessLockConfig,
793}
794
795#[cfg(not(tarpaulin_include))]
796impl fmt::Debug for StoreConfig {
797    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
798        fmt.debug_struct("StoreConfig").finish()
799    }
800}
801
802impl StoreConfig {
803    /// Create a new default `StoreConfig`.
804    ///
805    /// To learn more about `cross_process_lock_config`, please read
806    /// [`CrossProcessLock::new`](matrix_sdk_common::cross_process_lock::CrossProcessLock::new).
807    #[must_use]
808    pub fn new(cross_process_lock_config: CrossProcessLockConfig) -> Self {
809        Self {
810            #[cfg(feature = "e2e-encryption")]
811            crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
812            state_store: Arc::new(MemoryStore::new()),
813            event_cache_store: event_cache_store::EventCacheStoreLock::new(
814                event_cache_store::MemoryStore::new(),
815                cross_process_lock_config.clone(),
816            ),
817            media_store: media_store::MediaStoreLock::new(
818                media_store::MemoryMediaStore::new(),
819                cross_process_lock_config.clone(),
820            ),
821            cross_process_lock_config,
822        }
823    }
824
825    /// Set a custom implementation of a `CryptoStore`.
826    ///
827    /// The crypto store must be opened before being set.
828    #[cfg(feature = "e2e-encryption")]
829    pub fn crypto_store(mut self, store: impl IntoCryptoStore) -> Self {
830        self.crypto_store = store.into_crypto_store();
831        self
832    }
833
834    /// Set a custom implementation of a `StateStore`.
835    pub fn state_store(mut self, store: impl IntoStateStore) -> Self {
836        self.state_store = store.into_state_store();
837        self
838    }
839
840    /// Set a custom implementation of an `EventCacheStore`.
841    pub fn event_cache_store<S>(mut self, event_cache_store: S) -> Self
842    where
843        S: event_cache_store::IntoEventCacheStore,
844    {
845        self.event_cache_store = event_cache_store::EventCacheStoreLock::new(
846            event_cache_store,
847            self.cross_process_lock_config.clone(),
848        );
849        self
850    }
851
852    /// Set a custom implementation of an `MediaStore`.
853    pub fn media_store<S>(mut self, media_store: S) -> Self
854    where
855        S: media_store::IntoMediaStore,
856    {
857        self.media_store =
858            media_store::MediaStoreLock::new(media_store, self.cross_process_lock_config.clone());
859        self
860    }
861}
862
863#[cfg(test)]
864mod tests {
865    use std::{ops::Not, sync::Arc};
866
867    use assert_matches::assert_matches;
868    use matrix_sdk_test::async_test;
869    use ruma::{owned_device_id, owned_user_id, room_id, user_id};
870
871    use super::{BaseStateStore, MemoryStore, RoomLoadSettings};
872    use crate::{RoomInfo, RoomState, SessionMeta, StateChanges, StateStore};
873
874    #[async_test]
875    async fn test_set_session_meta() {
876        let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
877
878        let session_meta = SessionMeta {
879            user_id: owned_user_id!("@mnt_io:matrix.org"),
880            device_id: owned_device_id!("HELLOYOU"),
881        };
882
883        assert!(store.session_meta.get().is_none());
884
885        store.set_session_meta(session_meta.clone());
886
887        assert_eq!(store.session_meta.get(), Some(&session_meta));
888    }
889
890    #[async_test]
891    #[should_panic]
892    async fn test_set_session_meta_twice() {
893        let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
894
895        let session_meta = SessionMeta {
896            user_id: owned_user_id!("@mnt_io:matrix.org"),
897            device_id: owned_device_id!("HELLOYOU"),
898        };
899
900        store.set_session_meta(session_meta.clone());
901        // Kaboom.
902        store.set_session_meta(session_meta);
903    }
904
905    #[async_test]
906    async fn test_derive_from_other() {
907        // The first store.
908        let other = BaseStateStore::new(Arc::new(MemoryStore::new()));
909
910        let session_meta = SessionMeta {
911            user_id: owned_user_id!("@mnt_io:matrix.org"),
912            device_id: owned_device_id!("HELLOYOU"),
913        };
914        let room_id_0 = room_id!("!r0");
915
916        other
917            .load_rooms(&session_meta.user_id, RoomLoadSettings::One(room_id_0.to_owned()))
918            .await
919            .unwrap();
920        other.set_session_meta(session_meta.clone());
921
922        // Derive another store.
923        let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
924        store.derive_from_other(&other).await.unwrap();
925
926        // `SessionMeta` is derived.
927        assert_eq!(store.session_meta.get(), Some(&session_meta));
928        // `RoomLoadSettings` is derived.
929        assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::One(ref room_id) => {
930            assert_eq!(room_id, room_id_0);
931        });
932
933        // The `RoomInfoNotableUpdate` is not derived. Every one has its own channel.
934        assert!(
935            store
936                .room_info_notable_update_sender
937                .same_channel(&other.room_info_notable_update_sender)
938                .not()
939        );
940    }
941
942    #[test]
943    fn test_room_load_settings_default() {
944        assert_matches!(RoomLoadSettings::default(), RoomLoadSettings::All);
945    }
946
947    #[async_test]
948    async fn test_load_all_rooms() {
949        let room_id_0 = room_id!("!r0");
950        let room_id_1 = room_id!("!r1");
951        let user_id = user_id!("@mnt_io:matrix.org");
952
953        let memory_state_store = Arc::new(MemoryStore::new());
954
955        // Initial state.
956        {
957            let store = BaseStateStore::new(memory_state_store.clone());
958            let mut changes = StateChanges::default();
959            changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
960            changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
961
962            store.inner.save_changes(&changes).await.unwrap();
963        }
964
965        // Check a `BaseStateStore` is able to load all rooms.
966        {
967            let store = BaseStateStore::new(memory_state_store.clone());
968
969            // Default value.
970            assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
971
972            // Load rooms.
973            store.load_rooms(user_id, RoomLoadSettings::All).await.unwrap();
974
975            // Check the last room load settings.
976            assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
977
978            // Check the loaded rooms.
979            let mut rooms = store.rooms();
980            rooms.sort_by(|a, b| a.room_id().cmp(b.room_id()));
981
982            assert_eq!(rooms.len(), 2);
983
984            assert_eq!(rooms[0].room_id(), room_id_0);
985            assert_eq!(rooms[0].own_user_id(), user_id);
986
987            assert_eq!(rooms[1].room_id(), room_id_1);
988            assert_eq!(rooms[1].own_user_id(), user_id);
989        }
990    }
991
992    #[async_test]
993    async fn test_load_one_room() {
994        let room_id_0 = room_id!("!r0");
995        let room_id_1 = room_id!("!r1");
996        let user_id = user_id!("@mnt_io:matrix.org");
997
998        let memory_state_store = Arc::new(MemoryStore::new());
999
1000        // Initial state.
1001        {
1002            let store = BaseStateStore::new(memory_state_store.clone());
1003            let mut changes = StateChanges::default();
1004            changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1005            changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1006
1007            store.inner.save_changes(&changes).await.unwrap();
1008        }
1009
1010        // Check a `BaseStateStore` is able to load one room.
1011        {
1012            let store = BaseStateStore::new(memory_state_store.clone());
1013
1014            // Default value.
1015            assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
1016
1017            // Load rooms.
1018            store.load_rooms(user_id, RoomLoadSettings::One(room_id_1.to_owned())).await.unwrap();
1019
1020            // Check the last room load settings.
1021            assert_matches!(
1022                *store.room_load_settings.read().await,
1023                RoomLoadSettings::One(ref room_id) => {
1024                    assert_eq!(room_id, room_id_1);
1025                }
1026            );
1027
1028            // Check the loaded rooms.
1029            let rooms = store.rooms();
1030            assert_eq!(rooms.len(), 1);
1031
1032            assert_eq!(rooms[0].room_id(), room_id_1);
1033            assert_eq!(rooms[0].own_user_id(), user_id);
1034        }
1035    }
1036}