matrix_sdk_base/event_cache/store/
mod.rs1use std::{fmt, ops::Deref, str::Utf8Error, sync::Arc};
23
24#[cfg(any(test, feature = "testing"))]
25#[macro_use]
26pub mod integration_tests;
27pub mod media;
28mod memory_store;
29mod traits;
30
31use matrix_sdk_common::store_locks::{
32 BackingStore, CrossProcessStoreLock, CrossProcessStoreLockGuard, LockStoreError,
33};
34pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
35use ruma::{
36 events::{relation::RelationType, AnySyncTimelineEvent},
37 serde::Raw,
38 OwnedEventId,
39};
40use tracing::trace;
41
42#[cfg(any(test, feature = "testing"))]
43pub use self::integration_tests::EventCacheStoreIntegrationTests;
44pub use self::{
45 memory_store::MemoryStore,
46 traits::{DynEventCacheStore, EventCacheStore, IntoEventCacheStore, DEFAULT_CHUNK_CAPACITY},
47};
48
49#[derive(Clone)]
51pub struct EventCacheStoreLock {
52 cross_process_lock: Arc<CrossProcessStoreLock<LockableEventCacheStore>>,
54
55 store: Arc<DynEventCacheStore>,
59}
60
61#[cfg(not(tarpaulin_include))]
62impl fmt::Debug for EventCacheStoreLock {
63 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
64 formatter.debug_struct("EventCacheStoreLock").finish_non_exhaustive()
65 }
66}
67
68impl EventCacheStoreLock {
69 pub fn new<S>(store: S, holder: String) -> Self
74 where
75 S: IntoEventCacheStore,
76 {
77 let store = store.into_event_cache_store();
78
79 Self {
80 cross_process_lock: Arc::new(CrossProcessStoreLock::new(
81 LockableEventCacheStore(store.clone()),
82 "default".to_owned(),
83 holder,
84 )),
85 store,
86 }
87 }
88
89 pub async fn lock(&self) -> Result<EventCacheStoreLockGuard<'_>, LockStoreError> {
91 let cross_process_lock_guard = self.cross_process_lock.spin_lock(None).await?;
92
93 Ok(EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.deref() })
94 }
95}
96
97pub struct EventCacheStoreLockGuard<'a> {
101 #[allow(unused)]
103 cross_process_lock_guard: CrossProcessStoreLockGuard,
104
105 store: &'a DynEventCacheStore,
107}
108
109#[cfg(not(tarpaulin_include))]
110impl fmt::Debug for EventCacheStoreLockGuard<'_> {
111 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
112 formatter.debug_struct("EventCacheStoreLockGuard").finish_non_exhaustive()
113 }
114}
115
116impl Deref for EventCacheStoreLockGuard<'_> {
117 type Target = DynEventCacheStore;
118
119 fn deref(&self) -> &Self::Target {
120 self.store
121 }
122}
123
124#[derive(Debug, thiserror::Error)]
126pub enum EventCacheStoreError {
127 #[error(transparent)]
129 #[cfg(not(target_family = "wasm"))]
130 Backend(Box<dyn std::error::Error + Send + Sync>),
131
132 #[error(transparent)]
134 #[cfg(target_family = "wasm")]
135 Backend(Box<dyn std::error::Error>),
136
137 #[error("The event cache store failed to be unlocked")]
140 Locked,
141
142 #[error("The event cache store is not encrypted but tried to be opened with a passphrase")]
144 Unencrypted,
145
146 #[error("Error encrypting or decrypting data from the event cache store: {0}")]
148 Encryption(#[from] StoreEncryptionError),
149
150 #[error("Error encoding or decoding data from the event cache store: {0}")]
152 Codec(#[from] Utf8Error),
153
154 #[error("Error serializing or deserializing data from the event cache store: {0}")]
156 Serialization(#[from] serde_json::Error),
157
158 #[error(
160 "The database format of the event cache store changed in an incompatible way, \
161 current version: {0}, latest version: {1}"
162 )]
163 UnsupportedDatabaseVersion(usize, usize),
164
165 #[error("The store contains invalid data: {details}")]
167 InvalidData {
168 details: String,
170 },
171}
172
173impl EventCacheStoreError {
174 #[inline]
178 #[cfg(not(target_family = "wasm"))]
179 pub fn backend<E>(error: E) -> Self
180 where
181 E: std::error::Error + Send + Sync + 'static,
182 {
183 Self::Backend(Box::new(error))
184 }
185
186 #[inline]
190 #[cfg(target_family = "wasm")]
191 pub fn backend<E>(error: E) -> Self
192 where
193 E: std::error::Error + 'static,
194 {
195 Self::Backend(Box::new(error))
196 }
197}
198
199pub type Result<T, E = EventCacheStoreError> = std::result::Result<T, E>;
201
202#[derive(Clone, Debug)]
205struct LockableEventCacheStore(Arc<DynEventCacheStore>);
206
207impl BackingStore for LockableEventCacheStore {
208 type LockError = EventCacheStoreError;
209
210 async fn try_lock(
211 &self,
212 lease_duration_ms: u32,
213 key: &str,
214 holder: &str,
215 ) -> std::result::Result<bool, Self::LockError> {
216 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
217 }
218}
219
220pub fn extract_event_relation(event: &Raw<AnySyncTimelineEvent>) -> Option<(OwnedEventId, String)> {
226 #[derive(serde::Deserialize)]
227 struct RelatesTo {
228 event_id: OwnedEventId,
229 rel_type: String,
230 }
231
232 #[derive(serde::Deserialize)]
233 struct EventContent {
234 #[serde(rename = "m.relates_to")]
235 rel: Option<RelatesTo>,
236 }
237
238 match event.get_field::<EventContent>("content") {
239 Ok(event_content) => {
240 event_content.and_then(|c| c.rel).map(|rel| (rel.event_id, rel.rel_type))
241 }
242 Err(err) => {
243 trace!("when extracting relation data from an event: {err}");
244 None
245 }
246 }
247}
248
249pub fn compute_filters_string(filters: Option<&[RelationType]>) -> Option<Vec<String>> {
254 filters.map(|filter| {
255 filter
256 .iter()
257 .map(|f| {
258 if *f == RelationType::Replacement {
259 "m.replace".to_owned()
260 } else {
261 f.to_string()
262 }
263 })
264 .collect()
265 })
266}