1use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22 deserialized_responses::TimelineEvent,
23 event_cache::{
24 store::{
25 compute_filters_string, extract_event_relation,
26 media::{
27 EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
28 MediaService,
29 },
30 EventCacheStore,
31 },
32 Event, Gap,
33 },
34 linked_chunk::{
35 ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunkId, Position, RawChunk,
36 Update,
37 },
38 media::{MediaRequestParameters, UniqueKey},
39};
40use matrix_sdk_store_encryption::StoreCipher;
41use ruma::{
42 events::relation::RelationType, time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri,
43 OwnedEventId, RoomId,
44};
45use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
46use tokio::fs;
47use tracing::{debug, error, trace};
48
49use crate::{
50 error::{Error, Result},
51 utils::{
52 repeat_vars, time_to_timestamp, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
53 SqliteKeyValueStoreConnExt, SqliteTransactionExt,
54 },
55 OpenStoreError, SqliteStoreConfig,
56};
57
58mod keys {
59 pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
61 pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
62
63 pub const LINKED_CHUNKS: &str = "linked_chunks";
65 pub const MEDIA: &str = "media";
66}
67
68const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
70
71const DATABASE_VERSION: u8 = 8;
77
78const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
81const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
84
85#[derive(Clone)]
87pub struct SqliteEventCacheStore {
88 store_cipher: Option<Arc<StoreCipher>>,
89 pool: SqlitePool,
90 media_service: MediaService,
91}
92
93#[cfg(not(tarpaulin_include))]
94impl fmt::Debug for SqliteEventCacheStore {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
97 }
98}
99
100impl SqliteEventCacheStore {
101 pub async fn open(
104 path: impl AsRef<Path>,
105 passphrase: Option<&str>,
106 ) -> Result<Self, OpenStoreError> {
107 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
108 }
109
110 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
112 let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
113
114 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
115
116 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
117 config.pool = Some(pool_config);
118
119 let pool = config.create_pool(Runtime::Tokio1)?;
120
121 let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
122 this.pool.get().await?.apply_runtime_config(runtime_config).await?;
123
124 Ok(this)
125 }
126
127 async fn open_with_pool(
130 pool: SqlitePool,
131 passphrase: Option<&str>,
132 ) -> Result<Self, OpenStoreError> {
133 let conn = pool.get().await?;
134
135 let version = conn.db_version().await?;
136 run_migrations(&conn, version).await?;
137
138 let store_cipher = match passphrase {
139 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
140 None => None,
141 };
142
143 let media_service = MediaService::new();
144 let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
145 let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
146 media_service.restore(media_retention_policy, last_media_cleanup_time);
147
148 Ok(Self { store_cipher, pool, media_service })
149 }
150
151 fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
152 if let Some(key) = &self.store_cipher {
153 let encrypted = key.encrypt_value_data(value)?;
154 Ok(rmp_serde::to_vec_named(&encrypted)?)
155 } else {
156 Ok(value)
157 }
158 }
159
160 fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
161 if let Some(key) = &self.store_cipher {
162 let encrypted = rmp_serde::from_slice(value)?;
163 let decrypted = key.decrypt_value_data(encrypted)?;
164 Ok(Cow::Owned(decrypted))
165 } else {
166 Ok(Cow::Borrowed(value))
167 }
168 }
169
170 fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
171 let bytes = key.as_ref();
172 if let Some(store_cipher) = &self.store_cipher {
173 Key::Hashed(store_cipher.hash_key(table_name, bytes))
174 } else {
175 Key::Plain(bytes.to_owned())
176 }
177 }
178
179 async fn acquire(&self) -> Result<SqliteAsyncConn> {
180 let connection = self.pool.get().await?;
181
182 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
187
188 Ok(connection)
189 }
190
191 fn map_row_to_chunk(
192 row: &rusqlite::Row<'_>,
193 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
194 Ok((
195 row.get::<_, u64>(0)?,
196 row.get::<_, Option<u64>>(1)?,
197 row.get::<_, Option<u64>>(2)?,
198 row.get::<_, String>(3)?,
199 ))
200 }
201
202 fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
203 let serialized = serde_json::to_vec(event)?;
204
205 let raw_event = event.raw();
207 let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
208
209 let content = self.encode_value(serialized)?;
211
212 Ok(EncodedEvent {
213 content,
214 rel_type,
215 relates_to: relates_to.map(|relates_to| relates_to.to_string()),
216 })
217 }
218}
219
220struct EncodedEvent {
221 content: Vec<u8>,
222 rel_type: Option<String>,
223 relates_to: Option<String>,
224}
225
226trait TransactionExtForLinkedChunks {
227 fn rebuild_chunk(
228 &self,
229 store: &SqliteEventCacheStore,
230 linked_chunk_id: &Key,
231 previous: Option<u64>,
232 index: u64,
233 next: Option<u64>,
234 chunk_type: &str,
235 ) -> Result<RawChunk<Event, Gap>>;
236
237 fn load_gap_content(
238 &self,
239 store: &SqliteEventCacheStore,
240 linked_chunk_id: &Key,
241 chunk_id: ChunkIdentifier,
242 ) -> Result<Gap>;
243
244 fn load_events_content(
245 &self,
246 store: &SqliteEventCacheStore,
247 linked_chunk_id: &Key,
248 chunk_id: ChunkIdentifier,
249 ) -> Result<Vec<Event>>;
250}
251
252impl TransactionExtForLinkedChunks for Transaction<'_> {
253 fn rebuild_chunk(
254 &self,
255 store: &SqliteEventCacheStore,
256 linked_chunk_id: &Key,
257 previous: Option<u64>,
258 id: u64,
259 next: Option<u64>,
260 chunk_type: &str,
261 ) -> Result<RawChunk<Event, Gap>> {
262 let previous = previous.map(ChunkIdentifier::new);
263 let next = next.map(ChunkIdentifier::new);
264 let id = ChunkIdentifier::new(id);
265
266 match chunk_type {
267 CHUNK_TYPE_GAP_TYPE_STRING => {
268 let gap = self.load_gap_content(store, linked_chunk_id, id)?;
270 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
271 }
272
273 CHUNK_TYPE_EVENT_TYPE_STRING => {
274 let events = self.load_events_content(store, linked_chunk_id, id)?;
276 Ok(RawChunk {
277 content: ChunkContent::Items(events),
278 previous,
279 identifier: id,
280 next,
281 })
282 }
283
284 other => {
285 Err(Error::InvalidData {
287 details: format!("a linked chunk has an unknown type {other}"),
288 })
289 }
290 }
291 }
292
293 fn load_gap_content(
294 &self,
295 store: &SqliteEventCacheStore,
296 linked_chunk_id: &Key,
297 chunk_id: ChunkIdentifier,
298 ) -> Result<Gap> {
299 let encoded_prev_token: Vec<u8> = self.query_row(
302 "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
303 (chunk_id.index(), &linked_chunk_id),
304 |row| row.get(0),
305 )?;
306 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
307 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
308 Ok(Gap { prev_token })
309 }
310
311 fn load_events_content(
312 &self,
313 store: &SqliteEventCacheStore,
314 linked_chunk_id: &Key,
315 chunk_id: ChunkIdentifier,
316 ) -> Result<Vec<Event>> {
317 let mut events = Vec::new();
319
320 for event_data in self
321 .prepare(
322 r#"
323 SELECT events.content
324 FROM event_chunks ec, events
325 WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
326 ORDER BY ec.position ASC
327 "#,
328 )?
329 .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
330 {
331 let encoded_content = event_data?;
332 let serialized_content = store.decode_value(&encoded_content)?;
333 let event = serde_json::from_slice(&serialized_content)?;
334
335 events.push(event);
336 }
337
338 Ok(events)
339 }
340}
341
342async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
344 if version == 0 {
345 debug!("Creating database");
346 } else if version < DATABASE_VERSION {
347 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
348 } else {
349 return Ok(());
350 }
351
352 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
354
355 if version < 1 {
356 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
359 conn.with_transaction(|txn| {
360 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
361 txn.set_db_version(1)
362 })
363 .await?;
364 }
365
366 if version < 2 {
367 conn.with_transaction(|txn| {
368 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
369 txn.set_db_version(2)
370 })
371 .await?;
372 }
373
374 if version < 3 {
375 conn.with_transaction(|txn| {
376 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
377 txn.set_db_version(3)
378 })
379 .await?;
380 }
381
382 if version < 4 {
383 conn.with_transaction(|txn| {
384 txn.execute_batch(include_str!(
385 "../migrations/event_cache_store/004_ignore_policy.sql"
386 ))?;
387 txn.set_db_version(4)
388 })
389 .await?;
390 }
391
392 if version < 5 {
393 conn.with_transaction(|txn| {
394 txn.execute_batch(include_str!(
395 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
396 ))?;
397 txn.set_db_version(5)
398 })
399 .await?;
400 }
401
402 if version < 6 {
403 conn.with_transaction(|txn| {
404 txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
405 txn.set_db_version(6)
406 })
407 .await?;
408 }
409
410 if version < 7 {
411 conn.with_transaction(|txn| {
412 txn.execute_batch(include_str!(
413 "../migrations/event_cache_store/007_event_chunks.sql"
414 ))?;
415 txn.set_db_version(7)
416 })
417 .await?;
418 }
419
420 if version < 8 {
421 conn.with_transaction(|txn| {
422 txn.execute_batch(include_str!(
423 "../migrations/event_cache_store/008_linked_chunk_id.sql"
424 ))?;
425 txn.set_db_version(8)
426 })
427 .await?;
428 }
429
430 Ok(())
431}
432
433#[async_trait]
434impl EventCacheStore for SqliteEventCacheStore {
435 type Error = Error;
436
437 async fn try_take_leased_lock(
438 &self,
439 lease_duration_ms: u32,
440 key: &str,
441 holder: &str,
442 ) -> Result<bool> {
443 let key = key.to_owned();
444 let holder = holder.to_owned();
445
446 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
447 let expiration = now + lease_duration_ms as u64;
448
449 let num_touched = self
450 .acquire()
451 .await?
452 .with_transaction(move |txn| {
453 txn.execute(
454 "INSERT INTO lease_locks (key, holder, expiration)
455 VALUES (?1, ?2, ?3)
456 ON CONFLICT (key)
457 DO
458 UPDATE SET holder = ?2, expiration = ?3
459 WHERE holder = ?2
460 OR expiration < ?4
461 ",
462 (key, holder, expiration, now),
463 )
464 })
465 .await?;
466
467 Ok(num_touched == 1)
468 }
469
470 async fn handle_linked_chunk_updates(
471 &self,
472 linked_chunk_id: LinkedChunkId<'_>,
473 updates: Vec<Update<Event, Gap>>,
474 ) -> Result<(), Self::Error> {
475 let hashed_linked_chunk_id =
478 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
479 let linked_chunk_id = linked_chunk_id.to_owned();
480 let this = self.clone();
481
482 with_immediate_transaction(self.acquire().await?, move |txn| {
483 for up in updates {
484 match up {
485 Update::NewItemsChunk { previous, new, next } => {
486 let previous = previous.as_ref().map(ChunkIdentifier::index);
487 let new = new.index();
488 let next = next.as_ref().map(ChunkIdentifier::index);
489
490 trace!(
491 %linked_chunk_id,
492 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
493 );
494
495 insert_chunk(
496 txn,
497 &hashed_linked_chunk_id,
498 previous,
499 new,
500 next,
501 CHUNK_TYPE_EVENT_TYPE_STRING,
502 )?;
503 }
504
505 Update::NewGapChunk { previous, new, next, gap } => {
506 let serialized = serde_json::to_vec(&gap.prev_token)?;
507 let prev_token = this.encode_value(serialized)?;
508
509 let previous = previous.as_ref().map(ChunkIdentifier::index);
510 let new = new.index();
511 let next = next.as_ref().map(ChunkIdentifier::index);
512
513 trace!(
514 %linked_chunk_id,
515 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
516 );
517
518 insert_chunk(
520 txn,
521 &hashed_linked_chunk_id,
522 previous,
523 new,
524 next,
525 CHUNK_TYPE_GAP_TYPE_STRING,
526 )?;
527
528 txn.execute(
530 r#"
531 INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
532 VALUES (?, ?, ?)
533 "#,
534 (new, &hashed_linked_chunk_id, prev_token),
535 )?;
536 }
537
538 Update::RemoveChunk(chunk_identifier) => {
539 let chunk_id = chunk_identifier.index();
540
541 trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
542
543 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
545 "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
546 (chunk_id, &hashed_linked_chunk_id),
547 |row| Ok((row.get(0)?, row.get(1)?))
548 )?;
549
550 if let Some(previous) = previous {
552 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
553 }
554
555 if let Some(next) = next {
557 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
558 }
559
560 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
563 }
564
565 Update::PushItems { at, items } => {
566 if items.is_empty() {
567 continue;
569 }
570
571 let chunk_id = at.chunk_identifier().index();
572
573 trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
574
575 let mut chunk_statement = txn.prepare(
576 "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
577 )?;
578
579 let mut content_statement = txn.prepare(
584 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
585 )?;
586
587 let invalid_event = |event: TimelineEvent| {
588 let Some(event_id) = event.event_id() else {
589 error!(%linked_chunk_id, "Trying to push an event with no ID");
590 return None;
591 };
592
593 Some((event_id.to_string(), event))
594 };
595
596 let room_id = linked_chunk_id.room_id();
597 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
598
599 for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
600 let index = at.index() + i;
602 chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
603
604 let encoded_event = this.encode_event(&event)?;
606 content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
607 }
608 }
609
610 Update::ReplaceItem { at, item: event } => {
611 let chunk_id = at.chunk_identifier().index();
612
613 let index = at.index();
614
615 trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
616
617 let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
619 error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
620 continue;
621 };
622
623 let encoded_event = this.encode_event(&event)?;
627 let room_id = linked_chunk_id.room_id();
628 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
629 txn.execute(
630 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
631 , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
632
633 txn.execute(
635 r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
636 (event_id, &hashed_linked_chunk_id, chunk_id, index)
637 )?;
638 }
639
640 Update::RemoveItem { at } => {
641 let chunk_id = at.chunk_identifier().index();
642 let index = at.index();
643
644 trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
645
646 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
648
649 txn.execute(
748 r#"
749 UPDATE event_chunks
750 SET position = -(position - 1)
751 WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
752 "#,
753 (&hashed_linked_chunk_id, chunk_id, index)
754 )?;
755 txn.execute(
756 r#"
757 UPDATE event_chunks
758 SET position = -position
759 WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
760 "#,
761 (&hashed_linked_chunk_id, chunk_id)
762 )?;
763 }
764
765 Update::DetachLastItems { at } => {
766 let chunk_id = at.chunk_identifier().index();
767 let index = at.index();
768
769 trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
770
771 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
773 }
774
775 Update::Clear => {
776 trace!(%linked_chunk_id, "clearing items");
777
778 txn.execute(
780 "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
781 (&hashed_linked_chunk_id,),
782 )?;
783 }
784
785 Update::StartReattachItems | Update::EndReattachItems => {
786 }
788 }
789 }
790
791 Ok(())
792 })
793 .await?;
794
795 Ok(())
796 }
797
798 async fn load_all_chunks(
799 &self,
800 linked_chunk_id: LinkedChunkId<'_>,
801 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
802 let hashed_linked_chunk_id =
803 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
804
805 let this = self.clone();
806
807 let result = self
808 .acquire()
809 .await?
810 .with_transaction(move |txn| -> Result<_> {
811 let mut items = Vec::new();
812
813 for data in txn
815 .prepare(
816 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
817 )?
818 .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
819 {
820 let (id, previous, next, chunk_type) = data?;
821 let new = txn.rebuild_chunk(
822 &this,
823 &hashed_linked_chunk_id,
824 previous,
825 id,
826 next,
827 chunk_type.as_str(),
828 )?;
829 items.push(new);
830 }
831
832 Ok(items)
833 })
834 .await?;
835
836 Ok(result)
837 }
838
839 async fn load_last_chunk(
840 &self,
841 linked_chunk_id: LinkedChunkId<'_>,
842 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
843 let hashed_linked_chunk_id =
844 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
845
846 let this = self.clone();
847
848 self
849 .acquire()
850 .await?
851 .with_transaction(move |txn| -> Result<_> {
852 let (chunk_identifier_generator, number_of_chunks) = txn
854 .prepare(
855 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
856 )?
857 .query_row(
858 (&hashed_linked_chunk_id,),
859 |row| {
860 Ok((
861 row.get::<_, Option<u64>>(0)?,
866 row.get::<_, u64>(1)?,
867 ))
868 }
869 )?;
870
871 let chunk_identifier_generator = match chunk_identifier_generator {
872 Some(last_chunk_identifier) => {
873 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
874 ChunkIdentifier::new(last_chunk_identifier)
875 )
876 },
877 None => ChunkIdentifierGenerator::new_from_scratch(),
878 };
879
880 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
882 .prepare(
883 "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
884 )?
885 .query_row(
886 (&hashed_linked_chunk_id,),
887 |row| {
888 Ok((
889 row.get::<_, u64>(0)?,
890 row.get::<_, Option<u64>>(1)?,
891 row.get::<_, String>(2)?,
892 ))
893 }
894 )
895 .optional()?
896 else {
897 if number_of_chunks == 0 {
900 return Ok((None, chunk_identifier_generator));
901 }
902 else {
907 return Err(Error::InvalidData {
908 details:
909 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
910 .to_owned()
911 }
912 )
913 }
914 };
915
916 let last_chunk = txn.rebuild_chunk(
918 &this,
919 &hashed_linked_chunk_id,
920 previous_chunk,
921 chunk_identifier,
922 None,
923 &chunk_type
924 )?;
925
926 Ok((Some(last_chunk), chunk_identifier_generator))
927 })
928 .await
929 }
930
931 async fn load_previous_chunk(
932 &self,
933 linked_chunk_id: LinkedChunkId<'_>,
934 before_chunk_identifier: ChunkIdentifier,
935 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
936 let hashed_linked_chunk_id =
937 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
938
939 let this = self.clone();
940
941 self
942 .acquire()
943 .await?
944 .with_transaction(move |txn| -> Result<_> {
945 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
947 .prepare(
948 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
949 )?
950 .query_row(
951 (&hashed_linked_chunk_id, before_chunk_identifier.index()),
952 |row| {
953 Ok((
954 row.get::<_, u64>(0)?,
955 row.get::<_, Option<u64>>(1)?,
956 row.get::<_, Option<u64>>(2)?,
957 row.get::<_, String>(3)?,
958 ))
959 }
960 )
961 .optional()?
962 else {
963 return Ok(None);
965 };
966
967 let last_chunk = txn.rebuild_chunk(
969 &this,
970 &hashed_linked_chunk_id,
971 previous_chunk,
972 chunk_identifier,
973 next_chunk,
974 &chunk_type
975 )?;
976
977 Ok(Some(last_chunk))
978 })
979 .await
980 }
981
982 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
983 self.acquire()
984 .await?
985 .with_transaction(move |txn| {
986 txn.execute("DELETE FROM linked_chunks", ())?;
988 txn.execute("DELETE FROM events", ())
990 })
991 .await?;
992 Ok(())
993 }
994
995 async fn filter_duplicated_events(
996 &self,
997 linked_chunk_id: LinkedChunkId<'_>,
998 events: Vec<OwnedEventId>,
999 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1000 if events.is_empty() {
1004 return Ok(Vec::new());
1005 }
1006
1007 let hashed_linked_chunk_id =
1009 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1010 let linked_chunk_id = linked_chunk_id.to_owned();
1011
1012 self.acquire()
1013 .await?
1014 .with_transaction(move |txn| -> Result<_> {
1015 txn.chunk_large_query_over(events, None, move |txn, events| {
1016 let query = format!(
1017 r#"
1018 SELECT event_id, chunk_id, position
1019 FROM event_chunks
1020 WHERE linked_chunk_id = ? AND event_id IN ({})
1021 ORDER BY chunk_id ASC, position ASC
1022 "#,
1023 repeat_vars(events.len()),
1024 );
1025
1026 let parameters = params_from_iter(
1027 once(
1029 hashed_linked_chunk_id
1030 .to_sql()
1031 .unwrap(),
1033 )
1034 .chain(events.iter().map(|event| {
1036 event
1037 .as_str()
1038 .to_sql()
1039 .unwrap()
1041 })),
1042 );
1043
1044 let mut duplicated_events = Vec::new();
1045
1046 for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1047 Ok((
1048 row.get::<_, String>(0)?,
1049 row.get::<_, u64>(1)?,
1050 row.get::<_, usize>(2)?,
1051 ))
1052 })? {
1053 let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1054
1055 let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1056 error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1059 continue;
1060 };
1061
1062 duplicated_events.push((
1063 duplicated_event,
1064 Position::new(ChunkIdentifier::new(chunk_identifier), index),
1065 ));
1066 }
1067
1068 Ok(duplicated_events)
1069 })
1070 })
1071 .await
1072 }
1073
1074 async fn find_event(
1075 &self,
1076 room_id: &RoomId,
1077 event_id: &EventId,
1078 ) -> Result<Option<Event>, Self::Error> {
1079 let event_id = event_id.to_owned();
1080 let this = self.clone();
1081
1082 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1083
1084 self.acquire()
1085 .await?
1086 .with_transaction(move |txn| -> Result<_> {
1087 let Some(event) = txn
1088 .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1089 .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1090 .optional()?
1091 else {
1092 return Ok(None);
1094 };
1095
1096 let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1097
1098 Ok(Some(event))
1099 })
1100 .await
1101 }
1102
1103 async fn find_event_relations(
1104 &self,
1105 room_id: &RoomId,
1106 event_id: &EventId,
1107 filters: Option<&[RelationType]>,
1108 ) -> Result<Vec<Event>, Self::Error> {
1109 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1110
1111 let event_id = event_id.to_owned();
1112 let filters = filters.map(ToOwned::to_owned);
1113 let this = self.clone();
1114
1115 self.acquire()
1116 .await?
1117 .with_transaction(move |txn| -> Result<_> {
1118 let filter_query = if let Some(filters) = compute_filters_string(filters.as_deref())
1119 {
1120 format!(
1121 " AND rel_type IN ({})",
1122 filters
1123 .into_iter()
1124 .map(|f| format!(r#""{f}""#))
1125 .collect::<Vec<_>>()
1126 .join(", ")
1127 )
1128 } else {
1129 "".to_owned()
1130 };
1131
1132 let query = format!(
1133 "SELECT content FROM events WHERE relates_to = ? AND room_id = ? {filter_query}"
1134 );
1135
1136 let mut related = Vec::new();
1138 for ev in
1139 txn.prepare(&query)?.query_map((event_id.as_str(), hashed_room_id), |row| {
1140 row.get::<_, Vec<u8>>(0)
1141 })?
1142 {
1143 let ev = ev?;
1144 let ev = serde_json::from_slice(&this.decode_value(&ev)?)?;
1145 related.push(ev);
1146 }
1147
1148 Ok(related)
1149 })
1150 .await
1151 }
1152
1153 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1154 let Some(event_id) = event.event_id() else {
1155 error!(%room_id, "Trying to save an event with no ID");
1156 return Ok(());
1157 };
1158
1159 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1160 let event_id = event_id.to_string();
1161 let encoded_event = self.encode_event(&event)?;
1162
1163 self.acquire()
1164 .await?
1165 .with_transaction(move |txn| -> Result<_> {
1166 txn.execute(
1167 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
1168 , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1169
1170 Ok(())
1171 })
1172 .await
1173 }
1174
1175 async fn add_media_content(
1176 &self,
1177 request: &MediaRequestParameters,
1178 content: Vec<u8>,
1179 ignore_policy: IgnoreMediaRetentionPolicy,
1180 ) -> Result<()> {
1181 self.media_service.add_media_content(self, request, content, ignore_policy).await
1182 }
1183
1184 async fn replace_media_key(
1185 &self,
1186 from: &MediaRequestParameters,
1187 to: &MediaRequestParameters,
1188 ) -> Result<(), Self::Error> {
1189 let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
1190 let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
1191
1192 let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
1193 let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
1194
1195 let conn = self.acquire().await?;
1196 conn.execute(
1197 r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
1198 (new_uri, new_format, prev_uri, prev_format),
1199 )
1200 .await?;
1201
1202 Ok(())
1203 }
1204
1205 async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
1206 self.media_service.get_media_content(self, request).await
1207 }
1208
1209 async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
1210 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1211 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1212
1213 let conn = self.acquire().await?;
1214 conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
1215
1216 Ok(())
1217 }
1218
1219 async fn get_media_content_for_uri(
1220 &self,
1221 uri: &MxcUri,
1222 ) -> Result<Option<Vec<u8>>, Self::Error> {
1223 self.media_service.get_media_content_for_uri(self, uri).await
1224 }
1225
1226 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
1227 let uri = self.encode_key(keys::MEDIA, uri);
1228
1229 let conn = self.acquire().await?;
1230 conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
1231
1232 Ok(())
1233 }
1234
1235 async fn set_media_retention_policy(
1236 &self,
1237 policy: MediaRetentionPolicy,
1238 ) -> Result<(), Self::Error> {
1239 self.media_service.set_media_retention_policy(self, policy).await
1240 }
1241
1242 fn media_retention_policy(&self) -> MediaRetentionPolicy {
1243 self.media_service.media_retention_policy()
1244 }
1245
1246 async fn set_ignore_media_retention_policy(
1247 &self,
1248 request: &MediaRequestParameters,
1249 ignore_policy: IgnoreMediaRetentionPolicy,
1250 ) -> Result<(), Self::Error> {
1251 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
1252 }
1253
1254 async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
1255 self.media_service.clean_up_media_cache(self).await
1256 }
1257}
1258
1259#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1260#[cfg_attr(not(target_family = "wasm"), async_trait)]
1261impl EventCacheStoreMedia for SqliteEventCacheStore {
1262 type Error = Error;
1263
1264 async fn media_retention_policy_inner(
1265 &self,
1266 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
1267 let conn = self.acquire().await?;
1268 conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
1269 }
1270
1271 async fn set_media_retention_policy_inner(
1272 &self,
1273 policy: MediaRetentionPolicy,
1274 ) -> Result<(), Self::Error> {
1275 let conn = self.acquire().await?;
1276 conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
1277 Ok(())
1278 }
1279
1280 async fn add_media_content_inner(
1281 &self,
1282 request: &MediaRequestParameters,
1283 data: Vec<u8>,
1284 last_access: SystemTime,
1285 policy: MediaRetentionPolicy,
1286 ignore_policy: IgnoreMediaRetentionPolicy,
1287 ) -> Result<(), Self::Error> {
1288 let ignore_policy = ignore_policy.is_yes();
1289 let data = self.encode_value(data)?;
1290
1291 if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
1292 return Ok(());
1293 }
1294
1295 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1296 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1297 let timestamp = time_to_timestamp(last_access);
1298
1299 let conn = self.acquire().await?;
1300 conn.execute(
1301 "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
1302 (uri, format, data, timestamp, ignore_policy),
1303 )
1304 .await?;
1305
1306 Ok(())
1307 }
1308
1309 async fn set_ignore_media_retention_policy_inner(
1310 &self,
1311 request: &MediaRequestParameters,
1312 ignore_policy: IgnoreMediaRetentionPolicy,
1313 ) -> Result<(), Self::Error> {
1314 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1315 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1316 let ignore_policy = ignore_policy.is_yes();
1317
1318 let conn = self.acquire().await?;
1319 conn.execute(
1320 r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
1321 (ignore_policy, uri, format),
1322 )
1323 .await?;
1324
1325 Ok(())
1326 }
1327
1328 async fn get_media_content_inner(
1329 &self,
1330 request: &MediaRequestParameters,
1331 current_time: SystemTime,
1332 ) -> Result<Option<Vec<u8>>, Self::Error> {
1333 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1334 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1335 let timestamp = time_to_timestamp(current_time);
1336
1337 let conn = self.acquire().await?;
1338 let data = conn
1339 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1340 txn.execute(
1344 "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
1345 (timestamp, &uri, &format),
1346 )?;
1347
1348 txn.query_row::<Vec<u8>, _, _>(
1349 "SELECT data FROM media WHERE uri = ? AND format = ?",
1350 (&uri, &format),
1351 |row| row.get(0),
1352 )
1353 .optional()
1354 })
1355 .await?;
1356
1357 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1358 }
1359
1360 async fn get_media_content_for_uri_inner(
1361 &self,
1362 uri: &MxcUri,
1363 current_time: SystemTime,
1364 ) -> Result<Option<Vec<u8>>, Self::Error> {
1365 let uri = self.encode_key(keys::MEDIA, uri);
1366 let timestamp = time_to_timestamp(current_time);
1367
1368 let conn = self.acquire().await?;
1369 let data = conn
1370 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1371 txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
1375
1376 txn.query_row::<Vec<u8>, _, _>(
1377 "SELECT data FROM media WHERE uri = ?",
1378 (&uri,),
1379 |row| row.get(0),
1380 )
1381 .optional()
1382 })
1383 .await?;
1384
1385 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1386 }
1387
1388 async fn clean_up_media_cache_inner(
1389 &self,
1390 policy: MediaRetentionPolicy,
1391 current_time: SystemTime,
1392 ) -> Result<(), Self::Error> {
1393 if !policy.has_limitations() {
1394 return Ok(());
1396 }
1397
1398 let conn = self.acquire().await?;
1399 let removed = conn
1400 .with_transaction::<_, Error, _>(move |txn| {
1401 let mut removed = false;
1402
1403 if let Some(max_file_size) = policy.computed_max_file_size() {
1405 let count = txn.execute(
1406 "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
1407 (max_file_size,),
1408 )?;
1409
1410 if count > 0 {
1411 removed = true;
1412 }
1413 }
1414
1415 if let Some(last_access_expiry) = policy.last_access_expiry {
1417 let current_timestamp = time_to_timestamp(current_time);
1418 let expiry_secs = last_access_expiry.as_secs();
1419 let count = txn.execute(
1420 "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
1421 (current_timestamp, expiry_secs),
1422 )?;
1423
1424 if count > 0 {
1425 removed = true;
1426 }
1427 }
1428
1429 if let Some(max_cache_size) = policy.max_cache_size {
1431 let cache_size = txn
1434 .query_row(
1435 "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
1436 (),
1437 |row| {
1438 row.get::<_, Option<u64>>(0)
1440 },
1441 )?
1442 .unwrap_or_default();
1443
1444 if cache_size > max_cache_size {
1446 let mut cached_stmt = txn.prepare_cached(
1448 "SELECT rowid, length(data) FROM media \
1449 WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
1450 )?;
1451 let content_sizes = cached_stmt
1452 .query(())?
1453 .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
1454
1455 let mut accumulated_items_size = 0u64;
1456 let mut limit_reached = false;
1457 let mut rows_to_remove = Vec::new();
1458
1459 for result in content_sizes {
1460 let (row_id, size) = match result {
1461 Ok(content_size) => content_size,
1462 Err(error) => {
1463 return Err(error.into());
1464 }
1465 };
1466
1467 if limit_reached {
1468 rows_to_remove.push(row_id);
1469 continue;
1470 }
1471
1472 match accumulated_items_size.checked_add(size) {
1473 Some(acc) if acc > max_cache_size => {
1474 limit_reached = true;
1476 rows_to_remove.push(row_id);
1477 }
1478 Some(acc) => accumulated_items_size = acc,
1479 None => {
1480 limit_reached = true;
1483 rows_to_remove.push(row_id);
1484 }
1485 };
1486 }
1487
1488 if !rows_to_remove.is_empty() {
1489 removed = true;
1490 }
1491
1492 txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
1493 let sql_params = repeat_vars(row_ids.len());
1494 let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
1495 txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
1496 Ok(Vec::<()>::new())
1497 })?;
1498 }
1499 }
1500
1501 txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
1502
1503 Ok(removed)
1504 })
1505 .await?;
1506
1507 if removed {
1510 conn.vacuum().await?;
1511 }
1512
1513 Ok(())
1514 }
1515
1516 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1517 let conn = self.acquire().await?;
1518 conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
1519 }
1520}
1521
1522async fn with_immediate_transaction<
1527 T: Send + 'static,
1528 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1529>(
1530 conn: SqliteAsyncConn,
1531 f: F,
1532) -> Result<T, Error> {
1533 conn.interact(move |conn| -> Result<T, Error> {
1534 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1538
1539 let code = || -> Result<T, Error> {
1540 let txn = conn.transaction()?;
1541 let res = f(&txn)?;
1542 txn.commit()?;
1543 Ok(res)
1544 };
1545
1546 let res = code();
1547
1548 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1551
1552 res
1553 })
1554 .await
1555 .unwrap()
1557}
1558
1559fn insert_chunk(
1560 txn: &Transaction<'_>,
1561 linked_chunk_id: &Key,
1562 previous: Option<u64>,
1563 new: u64,
1564 next: Option<u64>,
1565 type_str: &str,
1566) -> rusqlite::Result<()> {
1567 txn.execute(
1569 r#"
1570 INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1571 VALUES (?, ?, ?, ?, ?)
1572 "#,
1573 (new, linked_chunk_id, previous, next, type_str),
1574 )?;
1575
1576 if let Some(previous) = previous {
1578 txn.execute(
1579 r#"
1580 UPDATE linked_chunks
1581 SET next = ?
1582 WHERE id = ? AND linked_chunk_id = ?
1583 "#,
1584 (new, previous, linked_chunk_id),
1585 )?;
1586 }
1587
1588 if let Some(next) = next {
1590 txn.execute(
1591 r#"
1592 UPDATE linked_chunks
1593 SET previous = ?
1594 WHERE id = ? AND linked_chunk_id = ?
1595 "#,
1596 (new, next, linked_chunk_id),
1597 )?;
1598 }
1599
1600 Ok(())
1601}
1602
1603#[cfg(test)]
1604mod tests {
1605 use std::{
1606 path::PathBuf,
1607 sync::atomic::{AtomicU32, Ordering::SeqCst},
1608 time::Duration,
1609 };
1610
1611 use assert_matches::assert_matches;
1612 use matrix_sdk_base::{
1613 event_cache::{
1614 store::{
1615 integration_tests::{
1616 check_test_event, make_test_event, make_test_event_with_event_id,
1617 },
1618 media::IgnoreMediaRetentionPolicy,
1619 EventCacheStore, EventCacheStoreError,
1620 },
1621 Gap,
1622 },
1623 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1624 event_cache_store_media_integration_tests,
1625 linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1626 media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1627 };
1628 use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1629 use once_cell::sync::Lazy;
1630 use ruma::{event_id, events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1631 use tempfile::{tempdir, TempDir};
1632
1633 use super::SqliteEventCacheStore;
1634 use crate::{event_cache_store::keys, utils::SqliteAsyncConnExt, SqliteStoreConfig};
1635
1636 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1637 static NUM: AtomicU32 = AtomicU32::new(0);
1638
1639 fn new_event_cache_store_workspace() -> PathBuf {
1640 let name = NUM.fetch_add(1, SeqCst).to_string();
1641 TMP_DIR.path().join(name)
1642 }
1643
1644 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1645 let tmpdir_path = new_event_cache_store_workspace();
1646
1647 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1648
1649 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1650 }
1651
1652 event_cache_store_integration_tests!();
1653 event_cache_store_integration_tests_time!();
1654 event_cache_store_media_integration_tests!(with_media_size_tests);
1655
1656 async fn get_event_cache_store_content_sorted_by_last_access(
1657 event_cache_store: &SqliteEventCacheStore,
1658 ) -> Vec<Vec<u8>> {
1659 let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed");
1660 sqlite_db
1661 .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1662 stmt.query(())?.mapped(|row| row.get(0)).collect()
1663 })
1664 .await
1665 .expect("querying media cache content by last access failed")
1666 }
1667
1668 #[async_test]
1669 async fn test_pool_size() {
1670 let tmpdir_path = new_event_cache_store_workspace();
1671 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1672
1673 let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1674
1675 assert_eq!(store.pool.status().max_size, 42);
1676 }
1677
1678 #[async_test]
1679 async fn test_last_access() {
1680 let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1681 let uri = mxc_uri!("mxc://localhost/media");
1682 let file_request = MediaRequestParameters {
1683 source: MediaSource::Plain(uri.to_owned()),
1684 format: MediaFormat::File,
1685 };
1686 let thumbnail_request = MediaRequestParameters {
1687 source: MediaSource::Plain(uri.to_owned()),
1688 format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1689 Method::Crop,
1690 uint!(100),
1691 uint!(100),
1692 )),
1693 };
1694
1695 let content: Vec<u8> = "hello world".into();
1696 let thumbnail_content: Vec<u8> = "hello…".into();
1697
1698 event_cache_store
1700 .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1701 .await
1702 .expect("adding file failed");
1703
1704 tokio::time::sleep(Duration::from_secs(3)).await;
1707
1708 event_cache_store
1709 .add_media_content(
1710 &thumbnail_request,
1711 thumbnail_content.clone(),
1712 IgnoreMediaRetentionPolicy::No,
1713 )
1714 .await
1715 .expect("adding thumbnail failed");
1716
1717 let contents =
1719 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1720
1721 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1722 assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1723 assert_eq!(contents[1], content, "file is not second-to-last access");
1724
1725 tokio::time::sleep(Duration::from_secs(3)).await;
1728
1729 let _ = event_cache_store
1731 .get_media_content(&file_request)
1732 .await
1733 .expect("getting file failed")
1734 .expect("file is missing");
1735
1736 let contents =
1738 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1739
1740 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1741 assert_eq!(contents[0], content, "file is not last access");
1742 assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
1743 }
1744
1745 #[async_test]
1746 async fn test_linked_chunk_new_items_chunk() {
1747 let store = get_event_cache_store().await.expect("creating cache store failed");
1748
1749 let room_id = &DEFAULT_TEST_ROOM_ID;
1750 let linked_chunk_id = LinkedChunkId::Room(room_id);
1751
1752 store
1753 .handle_linked_chunk_updates(
1754 linked_chunk_id,
1755 vec![
1756 Update::NewItemsChunk {
1757 previous: None,
1758 new: ChunkIdentifier::new(42),
1759 next: None, },
1761 Update::NewItemsChunk {
1762 previous: Some(ChunkIdentifier::new(42)),
1763 new: ChunkIdentifier::new(13),
1764 next: Some(ChunkIdentifier::new(37)), },
1767 Update::NewItemsChunk {
1768 previous: Some(ChunkIdentifier::new(13)),
1769 new: ChunkIdentifier::new(37),
1770 next: None,
1771 },
1772 ],
1773 )
1774 .await
1775 .unwrap();
1776
1777 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1778
1779 assert_eq!(chunks.len(), 3);
1780
1781 {
1782 let c = chunks.remove(0);
1784 assert_eq!(c.identifier, ChunkIdentifier::new(13));
1785 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1786 assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1787 assert_matches!(c.content, ChunkContent::Items(events) => {
1788 assert!(events.is_empty());
1789 });
1790
1791 let c = chunks.remove(0);
1792 assert_eq!(c.identifier, ChunkIdentifier::new(37));
1793 assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1794 assert_eq!(c.next, None);
1795 assert_matches!(c.content, ChunkContent::Items(events) => {
1796 assert!(events.is_empty());
1797 });
1798
1799 let c = chunks.remove(0);
1800 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1801 assert_eq!(c.previous, None);
1802 assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1803 assert_matches!(c.content, ChunkContent::Items(events) => {
1804 assert!(events.is_empty());
1805 });
1806 }
1807 }
1808
1809 #[async_test]
1810 async fn test_linked_chunk_new_gap_chunk() {
1811 let store = get_event_cache_store().await.expect("creating cache store failed");
1812
1813 let room_id = &DEFAULT_TEST_ROOM_ID;
1814 let linked_chunk_id = LinkedChunkId::Room(room_id);
1815
1816 store
1817 .handle_linked_chunk_updates(
1818 linked_chunk_id,
1819 vec![Update::NewGapChunk {
1820 previous: None,
1821 new: ChunkIdentifier::new(42),
1822 next: None,
1823 gap: Gap { prev_token: "raclette".to_owned() },
1824 }],
1825 )
1826 .await
1827 .unwrap();
1828
1829 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1830
1831 assert_eq!(chunks.len(), 1);
1832
1833 let c = chunks.remove(0);
1835 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1836 assert_eq!(c.previous, None);
1837 assert_eq!(c.next, None);
1838 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1839 assert_eq!(gap.prev_token, "raclette");
1840 });
1841 }
1842
1843 #[async_test]
1844 async fn test_linked_chunk_replace_item() {
1845 let store = get_event_cache_store().await.expect("creating cache store failed");
1846
1847 let room_id = &DEFAULT_TEST_ROOM_ID;
1848 let linked_chunk_id = LinkedChunkId::Room(room_id);
1849 let event_id = event_id!("$world");
1850
1851 store
1852 .handle_linked_chunk_updates(
1853 linked_chunk_id,
1854 vec![
1855 Update::NewItemsChunk {
1856 previous: None,
1857 new: ChunkIdentifier::new(42),
1858 next: None,
1859 },
1860 Update::PushItems {
1861 at: Position::new(ChunkIdentifier::new(42), 0),
1862 items: vec![
1863 make_test_event(room_id, "hello"),
1864 make_test_event_with_event_id(room_id, "world", Some(event_id)),
1865 ],
1866 },
1867 Update::ReplaceItem {
1868 at: Position::new(ChunkIdentifier::new(42), 1),
1869 item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1870 },
1871 ],
1872 )
1873 .await
1874 .unwrap();
1875
1876 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1877
1878 assert_eq!(chunks.len(), 1);
1879
1880 let c = chunks.remove(0);
1881 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1882 assert_eq!(c.previous, None);
1883 assert_eq!(c.next, None);
1884 assert_matches!(c.content, ChunkContent::Items(events) => {
1885 assert_eq!(events.len(), 2);
1886 check_test_event(&events[0], "hello");
1887 check_test_event(&events[1], "yolo");
1888 });
1889 }
1890
1891 #[async_test]
1892 async fn test_linked_chunk_remove_chunk() {
1893 let store = get_event_cache_store().await.expect("creating cache store failed");
1894
1895 let room_id = &DEFAULT_TEST_ROOM_ID;
1896 let linked_chunk_id = LinkedChunkId::Room(room_id);
1897
1898 store
1899 .handle_linked_chunk_updates(
1900 linked_chunk_id,
1901 vec![
1902 Update::NewGapChunk {
1903 previous: None,
1904 new: ChunkIdentifier::new(42),
1905 next: None,
1906 gap: Gap { prev_token: "raclette".to_owned() },
1907 },
1908 Update::NewGapChunk {
1909 previous: Some(ChunkIdentifier::new(42)),
1910 new: ChunkIdentifier::new(43),
1911 next: None,
1912 gap: Gap { prev_token: "fondue".to_owned() },
1913 },
1914 Update::NewGapChunk {
1915 previous: Some(ChunkIdentifier::new(43)),
1916 new: ChunkIdentifier::new(44),
1917 next: None,
1918 gap: Gap { prev_token: "tartiflette".to_owned() },
1919 },
1920 Update::RemoveChunk(ChunkIdentifier::new(43)),
1921 ],
1922 )
1923 .await
1924 .unwrap();
1925
1926 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1927
1928 assert_eq!(chunks.len(), 2);
1929
1930 let c = chunks.remove(0);
1932 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1933 assert_eq!(c.previous, None);
1934 assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1935 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1936 assert_eq!(gap.prev_token, "raclette");
1937 });
1938
1939 let c = chunks.remove(0);
1940 assert_eq!(c.identifier, ChunkIdentifier::new(44));
1941 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1942 assert_eq!(c.next, None);
1943 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1944 assert_eq!(gap.prev_token, "tartiflette");
1945 });
1946
1947 let gaps = store
1949 .acquire()
1950 .await
1951 .unwrap()
1952 .with_transaction(|txn| -> rusqlite::Result<_> {
1953 let mut gaps = Vec::new();
1954 for data in txn
1955 .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1956 .query_map((), |row| row.get::<_, u64>(0))?
1957 {
1958 gaps.push(data?);
1959 }
1960 Ok(gaps)
1961 })
1962 .await
1963 .unwrap();
1964
1965 assert_eq!(gaps, vec![42, 44]);
1966 }
1967
1968 #[async_test]
1969 async fn test_linked_chunk_push_items() {
1970 let store = get_event_cache_store().await.expect("creating cache store failed");
1971
1972 let room_id = &DEFAULT_TEST_ROOM_ID;
1973 let linked_chunk_id = LinkedChunkId::Room(room_id);
1974
1975 store
1976 .handle_linked_chunk_updates(
1977 linked_chunk_id,
1978 vec![
1979 Update::NewItemsChunk {
1980 previous: None,
1981 new: ChunkIdentifier::new(42),
1982 next: None,
1983 },
1984 Update::PushItems {
1985 at: Position::new(ChunkIdentifier::new(42), 0),
1986 items: vec![
1987 make_test_event(room_id, "hello"),
1988 make_test_event(room_id, "world"),
1989 ],
1990 },
1991 Update::PushItems {
1992 at: Position::new(ChunkIdentifier::new(42), 2),
1993 items: vec![make_test_event(room_id, "who?")],
1994 },
1995 ],
1996 )
1997 .await
1998 .unwrap();
1999
2000 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2001
2002 assert_eq!(chunks.len(), 1);
2003
2004 let c = chunks.remove(0);
2005 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2006 assert_eq!(c.previous, None);
2007 assert_eq!(c.next, None);
2008 assert_matches!(c.content, ChunkContent::Items(events) => {
2009 assert_eq!(events.len(), 3);
2010
2011 check_test_event(&events[0], "hello");
2012 check_test_event(&events[1], "world");
2013 check_test_event(&events[2], "who?");
2014 });
2015 }
2016
2017 #[async_test]
2018 async fn test_linked_chunk_remove_item() {
2019 let store = get_event_cache_store().await.expect("creating cache store failed");
2020
2021 let room_id = *DEFAULT_TEST_ROOM_ID;
2022 let linked_chunk_id = LinkedChunkId::Room(room_id);
2023
2024 store
2025 .handle_linked_chunk_updates(
2026 linked_chunk_id,
2027 vec![
2028 Update::NewItemsChunk {
2029 previous: None,
2030 new: ChunkIdentifier::new(42),
2031 next: None,
2032 },
2033 Update::PushItems {
2034 at: Position::new(ChunkIdentifier::new(42), 0),
2035 items: vec![
2036 make_test_event(room_id, "one"),
2037 make_test_event(room_id, "two"),
2038 make_test_event(room_id, "three"),
2039 make_test_event(room_id, "four"),
2040 make_test_event(room_id, "five"),
2041 make_test_event(room_id, "six"),
2042 ],
2043 },
2044 Update::RemoveItem {
2045 at: Position::new(ChunkIdentifier::new(42), 2), },
2047 ],
2048 )
2049 .await
2050 .unwrap();
2051
2052 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2053
2054 assert_eq!(chunks.len(), 1);
2055
2056 let c = chunks.remove(0);
2057 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2058 assert_eq!(c.previous, None);
2059 assert_eq!(c.next, None);
2060 assert_matches!(c.content, ChunkContent::Items(events) => {
2061 assert_eq!(events.len(), 5);
2062 check_test_event(&events[0], "one");
2063 check_test_event(&events[1], "two");
2064 check_test_event(&events[2], "four");
2065 check_test_event(&events[3], "five");
2066 check_test_event(&events[4], "six");
2067 });
2068
2069 let num_rows: u64 = store
2071 .acquire()
2072 .await
2073 .unwrap()
2074 .with_transaction(move |txn| {
2075 txn.query_row(
2076 "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
2077 (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
2078 |row| row.get(0),
2079 )
2080 })
2081 .await
2082 .unwrap();
2083 assert_eq!(num_rows, 3);
2084 }
2085
2086 #[async_test]
2087 async fn test_linked_chunk_detach_last_items() {
2088 let store = get_event_cache_store().await.expect("creating cache store failed");
2089
2090 let room_id = *DEFAULT_TEST_ROOM_ID;
2091 let linked_chunk_id = LinkedChunkId::Room(room_id);
2092
2093 store
2094 .handle_linked_chunk_updates(
2095 linked_chunk_id,
2096 vec![
2097 Update::NewItemsChunk {
2098 previous: None,
2099 new: ChunkIdentifier::new(42),
2100 next: None,
2101 },
2102 Update::PushItems {
2103 at: Position::new(ChunkIdentifier::new(42), 0),
2104 items: vec![
2105 make_test_event(room_id, "hello"),
2106 make_test_event(room_id, "world"),
2107 make_test_event(room_id, "howdy"),
2108 ],
2109 },
2110 Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2111 ],
2112 )
2113 .await
2114 .unwrap();
2115
2116 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2117
2118 assert_eq!(chunks.len(), 1);
2119
2120 let c = chunks.remove(0);
2121 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2122 assert_eq!(c.previous, None);
2123 assert_eq!(c.next, None);
2124 assert_matches!(c.content, ChunkContent::Items(events) => {
2125 assert_eq!(events.len(), 1);
2126 check_test_event(&events[0], "hello");
2127 });
2128 }
2129
2130 #[async_test]
2131 async fn test_linked_chunk_start_end_reattach_items() {
2132 let store = get_event_cache_store().await.expect("creating cache store failed");
2133
2134 let room_id = *DEFAULT_TEST_ROOM_ID;
2135 let linked_chunk_id = LinkedChunkId::Room(room_id);
2136
2137 store
2141 .handle_linked_chunk_updates(
2142 linked_chunk_id,
2143 vec![
2144 Update::NewItemsChunk {
2145 previous: None,
2146 new: ChunkIdentifier::new(42),
2147 next: None,
2148 },
2149 Update::PushItems {
2150 at: Position::new(ChunkIdentifier::new(42), 0),
2151 items: vec![
2152 make_test_event(room_id, "hello"),
2153 make_test_event(room_id, "world"),
2154 make_test_event(room_id, "howdy"),
2155 ],
2156 },
2157 Update::StartReattachItems,
2158 Update::EndReattachItems,
2159 ],
2160 )
2161 .await
2162 .unwrap();
2163
2164 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2165
2166 assert_eq!(chunks.len(), 1);
2167
2168 let c = chunks.remove(0);
2169 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2170 assert_eq!(c.previous, None);
2171 assert_eq!(c.next, None);
2172 assert_matches!(c.content, ChunkContent::Items(events) => {
2173 assert_eq!(events.len(), 3);
2174 check_test_event(&events[0], "hello");
2175 check_test_event(&events[1], "world");
2176 check_test_event(&events[2], "howdy");
2177 });
2178 }
2179
2180 #[async_test]
2181 async fn test_linked_chunk_clear() {
2182 let store = get_event_cache_store().await.expect("creating cache store failed");
2183
2184 let room_id = *DEFAULT_TEST_ROOM_ID;
2185 let linked_chunk_id = LinkedChunkId::Room(room_id);
2186 let event_0 = make_test_event(room_id, "hello");
2187 let event_1 = make_test_event(room_id, "world");
2188 let event_2 = make_test_event(room_id, "howdy");
2189
2190 store
2191 .handle_linked_chunk_updates(
2192 linked_chunk_id,
2193 vec![
2194 Update::NewItemsChunk {
2195 previous: None,
2196 new: ChunkIdentifier::new(42),
2197 next: None,
2198 },
2199 Update::NewGapChunk {
2200 previous: Some(ChunkIdentifier::new(42)),
2201 new: ChunkIdentifier::new(54),
2202 next: None,
2203 gap: Gap { prev_token: "fondue".to_owned() },
2204 },
2205 Update::PushItems {
2206 at: Position::new(ChunkIdentifier::new(42), 0),
2207 items: vec![event_0.clone(), event_1, event_2],
2208 },
2209 Update::Clear,
2210 ],
2211 )
2212 .await
2213 .unwrap();
2214
2215 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2216 assert!(chunks.is_empty());
2217
2218 store
2220 .acquire()
2221 .await
2222 .unwrap()
2223 .with_transaction(|txn| -> rusqlite::Result<_> {
2224 let num_gaps = txn
2225 .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2226 .query_row((), |row| row.get::<_, u64>(0))?;
2227 assert_eq!(num_gaps, 0);
2228
2229 let num_events = txn
2230 .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2231 .query_row((), |row| row.get::<_, u64>(0))?;
2232 assert_eq!(num_events, 0);
2233
2234 Ok(())
2235 })
2236 .await
2237 .unwrap();
2238
2239 store
2241 .handle_linked_chunk_updates(
2242 linked_chunk_id,
2243 vec![
2244 Update::NewItemsChunk {
2245 previous: None,
2246 new: ChunkIdentifier::new(42),
2247 next: None,
2248 },
2249 Update::PushItems {
2250 at: Position::new(ChunkIdentifier::new(42), 0),
2251 items: vec![event_0],
2252 },
2253 ],
2254 )
2255 .await
2256 .unwrap();
2257 }
2258
2259 #[async_test]
2260 async fn test_linked_chunk_multiple_rooms() {
2261 let store = get_event_cache_store().await.expect("creating cache store failed");
2262
2263 let room1 = room_id!("!realcheeselovers:raclette.fr");
2264 let linked_chunk_id1 = LinkedChunkId::Room(room1);
2265 let room2 = room_id!("!realcheeselovers:fondue.ch");
2266 let linked_chunk_id2 = LinkedChunkId::Room(room2);
2267
2268 store
2272 .handle_linked_chunk_updates(
2273 linked_chunk_id1,
2274 vec![
2275 Update::NewItemsChunk {
2276 previous: None,
2277 new: ChunkIdentifier::new(42),
2278 next: None,
2279 },
2280 Update::PushItems {
2281 at: Position::new(ChunkIdentifier::new(42), 0),
2282 items: vec![
2283 make_test_event(room1, "best cheese is raclette"),
2284 make_test_event(room1, "obviously"),
2285 ],
2286 },
2287 ],
2288 )
2289 .await
2290 .unwrap();
2291
2292 store
2293 .handle_linked_chunk_updates(
2294 linked_chunk_id2,
2295 vec![
2296 Update::NewItemsChunk {
2297 previous: None,
2298 new: ChunkIdentifier::new(42),
2299 next: None,
2300 },
2301 Update::PushItems {
2302 at: Position::new(ChunkIdentifier::new(42), 0),
2303 items: vec![make_test_event(room1, "beaufort is the best")],
2304 },
2305 ],
2306 )
2307 .await
2308 .unwrap();
2309
2310 let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2312 assert_eq!(chunks_room1.len(), 1);
2313
2314 let c = chunks_room1.remove(0);
2315 assert_matches!(c.content, ChunkContent::Items(events) => {
2316 assert_eq!(events.len(), 2);
2317 check_test_event(&events[0], "best cheese is raclette");
2318 check_test_event(&events[1], "obviously");
2319 });
2320
2321 let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2323 assert_eq!(chunks_room2.len(), 1);
2324
2325 let c = chunks_room2.remove(0);
2326 assert_matches!(c.content, ChunkContent::Items(events) => {
2327 assert_eq!(events.len(), 1);
2328 check_test_event(&events[0], "beaufort is the best");
2329 });
2330 }
2331
2332 #[async_test]
2333 async fn test_linked_chunk_update_is_a_transaction() {
2334 let store = get_event_cache_store().await.expect("creating cache store failed");
2335
2336 let room_id = *DEFAULT_TEST_ROOM_ID;
2337 let linked_chunk_id = LinkedChunkId::Room(room_id);
2338
2339 let err = store
2342 .handle_linked_chunk_updates(
2343 linked_chunk_id,
2344 vec![
2345 Update::NewItemsChunk {
2346 previous: None,
2347 new: ChunkIdentifier::new(42),
2348 next: None,
2349 },
2350 Update::NewItemsChunk {
2351 previous: None,
2352 new: ChunkIdentifier::new(42),
2353 next: None,
2354 },
2355 ],
2356 )
2357 .await
2358 .unwrap_err();
2359
2360 assert_matches!(err, crate::error::Error::Sqlite(err) => {
2362 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2363 });
2364
2365 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2369 assert!(chunks.is_empty());
2370 }
2371
2372 #[async_test]
2373 async fn test_filter_duplicate_events_no_events() {
2374 let store = get_event_cache_store().await.expect("creating cache store failed");
2375
2376 let room_id = *DEFAULT_TEST_ROOM_ID;
2377 let linked_chunk_id = LinkedChunkId::Room(room_id);
2378 let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2379 assert!(duplicates.is_empty());
2380 }
2381
2382 #[async_test]
2383 async fn test_load_last_chunk() {
2384 let room_id = room_id!("!r0:matrix.org");
2385 let linked_chunk_id = LinkedChunkId::Room(room_id);
2386 let event = |msg: &str| make_test_event(room_id, msg);
2387 let store = get_event_cache_store().await.expect("creating cache store failed");
2388
2389 {
2391 let (last_chunk, chunk_identifier_generator) =
2392 store.load_last_chunk(linked_chunk_id).await.unwrap();
2393
2394 assert!(last_chunk.is_none());
2395 assert_eq!(chunk_identifier_generator.current(), 0);
2396 }
2397
2398 {
2400 store
2401 .handle_linked_chunk_updates(
2402 linked_chunk_id,
2403 vec![
2404 Update::NewItemsChunk {
2405 previous: None,
2406 new: ChunkIdentifier::new(42),
2407 next: None,
2408 },
2409 Update::PushItems {
2410 at: Position::new(ChunkIdentifier::new(42), 0),
2411 items: vec![event("saucisse de morteau"), event("comté")],
2412 },
2413 ],
2414 )
2415 .await
2416 .unwrap();
2417
2418 let (last_chunk, chunk_identifier_generator) =
2419 store.load_last_chunk(linked_chunk_id).await.unwrap();
2420
2421 assert_matches!(last_chunk, Some(last_chunk) => {
2422 assert_eq!(last_chunk.identifier, 42);
2423 assert!(last_chunk.previous.is_none());
2424 assert!(last_chunk.next.is_none());
2425 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2426 assert_eq!(items.len(), 2);
2427 check_test_event(&items[0], "saucisse de morteau");
2428 check_test_event(&items[1], "comté");
2429 });
2430 });
2431 assert_eq!(chunk_identifier_generator.current(), 42);
2432 }
2433
2434 {
2436 store
2437 .handle_linked_chunk_updates(
2438 linked_chunk_id,
2439 vec![
2440 Update::NewItemsChunk {
2441 previous: Some(ChunkIdentifier::new(42)),
2442 new: ChunkIdentifier::new(7),
2443 next: None,
2444 },
2445 Update::PushItems {
2446 at: Position::new(ChunkIdentifier::new(7), 0),
2447 items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2448 },
2449 ],
2450 )
2451 .await
2452 .unwrap();
2453
2454 let (last_chunk, chunk_identifier_generator) =
2455 store.load_last_chunk(linked_chunk_id).await.unwrap();
2456
2457 assert_matches!(last_chunk, Some(last_chunk) => {
2458 assert_eq!(last_chunk.identifier, 7);
2459 assert_matches!(last_chunk.previous, Some(previous) => {
2460 assert_eq!(previous, 42);
2461 });
2462 assert!(last_chunk.next.is_none());
2463 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2464 assert_eq!(items.len(), 3);
2465 check_test_event(&items[0], "fondue");
2466 check_test_event(&items[1], "gruyère");
2467 check_test_event(&items[2], "mont d'or");
2468 });
2469 });
2470 assert_eq!(chunk_identifier_generator.current(), 42);
2471 }
2472 }
2473
2474 #[async_test]
2475 async fn test_load_last_chunk_with_a_cycle() {
2476 let room_id = room_id!("!r0:matrix.org");
2477 let linked_chunk_id = LinkedChunkId::Room(room_id);
2478 let store = get_event_cache_store().await.expect("creating cache store failed");
2479
2480 store
2481 .handle_linked_chunk_updates(
2482 linked_chunk_id,
2483 vec![
2484 Update::NewItemsChunk {
2485 previous: None,
2486 new: ChunkIdentifier::new(0),
2487 next: None,
2488 },
2489 Update::NewItemsChunk {
2490 previous: Some(ChunkIdentifier::new(0)),
2494 new: ChunkIdentifier::new(1),
2495 next: Some(ChunkIdentifier::new(0)),
2496 },
2497 ],
2498 )
2499 .await
2500 .unwrap();
2501
2502 store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2503 }
2504
2505 #[async_test]
2506 async fn test_load_previous_chunk() {
2507 let room_id = room_id!("!r0:matrix.org");
2508 let linked_chunk_id = LinkedChunkId::Room(room_id);
2509 let event = |msg: &str| make_test_event(room_id, msg);
2510 let store = get_event_cache_store().await.expect("creating cache store failed");
2511
2512 {
2515 let previous_chunk = store
2516 .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2517 .await
2518 .unwrap();
2519
2520 assert!(previous_chunk.is_none());
2521 }
2522
2523 {
2526 store
2527 .handle_linked_chunk_updates(
2528 linked_chunk_id,
2529 vec![Update::NewItemsChunk {
2530 previous: None,
2531 new: ChunkIdentifier::new(42),
2532 next: None,
2533 }],
2534 )
2535 .await
2536 .unwrap();
2537
2538 let previous_chunk =
2539 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2540
2541 assert!(previous_chunk.is_none());
2542 }
2543
2544 {
2546 store
2547 .handle_linked_chunk_updates(
2548 linked_chunk_id,
2549 vec![
2550 Update::NewItemsChunk {
2552 previous: None,
2553 new: ChunkIdentifier::new(7),
2554 next: Some(ChunkIdentifier::new(42)),
2555 },
2556 Update::PushItems {
2557 at: Position::new(ChunkIdentifier::new(7), 0),
2558 items: vec![event("brigand du jorat"), event("morbier")],
2559 },
2560 ],
2561 )
2562 .await
2563 .unwrap();
2564
2565 let previous_chunk =
2566 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2567
2568 assert_matches!(previous_chunk, Some(previous_chunk) => {
2569 assert_eq!(previous_chunk.identifier, 7);
2570 assert!(previous_chunk.previous.is_none());
2571 assert_matches!(previous_chunk.next, Some(next) => {
2572 assert_eq!(next, 42);
2573 });
2574 assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2575 assert_eq!(items.len(), 2);
2576 check_test_event(&items[0], "brigand du jorat");
2577 check_test_event(&items[1], "morbier");
2578 });
2579 });
2580 }
2581 }
2582}
2583
2584#[cfg(test)]
2585mod encrypted_tests {
2586 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2587
2588 use matrix_sdk_base::{
2589 event_cache::store::EventCacheStoreError, event_cache_store_integration_tests,
2590 event_cache_store_integration_tests_time, event_cache_store_media_integration_tests,
2591 };
2592 use once_cell::sync::Lazy;
2593 use tempfile::{tempdir, TempDir};
2594
2595 use super::SqliteEventCacheStore;
2596
2597 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2598 static NUM: AtomicU32 = AtomicU32::new(0);
2599
2600 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2601 let name = NUM.fetch_add(1, SeqCst).to_string();
2602 let tmpdir_path = TMP_DIR.path().join(name);
2603
2604 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2605
2606 Ok(SqliteEventCacheStore::open(
2607 tmpdir_path.to_str().unwrap(),
2608 Some("default_test_password"),
2609 )
2610 .await
2611 .unwrap())
2612 }
2613
2614 event_cache_store_integration_tests!();
2615 event_cache_store_integration_tests_time!();
2616 event_cache_store_media_integration_tests!();
2617}