matrix_sdk_base/event_cache/store/
traits.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt, sync::Arc};
16
17use async_trait::async_trait;
18use matrix_sdk_common::{
19    linked_chunk::{
20        ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunkId, Position, RawChunk, Update,
21    },
22    AsyncTraitDeps,
23};
24use ruma::{events::relation::RelationType, EventId, MxcUri, OwnedEventId, RoomId};
25
26use super::{
27    media::{IgnoreMediaRetentionPolicy, MediaRetentionPolicy},
28    EventCacheStoreError,
29};
30use crate::{
31    event_cache::{Event, Gap},
32    media::MediaRequestParameters,
33};
34
35/// A default capacity for linked chunks, when manipulating in conjunction with
36/// an `EventCacheStore` implementation.
37// TODO: move back?
38pub const DEFAULT_CHUNK_CAPACITY: usize = 128;
39
40/// An abstract trait that can be used to implement different store backends
41/// for the event cache of the SDK.
42#[cfg_attr(target_family = "wasm", async_trait(?Send))]
43#[cfg_attr(not(target_family = "wasm"), async_trait)]
44pub trait EventCacheStore: AsyncTraitDeps {
45    /// The error type used by this event cache store.
46    type Error: fmt::Debug + Into<EventCacheStoreError>;
47
48    /// Try to take a lock using the given store.
49    async fn try_take_leased_lock(
50        &self,
51        lease_duration_ms: u32,
52        key: &str,
53        holder: &str,
54    ) -> Result<bool, Self::Error>;
55
56    /// An [`Update`] reflects an operation that has happened inside a linked
57    /// chunk. The linked chunk is used by the event cache to store the events
58    /// in-memory. This method aims at forwarding this update inside this store.
59    async fn handle_linked_chunk_updates(
60        &self,
61        linked_chunk_id: LinkedChunkId<'_>,
62        updates: Vec<Update<Event, Gap>>,
63    ) -> Result<(), Self::Error>;
64
65    /// Remove all data tied to a given room from the cache.
66    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
67        // Right now, this means removing all the linked chunk. If implementations
68        // override this behavior, they should *also* include this code.
69        self.handle_linked_chunk_updates(LinkedChunkId::Room(room_id), vec![Update::Clear]).await
70    }
71
72    /// Return all the raw components of a linked chunk, so the caller may
73    /// reconstruct the linked chunk later.
74    #[doc(hidden)]
75    async fn load_all_chunks(
76        &self,
77        linked_chunk_id: LinkedChunkId<'_>,
78    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error>;
79
80    /// Load the last chunk of the `LinkedChunk` holding all events of the room
81    /// identified by `room_id`.
82    ///
83    /// This is used to iteratively load events for the `EventCache`.
84    async fn load_last_chunk(
85        &self,
86        linked_chunk_id: LinkedChunkId<'_>,
87    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error>;
88
89    /// Load the chunk before the chunk identified by `before_chunk_identifier`
90    /// of the `LinkedChunk` holding all events of the room identified by
91    /// `room_id`
92    ///
93    /// This is used to iteratively load events for the `EventCache`.
94    async fn load_previous_chunk(
95        &self,
96        linked_chunk_id: LinkedChunkId<'_>,
97        before_chunk_identifier: ChunkIdentifier,
98    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error>;
99
100    /// Clear persisted events for all the rooms.
101    ///
102    /// This will empty and remove all the linked chunks stored previously,
103    /// using the above [`Self::handle_linked_chunk_updates`] methods. It
104    /// must *also* delete all the events' content, if they were stored in a
105    /// separate table.
106    ///
107    /// ⚠ This is meant only for super specific use cases, where there shouldn't
108    /// be any live in-memory linked chunks. In general, prefer using
109    /// `EventCache::clear_all_rooms()` from the common SDK crate.
110    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error>;
111
112    /// Given a set of event IDs, return the duplicated events along with their
113    /// position if there are any.
114    async fn filter_duplicated_events(
115        &self,
116        linked_chunk_id: LinkedChunkId<'_>,
117        events: Vec<OwnedEventId>,
118    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error>;
119
120    /// Find an event by its ID in a room.
121    async fn find_event(
122        &self,
123        room_id: &RoomId,
124        event_id: &EventId,
125    ) -> Result<Option<Event>, Self::Error>;
126
127    /// Find all the events that relate to a given event.
128    ///
129    /// Note: it doesn't process relations recursively: for instance, if
130    /// requesting only thread events, it will NOT return the aggregated
131    /// events affecting the returned events. It is the responsibility of
132    /// the caller to do so, if needed.
133    ///
134    /// An additional filter can be provided to only retrieve related events for
135    /// a certain relationship.
136    async fn find_event_relations(
137        &self,
138        room_id: &RoomId,
139        event_id: &EventId,
140        filter: Option<&[RelationType]>,
141    ) -> Result<Vec<Event>, Self::Error>;
142
143    /// Save an event, that might or might not be part of an existing linked
144    /// chunk.
145    ///
146    /// If the event has no event id, it will not be saved, and the function
147    /// must return an Ok result early.
148    ///
149    /// If the event was already stored with the same id, it must be replaced,
150    /// without causing an error.
151    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error>;
152
153    /// Add a media file's content in the media store.
154    ///
155    /// # Arguments
156    ///
157    /// * `request` - The `MediaRequest` of the file.
158    ///
159    /// * `content` - The content of the file.
160    async fn add_media_content(
161        &self,
162        request: &MediaRequestParameters,
163        content: Vec<u8>,
164        ignore_policy: IgnoreMediaRetentionPolicy,
165    ) -> Result<(), Self::Error>;
166
167    /// Replaces the given media's content key with another one.
168    ///
169    /// This should be used whenever a temporary (local) MXID has been used, and
170    /// it must now be replaced with its actual remote counterpart (after
171    /// uploading some content, or creating an empty MXC URI).
172    ///
173    /// ⚠ No check is performed to ensure that the media formats are consistent,
174    /// i.e. it's possible to update with a thumbnail key a media that was
175    /// keyed as a file before. The caller is responsible of ensuring that
176    /// the replacement makes sense, according to their use case.
177    ///
178    /// This should not raise an error when the `from` parameter points to an
179    /// unknown media, and it should silently continue in this case.
180    ///
181    /// # Arguments
182    ///
183    /// * `from` - The previous `MediaRequest` of the file.
184    ///
185    /// * `to` - The new `MediaRequest` of the file.
186    async fn replace_media_key(
187        &self,
188        from: &MediaRequestParameters,
189        to: &MediaRequestParameters,
190    ) -> Result<(), Self::Error>;
191
192    /// Get a media file's content out of the media store.
193    ///
194    /// # Arguments
195    ///
196    /// * `request` - The `MediaRequest` of the file.
197    async fn get_media_content(
198        &self,
199        request: &MediaRequestParameters,
200    ) -> Result<Option<Vec<u8>>, Self::Error>;
201
202    /// Remove a media file's content from the media store.
203    ///
204    /// # Arguments
205    ///
206    /// * `request` - The `MediaRequest` of the file.
207    async fn remove_media_content(
208        &self,
209        request: &MediaRequestParameters,
210    ) -> Result<(), Self::Error>;
211
212    /// Get a media file's content associated to an `MxcUri` from the
213    /// media store.
214    ///
215    /// In theory, there could be several files stored using the same URI and a
216    /// different `MediaFormat`. This API is meant to be used with a media file
217    /// that has only been stored with a single format.
218    ///
219    /// If there are several media files for a given URI in different formats,
220    /// this API will only return one of them. Which one is left as an
221    /// implementation detail.
222    ///
223    /// # Arguments
224    ///
225    /// * `uri` - The `MxcUri` of the media file.
226    async fn get_media_content_for_uri(&self, uri: &MxcUri)
227        -> Result<Option<Vec<u8>>, Self::Error>;
228
229    /// Remove all the media files' content associated to an `MxcUri` from the
230    /// media store.
231    ///
232    /// This should not raise an error when the `uri` parameter points to an
233    /// unknown media, and it should return an Ok result in this case.
234    ///
235    /// # Arguments
236    ///
237    /// * `uri` - The `MxcUri` of the media files.
238    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error>;
239
240    /// Set the `MediaRetentionPolicy` to use for deciding whether to store or
241    /// keep media content.
242    ///
243    /// # Arguments
244    ///
245    /// * `policy` - The `MediaRetentionPolicy` to use.
246    async fn set_media_retention_policy(
247        &self,
248        policy: MediaRetentionPolicy,
249    ) -> Result<(), Self::Error>;
250
251    /// Get the current `MediaRetentionPolicy`.
252    fn media_retention_policy(&self) -> MediaRetentionPolicy;
253
254    /// Set whether the current [`MediaRetentionPolicy`] should be ignored for
255    /// the media.
256    ///
257    /// The change will be taken into account in the next cleanup.
258    ///
259    /// # Arguments
260    ///
261    /// * `request` - The `MediaRequestParameters` of the file.
262    ///
263    /// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be
264    ///   ignored.
265    async fn set_ignore_media_retention_policy(
266        &self,
267        request: &MediaRequestParameters,
268        ignore_policy: IgnoreMediaRetentionPolicy,
269    ) -> Result<(), Self::Error>;
270
271    /// Clean up the media cache with the current `MediaRetentionPolicy`.
272    ///
273    /// If there is already an ongoing cleanup, this is a noop.
274    async fn clean_up_media_cache(&self) -> Result<(), Self::Error>;
275}
276
277#[repr(transparent)]
278struct EraseEventCacheStoreError<T>(T);
279
280#[cfg(not(tarpaulin_include))]
281impl<T: fmt::Debug> fmt::Debug for EraseEventCacheStoreError<T> {
282    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
283        self.0.fmt(f)
284    }
285}
286
287#[cfg_attr(target_family = "wasm", async_trait(?Send))]
288#[cfg_attr(not(target_family = "wasm"), async_trait)]
289impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
290    type Error = EventCacheStoreError;
291
292    async fn try_take_leased_lock(
293        &self,
294        lease_duration_ms: u32,
295        key: &str,
296        holder: &str,
297    ) -> Result<bool, Self::Error> {
298        self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into)
299    }
300
301    async fn handle_linked_chunk_updates(
302        &self,
303        linked_chunk_id: LinkedChunkId<'_>,
304        updates: Vec<Update<Event, Gap>>,
305    ) -> Result<(), Self::Error> {
306        self.0.handle_linked_chunk_updates(linked_chunk_id, updates).await.map_err(Into::into)
307    }
308
309    async fn load_all_chunks(
310        &self,
311        linked_chunk_id: LinkedChunkId<'_>,
312    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
313        self.0.load_all_chunks(linked_chunk_id).await.map_err(Into::into)
314    }
315
316    async fn load_last_chunk(
317        &self,
318        linked_chunk_id: LinkedChunkId<'_>,
319    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
320        self.0.load_last_chunk(linked_chunk_id).await.map_err(Into::into)
321    }
322
323    async fn load_previous_chunk(
324        &self,
325        linked_chunk_id: LinkedChunkId<'_>,
326        before_chunk_identifier: ChunkIdentifier,
327    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
328        self.0
329            .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
330            .await
331            .map_err(Into::into)
332    }
333
334    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
335        self.0.clear_all_linked_chunks().await.map_err(Into::into)
336    }
337
338    async fn filter_duplicated_events(
339        &self,
340        linked_chunk_id: LinkedChunkId<'_>,
341        events: Vec<OwnedEventId>,
342    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
343        self.0.filter_duplicated_events(linked_chunk_id, events).await.map_err(Into::into)
344    }
345
346    async fn find_event(
347        &self,
348        room_id: &RoomId,
349        event_id: &EventId,
350    ) -> Result<Option<Event>, Self::Error> {
351        self.0.find_event(room_id, event_id).await.map_err(Into::into)
352    }
353
354    async fn find_event_relations(
355        &self,
356        room_id: &RoomId,
357        event_id: &EventId,
358        filter: Option<&[RelationType]>,
359    ) -> Result<Vec<Event>, Self::Error> {
360        self.0.find_event_relations(room_id, event_id, filter).await.map_err(Into::into)
361    }
362
363    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
364        self.0.save_event(room_id, event).await.map_err(Into::into)
365    }
366
367    async fn add_media_content(
368        &self,
369        request: &MediaRequestParameters,
370        content: Vec<u8>,
371        ignore_policy: IgnoreMediaRetentionPolicy,
372    ) -> Result<(), Self::Error> {
373        self.0.add_media_content(request, content, ignore_policy).await.map_err(Into::into)
374    }
375
376    async fn replace_media_key(
377        &self,
378        from: &MediaRequestParameters,
379        to: &MediaRequestParameters,
380    ) -> Result<(), Self::Error> {
381        self.0.replace_media_key(from, to).await.map_err(Into::into)
382    }
383
384    async fn get_media_content(
385        &self,
386        request: &MediaRequestParameters,
387    ) -> Result<Option<Vec<u8>>, Self::Error> {
388        self.0.get_media_content(request).await.map_err(Into::into)
389    }
390
391    async fn remove_media_content(
392        &self,
393        request: &MediaRequestParameters,
394    ) -> Result<(), Self::Error> {
395        self.0.remove_media_content(request).await.map_err(Into::into)
396    }
397
398    async fn get_media_content_for_uri(
399        &self,
400        uri: &MxcUri,
401    ) -> Result<Option<Vec<u8>>, Self::Error> {
402        self.0.get_media_content_for_uri(uri).await.map_err(Into::into)
403    }
404
405    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> {
406        self.0.remove_media_content_for_uri(uri).await.map_err(Into::into)
407    }
408
409    async fn set_media_retention_policy(
410        &self,
411        policy: MediaRetentionPolicy,
412    ) -> Result<(), Self::Error> {
413        self.0.set_media_retention_policy(policy).await.map_err(Into::into)
414    }
415
416    fn media_retention_policy(&self) -> MediaRetentionPolicy {
417        self.0.media_retention_policy()
418    }
419
420    async fn set_ignore_media_retention_policy(
421        &self,
422        request: &MediaRequestParameters,
423        ignore_policy: IgnoreMediaRetentionPolicy,
424    ) -> Result<(), Self::Error> {
425        self.0.set_ignore_media_retention_policy(request, ignore_policy).await.map_err(Into::into)
426    }
427
428    async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
429        self.0.clean_up_media_cache().await.map_err(Into::into)
430    }
431}
432
433/// A type-erased [`EventCacheStore`].
434pub type DynEventCacheStore = dyn EventCacheStore<Error = EventCacheStoreError>;
435
436/// A type that can be type-erased into `Arc<dyn EventCacheStore>`.
437///
438/// This trait is not meant to be implemented directly outside
439/// `matrix-sdk-base`, but it is automatically implemented for everything that
440/// implements `EventCacheStore`.
441pub trait IntoEventCacheStore {
442    #[doc(hidden)]
443    fn into_event_cache_store(self) -> Arc<DynEventCacheStore>;
444}
445
446impl<T> IntoEventCacheStore for T
447where
448    T: EventCacheStore + Sized + 'static,
449{
450    fn into_event_cache_store(self) -> Arc<DynEventCacheStore> {
451        Arc::new(EraseEventCacheStoreError(self))
452    }
453}
454
455// Turns a given `Arc<T>` into `Arc<DynEventCacheStore>` by attaching the
456// `EventCacheStore` impl vtable of `EraseEventCacheStoreError<T>`.
457impl<T> IntoEventCacheStore for Arc<T>
458where
459    T: EventCacheStore + 'static,
460{
461    fn into_event_cache_store(self) -> Arc<DynEventCacheStore> {
462        let ptr: *const T = Arc::into_raw(self);
463        let ptr_erased = ptr as *const EraseEventCacheStoreError<T>;
464        // SAFETY: EraseEventCacheStoreError is repr(transparent) so T and
465        //         EraseEventCacheStoreError<T> have the same layout and ABI
466        unsafe { Arc::from_raw(ptr_erased) }
467    }
468}