matrix_sdk_base/media/store/
memory_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    num::NonZeroUsize,
18    sync::{Arc, RwLock as StdRwLock},
19};
20
21use async_trait::async_trait;
22use matrix_sdk_common::{
23    cross_process_lock::{
24        CrossProcessLockGeneration,
25        memory_store_helper::{Lease, try_take_leased_lock},
26    },
27    ring_buffer::RingBuffer,
28};
29use ruma::{MxcUri, OwnedMxcUri, time::SystemTime};
30
31use super::Result;
32use crate::media::{
33    MediaRequestParameters, UniqueKey as _,
34    store::{
35        IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore,
36        MediaStoreError, MediaStoreInner,
37    },
38};
39
40/// In-memory, non-persistent implementation of the `MediaStore`.
41///
42/// Default if no other is configured at startup.
43#[derive(Debug, Clone)]
44pub struct MemoryMediaStore {
45    inner: Arc<StdRwLock<MemoryMediaStoreInner>>,
46    media_service: MediaService,
47}
48
49#[derive(Debug)]
50struct MemoryMediaStoreInner {
51    media: RingBuffer<MediaContent>,
52    leases: HashMap<String, Lease>,
53    media_retention_policy: Option<MediaRetentionPolicy>,
54    last_media_cleanup_time: SystemTime,
55}
56
57/// A media content in the `MemoryStore`.
58#[derive(Debug)]
59struct MediaContent {
60    /// The URI of the content.
61    uri: OwnedMxcUri,
62
63    /// The unique key of the content.
64    key: String,
65
66    /// The bytes of the content.
67    data: Vec<u8>,
68
69    /// Whether we should ignore the [`MediaRetentionPolicy`] for this content.
70    ignore_policy: bool,
71
72    /// The time of the last access of the content.
73    last_access: SystemTime,
74}
75
76const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
77
78impl Default for MemoryMediaStore {
79    fn default() -> Self {
80        // Given that the store is empty, we won't need to clean it up right away.
81        let last_media_cleanup_time = SystemTime::now();
82        let media_service = MediaService::new();
83        media_service.restore(None, Some(last_media_cleanup_time));
84
85        Self {
86            inner: Arc::new(StdRwLock::new(MemoryMediaStoreInner {
87                media: RingBuffer::new(NUMBER_OF_MEDIAS),
88                leases: Default::default(),
89                media_retention_policy: None,
90                last_media_cleanup_time,
91            })),
92            media_service,
93        }
94    }
95}
96
97impl MemoryMediaStore {
98    /// Create a new empty MemoryMediaStore
99    pub fn new() -> Self {
100        Self::default()
101    }
102}
103
104#[cfg_attr(target_family = "wasm", async_trait(?Send))]
105#[cfg_attr(not(target_family = "wasm"), async_trait)]
106impl MediaStore for MemoryMediaStore {
107    type Error = MediaStoreError;
108
109    async fn try_take_leased_lock(
110        &self,
111        lease_duration_ms: u32,
112        key: &str,
113        holder: &str,
114    ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
115        let mut inner = self.inner.write().unwrap();
116
117        Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
118    }
119
120    async fn add_media_content(
121        &self,
122        request: &MediaRequestParameters,
123        data: Vec<u8>,
124        ignore_policy: IgnoreMediaRetentionPolicy,
125    ) -> Result<(), Self::Error> {
126        self.media_service.add_media_content(self, request, data, ignore_policy).await
127    }
128
129    async fn replace_media_key(
130        &self,
131        from: &MediaRequestParameters,
132        to: &MediaRequestParameters,
133    ) -> Result<(), Self::Error> {
134        let expected_key = from.unique_key();
135
136        let mut inner = self.inner.write().unwrap();
137
138        if let Some(media_content) =
139            inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
140        {
141            media_content.uri = to.uri().to_owned();
142            media_content.key = to.unique_key();
143        }
144
145        Ok(())
146    }
147
148    async fn get_media_content(
149        &self,
150        request: &MediaRequestParameters,
151    ) -> Result<Option<Vec<u8>>, Self::Error> {
152        self.media_service.get_media_content(self, request).await
153    }
154
155    async fn remove_media_content(
156        &self,
157        request: &MediaRequestParameters,
158    ) -> Result<(), Self::Error> {
159        let expected_key = request.unique_key();
160
161        let mut inner = self.inner.write().unwrap();
162
163        let Some(index) =
164            inner.media.iter().position(|media_content| media_content.key == expected_key)
165        else {
166            return Ok(());
167        };
168
169        inner.media.remove(index);
170
171        Ok(())
172    }
173
174    async fn get_media_content_for_uri(
175        &self,
176        uri: &MxcUri,
177    ) -> Result<Option<Vec<u8>>, Self::Error> {
178        self.media_service.get_media_content_for_uri(self, uri).await
179    }
180
181    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> {
182        let mut inner = self.inner.write().unwrap();
183
184        let positions = inner
185            .media
186            .iter()
187            .enumerate()
188            .filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
189            .collect::<Vec<_>>();
190
191        // Iterate in reverse-order so that positions stay valid after first removals.
192        for position in positions.into_iter().rev() {
193            inner.media.remove(position);
194        }
195
196        Ok(())
197    }
198
199    async fn set_media_retention_policy(
200        &self,
201        policy: MediaRetentionPolicy,
202    ) -> Result<(), Self::Error> {
203        self.media_service.set_media_retention_policy(self, policy).await
204    }
205
206    fn media_retention_policy(&self) -> MediaRetentionPolicy {
207        self.media_service.media_retention_policy()
208    }
209
210    async fn set_ignore_media_retention_policy(
211        &self,
212        request: &MediaRequestParameters,
213        ignore_policy: IgnoreMediaRetentionPolicy,
214    ) -> Result<(), Self::Error> {
215        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
216    }
217
218    async fn clean(&self) -> Result<(), Self::Error> {
219        self.media_service.clean(self).await
220    }
221
222    async fn optimize(&self) -> Result<(), Self::Error> {
223        Ok(())
224    }
225
226    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
227        Ok(None)
228    }
229}
230
231#[cfg_attr(target_family = "wasm", async_trait(?Send))]
232#[cfg_attr(not(target_family = "wasm"), async_trait)]
233impl MediaStoreInner for MemoryMediaStore {
234    type Error = MediaStoreError;
235
236    async fn media_retention_policy_inner(
237        &self,
238    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
239        Ok(self.inner.read().unwrap().media_retention_policy)
240    }
241
242    async fn set_media_retention_policy_inner(
243        &self,
244        policy: MediaRetentionPolicy,
245    ) -> Result<(), Self::Error> {
246        self.inner.write().unwrap().media_retention_policy = Some(policy);
247        Ok(())
248    }
249
250    async fn add_media_content_inner(
251        &self,
252        request: &MediaRequestParameters,
253        data: Vec<u8>,
254        last_access: SystemTime,
255        policy: MediaRetentionPolicy,
256        ignore_policy: IgnoreMediaRetentionPolicy,
257    ) -> Result<(), Self::Error> {
258        // Avoid duplication. Let's try to remove it first.
259        self.remove_media_content(request).await?;
260
261        let ignore_policy = ignore_policy.is_yes();
262
263        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
264            // Do not store it.
265            return Ok(());
266        }
267
268        // Now, let's add it.
269        let mut inner = self.inner.write().unwrap();
270        inner.media.push(MediaContent {
271            uri: request.uri().to_owned(),
272            key: request.unique_key(),
273            data,
274            ignore_policy,
275            last_access,
276        });
277
278        Ok(())
279    }
280
281    async fn set_ignore_media_retention_policy_inner(
282        &self,
283        request: &MediaRequestParameters,
284        ignore_policy: IgnoreMediaRetentionPolicy,
285    ) -> Result<(), Self::Error> {
286        let mut inner = self.inner.write().unwrap();
287        let expected_key = request.unique_key();
288
289        if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
290        {
291            media_content.ignore_policy = ignore_policy.is_yes();
292        }
293
294        Ok(())
295    }
296
297    async fn get_media_content_inner(
298        &self,
299        request: &MediaRequestParameters,
300        current_time: SystemTime,
301    ) -> Result<Option<Vec<u8>>, Self::Error> {
302        let mut inner = self.inner.write().unwrap();
303        let expected_key = request.unique_key();
304
305        // First get the content out of the buffer, we are going to put it back at the
306        // end.
307        let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
308            return Ok(None);
309        };
310        let Some(mut content) = inner.media.remove(index) else {
311            return Ok(None);
312        };
313
314        // Clone the data.
315        let data = content.data.clone();
316
317        // Update the last access time.
318        content.last_access = current_time;
319
320        // Put it back in the buffer.
321        inner.media.push(content);
322
323        Ok(Some(data))
324    }
325
326    async fn get_media_content_for_uri_inner(
327        &self,
328        expected_uri: &MxcUri,
329        current_time: SystemTime,
330    ) -> Result<Option<Vec<u8>>, Self::Error> {
331        let mut inner = self.inner.write().unwrap();
332
333        // First get the content out of the buffer, we are going to put it back at the
334        // end.
335        let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
336            return Ok(None);
337        };
338        let Some(mut content) = inner.media.remove(index) else {
339            return Ok(None);
340        };
341
342        // Clone the data.
343        let data = content.data.clone();
344
345        // Update the last access time.
346        content.last_access = current_time;
347
348        // Put it back in the buffer.
349        inner.media.push(content);
350
351        Ok(Some(data))
352    }
353
354    async fn clean_inner(
355        &self,
356        policy: MediaRetentionPolicy,
357        current_time: SystemTime,
358    ) -> Result<(), Self::Error> {
359        if !policy.has_limitations() {
360            // We can safely skip all the checks.
361            return Ok(());
362        }
363
364        let mut inner = self.inner.write().unwrap();
365
366        // First, check media content that exceed the max filesize.
367        if policy.computed_max_file_size().is_some() {
368            inner.media.retain(|content| {
369                content.ignore_policy || !policy.exceeds_max_file_size(content.data.len() as u64)
370            });
371        }
372
373        // Then, clean up expired media content.
374        if policy.last_access_expiry.is_some() {
375            inner.media.retain(|content| {
376                content.ignore_policy
377                    || !policy.has_content_expired(current_time, content.last_access)
378            });
379        }
380
381        // Finally, if the cache size is too big, remove old items until it fits.
382        if let Some(max_cache_size) = policy.max_cache_size {
383            // Reverse the iterator because in case the cache size is overflowing, we want
384            // to count the number of old items to remove. Items are sorted by last access
385            // and old items are at the start.
386            let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
387                (0u64, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
388                |(mut cache_size, mut items_to_remove), (index, content)| {
389                    if content.ignore_policy {
390                        // Do not count it.
391                        return (cache_size, items_to_remove);
392                    }
393
394                    let remove_item = if items_to_remove.is_empty() {
395                        // We have not reached the max cache size yet.
396                        if let Some(sum) = cache_size.checked_add(content.data.len() as u64) {
397                            cache_size = sum;
398                            // Start removing items if we have exceeded the max cache size.
399                            cache_size > max_cache_size
400                        } else {
401                            // The cache size is overflowing, remove the remaining items, since the
402                            // max cache size cannot be bigger than
403                            // usize::MAX.
404                            true
405                        }
406                    } else {
407                        // We have reached the max cache size already, just remove it.
408                        true
409                    };
410
411                    if remove_item {
412                        items_to_remove.push(index);
413                    }
414
415                    (cache_size, items_to_remove)
416                },
417            );
418
419            // The indexes are already in reverse order so we can just iterate in that order
420            // to remove them starting by the end.
421            for index in items_to_remove {
422                inner.media.remove(index);
423            }
424        }
425
426        inner.last_media_cleanup_time = current_time;
427
428        Ok(())
429    }
430
431    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
432        Ok(Some(self.inner.read().unwrap().last_media_cleanup_time))
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use super::{MemoryMediaStore, Result};
439    use crate::{
440        media_store_inner_integration_tests, media_store_integration_tests,
441        media_store_integration_tests_time,
442    };
443
444    async fn get_media_store() -> Result<MemoryMediaStore> {
445        Ok(MemoryMediaStore::new())
446    }
447
448    media_store_inner_integration_tests!();
449    media_store_integration_tests!();
450    media_store_integration_tests_time!();
451}