1use std::{borrow::Cow, fmt, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22 event_cache::{
23 store::{
24 media::{
25 EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
26 MediaService,
27 },
28 EventCacheStore,
29 },
30 Event, Gap,
31 },
32 linked_chunk::{ChunkContent, ChunkIdentifier, RawChunk, Update},
33 media::{MediaRequestParameters, UniqueKey},
34};
35use matrix_sdk_store_encryption::StoreCipher;
36use ruma::{time::SystemTime, MilliSecondsSinceUnixEpoch, MxcUri, RoomId};
37use rusqlite::{params_from_iter, OptionalExtension, Transaction, TransactionBehavior};
38use tokio::fs;
39#[cfg(not(test))]
40use tracing::warn;
41use tracing::{debug, trace};
42
43use crate::{
44 error::{Error, Result},
45 utils::{
46 repeat_vars, time_to_timestamp, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47 SqliteKeyValueStoreConnExt, SqliteTransactionExt,
48 },
49 OpenStoreError,
50};
51
52mod keys {
53 pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
55
56 pub const LINKED_CHUNKS: &str = "linked_chunks";
58 pub const MEDIA: &str = "media";
59}
60
61const DATABASE_VERSION: u8 = 4;
67
68const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
71const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
74
75#[derive(Clone)]
77pub struct SqliteEventCacheStore {
78 store_cipher: Option<Arc<StoreCipher>>,
79 pool: SqlitePool,
80 media_service: Arc<MediaService>,
81}
82
83#[cfg(not(tarpaulin_include))]
84impl fmt::Debug for SqliteEventCacheStore {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
87 }
88}
89
90impl SqliteEventCacheStore {
91 pub async fn open(
94 path: impl AsRef<Path>,
95 passphrase: Option<&str>,
96 ) -> Result<Self, OpenStoreError> {
97 let pool = create_pool(path.as_ref()).await?;
98
99 Self::open_with_pool(pool, passphrase).await
100 }
101
102 pub async fn open_with_pool(
105 pool: SqlitePool,
106 passphrase: Option<&str>,
107 ) -> Result<Self, OpenStoreError> {
108 let conn = pool.get().await?;
109 conn.set_journal_size_limit().await?;
110
111 let version = conn.db_version().await?;
112 run_migrations(&conn, version).await?;
113 conn.optimize().await?;
114
115 let store_cipher = match passphrase {
116 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
117 None => None,
118 };
119
120 let media_service = MediaService::new();
121 let media_retention_policy = media_retention_policy(&conn).await?;
122 media_service.restore(media_retention_policy);
123
124 Ok(Self { store_cipher, pool, media_service: Arc::new(media_service) })
125 }
126
127 fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
128 if let Some(key) = &self.store_cipher {
129 let encrypted = key.encrypt_value_data(value)?;
130 Ok(rmp_serde::to_vec_named(&encrypted)?)
131 } else {
132 Ok(value)
133 }
134 }
135
136 fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
137 if let Some(key) = &self.store_cipher {
138 let encrypted = rmp_serde::from_slice(value)?;
139 let decrypted = key.decrypt_value_data(encrypted)?;
140 Ok(Cow::Owned(decrypted))
141 } else {
142 Ok(Cow::Borrowed(value))
143 }
144 }
145
146 fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
147 let bytes = key.as_ref();
148 if let Some(store_cipher) = &self.store_cipher {
149 Key::Hashed(store_cipher.hash_key(table_name, bytes))
150 } else {
151 Key::Plain(bytes.to_owned())
152 }
153 }
154
155 async fn acquire(&self) -> Result<SqliteAsyncConn> {
156 Ok(self.pool.get().await?)
157 }
158
159 fn map_row_to_chunk(
160 row: &rusqlite::Row<'_>,
161 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
162 Ok((
163 row.get::<_, u64>(0)?,
164 row.get::<_, Option<u64>>(1)?,
165 row.get::<_, Option<u64>>(2)?,
166 row.get::<_, String>(3)?,
167 ))
168 }
169}
170
171trait TransactionExtForLinkedChunks {
172 fn rebuild_chunk(
173 &self,
174 store: &SqliteEventCacheStore,
175 room_id: &Key,
176 previous: Option<u64>,
177 index: u64,
178 next: Option<u64>,
179 chunk_type: &str,
180 ) -> Result<RawChunk<Event, Gap>>;
181
182 fn load_gap_content(
183 &self,
184 store: &SqliteEventCacheStore,
185 room_id: &Key,
186 chunk_id: ChunkIdentifier,
187 ) -> Result<Gap>;
188
189 fn load_events_content(
190 &self,
191 store: &SqliteEventCacheStore,
192 room_id: &Key,
193 chunk_id: ChunkIdentifier,
194 ) -> Result<Vec<Event>>;
195}
196
197impl TransactionExtForLinkedChunks for Transaction<'_> {
198 fn rebuild_chunk(
199 &self,
200 store: &SqliteEventCacheStore,
201 room_id: &Key,
202 previous: Option<u64>,
203 id: u64,
204 next: Option<u64>,
205 chunk_type: &str,
206 ) -> Result<RawChunk<Event, Gap>> {
207 let previous = previous.map(ChunkIdentifier::new);
208 let next = next.map(ChunkIdentifier::new);
209 let id = ChunkIdentifier::new(id);
210
211 match chunk_type {
212 CHUNK_TYPE_GAP_TYPE_STRING => {
213 let gap = self.load_gap_content(store, room_id, id)?;
216 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
217 }
218
219 CHUNK_TYPE_EVENT_TYPE_STRING => {
220 let events = self.load_events_content(store, room_id, id)?;
222 Ok(RawChunk {
223 content: ChunkContent::Items(events),
224 previous,
225 identifier: id,
226 next,
227 })
228 }
229
230 other => {
231 Err(Error::InvalidData {
233 details: format!("a linked chunk has an unknown type {other}"),
234 })
235 }
236 }
237 }
238
239 fn load_gap_content(
240 &self,
241 store: &SqliteEventCacheStore,
242 room_id: &Key,
243 chunk_id: ChunkIdentifier,
244 ) -> Result<Gap> {
245 let encoded_prev_token: Vec<u8> = self.query_row(
246 "SELECT prev_token FROM gaps WHERE chunk_id = ? AND room_id = ?",
247 (chunk_id.index(), &room_id),
248 |row| row.get(0),
249 )?;
250 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
251 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
252 Ok(Gap { prev_token })
253 }
254
255 fn load_events_content(
256 &self,
257 store: &SqliteEventCacheStore,
258 room_id: &Key,
259 chunk_id: ChunkIdentifier,
260 ) -> Result<Vec<Event>> {
261 let mut events = Vec::new();
263
264 for event_data in self
265 .prepare(
266 r#"
267 SELECT content FROM events
268 WHERE chunk_id = ? AND room_id = ?
269 ORDER BY position ASC
270 "#,
271 )?
272 .query_map((chunk_id.index(), &room_id), |row| row.get::<_, Vec<u8>>(0))?
273 {
274 let encoded_content = event_data?;
275 let serialized_content = store.decode_value(&encoded_content)?;
276 let sync_timeline_event = serde_json::from_slice(&serialized_content)?;
277
278 events.push(sync_timeline_event);
279 }
280
281 Ok(events)
282 }
283}
284
285async fn create_pool(path: &Path) -> Result<SqlitePool, OpenStoreError> {
286 fs::create_dir_all(path).await.map_err(OpenStoreError::CreateDir)?;
287 let cfg = deadpool_sqlite::Config::new(path.join("matrix-sdk-event-cache.sqlite3"));
288 Ok(cfg.create_pool(Runtime::Tokio1)?)
289}
290
291async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
293 if version == 0 {
294 debug!("Creating database");
295 } else if version < DATABASE_VERSION {
296 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
297 } else {
298 return Ok(());
299 }
300
301 if version < 1 {
302 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
305 conn.with_transaction(|txn| {
306 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
307 txn.set_db_version(1)
308 })
309 .await?;
310 }
311
312 if version < 2 {
313 conn.with_transaction(|txn| {
314 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
315 txn.set_db_version(2)
316 })
317 .await?;
318 }
319
320 if version < 3 {
321 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
323
324 conn.with_transaction(|txn| {
325 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
326 txn.set_db_version(3)
327 })
328 .await?;
329 }
330
331 if version < 4 {
332 conn.with_transaction(|txn| {
333 txn.execute_batch(include_str!(
334 "../migrations/event_cache_store/004_ignore_policy.sql"
335 ))?;
336 txn.set_db_version(4)
337 })
338 .await?;
339 }
340
341 Ok(())
342}
343
344#[async_trait]
345impl EventCacheStore for SqliteEventCacheStore {
346 type Error = Error;
347
348 async fn try_take_leased_lock(
349 &self,
350 lease_duration_ms: u32,
351 key: &str,
352 holder: &str,
353 ) -> Result<bool> {
354 let key = key.to_owned();
355 let holder = holder.to_owned();
356
357 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
358 let expiration = now + lease_duration_ms as u64;
359
360 let num_touched = self
361 .acquire()
362 .await?
363 .with_transaction(move |txn| {
364 txn.execute(
365 "INSERT INTO lease_locks (key, holder, expiration)
366 VALUES (?1, ?2, ?3)
367 ON CONFLICT (key)
368 DO
369 UPDATE SET holder = ?2, expiration = ?3
370 WHERE holder = ?2
371 OR expiration < ?4
372 ",
373 (key, holder, expiration, now),
374 )
375 })
376 .await?;
377
378 Ok(num_touched == 1)
379 }
380
381 async fn handle_linked_chunk_updates(
382 &self,
383 room_id: &RoomId,
384 updates: Vec<Update<Event, Gap>>,
385 ) -> Result<(), Self::Error> {
386 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
389 let room_id = room_id.to_owned();
390 let this = self.clone();
391
392 with_immediate_transaction(self.acquire().await?, move |txn| {
393 for up in updates {
394 match up {
395 Update::NewItemsChunk { previous, new, next } => {
396 let previous = previous.as_ref().map(ChunkIdentifier::index);
397 let new = new.index();
398 let next = next.as_ref().map(ChunkIdentifier::index);
399
400 trace!(
401 %room_id,
402 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
403 );
404
405 insert_chunk(
406 txn,
407 &hashed_room_id,
408 previous,
409 new,
410 next,
411 CHUNK_TYPE_EVENT_TYPE_STRING,
412 )?;
413 }
414
415 Update::NewGapChunk { previous, new, next, gap } => {
416 let serialized = serde_json::to_vec(&gap.prev_token)?;
417 let prev_token = this.encode_value(serialized)?;
418
419 let previous = previous.as_ref().map(ChunkIdentifier::index);
420 let new = new.index();
421 let next = next.as_ref().map(ChunkIdentifier::index);
422
423 trace!(
424 %room_id,
425 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
426 );
427
428 insert_chunk(
430 txn,
431 &hashed_room_id,
432 previous,
433 new,
434 next,
435 CHUNK_TYPE_GAP_TYPE_STRING,
436 )?;
437
438 txn.execute(
440 r#"
441 INSERT INTO gaps(chunk_id, room_id, prev_token)
442 VALUES (?, ?, ?)
443 "#,
444 (new, &hashed_room_id, prev_token),
445 )?;
446 }
447
448 Update::RemoveChunk(chunk_identifier) => {
449 let chunk_id = chunk_identifier.index();
450
451 trace!(%room_id, "removing chunk @ {chunk_id}");
452
453 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
455 "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?",
456 (chunk_id, &hashed_room_id),
457 |row| Ok((row.get(0)?, row.get(1)?))
458 )?;
459
460 if let Some(previous) = previous {
462 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?;
463 }
464
465 if let Some(next) = next {
467 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?;
468 }
469
470 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, &hashed_room_id))?;
473 }
474
475 Update::PushItems { at, items } => {
476 let chunk_id = at.chunk_identifier().index();
477
478 trace!(%room_id, "pushing {} items @ {chunk_id}", items.len());
479
480 for (i, event) in items.into_iter().enumerate() {
481 let serialized = serde_json::to_vec(&event)?;
482 let content = this.encode_value(serialized)?;
483
484 let event_id = event.event_id().map(|event_id| event_id.to_string());
485 let index = at.index() + i;
486
487 txn.execute(
488 r#"
489 INSERT INTO events(chunk_id, room_id, event_id, content, position)
490 VALUES (?, ?, ?, ?, ?)
491 "#,
492 (chunk_id, &hashed_room_id, event_id, content, index),
493 )?;
494 }
495 }
496
497 Update::ReplaceItem { at, item: event } => {
498 let chunk_id = at.chunk_identifier().index();
499 let index = at.index();
500
501 trace!(%room_id, "replacing item @ {chunk_id}:{index}");
502
503 let serialized = serde_json::to_vec(&event)?;
504 let content = this.encode_value(serialized)?;
505
506 let event_id = event.event_id().map(|event_id| event_id.to_string());
508
509 txn.execute(
510 r#"
511 UPDATE events
512 SET content = ?, event_id = ?
513 WHERE room_id = ? AND chunk_id = ? AND position = ?
514 "#,
515 (content, event_id, &hashed_room_id, chunk_id, index,)
516 )?;
517 }
518
519 Update::RemoveItem { at } => {
520 let chunk_id = at.chunk_identifier().index();
521 let index = at.index();
522
523 trace!(%room_id, "removing item @ {chunk_id}:{index}");
524
525 txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?;
527
528 txn.execute(
530 r#"
531 UPDATE events
532 SET position = position - 1
533 WHERE room_id = ? AND chunk_id = ? AND position > ?
534 "#,
535 (&hashed_room_id, chunk_id, index)
536 )?;
537
538 }
539
540 Update::DetachLastItems { at } => {
541 let chunk_id = at.chunk_identifier().index();
542 let index = at.index();
543
544 trace!(%room_id, "truncating items >= {chunk_id}:{index}");
545
546 txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?;
548 }
549
550 Update::Clear => {
551 trace!(%room_id, "clearing items");
552
553 txn.execute(
555 "DELETE FROM linked_chunks WHERE room_id = ?",
556 (&hashed_room_id,),
557 )?;
558 }
559
560 Update::StartReattachItems | Update::EndReattachItems => {
561 }
563 }
564 }
565
566 Ok(())
567 })
568 .await?;
569
570 Ok(())
571 }
572
573 async fn reload_linked_chunk(
574 &self,
575 room_id: &RoomId,
576 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
577 let room_id = room_id.to_owned();
578 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
579
580 let this = self.clone();
581
582 let result = self
583 .acquire()
584 .await?
585 .with_transaction(move |txn| -> Result<_> {
586 let mut items = Vec::new();
587
588 for data in txn
590 .prepare(
591 "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? ORDER BY id",
592 )?
593 .query_map((&hashed_room_id,), Self::map_row_to_chunk)?
594 {
595 let (id, previous, next, chunk_type) = data?;
596 let new = txn.rebuild_chunk(
597 &this,
598 &hashed_room_id,
599 previous,
600 id,
601 next,
602 chunk_type.as_str(),
603 )?;
604 items.push(new);
605 }
606
607 Ok(items)
608 })
609 .await?;
610
611 Ok(result)
612 }
613
614 async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error> {
615 self.acquire()
616 .await?
617 .with_transaction(move |txn| {
618 txn.execute("DELETE FROM linked_chunks", ())
620 })
621 .await?;
622 Ok(())
623 }
624
625 async fn add_media_content(
626 &self,
627 request: &MediaRequestParameters,
628 content: Vec<u8>,
629 ignore_policy: IgnoreMediaRetentionPolicy,
630 ) -> Result<()> {
631 self.media_service.add_media_content(self, request, content, ignore_policy).await
632 }
633
634 async fn replace_media_key(
635 &self,
636 from: &MediaRequestParameters,
637 to: &MediaRequestParameters,
638 ) -> Result<(), Self::Error> {
639 let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
640 let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
641
642 let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
643 let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
644
645 let conn = self.acquire().await?;
646 conn.execute(
647 r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
648 (new_uri, new_format, prev_uri, prev_format),
649 )
650 .await?;
651
652 Ok(())
653 }
654
655 async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
656 self.media_service.get_media_content(self, request).await
657 }
658
659 async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
660 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
661 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
662
663 let conn = self.acquire().await?;
664 conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
665
666 Ok(())
667 }
668
669 async fn get_media_content_for_uri(
670 &self,
671 uri: &MxcUri,
672 ) -> Result<Option<Vec<u8>>, Self::Error> {
673 self.media_service.get_media_content_for_uri(self, uri).await
674 }
675
676 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
677 let uri = self.encode_key(keys::MEDIA, uri);
678
679 let conn = self.acquire().await?;
680 conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
681
682 Ok(())
683 }
684
685 async fn set_media_retention_policy(
686 &self,
687 policy: MediaRetentionPolicy,
688 ) -> Result<(), Self::Error> {
689 self.media_service.set_media_retention_policy(self, policy).await
690 }
691
692 fn media_retention_policy(&self) -> MediaRetentionPolicy {
693 self.media_service.media_retention_policy()
694 }
695
696 async fn set_ignore_media_retention_policy(
697 &self,
698 request: &MediaRequestParameters,
699 ignore_policy: IgnoreMediaRetentionPolicy,
700 ) -> Result<(), Self::Error> {
701 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
702 }
703
704 async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
705 self.media_service.clean_up_media_cache(self).await
706 }
707}
708
709#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
710#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
711impl EventCacheStoreMedia for SqliteEventCacheStore {
712 type Error = Error;
713
714 async fn media_retention_policy_inner(
715 &self,
716 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
717 let conn = self.acquire().await?;
718 media_retention_policy(&conn).await
719 }
720
721 async fn set_media_retention_policy_inner(
722 &self,
723 policy: MediaRetentionPolicy,
724 ) -> Result<(), Self::Error> {
725 let conn = self.acquire().await?;
726
727 let serialized_policy = rmp_serde::to_vec_named(&policy)?;
728 conn.set_kv(keys::MEDIA_RETENTION_POLICY, serialized_policy).await?;
729
730 Ok(())
731 }
732
733 async fn add_media_content_inner(
734 &self,
735 request: &MediaRequestParameters,
736 data: Vec<u8>,
737 last_access: SystemTime,
738 policy: MediaRetentionPolicy,
739 ignore_policy: IgnoreMediaRetentionPolicy,
740 ) -> Result<(), Self::Error> {
741 let ignore_policy = ignore_policy.is_yes();
742 let data = self.encode_value(data)?;
743
744 if !ignore_policy && policy.exceeds_max_file_size(data.len()) {
745 return Ok(());
746 }
747
748 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
749 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
750 let timestamp = time_to_timestamp(last_access);
751
752 let conn = self.acquire().await?;
753 conn.execute(
754 "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
755 (uri, format, data, timestamp, ignore_policy),
756 )
757 .await?;
758
759 Ok(())
760 }
761
762 async fn set_ignore_media_retention_policy_inner(
763 &self,
764 request: &MediaRequestParameters,
765 ignore_policy: IgnoreMediaRetentionPolicy,
766 ) -> Result<(), Self::Error> {
767 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
768 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
769 let ignore_policy = ignore_policy.is_yes();
770
771 let conn = self.acquire().await?;
772 conn.execute(
773 r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
774 (ignore_policy, uri, format),
775 )
776 .await?;
777
778 Ok(())
779 }
780
781 async fn get_media_content_inner(
782 &self,
783 request: &MediaRequestParameters,
784 current_time: SystemTime,
785 ) -> Result<Option<Vec<u8>>, Self::Error> {
786 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
787 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
788 let timestamp = time_to_timestamp(current_time);
789
790 let conn = self.acquire().await?;
791 let data = conn
792 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
793 txn.execute(
797 "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
798 (timestamp, &uri, &format),
799 )?;
800
801 txn.query_row::<Vec<u8>, _, _>(
802 "SELECT data FROM media WHERE uri = ? AND format = ?",
803 (&uri, &format),
804 |row| row.get(0),
805 )
806 .optional()
807 })
808 .await?;
809
810 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
811 }
812
813 async fn get_media_content_for_uri_inner(
814 &self,
815 uri: &MxcUri,
816 current_time: SystemTime,
817 ) -> Result<Option<Vec<u8>>, Self::Error> {
818 let uri = self.encode_key(keys::MEDIA, uri);
819 let timestamp = time_to_timestamp(current_time);
820
821 let conn = self.acquire().await?;
822 let data = conn
823 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
824 txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
828
829 txn.query_row::<Vec<u8>, _, _>(
830 "SELECT data FROM media WHERE uri = ?",
831 (&uri,),
832 |row| row.get(0),
833 )
834 .optional()
835 })
836 .await?;
837
838 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
839 }
840
841 async fn clean_up_media_cache_inner(
842 &self,
843 policy: MediaRetentionPolicy,
844 current_time: SystemTime,
845 ) -> Result<(), Self::Error> {
846 if !policy.has_limitations() {
847 return Ok(());
849 }
850
851 let conn = self.acquire().await?;
852 let removed = conn
853 .with_transaction::<_, Error, _>(move |txn| {
854 let mut removed = false;
855
856 if let Some(max_file_size) = policy.computed_max_file_size() {
858 let count = txn.execute(
859 "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
860 (max_file_size,),
861 )?;
862
863 if count > 0 {
864 removed = true;
865 }
866 }
867
868 if let Some(last_access_expiry) = policy.last_access_expiry {
870 let current_timestamp = time_to_timestamp(current_time);
871 let expiry_secs = last_access_expiry.as_secs();
872 let count = txn.execute(
873 "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
874 (current_timestamp, expiry_secs),
875 )?;
876
877 if count > 0 {
878 removed = true;
879 }
880 }
881
882 if let Some(max_cache_size) = policy.max_cache_size {
884 let cache_size_int = txn
887 .query_row(
888 "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
889 (),
890 |row| {
891 row.get::<_, Option<i64>>(0)
893 },
894 )?
895 .unwrap_or_default();
896 let cache_size_usize = usize::try_from(cache_size_int);
897
898 if cache_size_usize.is_err()
900 || cache_size_usize.is_ok_and(|cache_size| cache_size > max_cache_size)
901 {
902 let mut cached_stmt = txn.prepare_cached(
904 "SELECT rowid, length(data) FROM media \
905 WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
906 )?;
907 let content_sizes = cached_stmt
908 .query(())?
909 .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, usize>(1)?)));
910
911 let mut accumulated_items_size = 0usize;
912 let mut limit_reached = false;
913 let mut rows_to_remove = Vec::new();
914
915 for result in content_sizes {
916 let (row_id, size) = match result {
917 Ok(content_size) => content_size,
918 Err(error) => {
919 return Err(error.into());
920 }
921 };
922
923 if limit_reached {
924 rows_to_remove.push(row_id);
925 continue;
926 }
927
928 match accumulated_items_size.checked_add(size) {
929 Some(acc) if acc > max_cache_size => {
930 limit_reached = true;
932 rows_to_remove.push(row_id);
933 }
934 Some(acc) => accumulated_items_size = acc,
935 None => {
936 limit_reached = true;
939 rows_to_remove.push(row_id);
940 }
941 };
942 }
943
944 if !rows_to_remove.is_empty() {
945 removed = true;
946 }
947
948 txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
949 let sql_params = repeat_vars(row_ids.len());
950 let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
951 txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
952 Ok(Vec::<()>::new())
953 })?;
954 }
955 }
956
957 Ok(removed)
958 })
959 .await?;
960
961 if removed {
964 if let Err(error) = conn.execute("VACUUM", ()).await {
965 #[cfg(not(test))]
968 warn!("Failed to vacuum database: {error}");
969
970 #[cfg(test)]
972 return Err(error.into());
973 }
974 }
975
976 Ok(())
977 }
978}
979
980async fn with_immediate_transaction<
985 T: Send + 'static,
986 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
987>(
988 conn: SqliteAsyncConn,
989 f: F,
990) -> Result<T, Error> {
991 conn.interact(move |conn| -> Result<T, Error> {
992 conn.set_transaction_behavior(TransactionBehavior::Immediate);
996
997 let code = || -> Result<T, Error> {
998 let txn = conn.transaction()?;
999 let res = f(&txn)?;
1000 txn.commit()?;
1001 Ok(res)
1002 };
1003
1004 let res = code();
1005
1006 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1009
1010 res
1011 })
1012 .await
1013 .unwrap()
1015}
1016
1017fn insert_chunk(
1018 txn: &Transaction<'_>,
1019 room_id: &Key,
1020 previous: Option<u64>,
1021 new: u64,
1022 next: Option<u64>,
1023 type_str: &str,
1024) -> rusqlite::Result<()> {
1025 txn.execute(
1027 r#"
1028 INSERT INTO linked_chunks(id, room_id, previous, next, type)
1029 VALUES (?, ?, ?, ?, ?)
1030 "#,
1031 (new, room_id, previous, next, type_str),
1032 )?;
1033
1034 if let Some(previous) = previous {
1036 txn.execute(
1037 r#"
1038 UPDATE linked_chunks
1039 SET next = ?
1040 WHERE id = ? AND room_id = ?
1041 "#,
1042 (new, previous, room_id),
1043 )?;
1044 }
1045
1046 if let Some(next) = next {
1048 txn.execute(
1049 r#"
1050 UPDATE linked_chunks
1051 SET previous = ?
1052 WHERE id = ? AND room_id = ?
1053 "#,
1054 (new, next, room_id),
1055 )?;
1056 }
1057
1058 Ok(())
1059}
1060
1061async fn media_retention_policy(
1063 conn: &SqliteAsyncConn,
1064) -> Result<Option<MediaRetentionPolicy>, Error> {
1065 let Some(bytes) = conn.get_kv(keys::MEDIA_RETENTION_POLICY).await? else {
1066 return Ok(None);
1067 };
1068
1069 Ok(Some(rmp_serde::from_slice(&bytes)?))
1070}
1071
1072#[cfg(test)]
1073mod tests {
1074 use std::{
1075 sync::atomic::{AtomicU32, Ordering::SeqCst},
1076 time::Duration,
1077 };
1078
1079 use assert_matches::assert_matches;
1080 use matrix_sdk_base::{
1081 event_cache::{
1082 store::{
1083 integration_tests::{check_test_event, make_test_event},
1084 media::IgnoreMediaRetentionPolicy,
1085 EventCacheStore, EventCacheStoreError,
1086 },
1087 Gap,
1088 },
1089 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1090 event_cache_store_media_integration_tests,
1091 linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
1092 media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1093 };
1094 use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1095 use once_cell::sync::Lazy;
1096 use ruma::{events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1097 use tempfile::{tempdir, TempDir};
1098
1099 use super::SqliteEventCacheStore;
1100 use crate::utils::SqliteAsyncConnExt;
1101
1102 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1103 static NUM: AtomicU32 = AtomicU32::new(0);
1104
1105 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1106 let name = NUM.fetch_add(1, SeqCst).to_string();
1107 let tmpdir_path = TMP_DIR.path().join(name);
1108
1109 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1110
1111 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1112 }
1113
1114 event_cache_store_integration_tests!();
1115 event_cache_store_integration_tests_time!();
1116 event_cache_store_media_integration_tests!(with_media_size_tests);
1117
1118 async fn get_event_cache_store_content_sorted_by_last_access(
1119 event_cache_store: &SqliteEventCacheStore,
1120 ) -> Vec<Vec<u8>> {
1121 let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed");
1122 sqlite_db
1123 .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1124 stmt.query(())?.mapped(|row| row.get(0)).collect()
1125 })
1126 .await
1127 .expect("querying media cache content by last access failed")
1128 }
1129
1130 #[async_test]
1131 async fn test_last_access() {
1132 let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1133 let uri = mxc_uri!("mxc://localhost/media");
1134 let file_request = MediaRequestParameters {
1135 source: MediaSource::Plain(uri.to_owned()),
1136 format: MediaFormat::File,
1137 };
1138 let thumbnail_request = MediaRequestParameters {
1139 source: MediaSource::Plain(uri.to_owned()),
1140 format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1141 Method::Crop,
1142 uint!(100),
1143 uint!(100),
1144 )),
1145 };
1146
1147 let content: Vec<u8> = "hello world".into();
1148 let thumbnail_content: Vec<u8> = "hello…".into();
1149
1150 event_cache_store
1152 .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1153 .await
1154 .expect("adding file failed");
1155
1156 tokio::time::sleep(Duration::from_secs(3)).await;
1159
1160 event_cache_store
1161 .add_media_content(
1162 &thumbnail_request,
1163 thumbnail_content.clone(),
1164 IgnoreMediaRetentionPolicy::No,
1165 )
1166 .await
1167 .expect("adding thumbnail failed");
1168
1169 let contents =
1171 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1172
1173 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1174 assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1175 assert_eq!(contents[1], content, "file is not second-to-last access");
1176
1177 tokio::time::sleep(Duration::from_secs(3)).await;
1180
1181 let _ = event_cache_store
1183 .get_media_content(&file_request)
1184 .await
1185 .expect("getting file failed")
1186 .expect("file is missing");
1187
1188 let contents =
1190 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1191
1192 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1193 assert_eq!(contents[0], content, "file is not last access");
1194 assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
1195 }
1196
1197 #[async_test]
1198 async fn test_linked_chunk_new_items_chunk() {
1199 let store = get_event_cache_store().await.expect("creating cache store failed");
1200
1201 let room_id = &DEFAULT_TEST_ROOM_ID;
1202
1203 store
1204 .handle_linked_chunk_updates(
1205 room_id,
1206 vec![
1207 Update::NewItemsChunk {
1208 previous: None,
1209 new: ChunkIdentifier::new(42),
1210 next: None, },
1212 Update::NewItemsChunk {
1213 previous: Some(ChunkIdentifier::new(42)),
1214 new: ChunkIdentifier::new(13),
1215 next: Some(ChunkIdentifier::new(37)), },
1218 Update::NewItemsChunk {
1219 previous: Some(ChunkIdentifier::new(13)),
1220 new: ChunkIdentifier::new(37),
1221 next: None,
1222 },
1223 ],
1224 )
1225 .await
1226 .unwrap();
1227
1228 let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1229
1230 assert_eq!(chunks.len(), 3);
1231
1232 {
1233 let c = chunks.remove(0);
1235 assert_eq!(c.identifier, ChunkIdentifier::new(13));
1236 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1237 assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1238 assert_matches!(c.content, ChunkContent::Items(events) => {
1239 assert!(events.is_empty());
1240 });
1241
1242 let c = chunks.remove(0);
1243 assert_eq!(c.identifier, ChunkIdentifier::new(37));
1244 assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1245 assert_eq!(c.next, None);
1246 assert_matches!(c.content, ChunkContent::Items(events) => {
1247 assert!(events.is_empty());
1248 });
1249
1250 let c = chunks.remove(0);
1251 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1252 assert_eq!(c.previous, None);
1253 assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1254 assert_matches!(c.content, ChunkContent::Items(events) => {
1255 assert!(events.is_empty());
1256 });
1257 }
1258 }
1259
1260 #[async_test]
1261 async fn test_linked_chunk_new_gap_chunk() {
1262 let store = get_event_cache_store().await.expect("creating cache store failed");
1263
1264 let room_id = &DEFAULT_TEST_ROOM_ID;
1265
1266 store
1267 .handle_linked_chunk_updates(
1268 room_id,
1269 vec![Update::NewGapChunk {
1270 previous: None,
1271 new: ChunkIdentifier::new(42),
1272 next: None,
1273 gap: Gap { prev_token: "raclette".to_owned() },
1274 }],
1275 )
1276 .await
1277 .unwrap();
1278
1279 let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1280
1281 assert_eq!(chunks.len(), 1);
1282
1283 let c = chunks.remove(0);
1285 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1286 assert_eq!(c.previous, None);
1287 assert_eq!(c.next, None);
1288 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1289 assert_eq!(gap.prev_token, "raclette");
1290 });
1291 }
1292
1293 #[async_test]
1294 async fn test_linked_chunk_replace_item() {
1295 let store = get_event_cache_store().await.expect("creating cache store failed");
1296
1297 let room_id = &DEFAULT_TEST_ROOM_ID;
1298
1299 store
1300 .handle_linked_chunk_updates(
1301 room_id,
1302 vec![
1303 Update::NewItemsChunk {
1304 previous: None,
1305 new: ChunkIdentifier::new(42),
1306 next: None,
1307 },
1308 Update::PushItems {
1309 at: Position::new(ChunkIdentifier::new(42), 0),
1310 items: vec![
1311 make_test_event(room_id, "hello"),
1312 make_test_event(room_id, "world"),
1313 ],
1314 },
1315 Update::ReplaceItem {
1316 at: Position::new(ChunkIdentifier::new(42), 1),
1317 item: make_test_event(room_id, "yolo"),
1318 },
1319 ],
1320 )
1321 .await
1322 .unwrap();
1323
1324 let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1325
1326 assert_eq!(chunks.len(), 1);
1327
1328 let c = chunks.remove(0);
1329 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1330 assert_eq!(c.previous, None);
1331 assert_eq!(c.next, None);
1332 assert_matches!(c.content, ChunkContent::Items(events) => {
1333 assert_eq!(events.len(), 2);
1334 check_test_event(&events[0], "hello");
1335 check_test_event(&events[1], "yolo");
1336 });
1337 }
1338
1339 #[async_test]
1340 async fn test_linked_chunk_remove_chunk() {
1341 let store = get_event_cache_store().await.expect("creating cache store failed");
1342
1343 let room_id = &DEFAULT_TEST_ROOM_ID;
1344
1345 store
1346 .handle_linked_chunk_updates(
1347 room_id,
1348 vec![
1349 Update::NewGapChunk {
1350 previous: None,
1351 new: ChunkIdentifier::new(42),
1352 next: None,
1353 gap: Gap { prev_token: "raclette".to_owned() },
1354 },
1355 Update::NewGapChunk {
1356 previous: Some(ChunkIdentifier::new(42)),
1357 new: ChunkIdentifier::new(43),
1358 next: None,
1359 gap: Gap { prev_token: "fondue".to_owned() },
1360 },
1361 Update::NewGapChunk {
1362 previous: Some(ChunkIdentifier::new(43)),
1363 new: ChunkIdentifier::new(44),
1364 next: None,
1365 gap: Gap { prev_token: "tartiflette".to_owned() },
1366 },
1367 Update::RemoveChunk(ChunkIdentifier::new(43)),
1368 ],
1369 )
1370 .await
1371 .unwrap();
1372
1373 let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1374
1375 assert_eq!(chunks.len(), 2);
1376
1377 let c = chunks.remove(0);
1379 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1380 assert_eq!(c.previous, None);
1381 assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1382 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1383 assert_eq!(gap.prev_token, "raclette");
1384 });
1385
1386 let c = chunks.remove(0);
1387 assert_eq!(c.identifier, ChunkIdentifier::new(44));
1388 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1389 assert_eq!(c.next, None);
1390 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1391 assert_eq!(gap.prev_token, "tartiflette");
1392 });
1393
1394 let gaps = store
1396 .acquire()
1397 .await
1398 .unwrap()
1399 .with_transaction(|txn| -> rusqlite::Result<_> {
1400 let mut gaps = Vec::new();
1401 for data in txn
1402 .prepare("SELECT chunk_id FROM gaps ORDER BY chunk_id")?
1403 .query_map((), |row| row.get::<_, u64>(0))?
1404 {
1405 gaps.push(data?);
1406 }
1407 Ok(gaps)
1408 })
1409 .await
1410 .unwrap();
1411
1412 assert_eq!(gaps, vec![42, 44]);
1413 }
1414
1415 #[async_test]
1416 async fn test_linked_chunk_push_items() {
1417 let store = get_event_cache_store().await.expect("creating cache store failed");
1418
1419 let room_id = &DEFAULT_TEST_ROOM_ID;
1420
1421 store
1422 .handle_linked_chunk_updates(
1423 room_id,
1424 vec![
1425 Update::NewItemsChunk {
1426 previous: None,
1427 new: ChunkIdentifier::new(42),
1428 next: None,
1429 },
1430 Update::PushItems {
1431 at: Position::new(ChunkIdentifier::new(42), 0),
1432 items: vec![
1433 make_test_event(room_id, "hello"),
1434 make_test_event(room_id, "world"),
1435 ],
1436 },
1437 Update::PushItems {
1438 at: Position::new(ChunkIdentifier::new(42), 2),
1439 items: vec![make_test_event(room_id, "who?")],
1440 },
1441 ],
1442 )
1443 .await
1444 .unwrap();
1445
1446 let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1447
1448 assert_eq!(chunks.len(), 1);
1449
1450 let c = chunks.remove(0);
1451 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1452 assert_eq!(c.previous, None);
1453 assert_eq!(c.next, None);
1454 assert_matches!(c.content, ChunkContent::Items(events) => {
1455 assert_eq!(events.len(), 3);
1456
1457 check_test_event(&events[0], "hello");
1458 check_test_event(&events[1], "world");
1459 check_test_event(&events[2], "who?");
1460 });
1461 }
1462
1463 #[async_test]
1464 async fn test_linked_chunk_remove_item() {
1465 let store = get_event_cache_store().await.expect("creating cache store failed");
1466
1467 let room_id = *DEFAULT_TEST_ROOM_ID;
1468
1469 store
1470 .handle_linked_chunk_updates(
1471 room_id,
1472 vec![
1473 Update::NewItemsChunk {
1474 previous: None,
1475 new: ChunkIdentifier::new(42),
1476 next: None,
1477 },
1478 Update::PushItems {
1479 at: Position::new(ChunkIdentifier::new(42), 0),
1480 items: vec![
1481 make_test_event(room_id, "hello"),
1482 make_test_event(room_id, "world"),
1483 ],
1484 },
1485 Update::RemoveItem { at: Position::new(ChunkIdentifier::new(42), 0) },
1486 ],
1487 )
1488 .await
1489 .unwrap();
1490
1491 let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1492
1493 assert_eq!(chunks.len(), 1);
1494
1495 let c = chunks.remove(0);
1496 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1497 assert_eq!(c.previous, None);
1498 assert_eq!(c.next, None);
1499 assert_matches!(c.content, ChunkContent::Items(events) => {
1500 assert_eq!(events.len(), 1);
1501 check_test_event(&events[0], "world");
1502 });
1503
1504 let num_rows: u64 = store
1506 .acquire()
1507 .await
1508 .unwrap()
1509 .with_transaction(move |txn| {
1510 txn.query_row(
1511 "SELECT COUNT(*) FROM events WHERE chunk_id = 42 AND room_id = ? AND position = 0",
1512 (room_id.as_bytes(),),
1513 |row| row.get(0),
1514 )
1515 })
1516 .await
1517 .unwrap();
1518 assert_eq!(num_rows, 1);
1519 }
1520
1521 #[async_test]
1522 async fn test_linked_chunk_detach_last_items() {
1523 let store = get_event_cache_store().await.expect("creating cache store failed");
1524
1525 let room_id = *DEFAULT_TEST_ROOM_ID;
1526
1527 store
1528 .handle_linked_chunk_updates(
1529 room_id,
1530 vec![
1531 Update::NewItemsChunk {
1532 previous: None,
1533 new: ChunkIdentifier::new(42),
1534 next: None,
1535 },
1536 Update::PushItems {
1537 at: Position::new(ChunkIdentifier::new(42), 0),
1538 items: vec![
1539 make_test_event(room_id, "hello"),
1540 make_test_event(room_id, "world"),
1541 make_test_event(room_id, "howdy"),
1542 ],
1543 },
1544 Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
1545 ],
1546 )
1547 .await
1548 .unwrap();
1549
1550 let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1551
1552 assert_eq!(chunks.len(), 1);
1553
1554 let c = chunks.remove(0);
1555 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1556 assert_eq!(c.previous, None);
1557 assert_eq!(c.next, None);
1558 assert_matches!(c.content, ChunkContent::Items(events) => {
1559 assert_eq!(events.len(), 1);
1560 check_test_event(&events[0], "hello");
1561 });
1562 }
1563
1564 #[async_test]
1565 async fn test_linked_chunk_start_end_reattach_items() {
1566 let store = get_event_cache_store().await.expect("creating cache store failed");
1567
1568 let room_id = *DEFAULT_TEST_ROOM_ID;
1569
1570 store
1574 .handle_linked_chunk_updates(
1575 room_id,
1576 vec![
1577 Update::NewItemsChunk {
1578 previous: None,
1579 new: ChunkIdentifier::new(42),
1580 next: None,
1581 },
1582 Update::PushItems {
1583 at: Position::new(ChunkIdentifier::new(42), 0),
1584 items: vec![
1585 make_test_event(room_id, "hello"),
1586 make_test_event(room_id, "world"),
1587 make_test_event(room_id, "howdy"),
1588 ],
1589 },
1590 Update::StartReattachItems,
1591 Update::EndReattachItems,
1592 ],
1593 )
1594 .await
1595 .unwrap();
1596
1597 let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1598
1599 assert_eq!(chunks.len(), 1);
1600
1601 let c = chunks.remove(0);
1602 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1603 assert_eq!(c.previous, None);
1604 assert_eq!(c.next, None);
1605 assert_matches!(c.content, ChunkContent::Items(events) => {
1606 assert_eq!(events.len(), 3);
1607 check_test_event(&events[0], "hello");
1608 check_test_event(&events[1], "world");
1609 check_test_event(&events[2], "howdy");
1610 });
1611 }
1612
1613 #[async_test]
1614 async fn test_linked_chunk_clear() {
1615 let store = get_event_cache_store().await.expect("creating cache store failed");
1616
1617 let room_id = *DEFAULT_TEST_ROOM_ID;
1618
1619 store
1620 .handle_linked_chunk_updates(
1621 room_id,
1622 vec![
1623 Update::NewItemsChunk {
1624 previous: None,
1625 new: ChunkIdentifier::new(42),
1626 next: None,
1627 },
1628 Update::NewGapChunk {
1629 previous: Some(ChunkIdentifier::new(42)),
1630 new: ChunkIdentifier::new(54),
1631 next: None,
1632 gap: Gap { prev_token: "fondue".to_owned() },
1633 },
1634 Update::PushItems {
1635 at: Position::new(ChunkIdentifier::new(42), 0),
1636 items: vec![
1637 make_test_event(room_id, "hello"),
1638 make_test_event(room_id, "world"),
1639 make_test_event(room_id, "howdy"),
1640 ],
1641 },
1642 Update::Clear,
1643 ],
1644 )
1645 .await
1646 .unwrap();
1647
1648 let chunks = store.reload_linked_chunk(room_id).await.unwrap();
1649 assert!(chunks.is_empty());
1650 }
1651
1652 #[async_test]
1653 async fn test_linked_chunk_multiple_rooms() {
1654 let store = get_event_cache_store().await.expect("creating cache store failed");
1655
1656 let room1 = room_id!("!realcheeselovers:raclette.fr");
1657 let room2 = room_id!("!realcheeselovers:fondue.ch");
1658
1659 store
1663 .handle_linked_chunk_updates(
1664 room1,
1665 vec![
1666 Update::NewItemsChunk {
1667 previous: None,
1668 new: ChunkIdentifier::new(42),
1669 next: None,
1670 },
1671 Update::PushItems {
1672 at: Position::new(ChunkIdentifier::new(42), 0),
1673 items: vec![
1674 make_test_event(room1, "best cheese is raclette"),
1675 make_test_event(room1, "obviously"),
1676 ],
1677 },
1678 ],
1679 )
1680 .await
1681 .unwrap();
1682
1683 store
1684 .handle_linked_chunk_updates(
1685 room2,
1686 vec![
1687 Update::NewItemsChunk {
1688 previous: None,
1689 new: ChunkIdentifier::new(42),
1690 next: None,
1691 },
1692 Update::PushItems {
1693 at: Position::new(ChunkIdentifier::new(42), 0),
1694 items: vec![make_test_event(room1, "beaufort is the best")],
1695 },
1696 ],
1697 )
1698 .await
1699 .unwrap();
1700
1701 let mut chunks_room1 = store.reload_linked_chunk(room1).await.unwrap();
1703 assert_eq!(chunks_room1.len(), 1);
1704
1705 let c = chunks_room1.remove(0);
1706 assert_matches!(c.content, ChunkContent::Items(events) => {
1707 assert_eq!(events.len(), 2);
1708 check_test_event(&events[0], "best cheese is raclette");
1709 check_test_event(&events[1], "obviously");
1710 });
1711
1712 let mut chunks_room2 = store.reload_linked_chunk(room2).await.unwrap();
1714 assert_eq!(chunks_room2.len(), 1);
1715
1716 let c = chunks_room2.remove(0);
1717 assert_matches!(c.content, ChunkContent::Items(events) => {
1718 assert_eq!(events.len(), 1);
1719 check_test_event(&events[0], "beaufort is the best");
1720 });
1721 }
1722
1723 #[async_test]
1724 async fn test_linked_chunk_update_is_a_transaction() {
1725 let store = get_event_cache_store().await.expect("creating cache store failed");
1726
1727 let room_id = *DEFAULT_TEST_ROOM_ID;
1728
1729 let err = store
1732 .handle_linked_chunk_updates(
1733 room_id,
1734 vec![
1735 Update::NewItemsChunk {
1736 previous: None,
1737 new: ChunkIdentifier::new(42),
1738 next: None,
1739 },
1740 Update::NewItemsChunk {
1741 previous: None,
1742 new: ChunkIdentifier::new(42),
1743 next: None,
1744 },
1745 ],
1746 )
1747 .await
1748 .unwrap_err();
1749
1750 assert_matches!(err, crate::error::Error::Sqlite(err) => {
1752 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
1753 });
1754
1755 let chunks = store.reload_linked_chunk(room_id).await.unwrap();
1759 assert!(chunks.is_empty());
1760 }
1761}
1762
1763#[cfg(test)]
1764mod encrypted_tests {
1765 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
1766
1767 use matrix_sdk_base::{
1768 event_cache::store::EventCacheStoreError, event_cache_store_integration_tests,
1769 event_cache_store_integration_tests_time, event_cache_store_media_integration_tests,
1770 };
1771 use once_cell::sync::Lazy;
1772 use tempfile::{tempdir, TempDir};
1773
1774 use super::SqliteEventCacheStore;
1775
1776 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1777 static NUM: AtomicU32 = AtomicU32::new(0);
1778
1779 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1780 let name = NUM.fetch_add(1, SeqCst).to_string();
1781 let tmpdir_path = TMP_DIR.path().join(name);
1782
1783 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1784
1785 Ok(SqliteEventCacheStore::open(
1786 tmpdir_path.to_str().unwrap(),
1787 Some("default_test_password"),
1788 )
1789 .await
1790 .unwrap())
1791 }
1792
1793 event_cache_store_integration_tests!();
1794 event_cache_store_integration_tests_time!();
1795 event_cache_store_media_integration_tests!();
1796}