matrix_sdk_base/store/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The state store holds the overall state for rooms, users and their
16//! profiles and their timelines. It is an overall cache for faster access
17//! and convenience- accessible through `Store`.
18//!
19//! Implementing the `StateStore` trait, you can plug any storage backend
20//! into the store for the actual storage. By default this brings an in-memory
21//! store.
22
23use std::{
24    collections::{BTreeMap, BTreeSet, HashMap},
25    fmt,
26    ops::Deref,
27    result::Result as StdResult,
28    str::Utf8Error,
29    sync::{Arc, RwLock as StdRwLock},
30};
31
32use eyeball_im::{Vector, VectorDiff};
33use futures_util::Stream;
34use once_cell::sync::OnceCell;
35
36#[cfg(any(test, feature = "testing"))]
37#[macro_use]
38pub mod integration_tests;
39mod observable_map;
40mod traits;
41
42#[cfg(feature = "e2e-encryption")]
43use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
44pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
45use observable_map::ObservableMap;
46use ruma::{
47    events::{
48        presence::PresenceEvent,
49        receipt::ReceiptEventContent,
50        room::{member::StrippedRoomMemberEvent, redaction::SyncRoomRedactionEvent},
51        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
52        AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
53    },
54    serde::Raw,
55    EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
56};
57use tokio::sync::{broadcast, Mutex, RwLock};
58use tracing::warn;
59
60use crate::{
61    deserialized_responses::DisplayName,
62    event_cache::store as event_cache_store,
63    rooms::{normal::RoomInfoNotableUpdate, RoomInfo, RoomState},
64    MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
65};
66
67pub(crate) mod ambiguity_map;
68mod memory_store;
69pub mod migration_helpers;
70mod send_queue;
71
72#[cfg(any(test, feature = "testing"))]
73pub use self::integration_tests::StateStoreIntegrationTests;
74pub use self::{
75    memory_store::MemoryStore,
76    send_queue::{
77        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
78        FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
79        SentMediaInfo, SentRequestKey, SerializableEventContent,
80    },
81    traits::{
82        ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerCapabilities,
83        StateStore, StateStoreDataKey, StateStoreDataValue, StateStoreExt,
84    },
85};
86
87/// State store specific error type.
88#[derive(Debug, thiserror::Error)]
89pub enum StoreError {
90    /// An error happened in the underlying database backend.
91    #[error(transparent)]
92    Backend(Box<dyn std::error::Error + Send + Sync>),
93    /// An error happened while serializing or deserializing some data.
94    #[error(transparent)]
95    Json(#[from] serde_json::Error),
96    /// An error happened while deserializing a Matrix identifier, e.g. an user
97    /// id.
98    #[error(transparent)]
99    Identifier(#[from] ruma::IdParseError),
100    /// The store is locked with a passphrase and an incorrect passphrase was
101    /// given.
102    #[error("The store failed to be unlocked")]
103    StoreLocked,
104    /// An unencrypted store was tried to be unlocked with a passphrase.
105    #[error("The store is not encrypted but was tried to be opened with a passphrase")]
106    UnencryptedStore,
107    /// The store failed to encrypt or decrypt some data.
108    #[error("Error encrypting or decrypting data from the store: {0}")]
109    Encryption(#[from] StoreEncryptionError),
110
111    /// The store failed to encode or decode some data.
112    #[error("Error encoding or decoding data from the store: {0}")]
113    Codec(#[from] Utf8Error),
114
115    /// The database format has changed in a backwards incompatible way.
116    #[error(
117        "The database format changed in an incompatible way, current \
118        version: {0}, latest version: {1}"
119    )]
120    UnsupportedDatabaseVersion(usize, usize),
121    /// Redacting an event in the store has failed.
122    ///
123    /// This should never happen.
124    #[error("Redaction failed: {0}")]
125    Redaction(#[source] ruma::canonical_json::RedactionError),
126}
127
128impl StoreError {
129    /// Create a new [`Backend`][Self::Backend] error.
130    ///
131    /// Shorthand for `StoreError::Backend(Box::new(error))`.
132    #[inline]
133    pub fn backend<E>(error: E) -> Self
134    where
135        E: std::error::Error + Send + Sync + 'static,
136    {
137        Self::Backend(Box::new(error))
138    }
139}
140
141/// A `StateStore` specific result type.
142pub type Result<T, E = StoreError> = std::result::Result<T, E>;
143
144/// A state store wrapper for the SDK.
145///
146/// This adds additional higher level store functionality on top of a
147/// `StateStore` implementation.
148#[derive(Clone)]
149pub(crate) struct Store {
150    pub(super) inner: Arc<DynStateStore>,
151    session_meta: Arc<OnceCell<SessionMeta>>,
152    /// The current sync token that should be used for the next sync call.
153    pub(super) sync_token: Arc<RwLock<Option<String>>>,
154    /// All rooms the store knows about.
155    rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
156    /// A lock to synchronize access to the store, such that data by the sync is
157    /// never overwritten.
158    sync_lock: Arc<Mutex<()>>,
159}
160
161impl Store {
162    /// Create a new store, wrapping the given `StateStore`
163    pub fn new(inner: Arc<DynStateStore>) -> Self {
164        Self {
165            inner,
166            session_meta: Default::default(),
167            sync_token: Default::default(),
168            rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
169            sync_lock: Default::default(),
170        }
171    }
172
173    /// Get access to the syncing lock.
174    pub fn sync_lock(&self) -> &Mutex<()> {
175        &self.sync_lock
176    }
177
178    /// Load the room infos from the inner `StateStore`.
179    ///
180    /// Applies migrations to the room infos if needed.
181    async fn load_room_infos(&self) -> Result<Vec<RoomInfo>> {
182        let mut room_infos = self.inner.get_room_infos().await?;
183        let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
184
185        for room_info in room_infos.iter_mut() {
186            if room_info.apply_migrations(self.inner.clone()).await {
187                migrated_room_infos.push(room_info.clone());
188            }
189        }
190
191        if !migrated_room_infos.is_empty() {
192            let changes = StateChanges {
193                room_infos: migrated_room_infos
194                    .into_iter()
195                    .map(|room_info| (room_info.room_id.clone(), room_info))
196                    .collect(),
197                ..Default::default()
198            };
199
200            if let Err(error) = self.inner.save_changes(&changes).await {
201                warn!("Failed to save migrated room infos: {error}");
202            }
203        }
204
205        Ok(room_infos)
206    }
207
208    /// Set the meta of the session.
209    ///
210    /// Restores the state of this `Store` from the given `SessionMeta` and the
211    /// inner `StateStore`.
212    ///
213    /// This method panics if it is called twice.
214    pub async fn set_session_meta(
215        &self,
216        session_meta: SessionMeta,
217        room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
218    ) -> Result<()> {
219        {
220            let room_infos = self.load_room_infos().await?;
221
222            let mut rooms = self.rooms.write().unwrap();
223
224            for room_info in room_infos {
225                let new_room = Room::restore(
226                    &session_meta.user_id,
227                    self.inner.clone(),
228                    room_info,
229                    room_info_notable_update_sender.clone(),
230                );
231                let new_room_id = new_room.room_id().to_owned();
232
233                rooms.insert(new_room_id, new_room);
234            }
235        }
236
237        let token =
238            self.get_kv_data(StateStoreDataKey::SyncToken).await?.and_then(|s| s.into_sync_token());
239        *self.sync_token.write().await = token;
240
241        self.session_meta.set(session_meta).expect("Session Meta was already set");
242
243        Ok(())
244    }
245
246    /// The current [`SessionMeta`] containing our user ID and device ID.
247    pub fn session_meta(&self) -> Option<&SessionMeta> {
248        self.session_meta.get()
249    }
250
251    /// Get all the rooms this store knows about.
252    pub fn rooms(&self) -> Vec<Room> {
253        self.rooms.read().unwrap().iter().cloned().collect()
254    }
255
256    /// Get all the rooms this store knows about, filtered by state.
257    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
258        self.rooms
259            .read()
260            .unwrap()
261            .iter()
262            .filter(|room| filter.matches(room.state()))
263            .cloned()
264            .collect()
265    }
266
267    /// Get a stream of all the rooms changes, in addition to the existing
268    /// rooms.
269    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
270        self.rooms.read().unwrap().stream()
271    }
272
273    /// Get the room with the given room id.
274    pub fn room(&self, room_id: &RoomId) -> Option<Room> {
275        self.rooms.read().unwrap().get(room_id).cloned()
276    }
277
278    /// Check if a room exists.
279    pub(crate) fn room_exists(&self, room_id: &RoomId) -> bool {
280        self.rooms.read().unwrap().get(room_id).is_some()
281    }
282
283    /// Lookup the `Room` for the given `RoomId`, or create one, if it didn't
284    /// exist yet in the store
285    pub fn get_or_create_room(
286        &self,
287        room_id: &RoomId,
288        room_type: RoomState,
289        room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
290    ) -> Room {
291        let user_id =
292            &self.session_meta.get().expect("Creating room while not being logged in").user_id;
293
294        self.rooms
295            .write()
296            .unwrap()
297            .get_or_create(room_id, || {
298                Room::new(
299                    user_id,
300                    self.inner.clone(),
301                    room_id,
302                    room_type,
303                    room_info_notable_update_sender,
304                )
305            })
306            .clone()
307    }
308
309    /// Forget the room with the given room ID.
310    ///
311    /// # Arguments
312    ///
313    /// * `room_id` - The id of the room that should be forgotten.
314    pub(crate) async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
315        self.inner.remove_room(room_id).await?;
316        self.rooms.write().unwrap().remove(room_id);
317        Ok(())
318    }
319}
320
321#[cfg(not(tarpaulin_include))]
322impl fmt::Debug for Store {
323    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
324        f.debug_struct("Store")
325            .field("inner", &self.inner)
326            .field("session_meta", &self.session_meta)
327            .field("sync_token", &self.sync_token)
328            .field("rooms", &self.rooms)
329            .finish_non_exhaustive()
330    }
331}
332
333impl Deref for Store {
334    type Target = DynStateStore;
335
336    fn deref(&self) -> &Self::Target {
337        self.inner.deref()
338    }
339}
340
341/// Store state changes and pass them to the StateStore.
342#[derive(Clone, Debug, Default)]
343pub struct StateChanges {
344    /// The sync token that relates to this update.
345    pub sync_token: Option<String>,
346    /// A mapping of event type string to `AnyBasicEvent`.
347    pub account_data: BTreeMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
348    /// A mapping of `UserId` to `PresenceEvent`.
349    pub presence: BTreeMap<OwnedUserId, Raw<PresenceEvent>>,
350
351    /// A mapping of `RoomId` to a map of users and their
352    /// `MinimalRoomMemberEvent`.
353    pub profiles: BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, MinimalRoomMemberEvent>>,
354
355    /// A mapping of room profiles to delete.
356    ///
357    /// These are deleted *before* other room profiles are inserted.
358    pub profiles_to_delete: BTreeMap<OwnedRoomId, Vec<OwnedUserId>>,
359
360    /// A mapping of `RoomId` to a map of event type string to a state key and
361    /// `AnySyncStateEvent`.
362    pub state:
363        BTreeMap<OwnedRoomId, BTreeMap<StateEventType, BTreeMap<String, Raw<AnySyncStateEvent>>>>,
364    /// A mapping of `RoomId` to a map of event type string to `AnyBasicEvent`.
365    pub room_account_data:
366        BTreeMap<OwnedRoomId, BTreeMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
367
368    /// A map of `OwnedRoomId` to `RoomInfo`.
369    pub room_infos: BTreeMap<OwnedRoomId, RoomInfo>,
370
371    /// A map of `RoomId` to `ReceiptEventContent`.
372    pub receipts: BTreeMap<OwnedRoomId, ReceiptEventContent>,
373
374    /// A map of `RoomId` to maps of `OwnedEventId` to be redacted by
375    /// `SyncRoomRedactionEvent`.
376    pub redactions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, Raw<SyncRoomRedactionEvent>>>,
377
378    /// A mapping of `RoomId` to a map of event type to a map of state key to
379    /// `AnyStrippedStateEvent`.
380    pub stripped_state: BTreeMap<
381        OwnedRoomId,
382        BTreeMap<StateEventType, BTreeMap<String, Raw<AnyStrippedStateEvent>>>,
383    >,
384
385    /// A map from room id to a map of a display name and a set of user ids that
386    /// share that display name in the given room.
387    pub ambiguity_maps: BTreeMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
388}
389
390impl StateChanges {
391    /// Create a new `StateChanges` struct with the given sync_token.
392    pub fn new(sync_token: String) -> Self {
393        Self { sync_token: Some(sync_token), ..Default::default() }
394    }
395
396    /// Update the `StateChanges` struct with the given `PresenceEvent`.
397    pub fn add_presence_event(&mut self, event: PresenceEvent, raw_event: Raw<PresenceEvent>) {
398        self.presence.insert(event.sender, raw_event);
399    }
400
401    /// Update the `StateChanges` struct with the given `RoomInfo`.
402    pub fn add_room(&mut self, room: RoomInfo) {
403        self.room_infos.insert(room.room_id.clone(), room);
404    }
405
406    /// Update the `StateChanges` struct with the given room with a new
407    /// `AnyBasicEvent`.
408    pub fn add_room_account_data(
409        &mut self,
410        room_id: &RoomId,
411        event: AnyRoomAccountDataEvent,
412        raw_event: Raw<AnyRoomAccountDataEvent>,
413    ) {
414        self.room_account_data
415            .entry(room_id.to_owned())
416            .or_default()
417            .insert(event.event_type(), raw_event);
418    }
419
420    /// Update the `StateChanges` struct with the given room with a new
421    /// `StrippedMemberEvent`.
422    pub fn add_stripped_member(
423        &mut self,
424        room_id: &RoomId,
425        user_id: &UserId,
426        event: Raw<StrippedRoomMemberEvent>,
427    ) {
428        self.stripped_state
429            .entry(room_id.to_owned())
430            .or_default()
431            .entry(StateEventType::RoomMember)
432            .or_default()
433            .insert(user_id.into(), event.cast());
434    }
435
436    /// Update the `StateChanges` struct with the given room with a new
437    /// `AnySyncStateEvent`.
438    pub fn add_state_event(
439        &mut self,
440        room_id: &RoomId,
441        event: AnySyncStateEvent,
442        raw_event: Raw<AnySyncStateEvent>,
443    ) {
444        self.state
445            .entry(room_id.to_owned())
446            .or_default()
447            .entry(event.event_type())
448            .or_default()
449            .insert(event.state_key().to_owned(), raw_event);
450    }
451
452    /// Redact an event in the room
453    pub fn add_redaction(
454        &mut self,
455        room_id: &RoomId,
456        redacted_event_id: &EventId,
457        redaction: Raw<SyncRoomRedactionEvent>,
458    ) {
459        self.redactions
460            .entry(room_id.to_owned())
461            .or_default()
462            .insert(redacted_event_id.to_owned(), redaction);
463    }
464
465    /// Update the `StateChanges` struct with the given room with a new
466    /// `Receipts`.
467    pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
468        self.receipts.insert(room_id.to_owned(), event);
469    }
470}
471
472/// Configuration for the various stores.
473///
474/// By default, this always includes a state store and an event cache store.
475/// When the `e2e-encryption` feature is enabled, this also includes a crypto
476/// store.
477///
478/// # Examples
479///
480/// ```
481/// # use matrix_sdk_base::store::StoreConfig;
482///
483/// let store_config =
484///     StoreConfig::new("cross-process-store-locks-holder-name".to_owned());
485/// ```
486#[derive(Clone)]
487pub struct StoreConfig {
488    #[cfg(feature = "e2e-encryption")]
489    pub(crate) crypto_store: Arc<DynCryptoStore>,
490    pub(crate) state_store: Arc<DynStateStore>,
491    pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
492    cross_process_store_locks_holder_name: String,
493}
494
495#[cfg(not(tarpaulin_include))]
496impl fmt::Debug for StoreConfig {
497    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
498        fmt.debug_struct("StoreConfig").finish()
499    }
500}
501
502impl StoreConfig {
503    /// Create a new default `StoreConfig`.
504    ///
505    /// To learn more about `cross_process_store_locks_holder_name`, please read
506    /// [`CrossProcessStoreLock::new`](matrix_sdk_common::store_locks::CrossProcessStoreLock::new).
507    #[must_use]
508    pub fn new(cross_process_store_locks_holder_name: String) -> Self {
509        Self {
510            #[cfg(feature = "e2e-encryption")]
511            crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
512            state_store: Arc::new(MemoryStore::new()),
513            event_cache_store: event_cache_store::EventCacheStoreLock::new(
514                event_cache_store::MemoryStore::new(),
515                cross_process_store_locks_holder_name.clone(),
516            ),
517            cross_process_store_locks_holder_name,
518        }
519    }
520
521    /// Set a custom implementation of a `CryptoStore`.
522    ///
523    /// The crypto store must be opened before being set.
524    #[cfg(feature = "e2e-encryption")]
525    pub fn crypto_store(mut self, store: impl IntoCryptoStore) -> Self {
526        self.crypto_store = store.into_crypto_store();
527        self
528    }
529
530    /// Set a custom implementation of a `StateStore`.
531    pub fn state_store(mut self, store: impl IntoStateStore) -> Self {
532        self.state_store = store.into_state_store();
533        self
534    }
535
536    /// Set a custom implementation of an `EventCacheStore`.
537    pub fn event_cache_store<S>(mut self, event_cache_store: S) -> Self
538    where
539        S: event_cache_store::IntoEventCacheStore,
540    {
541        self.event_cache_store = event_cache_store::EventCacheStoreLock::new(
542            event_cache_store,
543            self.cross_process_store_locks_holder_name.clone(),
544        );
545        self
546    }
547}