matrix_sdk_base/event_cache/store/
memory_store.rs1use std::{
16 collections::HashMap,
17 num::NonZeroUsize,
18 sync::{Arc, RwLock as StdRwLock},
19};
20
21use async_trait::async_trait;
22use matrix_sdk_common::{
23 linked_chunk::{
24 relational::RelationalLinkedChunk, ChunkIdentifier, ChunkIdentifierGenerator,
25 ChunkMetadata, LinkedChunkId, OwnedLinkedChunkId, Position, RawChunk, Update,
26 },
27 ring_buffer::RingBuffer,
28 store_locks::memory_store_helper::try_take_leased_lock,
29};
30use ruma::{
31 events::relation::RelationType,
32 time::{Instant, SystemTime},
33 EventId, MxcUri, OwnedEventId, OwnedMxcUri, RoomId,
34};
35use tracing::error;
36
37use super::{
38 compute_filters_string, extract_event_relation,
39 media::{EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService},
40 EventCacheStore, EventCacheStoreError, Result,
41};
42use crate::{
43 event_cache::{Event, Gap},
44 media::{MediaRequestParameters, UniqueKey as _},
45};
46
47#[derive(Debug, Clone)]
51pub struct MemoryStore {
52 inner: Arc<StdRwLock<MemoryStoreInner>>,
53 media_service: MediaService,
54}
55
56#[derive(Debug)]
57struct MemoryStoreInner {
58 media: RingBuffer<MediaContent>,
59 leases: HashMap<String, (String, Instant)>,
60 events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
61 media_retention_policy: Option<MediaRetentionPolicy>,
62 last_media_cleanup_time: SystemTime,
63}
64
65#[derive(Debug)]
67struct MediaContent {
68 uri: OwnedMxcUri,
70
71 key: String,
73
74 data: Vec<u8>,
76
77 ignore_policy: bool,
79
80 last_access: SystemTime,
82}
83
84const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
85
86impl Default for MemoryStore {
87 fn default() -> Self {
88 let last_media_cleanup_time = SystemTime::now();
90 let media_service = MediaService::new();
91 media_service.restore(None, Some(last_media_cleanup_time));
92
93 Self {
94 inner: Arc::new(StdRwLock::new(MemoryStoreInner {
95 media: RingBuffer::new(NUMBER_OF_MEDIAS),
96 leases: Default::default(),
97 events: RelationalLinkedChunk::new(),
98 media_retention_policy: None,
99 last_media_cleanup_time,
100 })),
101 media_service,
102 }
103 }
104}
105
106impl MemoryStore {
107 pub fn new() -> Self {
109 Self::default()
110 }
111}
112
113#[cfg_attr(target_family = "wasm", async_trait(?Send))]
114#[cfg_attr(not(target_family = "wasm"), async_trait)]
115impl EventCacheStore for MemoryStore {
116 type Error = EventCacheStoreError;
117
118 async fn try_take_leased_lock(
119 &self,
120 lease_duration_ms: u32,
121 key: &str,
122 holder: &str,
123 ) -> Result<bool, Self::Error> {
124 let mut inner = self.inner.write().unwrap();
125
126 Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
127 }
128
129 async fn handle_linked_chunk_updates(
130 &self,
131 linked_chunk_id: LinkedChunkId<'_>,
132 updates: Vec<Update<Event, Gap>>,
133 ) -> Result<(), Self::Error> {
134 let mut inner = self.inner.write().unwrap();
135 inner.events.apply_updates(linked_chunk_id, updates);
136
137 Ok(())
138 }
139
140 async fn load_all_chunks(
141 &self,
142 linked_chunk_id: LinkedChunkId<'_>,
143 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
144 let inner = self.inner.read().unwrap();
145 inner
146 .events
147 .load_all_chunks(linked_chunk_id)
148 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
149 }
150
151 async fn load_all_chunks_metadata(
152 &self,
153 linked_chunk_id: LinkedChunkId<'_>,
154 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
155 let inner = self.inner.read().unwrap();
156 inner
157 .events
158 .load_all_chunks_metadata(linked_chunk_id)
159 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
160 }
161
162 async fn load_last_chunk(
163 &self,
164 linked_chunk_id: LinkedChunkId<'_>,
165 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
166 let inner = self.inner.read().unwrap();
167 inner
168 .events
169 .load_last_chunk(linked_chunk_id)
170 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
171 }
172
173 async fn load_previous_chunk(
174 &self,
175 linked_chunk_id: LinkedChunkId<'_>,
176 before_chunk_identifier: ChunkIdentifier,
177 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
178 let inner = self.inner.read().unwrap();
179 inner
180 .events
181 .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
182 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
183 }
184
185 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
186 self.inner.write().unwrap().events.clear();
187 Ok(())
188 }
189
190 async fn filter_duplicated_events(
191 &self,
192 linked_chunk_id: LinkedChunkId<'_>,
193 mut events: Vec<OwnedEventId>,
194 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
195 if events.is_empty() {
196 return Ok(Vec::new());
197 }
198
199 let inner = self.inner.read().unwrap();
200
201 let mut duplicated_events = Vec::new();
202
203 for (event, position) in
204 inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
205 {
206 if let Some(known_event_id) = event.event_id() {
207 if let Some(index) =
209 events.iter().position(|new_event_id| &known_event_id == new_event_id)
210 {
211 duplicated_events.push((events.remove(index), position));
212 }
213 }
214 }
215
216 Ok(duplicated_events)
217 }
218
219 async fn find_event(
220 &self,
221 room_id: &RoomId,
222 event_id: &EventId,
223 ) -> Result<Option<Event>, Self::Error> {
224 let inner = self.inner.read().unwrap();
225
226 let target_linked_chunk_id = OwnedLinkedChunkId::Room(room_id.to_owned());
227
228 let event = inner
229 .events
230 .items(&target_linked_chunk_id)
231 .find_map(|(event, _pos)| (event.event_id()? == event_id).then_some(event.clone()));
232
233 Ok(event)
234 }
235
236 async fn find_event_relations(
237 &self,
238 room_id: &RoomId,
239 event_id: &EventId,
240 filters: Option<&[RelationType]>,
241 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
242 let inner = self.inner.read().unwrap();
243
244 let target_linked_chunk_id = OwnedLinkedChunkId::Room(room_id.to_owned());
245
246 let filters = compute_filters_string(filters);
247
248 let related_events = inner
249 .events
250 .items(&target_linked_chunk_id)
251 .filter_map(|(event, pos)| {
252 let (related_to, rel_type) = extract_event_relation(event.raw())?;
254
255 if related_to != event_id {
257 return None;
258 }
259
260 if let Some(filters) = &filters {
262 filters.contains(&rel_type).then_some((event.clone(), pos))
263 } else {
264 Some((event.clone(), pos))
265 }
266 })
267 .collect();
268
269 Ok(related_events)
270 }
271
272 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
273 if event.event_id().is_none() {
274 error!(%room_id, "Trying to save an event with no ID");
275 return Ok(());
276 }
277 self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
278 Ok(())
279 }
280
281 async fn add_media_content(
282 &self,
283 request: &MediaRequestParameters,
284 data: Vec<u8>,
285 ignore_policy: IgnoreMediaRetentionPolicy,
286 ) -> Result<()> {
287 self.media_service.add_media_content(self, request, data, ignore_policy).await
288 }
289
290 async fn replace_media_key(
291 &self,
292 from: &MediaRequestParameters,
293 to: &MediaRequestParameters,
294 ) -> Result<(), Self::Error> {
295 let expected_key = from.unique_key();
296
297 let mut inner = self.inner.write().unwrap();
298
299 if let Some(media_content) =
300 inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
301 {
302 media_content.uri = to.uri().to_owned();
303 media_content.key = to.unique_key();
304 }
305
306 Ok(())
307 }
308
309 async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
310 self.media_service.get_media_content(self, request).await
311 }
312
313 async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
314 let expected_key = request.unique_key();
315
316 let mut inner = self.inner.write().unwrap();
317
318 let Some(index) =
319 inner.media.iter().position(|media_content| media_content.key == expected_key)
320 else {
321 return Ok(());
322 };
323
324 inner.media.remove(index);
325
326 Ok(())
327 }
328
329 async fn get_media_content_for_uri(
330 &self,
331 uri: &MxcUri,
332 ) -> Result<Option<Vec<u8>>, Self::Error> {
333 self.media_service.get_media_content_for_uri(self, uri).await
334 }
335
336 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
337 let mut inner = self.inner.write().unwrap();
338
339 let positions = inner
340 .media
341 .iter()
342 .enumerate()
343 .filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
344 .collect::<Vec<_>>();
345
346 for position in positions.into_iter().rev() {
348 inner.media.remove(position);
349 }
350
351 Ok(())
352 }
353
354 async fn set_media_retention_policy(
355 &self,
356 policy: MediaRetentionPolicy,
357 ) -> Result<(), Self::Error> {
358 self.media_service.set_media_retention_policy(self, policy).await
359 }
360
361 fn media_retention_policy(&self) -> MediaRetentionPolicy {
362 self.media_service.media_retention_policy()
363 }
364
365 async fn set_ignore_media_retention_policy(
366 &self,
367 request: &MediaRequestParameters,
368 ignore_policy: IgnoreMediaRetentionPolicy,
369 ) -> Result<(), Self::Error> {
370 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
371 }
372
373 async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
374 self.media_service.clean_up_media_cache(self).await
375 }
376}
377
378#[cfg_attr(target_family = "wasm", async_trait(?Send))]
379#[cfg_attr(not(target_family = "wasm"), async_trait)]
380impl EventCacheStoreMedia for MemoryStore {
381 type Error = EventCacheStoreError;
382
383 async fn media_retention_policy_inner(
384 &self,
385 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
386 Ok(self.inner.read().unwrap().media_retention_policy)
387 }
388
389 async fn set_media_retention_policy_inner(
390 &self,
391 policy: MediaRetentionPolicy,
392 ) -> Result<(), Self::Error> {
393 self.inner.write().unwrap().media_retention_policy = Some(policy);
394 Ok(())
395 }
396
397 async fn add_media_content_inner(
398 &self,
399 request: &MediaRequestParameters,
400 data: Vec<u8>,
401 last_access: SystemTime,
402 policy: MediaRetentionPolicy,
403 ignore_policy: IgnoreMediaRetentionPolicy,
404 ) -> Result<(), Self::Error> {
405 self.remove_media_content(request).await?;
407
408 let ignore_policy = ignore_policy.is_yes();
409
410 if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
411 return Ok(());
413 }
414
415 let mut inner = self.inner.write().unwrap();
417 inner.media.push(MediaContent {
418 uri: request.uri().to_owned(),
419 key: request.unique_key(),
420 data,
421 ignore_policy,
422 last_access,
423 });
424
425 Ok(())
426 }
427
428 async fn set_ignore_media_retention_policy_inner(
429 &self,
430 request: &MediaRequestParameters,
431 ignore_policy: IgnoreMediaRetentionPolicy,
432 ) -> Result<(), Self::Error> {
433 let mut inner = self.inner.write().unwrap();
434 let expected_key = request.unique_key();
435
436 if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
437 {
438 media_content.ignore_policy = ignore_policy.is_yes();
439 }
440
441 Ok(())
442 }
443
444 async fn get_media_content_inner(
445 &self,
446 request: &MediaRequestParameters,
447 current_time: SystemTime,
448 ) -> Result<Option<Vec<u8>>, Self::Error> {
449 let mut inner = self.inner.write().unwrap();
450 let expected_key = request.unique_key();
451
452 let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
455 return Ok(None);
456 };
457 let Some(mut content) = inner.media.remove(index) else {
458 return Ok(None);
459 };
460
461 let data = content.data.clone();
463
464 content.last_access = current_time;
466
467 inner.media.push(content);
469
470 Ok(Some(data))
471 }
472
473 async fn get_media_content_for_uri_inner(
474 &self,
475 expected_uri: &MxcUri,
476 current_time: SystemTime,
477 ) -> Result<Option<Vec<u8>>, Self::Error> {
478 let mut inner = self.inner.write().unwrap();
479
480 let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
483 return Ok(None);
484 };
485 let Some(mut content) = inner.media.remove(index) else {
486 return Ok(None);
487 };
488
489 let data = content.data.clone();
491
492 content.last_access = current_time;
494
495 inner.media.push(content);
497
498 Ok(Some(data))
499 }
500
501 async fn clean_up_media_cache_inner(
502 &self,
503 policy: MediaRetentionPolicy,
504 current_time: SystemTime,
505 ) -> Result<(), Self::Error> {
506 if !policy.has_limitations() {
507 return Ok(());
509 }
510
511 let mut inner = self.inner.write().unwrap();
512
513 if policy.computed_max_file_size().is_some() {
515 inner.media.retain(|content| {
516 content.ignore_policy || !policy.exceeds_max_file_size(content.data.len() as u64)
517 });
518 }
519
520 if policy.last_access_expiry.is_some() {
522 inner.media.retain(|content| {
523 content.ignore_policy
524 || !policy.has_content_expired(current_time, content.last_access)
525 });
526 }
527
528 if let Some(max_cache_size) = policy.max_cache_size {
530 let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
534 (0u64, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
535 |(mut cache_size, mut items_to_remove), (index, content)| {
536 if content.ignore_policy {
537 return (cache_size, items_to_remove);
539 }
540
541 let remove_item = if items_to_remove.is_empty() {
542 if let Some(sum) = cache_size.checked_add(content.data.len() as u64) {
544 cache_size = sum;
545 cache_size > max_cache_size
547 } else {
548 true
552 }
553 } else {
554 true
556 };
557
558 if remove_item {
559 items_to_remove.push(index);
560 }
561
562 (cache_size, items_to_remove)
563 },
564 );
565
566 for index in items_to_remove {
569 inner.media.remove(index);
570 }
571 }
572
573 inner.last_media_cleanup_time = current_time;
574
575 Ok(())
576 }
577
578 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
579 Ok(Some(self.inner.read().unwrap().last_media_cleanup_time))
580 }
581}
582
583#[cfg(test)]
584mod tests {
585 use super::{MemoryStore, Result};
586 use crate::event_cache_store_media_integration_tests;
587
588 async fn get_event_cache_store() -> Result<MemoryStore> {
589 Ok(MemoryStore::new())
590 }
591
592 event_cache_store_integration_tests!();
593 event_cache_store_integration_tests_time!();
594 event_cache_store_media_integration_tests!(with_media_size_tests);
595}