matrix_sdk_base/media/store/
memory_store.rs1use std::{
16 collections::HashMap,
17 num::NonZeroUsize,
18 sync::{Arc, RwLock as StdRwLock},
19};
20
21use async_trait::async_trait;
22use matrix_sdk_common::{
23 cross_process_lock::{
24 CrossProcessLockGeneration,
25 memory_store_helper::{Lease, try_take_leased_lock},
26 },
27 ring_buffer::RingBuffer,
28};
29use ruma::{MxcUri, OwnedMxcUri, time::SystemTime};
30
31use super::Result;
32use crate::media::{
33 MediaRequestParameters, UniqueKey as _,
34 store::{
35 IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore,
36 MediaStoreError, MediaStoreInner,
37 },
38};
39
40#[derive(Debug, Clone)]
44pub struct MemoryMediaStore {
45 inner: Arc<StdRwLock<MemoryMediaStoreInner>>,
46 media_service: MediaService,
47}
48
49#[derive(Debug)]
50struct MemoryMediaStoreInner {
51 media: RingBuffer<MediaContent>,
52 leases: HashMap<String, Lease>,
53 media_retention_policy: Option<MediaRetentionPolicy>,
54 last_media_cleanup_time: SystemTime,
55}
56
57#[derive(Debug)]
59struct MediaContent {
60 uri: OwnedMxcUri,
62
63 key: String,
65
66 data: Vec<u8>,
68
69 ignore_policy: bool,
71
72 last_access: SystemTime,
74}
75
76const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
77
78impl Default for MemoryMediaStore {
79 fn default() -> Self {
80 let last_media_cleanup_time = SystemTime::now();
82 let media_service = MediaService::new();
83 media_service.restore(None, Some(last_media_cleanup_time));
84
85 Self {
86 inner: Arc::new(StdRwLock::new(MemoryMediaStoreInner {
87 media: RingBuffer::new(NUMBER_OF_MEDIAS),
88 leases: Default::default(),
89 media_retention_policy: None,
90 last_media_cleanup_time,
91 })),
92 media_service,
93 }
94 }
95}
96
97impl MemoryMediaStore {
98 pub fn new() -> Self {
100 Self::default()
101 }
102}
103
104#[cfg_attr(target_family = "wasm", async_trait(?Send))]
105#[cfg_attr(not(target_family = "wasm"), async_trait)]
106impl MediaStore for MemoryMediaStore {
107 type Error = MediaStoreError;
108
109 async fn try_take_leased_lock(
110 &self,
111 lease_duration_ms: u32,
112 key: &str,
113 holder: &str,
114 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
115 let mut inner = self.inner.write().unwrap();
116
117 Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
118 }
119
120 async fn add_media_content(
121 &self,
122 request: &MediaRequestParameters,
123 data: Vec<u8>,
124 ignore_policy: IgnoreMediaRetentionPolicy,
125 ) -> Result<(), Self::Error> {
126 self.media_service.add_media_content(self, request, data, ignore_policy).await
127 }
128
129 async fn replace_media_key(
130 &self,
131 from: &MediaRequestParameters,
132 to: &MediaRequestParameters,
133 ) -> Result<(), Self::Error> {
134 let expected_key = from.unique_key();
135
136 let mut inner = self.inner.write().unwrap();
137
138 if let Some(media_content) =
139 inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
140 {
141 media_content.uri = to.uri().to_owned();
142 media_content.key = to.unique_key();
143 }
144
145 Ok(())
146 }
147
148 async fn get_media_content(
149 &self,
150 request: &MediaRequestParameters,
151 ) -> Result<Option<Vec<u8>>, Self::Error> {
152 self.media_service.get_media_content(self, request).await
153 }
154
155 async fn remove_media_content(
156 &self,
157 request: &MediaRequestParameters,
158 ) -> Result<(), Self::Error> {
159 let expected_key = request.unique_key();
160
161 let mut inner = self.inner.write().unwrap();
162
163 let Some(index) =
164 inner.media.iter().position(|media_content| media_content.key == expected_key)
165 else {
166 return Ok(());
167 };
168
169 inner.media.remove(index);
170
171 Ok(())
172 }
173
174 async fn get_media_content_for_uri(
175 &self,
176 uri: &MxcUri,
177 ) -> Result<Option<Vec<u8>>, Self::Error> {
178 self.media_service.get_media_content_for_uri(self, uri).await
179 }
180
181 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> {
182 let mut inner = self.inner.write().unwrap();
183
184 let positions = inner
185 .media
186 .iter()
187 .enumerate()
188 .filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
189 .collect::<Vec<_>>();
190
191 for position in positions.into_iter().rev() {
193 inner.media.remove(position);
194 }
195
196 Ok(())
197 }
198
199 async fn set_media_retention_policy(
200 &self,
201 policy: MediaRetentionPolicy,
202 ) -> Result<(), Self::Error> {
203 self.media_service.set_media_retention_policy(self, policy).await
204 }
205
206 fn media_retention_policy(&self) -> MediaRetentionPolicy {
207 self.media_service.media_retention_policy()
208 }
209
210 async fn set_ignore_media_retention_policy(
211 &self,
212 request: &MediaRequestParameters,
213 ignore_policy: IgnoreMediaRetentionPolicy,
214 ) -> Result<(), Self::Error> {
215 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
216 }
217
218 async fn clean(&self) -> Result<(), Self::Error> {
219 self.media_service.clean(self).await
220 }
221
222 async fn optimize(&self) -> Result<(), Self::Error> {
223 Ok(())
224 }
225
226 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
227 Ok(None)
228 }
229}
230
231#[cfg_attr(target_family = "wasm", async_trait(?Send))]
232#[cfg_attr(not(target_family = "wasm"), async_trait)]
233impl MediaStoreInner for MemoryMediaStore {
234 type Error = MediaStoreError;
235
236 async fn media_retention_policy_inner(
237 &self,
238 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
239 Ok(self.inner.read().unwrap().media_retention_policy)
240 }
241
242 async fn set_media_retention_policy_inner(
243 &self,
244 policy: MediaRetentionPolicy,
245 ) -> Result<(), Self::Error> {
246 self.inner.write().unwrap().media_retention_policy = Some(policy);
247 Ok(())
248 }
249
250 async fn add_media_content_inner(
251 &self,
252 request: &MediaRequestParameters,
253 data: Vec<u8>,
254 last_access: SystemTime,
255 policy: MediaRetentionPolicy,
256 ignore_policy: IgnoreMediaRetentionPolicy,
257 ) -> Result<(), Self::Error> {
258 self.remove_media_content(request).await?;
260
261 let ignore_policy = ignore_policy.is_yes();
262
263 if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
264 return Ok(());
266 }
267
268 let mut inner = self.inner.write().unwrap();
270 inner.media.push(MediaContent {
271 uri: request.uri().to_owned(),
272 key: request.unique_key(),
273 data,
274 ignore_policy,
275 last_access,
276 });
277
278 Ok(())
279 }
280
281 async fn set_ignore_media_retention_policy_inner(
282 &self,
283 request: &MediaRequestParameters,
284 ignore_policy: IgnoreMediaRetentionPolicy,
285 ) -> Result<(), Self::Error> {
286 let mut inner = self.inner.write().unwrap();
287 let expected_key = request.unique_key();
288
289 if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
290 {
291 media_content.ignore_policy = ignore_policy.is_yes();
292 }
293
294 Ok(())
295 }
296
297 async fn get_media_content_inner(
298 &self,
299 request: &MediaRequestParameters,
300 current_time: SystemTime,
301 ) -> Result<Option<Vec<u8>>, Self::Error> {
302 let mut inner = self.inner.write().unwrap();
303 let expected_key = request.unique_key();
304
305 let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
308 return Ok(None);
309 };
310 let Some(mut content) = inner.media.remove(index) else {
311 return Ok(None);
312 };
313
314 let data = content.data.clone();
316
317 content.last_access = current_time;
319
320 inner.media.push(content);
322
323 Ok(Some(data))
324 }
325
326 async fn get_media_content_for_uri_inner(
327 &self,
328 expected_uri: &MxcUri,
329 current_time: SystemTime,
330 ) -> Result<Option<Vec<u8>>, Self::Error> {
331 let mut inner = self.inner.write().unwrap();
332
333 let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
336 return Ok(None);
337 };
338 let Some(mut content) = inner.media.remove(index) else {
339 return Ok(None);
340 };
341
342 let data = content.data.clone();
344
345 content.last_access = current_time;
347
348 inner.media.push(content);
350
351 Ok(Some(data))
352 }
353
354 async fn clean_inner(
355 &self,
356 policy: MediaRetentionPolicy,
357 current_time: SystemTime,
358 ) -> Result<(), Self::Error> {
359 if !policy.has_limitations() {
360 return Ok(());
362 }
363
364 let mut inner = self.inner.write().unwrap();
365
366 if policy.computed_max_file_size().is_some() {
368 inner.media.retain(|content| {
369 content.ignore_policy || !policy.exceeds_max_file_size(content.data.len() as u64)
370 });
371 }
372
373 if policy.last_access_expiry.is_some() {
375 inner.media.retain(|content| {
376 content.ignore_policy
377 || !policy.has_content_expired(current_time, content.last_access)
378 });
379 }
380
381 if let Some(max_cache_size) = policy.max_cache_size {
383 let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
387 (0u64, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
388 |(mut cache_size, mut items_to_remove), (index, content)| {
389 if content.ignore_policy {
390 return (cache_size, items_to_remove);
392 }
393
394 let remove_item = if items_to_remove.is_empty() {
395 if let Some(sum) = cache_size.checked_add(content.data.len() as u64) {
397 cache_size = sum;
398 cache_size > max_cache_size
400 } else {
401 true
405 }
406 } else {
407 true
409 };
410
411 if remove_item {
412 items_to_remove.push(index);
413 }
414
415 (cache_size, items_to_remove)
416 },
417 );
418
419 for index in items_to_remove {
422 inner.media.remove(index);
423 }
424 }
425
426 inner.last_media_cleanup_time = current_time;
427
428 Ok(())
429 }
430
431 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
432 Ok(Some(self.inner.read().unwrap().last_media_cleanup_time))
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use super::{MemoryMediaStore, Result};
439 use crate::{
440 media_store_inner_integration_tests, media_store_integration_tests,
441 media_store_integration_tests_time,
442 };
443
444 async fn get_media_store() -> Result<MemoryMediaStore> {
445 Ok(MemoryMediaStore::new())
446 }
447
448 media_store_inner_integration_tests!();
449 media_store_integration_tests!();
450 media_store_integration_tests_time!();
451}