matrix_sdk_base/event_cache/store/
memory_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    num::NonZeroUsize,
18    sync::{Arc, RwLock as StdRwLock},
19};
20
21use async_trait::async_trait;
22use matrix_sdk_common::{
23    linked_chunk::{
24        relational::RelationalLinkedChunk, ChunkIdentifier, ChunkIdentifierGenerator,
25        ChunkMetadata, LinkedChunkId, OwnedLinkedChunkId, Position, RawChunk, Update,
26    },
27    ring_buffer::RingBuffer,
28    store_locks::memory_store_helper::try_take_leased_lock,
29};
30use ruma::{
31    events::relation::RelationType,
32    time::{Instant, SystemTime},
33    EventId, MxcUri, OwnedEventId, OwnedMxcUri, RoomId,
34};
35use tracing::error;
36
37use super::{
38    compute_filters_string, extract_event_relation,
39    media::{EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService},
40    EventCacheStore, EventCacheStoreError, Result,
41};
42use crate::{
43    event_cache::{Event, Gap},
44    media::{MediaRequestParameters, UniqueKey as _},
45};
46
47/// In-memory, non-persistent implementation of the `EventCacheStore`.
48///
49/// Default if no other is configured at startup.
50#[derive(Debug, Clone)]
51pub struct MemoryStore {
52    inner: Arc<StdRwLock<MemoryStoreInner>>,
53    media_service: MediaService,
54}
55
56#[derive(Debug)]
57struct MemoryStoreInner {
58    media: RingBuffer<MediaContent>,
59    leases: HashMap<String, (String, Instant)>,
60    events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
61    media_retention_policy: Option<MediaRetentionPolicy>,
62    last_media_cleanup_time: SystemTime,
63}
64
65/// A media content in the `MemoryStore`.
66#[derive(Debug)]
67struct MediaContent {
68    /// The URI of the content.
69    uri: OwnedMxcUri,
70
71    /// The unique key of the content.
72    key: String,
73
74    /// The bytes of the content.
75    data: Vec<u8>,
76
77    /// Whether we should ignore the [`MediaRetentionPolicy`] for this content.
78    ignore_policy: bool,
79
80    /// The time of the last access of the content.
81    last_access: SystemTime,
82}
83
84const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
85
86impl Default for MemoryStore {
87    fn default() -> Self {
88        // Given that the store is empty, we won't need to clean it up right away.
89        let last_media_cleanup_time = SystemTime::now();
90        let media_service = MediaService::new();
91        media_service.restore(None, Some(last_media_cleanup_time));
92
93        Self {
94            inner: Arc::new(StdRwLock::new(MemoryStoreInner {
95                media: RingBuffer::new(NUMBER_OF_MEDIAS),
96                leases: Default::default(),
97                events: RelationalLinkedChunk::new(),
98                media_retention_policy: None,
99                last_media_cleanup_time,
100            })),
101            media_service,
102        }
103    }
104}
105
106impl MemoryStore {
107    /// Create a new empty MemoryStore
108    pub fn new() -> Self {
109        Self::default()
110    }
111}
112
113#[cfg_attr(target_family = "wasm", async_trait(?Send))]
114#[cfg_attr(not(target_family = "wasm"), async_trait)]
115impl EventCacheStore for MemoryStore {
116    type Error = EventCacheStoreError;
117
118    async fn try_take_leased_lock(
119        &self,
120        lease_duration_ms: u32,
121        key: &str,
122        holder: &str,
123    ) -> Result<bool, Self::Error> {
124        let mut inner = self.inner.write().unwrap();
125
126        Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
127    }
128
129    async fn handle_linked_chunk_updates(
130        &self,
131        linked_chunk_id: LinkedChunkId<'_>,
132        updates: Vec<Update<Event, Gap>>,
133    ) -> Result<(), Self::Error> {
134        let mut inner = self.inner.write().unwrap();
135        inner.events.apply_updates(linked_chunk_id, updates);
136
137        Ok(())
138    }
139
140    async fn load_all_chunks(
141        &self,
142        linked_chunk_id: LinkedChunkId<'_>,
143    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
144        let inner = self.inner.read().unwrap();
145        inner
146            .events
147            .load_all_chunks(linked_chunk_id)
148            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
149    }
150
151    async fn load_all_chunks_metadata(
152        &self,
153        linked_chunk_id: LinkedChunkId<'_>,
154    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
155        let inner = self.inner.read().unwrap();
156        inner
157            .events
158            .load_all_chunks_metadata(linked_chunk_id)
159            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
160    }
161
162    async fn load_last_chunk(
163        &self,
164        linked_chunk_id: LinkedChunkId<'_>,
165    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
166        let inner = self.inner.read().unwrap();
167        inner
168            .events
169            .load_last_chunk(linked_chunk_id)
170            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
171    }
172
173    async fn load_previous_chunk(
174        &self,
175        linked_chunk_id: LinkedChunkId<'_>,
176        before_chunk_identifier: ChunkIdentifier,
177    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
178        let inner = self.inner.read().unwrap();
179        inner
180            .events
181            .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
182            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
183    }
184
185    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
186        self.inner.write().unwrap().events.clear();
187        Ok(())
188    }
189
190    async fn filter_duplicated_events(
191        &self,
192        linked_chunk_id: LinkedChunkId<'_>,
193        mut events: Vec<OwnedEventId>,
194    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
195        if events.is_empty() {
196            return Ok(Vec::new());
197        }
198
199        let inner = self.inner.read().unwrap();
200
201        let mut duplicated_events = Vec::new();
202
203        for (event, position) in
204            inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
205        {
206            if let Some(known_event_id) = event.event_id() {
207                // This event is a duplicate!
208                if let Some(index) =
209                    events.iter().position(|new_event_id| &known_event_id == new_event_id)
210                {
211                    duplicated_events.push((events.remove(index), position));
212                }
213            }
214        }
215
216        Ok(duplicated_events)
217    }
218
219    async fn find_event(
220        &self,
221        room_id: &RoomId,
222        event_id: &EventId,
223    ) -> Result<Option<Event>, Self::Error> {
224        let inner = self.inner.read().unwrap();
225
226        let target_linked_chunk_id = OwnedLinkedChunkId::Room(room_id.to_owned());
227
228        let event = inner
229            .events
230            .items(&target_linked_chunk_id)
231            .find_map(|(event, _pos)| (event.event_id()? == event_id).then_some(event.clone()));
232
233        Ok(event)
234    }
235
236    async fn find_event_relations(
237        &self,
238        room_id: &RoomId,
239        event_id: &EventId,
240        filters: Option<&[RelationType]>,
241    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
242        let inner = self.inner.read().unwrap();
243
244        let target_linked_chunk_id = OwnedLinkedChunkId::Room(room_id.to_owned());
245
246        let filters = compute_filters_string(filters);
247
248        let related_events = inner
249            .events
250            .items(&target_linked_chunk_id)
251            .filter_map(|(event, pos)| {
252                // Must have a relation.
253                let (related_to, rel_type) = extract_event_relation(event.raw())?;
254
255                // Must relate to the target item.
256                if related_to != event_id {
257                    return None;
258                }
259
260                // Must not be filtered out.
261                if let Some(filters) = &filters {
262                    filters.contains(&rel_type).then_some((event.clone(), pos))
263                } else {
264                    Some((event.clone(), pos))
265                }
266            })
267            .collect();
268
269        Ok(related_events)
270    }
271
272    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
273        if event.event_id().is_none() {
274            error!(%room_id, "Trying to save an event with no ID");
275            return Ok(());
276        }
277        self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
278        Ok(())
279    }
280
281    async fn add_media_content(
282        &self,
283        request: &MediaRequestParameters,
284        data: Vec<u8>,
285        ignore_policy: IgnoreMediaRetentionPolicy,
286    ) -> Result<()> {
287        self.media_service.add_media_content(self, request, data, ignore_policy).await
288    }
289
290    async fn replace_media_key(
291        &self,
292        from: &MediaRequestParameters,
293        to: &MediaRequestParameters,
294    ) -> Result<(), Self::Error> {
295        let expected_key = from.unique_key();
296
297        let mut inner = self.inner.write().unwrap();
298
299        if let Some(media_content) =
300            inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
301        {
302            media_content.uri = to.uri().to_owned();
303            media_content.key = to.unique_key();
304        }
305
306        Ok(())
307    }
308
309    async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
310        self.media_service.get_media_content(self, request).await
311    }
312
313    async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
314        let expected_key = request.unique_key();
315
316        let mut inner = self.inner.write().unwrap();
317
318        let Some(index) =
319            inner.media.iter().position(|media_content| media_content.key == expected_key)
320        else {
321            return Ok(());
322        };
323
324        inner.media.remove(index);
325
326        Ok(())
327    }
328
329    async fn get_media_content_for_uri(
330        &self,
331        uri: &MxcUri,
332    ) -> Result<Option<Vec<u8>>, Self::Error> {
333        self.media_service.get_media_content_for_uri(self, uri).await
334    }
335
336    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
337        let mut inner = self.inner.write().unwrap();
338
339        let positions = inner
340            .media
341            .iter()
342            .enumerate()
343            .filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
344            .collect::<Vec<_>>();
345
346        // Iterate in reverse-order so that positions stay valid after first removals.
347        for position in positions.into_iter().rev() {
348            inner.media.remove(position);
349        }
350
351        Ok(())
352    }
353
354    async fn set_media_retention_policy(
355        &self,
356        policy: MediaRetentionPolicy,
357    ) -> Result<(), Self::Error> {
358        self.media_service.set_media_retention_policy(self, policy).await
359    }
360
361    fn media_retention_policy(&self) -> MediaRetentionPolicy {
362        self.media_service.media_retention_policy()
363    }
364
365    async fn set_ignore_media_retention_policy(
366        &self,
367        request: &MediaRequestParameters,
368        ignore_policy: IgnoreMediaRetentionPolicy,
369    ) -> Result<(), Self::Error> {
370        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
371    }
372
373    async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
374        self.media_service.clean_up_media_cache(self).await
375    }
376}
377
378#[cfg_attr(target_family = "wasm", async_trait(?Send))]
379#[cfg_attr(not(target_family = "wasm"), async_trait)]
380impl EventCacheStoreMedia for MemoryStore {
381    type Error = EventCacheStoreError;
382
383    async fn media_retention_policy_inner(
384        &self,
385    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
386        Ok(self.inner.read().unwrap().media_retention_policy)
387    }
388
389    async fn set_media_retention_policy_inner(
390        &self,
391        policy: MediaRetentionPolicy,
392    ) -> Result<(), Self::Error> {
393        self.inner.write().unwrap().media_retention_policy = Some(policy);
394        Ok(())
395    }
396
397    async fn add_media_content_inner(
398        &self,
399        request: &MediaRequestParameters,
400        data: Vec<u8>,
401        last_access: SystemTime,
402        policy: MediaRetentionPolicy,
403        ignore_policy: IgnoreMediaRetentionPolicy,
404    ) -> Result<(), Self::Error> {
405        // Avoid duplication. Let's try to remove it first.
406        self.remove_media_content(request).await?;
407
408        let ignore_policy = ignore_policy.is_yes();
409
410        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
411            // Do not store it.
412            return Ok(());
413        }
414
415        // Now, let's add it.
416        let mut inner = self.inner.write().unwrap();
417        inner.media.push(MediaContent {
418            uri: request.uri().to_owned(),
419            key: request.unique_key(),
420            data,
421            ignore_policy,
422            last_access,
423        });
424
425        Ok(())
426    }
427
428    async fn set_ignore_media_retention_policy_inner(
429        &self,
430        request: &MediaRequestParameters,
431        ignore_policy: IgnoreMediaRetentionPolicy,
432    ) -> Result<(), Self::Error> {
433        let mut inner = self.inner.write().unwrap();
434        let expected_key = request.unique_key();
435
436        if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
437        {
438            media_content.ignore_policy = ignore_policy.is_yes();
439        }
440
441        Ok(())
442    }
443
444    async fn get_media_content_inner(
445        &self,
446        request: &MediaRequestParameters,
447        current_time: SystemTime,
448    ) -> Result<Option<Vec<u8>>, Self::Error> {
449        let mut inner = self.inner.write().unwrap();
450        let expected_key = request.unique_key();
451
452        // First get the content out of the buffer, we are going to put it back at the
453        // end.
454        let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
455            return Ok(None);
456        };
457        let Some(mut content) = inner.media.remove(index) else {
458            return Ok(None);
459        };
460
461        // Clone the data.
462        let data = content.data.clone();
463
464        // Update the last access time.
465        content.last_access = current_time;
466
467        // Put it back in the buffer.
468        inner.media.push(content);
469
470        Ok(Some(data))
471    }
472
473    async fn get_media_content_for_uri_inner(
474        &self,
475        expected_uri: &MxcUri,
476        current_time: SystemTime,
477    ) -> Result<Option<Vec<u8>>, Self::Error> {
478        let mut inner = self.inner.write().unwrap();
479
480        // First get the content out of the buffer, we are going to put it back at the
481        // end.
482        let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
483            return Ok(None);
484        };
485        let Some(mut content) = inner.media.remove(index) else {
486            return Ok(None);
487        };
488
489        // Clone the data.
490        let data = content.data.clone();
491
492        // Update the last access time.
493        content.last_access = current_time;
494
495        // Put it back in the buffer.
496        inner.media.push(content);
497
498        Ok(Some(data))
499    }
500
501    async fn clean_up_media_cache_inner(
502        &self,
503        policy: MediaRetentionPolicy,
504        current_time: SystemTime,
505    ) -> Result<(), Self::Error> {
506        if !policy.has_limitations() {
507            // We can safely skip all the checks.
508            return Ok(());
509        }
510
511        let mut inner = self.inner.write().unwrap();
512
513        // First, check media content that exceed the max filesize.
514        if policy.computed_max_file_size().is_some() {
515            inner.media.retain(|content| {
516                content.ignore_policy || !policy.exceeds_max_file_size(content.data.len() as u64)
517            });
518        }
519
520        // Then, clean up expired media content.
521        if policy.last_access_expiry.is_some() {
522            inner.media.retain(|content| {
523                content.ignore_policy
524                    || !policy.has_content_expired(current_time, content.last_access)
525            });
526        }
527
528        // Finally, if the cache size is too big, remove old items until it fits.
529        if let Some(max_cache_size) = policy.max_cache_size {
530            // Reverse the iterator because in case the cache size is overflowing, we want
531            // to count the number of old items to remove. Items are sorted by last access
532            // and old items are at the start.
533            let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
534                (0u64, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
535                |(mut cache_size, mut items_to_remove), (index, content)| {
536                    if content.ignore_policy {
537                        // Do not count it.
538                        return (cache_size, items_to_remove);
539                    }
540
541                    let remove_item = if items_to_remove.is_empty() {
542                        // We have not reached the max cache size yet.
543                        if let Some(sum) = cache_size.checked_add(content.data.len() as u64) {
544                            cache_size = sum;
545                            // Start removing items if we have exceeded the max cache size.
546                            cache_size > max_cache_size
547                        } else {
548                            // The cache size is overflowing, remove the remaining items, since the
549                            // max cache size cannot be bigger than
550                            // usize::MAX.
551                            true
552                        }
553                    } else {
554                        // We have reached the max cache size already, just remove it.
555                        true
556                    };
557
558                    if remove_item {
559                        items_to_remove.push(index);
560                    }
561
562                    (cache_size, items_to_remove)
563                },
564            );
565
566            // The indexes are already in reverse order so we can just iterate in that order
567            // to remove them starting by the end.
568            for index in items_to_remove {
569                inner.media.remove(index);
570            }
571        }
572
573        inner.last_media_cleanup_time = current_time;
574
575        Ok(())
576    }
577
578    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
579        Ok(Some(self.inner.read().unwrap().last_media_cleanup_time))
580    }
581}
582
583#[cfg(test)]
584mod tests {
585    use super::{MemoryStore, Result};
586    use crate::event_cache_store_media_integration_tests;
587
588    async fn get_event_cache_store() -> Result<MemoryStore> {
589        Ok(MemoryStore::new())
590    }
591
592    event_cache_store_integration_tests!();
593    event_cache_store_integration_tests_time!();
594    event_cache_store_media_integration_tests!(with_media_size_tests);
595}