matrix_sdk_base/event_cache/store/
memory_store.rs1use std::{
16 collections::HashMap,
17 num::NonZeroUsize,
18 sync::{Arc, RwLock as StdRwLock},
19};
20
21use async_trait::async_trait;
22use matrix_sdk_common::{
23 linked_chunk::{
24 relational::RelationalLinkedChunk, ChunkIdentifier, ChunkIdentifierGenerator,
25 LinkedChunkId, Position, RawChunk, Update,
26 },
27 ring_buffer::RingBuffer,
28 store_locks::memory_store_helper::try_take_leased_lock,
29};
30use ruma::{
31 events::relation::RelationType,
32 time::{Instant, SystemTime},
33 EventId, MxcUri, OwnedEventId, OwnedMxcUri, RoomId,
34};
35use tracing::error;
36
37use super::{
38 compute_filters_string, extract_event_relation,
39 media::{EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService},
40 EventCacheStore, EventCacheStoreError, Result,
41};
42use crate::{
43 event_cache::{Event, Gap},
44 media::{MediaRequestParameters, UniqueKey as _},
45};
46
47#[derive(Debug, Clone)]
51pub struct MemoryStore {
52 inner: Arc<StdRwLock<MemoryStoreInner>>,
53 media_service: MediaService,
54}
55
56#[derive(Debug)]
57struct MemoryStoreInner {
58 media: RingBuffer<MediaContent>,
59 leases: HashMap<String, (String, Instant)>,
60 events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
61 media_retention_policy: Option<MediaRetentionPolicy>,
62 last_media_cleanup_time: SystemTime,
63}
64
65#[derive(Debug)]
67struct MediaContent {
68 uri: OwnedMxcUri,
70
71 key: String,
73
74 data: Vec<u8>,
76
77 ignore_policy: bool,
79
80 last_access: SystemTime,
82}
83
84const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
85
86impl Default for MemoryStore {
87 fn default() -> Self {
88 let last_media_cleanup_time = SystemTime::now();
90 let media_service = MediaService::new();
91 media_service.restore(None, Some(last_media_cleanup_time));
92
93 Self {
94 inner: Arc::new(StdRwLock::new(MemoryStoreInner {
95 media: RingBuffer::new(NUMBER_OF_MEDIAS),
96 leases: Default::default(),
97 events: RelationalLinkedChunk::new(),
98 media_retention_policy: None,
99 last_media_cleanup_time,
100 })),
101 media_service,
102 }
103 }
104}
105
106impl MemoryStore {
107 pub fn new() -> Self {
109 Self::default()
110 }
111}
112
113#[cfg_attr(target_family = "wasm", async_trait(?Send))]
114#[cfg_attr(not(target_family = "wasm"), async_trait)]
115impl EventCacheStore for MemoryStore {
116 type Error = EventCacheStoreError;
117
118 async fn try_take_leased_lock(
119 &self,
120 lease_duration_ms: u32,
121 key: &str,
122 holder: &str,
123 ) -> Result<bool, Self::Error> {
124 let mut inner = self.inner.write().unwrap();
125
126 Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
127 }
128
129 async fn handle_linked_chunk_updates(
130 &self,
131 linked_chunk_id: LinkedChunkId<'_>,
132 updates: Vec<Update<Event, Gap>>,
133 ) -> Result<(), Self::Error> {
134 let mut inner = self.inner.write().unwrap();
135 inner.events.apply_updates(linked_chunk_id, updates);
136
137 Ok(())
138 }
139
140 async fn load_all_chunks(
141 &self,
142 linked_chunk_id: LinkedChunkId<'_>,
143 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
144 let inner = self.inner.read().unwrap();
145 inner
146 .events
147 .load_all_chunks(linked_chunk_id)
148 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
149 }
150
151 async fn load_last_chunk(
152 &self,
153 linked_chunk_id: LinkedChunkId<'_>,
154 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
155 let inner = self.inner.read().unwrap();
156 inner
157 .events
158 .load_last_chunk(linked_chunk_id)
159 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
160 }
161
162 async fn load_previous_chunk(
163 &self,
164 linked_chunk_id: LinkedChunkId<'_>,
165 before_chunk_identifier: ChunkIdentifier,
166 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
167 let inner = self.inner.read().unwrap();
168 inner
169 .events
170 .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
171 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
172 }
173
174 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
175 self.inner.write().unwrap().events.clear();
176 Ok(())
177 }
178
179 async fn filter_duplicated_events(
180 &self,
181 linked_chunk_id: LinkedChunkId<'_>,
182 mut events: Vec<OwnedEventId>,
183 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
184 let inner = self.inner.read().unwrap();
186
187 let mut duplicated_events = Vec::new();
188
189 for (event, position) in inner.events.unordered_linked_chunk_items(linked_chunk_id) {
190 if events.is_empty() {
192 break;
193 }
194
195 if let Some(known_event_id) = event.event_id() {
196 if let Some(index) =
198 events.iter().position(|new_event_id| &known_event_id == new_event_id)
199 {
200 duplicated_events.push((events.remove(index), position));
201 }
202 }
203 }
204
205 Ok(duplicated_events)
206 }
207
208 async fn find_event(
209 &self,
210 room_id: &RoomId,
211 event_id: &EventId,
212 ) -> Result<Option<Event>, Self::Error> {
213 let inner = self.inner.read().unwrap();
214
215 let event = inner.events.items().find_map(|(event, this_linked_chunk_id)| {
216 (room_id == this_linked_chunk_id.room_id() && event.event_id()? == event_id)
217 .then_some(event.clone())
218 });
219
220 Ok(event)
221 }
222
223 async fn find_event_relations(
224 &self,
225 room_id: &RoomId,
226 event_id: &EventId,
227 filters: Option<&[RelationType]>,
228 ) -> Result<Vec<Event>, Self::Error> {
229 let inner = self.inner.read().unwrap();
230
231 let filters = compute_filters_string(filters);
232
233 let related_events = inner
234 .events
235 .items()
236 .filter_map(|(event, this_linked_chunk_id)| {
237 if room_id != this_linked_chunk_id.room_id() {
239 return None;
240 }
241
242 let (related_to, rel_type) = extract_event_relation(event.raw())?;
244
245 if related_to != event_id {
247 return None;
248 }
249
250 if let Some(filters) = &filters {
252 filters.contains(&rel_type).then_some(event.clone())
253 } else {
254 Some(event.clone())
255 }
256 })
257 .collect();
258
259 Ok(related_events)
260 }
261
262 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
263 if event.event_id().is_none() {
264 error!(%room_id, "Trying to save an event with no ID");
265 return Ok(());
266 }
267 self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
268 Ok(())
269 }
270
271 async fn add_media_content(
272 &self,
273 request: &MediaRequestParameters,
274 data: Vec<u8>,
275 ignore_policy: IgnoreMediaRetentionPolicy,
276 ) -> Result<()> {
277 self.media_service.add_media_content(self, request, data, ignore_policy).await
278 }
279
280 async fn replace_media_key(
281 &self,
282 from: &MediaRequestParameters,
283 to: &MediaRequestParameters,
284 ) -> Result<(), Self::Error> {
285 let expected_key = from.unique_key();
286
287 let mut inner = self.inner.write().unwrap();
288
289 if let Some(media_content) =
290 inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
291 {
292 media_content.uri = to.uri().to_owned();
293 media_content.key = to.unique_key();
294 }
295
296 Ok(())
297 }
298
299 async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
300 self.media_service.get_media_content(self, request).await
301 }
302
303 async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
304 let expected_key = request.unique_key();
305
306 let mut inner = self.inner.write().unwrap();
307
308 let Some(index) =
309 inner.media.iter().position(|media_content| media_content.key == expected_key)
310 else {
311 return Ok(());
312 };
313
314 inner.media.remove(index);
315
316 Ok(())
317 }
318
319 async fn get_media_content_for_uri(
320 &self,
321 uri: &MxcUri,
322 ) -> Result<Option<Vec<u8>>, Self::Error> {
323 self.media_service.get_media_content_for_uri(self, uri).await
324 }
325
326 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
327 let mut inner = self.inner.write().unwrap();
328
329 let positions = inner
330 .media
331 .iter()
332 .enumerate()
333 .filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
334 .collect::<Vec<_>>();
335
336 for position in positions.into_iter().rev() {
338 inner.media.remove(position);
339 }
340
341 Ok(())
342 }
343
344 async fn set_media_retention_policy(
345 &self,
346 policy: MediaRetentionPolicy,
347 ) -> Result<(), Self::Error> {
348 self.media_service.set_media_retention_policy(self, policy).await
349 }
350
351 fn media_retention_policy(&self) -> MediaRetentionPolicy {
352 self.media_service.media_retention_policy()
353 }
354
355 async fn set_ignore_media_retention_policy(
356 &self,
357 request: &MediaRequestParameters,
358 ignore_policy: IgnoreMediaRetentionPolicy,
359 ) -> Result<(), Self::Error> {
360 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
361 }
362
363 async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
364 self.media_service.clean_up_media_cache(self).await
365 }
366}
367
368#[cfg_attr(target_family = "wasm", async_trait(?Send))]
369#[cfg_attr(not(target_family = "wasm"), async_trait)]
370impl EventCacheStoreMedia for MemoryStore {
371 type Error = EventCacheStoreError;
372
373 async fn media_retention_policy_inner(
374 &self,
375 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
376 Ok(self.inner.read().unwrap().media_retention_policy)
377 }
378
379 async fn set_media_retention_policy_inner(
380 &self,
381 policy: MediaRetentionPolicy,
382 ) -> Result<(), Self::Error> {
383 self.inner.write().unwrap().media_retention_policy = Some(policy);
384 Ok(())
385 }
386
387 async fn add_media_content_inner(
388 &self,
389 request: &MediaRequestParameters,
390 data: Vec<u8>,
391 last_access: SystemTime,
392 policy: MediaRetentionPolicy,
393 ignore_policy: IgnoreMediaRetentionPolicy,
394 ) -> Result<(), Self::Error> {
395 self.remove_media_content(request).await?;
397
398 let ignore_policy = ignore_policy.is_yes();
399
400 if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
401 return Ok(());
403 };
404
405 let mut inner = self.inner.write().unwrap();
407 inner.media.push(MediaContent {
408 uri: request.uri().to_owned(),
409 key: request.unique_key(),
410 data,
411 ignore_policy,
412 last_access,
413 });
414
415 Ok(())
416 }
417
418 async fn set_ignore_media_retention_policy_inner(
419 &self,
420 request: &MediaRequestParameters,
421 ignore_policy: IgnoreMediaRetentionPolicy,
422 ) -> Result<(), Self::Error> {
423 let mut inner = self.inner.write().unwrap();
424 let expected_key = request.unique_key();
425
426 if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
427 {
428 media_content.ignore_policy = ignore_policy.is_yes();
429 }
430
431 Ok(())
432 }
433
434 async fn get_media_content_inner(
435 &self,
436 request: &MediaRequestParameters,
437 current_time: SystemTime,
438 ) -> Result<Option<Vec<u8>>, Self::Error> {
439 let mut inner = self.inner.write().unwrap();
440 let expected_key = request.unique_key();
441
442 let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
445 return Ok(None);
446 };
447 let Some(mut content) = inner.media.remove(index) else {
448 return Ok(None);
449 };
450
451 let data = content.data.clone();
453
454 content.last_access = current_time;
456
457 inner.media.push(content);
459
460 Ok(Some(data))
461 }
462
463 async fn get_media_content_for_uri_inner(
464 &self,
465 expected_uri: &MxcUri,
466 current_time: SystemTime,
467 ) -> Result<Option<Vec<u8>>, Self::Error> {
468 let mut inner = self.inner.write().unwrap();
469
470 let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
473 return Ok(None);
474 };
475 let Some(mut content) = inner.media.remove(index) else {
476 return Ok(None);
477 };
478
479 let data = content.data.clone();
481
482 content.last_access = current_time;
484
485 inner.media.push(content);
487
488 Ok(Some(data))
489 }
490
491 async fn clean_up_media_cache_inner(
492 &self,
493 policy: MediaRetentionPolicy,
494 current_time: SystemTime,
495 ) -> Result<(), Self::Error> {
496 if !policy.has_limitations() {
497 return Ok(());
499 }
500
501 let mut inner = self.inner.write().unwrap();
502
503 if policy.computed_max_file_size().is_some() {
505 inner.media.retain(|content| {
506 content.ignore_policy || !policy.exceeds_max_file_size(content.data.len() as u64)
507 });
508 }
509
510 if policy.last_access_expiry.is_some() {
512 inner.media.retain(|content| {
513 content.ignore_policy
514 || !policy.has_content_expired(current_time, content.last_access)
515 });
516 }
517
518 if let Some(max_cache_size) = policy.max_cache_size {
520 let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
524 (0u64, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
525 |(mut cache_size, mut items_to_remove), (index, content)| {
526 if content.ignore_policy {
527 return (cache_size, items_to_remove);
529 }
530
531 let remove_item = if items_to_remove.is_empty() {
532 if let Some(sum) = cache_size.checked_add(content.data.len() as u64) {
534 cache_size = sum;
535 cache_size > max_cache_size
537 } else {
538 true
542 }
543 } else {
544 true
546 };
547
548 if remove_item {
549 items_to_remove.push(index);
550 }
551
552 (cache_size, items_to_remove)
553 },
554 );
555
556 for index in items_to_remove {
559 inner.media.remove(index);
560 }
561 }
562
563 inner.last_media_cleanup_time = current_time;
564
565 Ok(())
566 }
567
568 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
569 Ok(Some(self.inner.read().unwrap().last_media_cleanup_time))
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use super::{MemoryStore, Result};
576 use crate::event_cache_store_media_integration_tests;
577
578 async fn get_event_cache_store() -> Result<MemoryStore> {
579 Ok(MemoryStore::new())
580 }
581
582 event_cache_store_integration_tests!();
583 event_cache_store_integration_tests_time!();
584 event_cache_store_media_integration_tests!(with_media_size_tests);
585}