1use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22 deserialized_responses::TimelineEvent,
23 event_cache::{
24 store::{
25 compute_filters_string, extract_event_relation,
26 media::{
27 EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
28 MediaService,
29 },
30 EventCacheStore,
31 },
32 Event, Gap,
33 },
34 linked_chunk::{
35 ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, Position, RawChunk, Update,
36 },
37 media::{MediaRequestParameters, UniqueKey},
38};
39use matrix_sdk_store_encryption::StoreCipher;
40use ruma::{
41 events::relation::RelationType, time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri,
42 OwnedEventId, RoomId,
43};
44use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
45use tokio::fs;
46use tracing::{debug, error, trace};
47
48use crate::{
49 error::{Error, Result},
50 utils::{
51 repeat_vars, time_to_timestamp, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52 SqliteKeyValueStoreConnExt, SqliteTransactionExt,
53 },
54 OpenStoreError, SqliteStoreConfig,
55};
56
57mod keys {
58 pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
60 pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
61
62 pub const LINKED_CHUNKS: &str = "linked_chunks";
64 pub const MEDIA: &str = "media";
65}
66
67const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
69
70const DATABASE_VERSION: u8 = 7;
76
77const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
80const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
83
84#[derive(Clone)]
86pub struct SqliteEventCacheStore {
87 store_cipher: Option<Arc<StoreCipher>>,
88 pool: SqlitePool,
89 media_service: MediaService,
90}
91
92#[cfg(not(tarpaulin_include))]
93impl fmt::Debug for SqliteEventCacheStore {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
96 }
97}
98
99impl SqliteEventCacheStore {
100 pub async fn open(
103 path: impl AsRef<Path>,
104 passphrase: Option<&str>,
105 ) -> Result<Self, OpenStoreError> {
106 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
107 }
108
109 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
111 let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
112
113 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
114
115 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
116 config.pool = Some(pool_config);
117
118 let pool = config.create_pool(Runtime::Tokio1)?;
119
120 let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
121 this.pool.get().await?.apply_runtime_config(runtime_config).await?;
122
123 Ok(this)
124 }
125
126 async fn open_with_pool(
129 pool: SqlitePool,
130 passphrase: Option<&str>,
131 ) -> Result<Self, OpenStoreError> {
132 let conn = pool.get().await?;
133
134 let version = conn.db_version().await?;
135 run_migrations(&conn, version).await?;
136
137 let store_cipher = match passphrase {
138 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
139 None => None,
140 };
141
142 let media_service = MediaService::new();
143 let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
144 let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
145 media_service.restore(media_retention_policy, last_media_cleanup_time);
146
147 Ok(Self { store_cipher, pool, media_service })
148 }
149
150 fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
151 if let Some(key) = &self.store_cipher {
152 let encrypted = key.encrypt_value_data(value)?;
153 Ok(rmp_serde::to_vec_named(&encrypted)?)
154 } else {
155 Ok(value)
156 }
157 }
158
159 fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
160 if let Some(key) = &self.store_cipher {
161 let encrypted = rmp_serde::from_slice(value)?;
162 let decrypted = key.decrypt_value_data(encrypted)?;
163 Ok(Cow::Owned(decrypted))
164 } else {
165 Ok(Cow::Borrowed(value))
166 }
167 }
168
169 fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
170 let bytes = key.as_ref();
171 if let Some(store_cipher) = &self.store_cipher {
172 Key::Hashed(store_cipher.hash_key(table_name, bytes))
173 } else {
174 Key::Plain(bytes.to_owned())
175 }
176 }
177
178 async fn acquire(&self) -> Result<SqliteAsyncConn> {
179 let connection = self.pool.get().await?;
180
181 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
186
187 Ok(connection)
188 }
189
190 fn map_row_to_chunk(
191 row: &rusqlite::Row<'_>,
192 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
193 Ok((
194 row.get::<_, u64>(0)?,
195 row.get::<_, Option<u64>>(1)?,
196 row.get::<_, Option<u64>>(2)?,
197 row.get::<_, String>(3)?,
198 ))
199 }
200
201 fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
202 let serialized = serde_json::to_vec(event)?;
203
204 let raw_event = event.raw();
206 let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
207
208 let content = self.encode_value(serialized)?;
210
211 Ok(EncodedEvent {
212 content,
213 rel_type,
214 relates_to: relates_to.map(|relates_to| relates_to.to_string()),
215 })
216 }
217}
218
219struct EncodedEvent {
220 content: Vec<u8>,
221 rel_type: Option<String>,
222 relates_to: Option<String>,
223}
224
225trait TransactionExtForLinkedChunks {
226 fn rebuild_chunk(
227 &self,
228 store: &SqliteEventCacheStore,
229 room_id: &Key,
230 previous: Option<u64>,
231 index: u64,
232 next: Option<u64>,
233 chunk_type: &str,
234 ) -> Result<RawChunk<Event, Gap>>;
235
236 fn load_gap_content(
237 &self,
238 store: &SqliteEventCacheStore,
239 room_id: &Key,
240 chunk_id: ChunkIdentifier,
241 ) -> Result<Gap>;
242
243 fn load_events_content(
244 &self,
245 store: &SqliteEventCacheStore,
246 room_id: &Key,
247 chunk_id: ChunkIdentifier,
248 ) -> Result<Vec<Event>>;
249}
250
251impl TransactionExtForLinkedChunks for Transaction<'_> {
252 fn rebuild_chunk(
253 &self,
254 store: &SqliteEventCacheStore,
255 room_id: &Key,
256 previous: Option<u64>,
257 id: u64,
258 next: Option<u64>,
259 chunk_type: &str,
260 ) -> Result<RawChunk<Event, Gap>> {
261 let previous = previous.map(ChunkIdentifier::new);
262 let next = next.map(ChunkIdentifier::new);
263 let id = ChunkIdentifier::new(id);
264
265 match chunk_type {
266 CHUNK_TYPE_GAP_TYPE_STRING => {
267 let gap = self.load_gap_content(store, room_id, id)?;
269 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
270 }
271
272 CHUNK_TYPE_EVENT_TYPE_STRING => {
273 let events = self.load_events_content(store, room_id, id)?;
275 Ok(RawChunk {
276 content: ChunkContent::Items(events),
277 previous,
278 identifier: id,
279 next,
280 })
281 }
282
283 other => {
284 Err(Error::InvalidData {
286 details: format!("a linked chunk has an unknown type {other}"),
287 })
288 }
289 }
290 }
291
292 fn load_gap_content(
293 &self,
294 store: &SqliteEventCacheStore,
295 room_id: &Key,
296 chunk_id: ChunkIdentifier,
297 ) -> Result<Gap> {
298 let encoded_prev_token: Vec<u8> = self.query_row(
301 "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND room_id = ?",
302 (chunk_id.index(), &room_id),
303 |row| row.get(0),
304 )?;
305 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
306 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
307 Ok(Gap { prev_token })
308 }
309
310 fn load_events_content(
311 &self,
312 store: &SqliteEventCacheStore,
313 room_id: &Key,
314 chunk_id: ChunkIdentifier,
315 ) -> Result<Vec<Event>> {
316 let mut events = Vec::new();
318
319 for event_data in self
320 .prepare(
321 r#"
322 SELECT events.content
323 FROM event_chunks ec
324 INNER JOIN events USING (event_id, room_id)
325 WHERE ec.chunk_id = ? AND ec.room_id = ?
326 ORDER BY ec.position ASC
327 "#,
328 )?
329 .query_map((chunk_id.index(), &room_id), |row| row.get::<_, Vec<u8>>(0))?
330 {
331 let encoded_content = event_data?;
332 let serialized_content = store.decode_value(&encoded_content)?;
333 let event = serde_json::from_slice(&serialized_content)?;
334
335 events.push(event);
336 }
337
338 Ok(events)
339 }
340}
341
342async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
344 if version == 0 {
345 debug!("Creating database");
346 } else if version < DATABASE_VERSION {
347 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
348 } else {
349 return Ok(());
350 }
351
352 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
354
355 if version < 1 {
356 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
359 conn.with_transaction(|txn| {
360 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
361 txn.set_db_version(1)
362 })
363 .await?;
364 }
365
366 if version < 2 {
367 conn.with_transaction(|txn| {
368 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
369 txn.set_db_version(2)
370 })
371 .await?;
372 }
373
374 if version < 3 {
375 conn.with_transaction(|txn| {
376 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
377 txn.set_db_version(3)
378 })
379 .await?;
380 }
381
382 if version < 4 {
383 conn.with_transaction(|txn| {
384 txn.execute_batch(include_str!(
385 "../migrations/event_cache_store/004_ignore_policy.sql"
386 ))?;
387 txn.set_db_version(4)
388 })
389 .await?;
390 }
391
392 if version < 5 {
393 conn.with_transaction(|txn| {
394 txn.execute_batch(include_str!(
395 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
396 ))?;
397 txn.set_db_version(5)
398 })
399 .await?;
400 }
401
402 if version < 6 {
403 conn.with_transaction(|txn| {
404 txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
405 txn.set_db_version(6)
406 })
407 .await?;
408 }
409
410 if version < 7 {
411 conn.with_transaction(|txn| {
412 txn.execute_batch(include_str!(
413 "../migrations/event_cache_store/007_event_chunks.sql"
414 ))?;
415 txn.set_db_version(7)
416 })
417 .await?;
418 }
419
420 Ok(())
421}
422
423#[async_trait]
424impl EventCacheStore for SqliteEventCacheStore {
425 type Error = Error;
426
427 async fn try_take_leased_lock(
428 &self,
429 lease_duration_ms: u32,
430 key: &str,
431 holder: &str,
432 ) -> Result<bool> {
433 let key = key.to_owned();
434 let holder = holder.to_owned();
435
436 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
437 let expiration = now + lease_duration_ms as u64;
438
439 let num_touched = self
440 .acquire()
441 .await?
442 .with_transaction(move |txn| {
443 txn.execute(
444 "INSERT INTO lease_locks (key, holder, expiration)
445 VALUES (?1, ?2, ?3)
446 ON CONFLICT (key)
447 DO
448 UPDATE SET holder = ?2, expiration = ?3
449 WHERE holder = ?2
450 OR expiration < ?4
451 ",
452 (key, holder, expiration, now),
453 )
454 })
455 .await?;
456
457 Ok(num_touched == 1)
458 }
459
460 async fn handle_linked_chunk_updates(
461 &self,
462 room_id: &RoomId,
463 updates: Vec<Update<Event, Gap>>,
464 ) -> Result<(), Self::Error> {
465 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
468 let room_id = room_id.to_owned();
469 let this = self.clone();
470
471 with_immediate_transaction(self.acquire().await?, move |txn| {
472 for up in updates {
473 match up {
474 Update::NewItemsChunk { previous, new, next } => {
475 let previous = previous.as_ref().map(ChunkIdentifier::index);
476 let new = new.index();
477 let next = next.as_ref().map(ChunkIdentifier::index);
478
479 trace!(
480 %room_id,
481 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
482 );
483
484 insert_chunk(
485 txn,
486 &hashed_room_id,
487 previous,
488 new,
489 next,
490 CHUNK_TYPE_EVENT_TYPE_STRING,
491 )?;
492 }
493
494 Update::NewGapChunk { previous, new, next, gap } => {
495 let serialized = serde_json::to_vec(&gap.prev_token)?;
496 let prev_token = this.encode_value(serialized)?;
497
498 let previous = previous.as_ref().map(ChunkIdentifier::index);
499 let new = new.index();
500 let next = next.as_ref().map(ChunkIdentifier::index);
501
502 trace!(
503 %room_id,
504 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
505 );
506
507 insert_chunk(
509 txn,
510 &hashed_room_id,
511 previous,
512 new,
513 next,
514 CHUNK_TYPE_GAP_TYPE_STRING,
515 )?;
516
517 txn.execute(
519 r#"
520 INSERT INTO gap_chunks(chunk_id, room_id, prev_token)
521 VALUES (?, ?, ?)
522 "#,
523 (new, &hashed_room_id, prev_token),
524 )?;
525 }
526
527 Update::RemoveChunk(chunk_identifier) => {
528 let chunk_id = chunk_identifier.index();
529
530 trace!(%room_id, "removing chunk @ {chunk_id}");
531
532 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
534 "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?",
535 (chunk_id, &hashed_room_id),
536 |row| Ok((row.get(0)?, row.get(1)?))
537 )?;
538
539 if let Some(previous) = previous {
541 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?;
542 }
543
544 if let Some(next) = next {
546 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?;
547 }
548
549 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, &hashed_room_id))?;
552 }
553
554 Update::PushItems { at, items } => {
555 if items.is_empty() {
556 continue;
558 }
559
560 let chunk_id = at.chunk_identifier().index();
561
562 trace!(%room_id, "pushing {} items @ {chunk_id}", items.len());
563
564 let mut chunk_statement = txn.prepare(
565 "INSERT INTO event_chunks(chunk_id, room_id, event_id, position) VALUES (?, ?, ?, ?)"
566 )?;
567
568 let mut content_statement = txn.prepare(
573 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
574 )?;
575
576 let invalid_event = |event: TimelineEvent| {
577 let Some(event_id) = event.event_id() else {
578 error!(%room_id, "Trying to push an event with no ID");
579 return None;
580 };
581
582 Some((event_id.to_string(), event))
583 };
584
585 for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
586 let index = at.index() + i;
588 chunk_statement.execute((chunk_id, &hashed_room_id, &event_id, index))?;
589
590 let encoded_event = this.encode_event(&event)?;
592 content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
593 }
594 }
595
596 Update::ReplaceItem { at, item: event } => {
597 let chunk_id = at.chunk_identifier().index();
598
599 let index = at.index();
600
601 trace!(%room_id, "replacing item @ {chunk_id}:{index}");
602
603 let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
605 error!(%room_id, "Trying to replace an event with a new one that has no ID");
606 continue;
607 };
608
609 let encoded_event = this.encode_event(&event)?;
613 txn.execute(
614 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
615 , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
616
617 txn.execute(
619 r#"UPDATE event_chunks SET event_id = ? WHERE room_id = ? AND chunk_id = ? AND position = ?"#,
620 (event_id, &hashed_room_id, chunk_id, index)
621 )?;
622 }
623
624 Update::RemoveItem { at } => {
625 let chunk_id = at.chunk_identifier().index();
626 let index = at.index();
627
628 trace!(%room_id, "removing item @ {chunk_id}:{index}");
629
630 txn.execute("DELETE FROM event_chunks WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?;
632
633 txn.execute(
635 r#"
636 UPDATE event_chunks
637 SET position = position - 1
638 WHERE room_id = ? AND chunk_id = ? AND position > ?
639 "#,
640 (&hashed_room_id, chunk_id, index)
641 )?;
642
643 }
644
645 Update::DetachLastItems { at } => {
646 let chunk_id = at.chunk_identifier().index();
647 let index = at.index();
648
649 trace!(%room_id, "truncating items >= {chunk_id}:{index}");
650
651 txn.execute("DELETE FROM event_chunks WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?;
653 }
654
655 Update::Clear => {
656 trace!(%room_id, "clearing items");
657
658 txn.execute(
660 "DELETE FROM linked_chunks WHERE room_id = ?",
661 (&hashed_room_id,),
662 )?;
663 }
664
665 Update::StartReattachItems | Update::EndReattachItems => {
666 }
668 }
669 }
670
671 Ok(())
672 })
673 .await?;
674
675 Ok(())
676 }
677
678 async fn load_all_chunks(
679 &self,
680 room_id: &RoomId,
681 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
682 let room_id = room_id.to_owned();
683 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
684
685 let this = self.clone();
686
687 let result = self
688 .acquire()
689 .await?
690 .with_transaction(move |txn| -> Result<_> {
691 let mut items = Vec::new();
692
693 for data in txn
695 .prepare(
696 "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? ORDER BY id",
697 )?
698 .query_map((&hashed_room_id,), Self::map_row_to_chunk)?
699 {
700 let (id, previous, next, chunk_type) = data?;
701 let new = txn.rebuild_chunk(
702 &this,
703 &hashed_room_id,
704 previous,
705 id,
706 next,
707 chunk_type.as_str(),
708 )?;
709 items.push(new);
710 }
711
712 Ok(items)
713 })
714 .await?;
715
716 Ok(result)
717 }
718
719 async fn load_last_chunk(
720 &self,
721 room_id: &RoomId,
722 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
723 let room_id = room_id.to_owned();
724 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
725
726 let this = self.clone();
727
728 self
729 .acquire()
730 .await?
731 .with_transaction(move |txn| -> Result<_> {
732 let (chunk_identifier_generator, number_of_chunks) = txn
734 .prepare(
735 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE room_id = ?"
736 )?
737 .query_row(
738 (&hashed_room_id,),
739 |row| {
740 Ok((
741 row.get::<_, Option<u64>>(0)?,
746 row.get::<_, u64>(1)?,
747 ))
748 }
749 )?;
750
751 let chunk_identifier_generator = match chunk_identifier_generator {
752 Some(last_chunk_identifier) => {
753 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
754 ChunkIdentifier::new(last_chunk_identifier)
755 )
756 },
757 None => ChunkIdentifierGenerator::new_from_scratch(),
758 };
759
760 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
762 .prepare(
763 "SELECT id, previous, type FROM linked_chunks WHERE room_id = ? AND next IS NULL"
764 )?
765 .query_row(
766 (&hashed_room_id,),
767 |row| {
768 Ok((
769 row.get::<_, u64>(0)?,
770 row.get::<_, Option<u64>>(1)?,
771 row.get::<_, String>(2)?,
772 ))
773 }
774 )
775 .optional()?
776 else {
777 if number_of_chunks == 0 {
780 return Ok((None, chunk_identifier_generator));
781 }
782 else {
787 return Err(Error::InvalidData {
788 details:
789 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
790 .to_owned()
791 }
792 )
793 }
794 };
795
796 let last_chunk = txn.rebuild_chunk(
798 &this,
799 &hashed_room_id,
800 previous_chunk,
801 chunk_identifier,
802 None,
803 &chunk_type
804 )?;
805
806 Ok((Some(last_chunk), chunk_identifier_generator))
807 })
808 .await
809 }
810
811 async fn load_previous_chunk(
812 &self,
813 room_id: &RoomId,
814 before_chunk_identifier: ChunkIdentifier,
815 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
816 let room_id = room_id.to_owned();
817 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
818
819 let this = self.clone();
820
821 self
822 .acquire()
823 .await?
824 .with_transaction(move |txn| -> Result<_> {
825 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
827 .prepare(
828 "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? AND next = ?"
829 )?
830 .query_row(
831 (&hashed_room_id, before_chunk_identifier.index()),
832 |row| {
833 Ok((
834 row.get::<_, u64>(0)?,
835 row.get::<_, Option<u64>>(1)?,
836 row.get::<_, Option<u64>>(2)?,
837 row.get::<_, String>(3)?,
838 ))
839 }
840 )
841 .optional()?
842 else {
843 return Ok(None);
845 };
846
847 let last_chunk = txn.rebuild_chunk(
849 &this,
850 &hashed_room_id,
851 previous_chunk,
852 chunk_identifier,
853 next_chunk,
854 &chunk_type
855 )?;
856
857 Ok(Some(last_chunk))
858 })
859 .await
860 }
861
862 async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error> {
863 self.acquire()
864 .await?
865 .with_transaction(move |txn| {
866 txn.execute("DELETE FROM linked_chunks", ())?;
868 txn.execute("DELETE FROM events", ())
870 })
871 .await?;
872 Ok(())
873 }
874
875 async fn filter_duplicated_events(
876 &self,
877 room_id: &RoomId,
878 events: Vec<OwnedEventId>,
879 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
880 if events.is_empty() {
884 return Ok(Vec::new());
885 }
886
887 let room_id = room_id.to_owned();
889 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
890
891 self.acquire()
892 .await?
893 .with_transaction(move |txn| -> Result<_> {
894 txn.chunk_large_query_over(events, None, move |txn, events| {
895 let query = format!(
896 r#"
897 SELECT event_id, chunk_id, position
898 FROM event_chunks
899 WHERE room_id = ? AND event_id IN ({})
900 ORDER BY chunk_id ASC, position ASC
901 "#,
902 repeat_vars(events.len()),
903 );
904
905 let parameters = params_from_iter(
906 once(
908 hashed_room_id
909 .to_sql()
910 .unwrap(),
912 )
913 .chain(events.iter().map(|event| {
915 event
916 .as_str()
917 .to_sql()
918 .unwrap()
920 })),
921 );
922
923 let mut duplicated_events = Vec::new();
924
925 for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
926 Ok((
927 row.get::<_, String>(0)?,
928 row.get::<_, u64>(1)?,
929 row.get::<_, usize>(2)?,
930 ))
931 })? {
932 let (duplicated_event, chunk_identifier, index) = duplicated_event?;
933
934 let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
935 error!(%duplicated_event, %room_id, "Reading an malformed event ID");
938 continue;
939 };
940
941 duplicated_events.push((
942 duplicated_event,
943 Position::new(ChunkIdentifier::new(chunk_identifier), index),
944 ));
945 }
946
947 Ok(duplicated_events)
948 })
949 })
950 .await
951 }
952
953 async fn find_event(
954 &self,
955 room_id: &RoomId,
956 event_id: &EventId,
957 ) -> Result<Option<Event>, Self::Error> {
958 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
959 let event_id = event_id.to_owned();
960 let this = self.clone();
961
962 self.acquire()
963 .await?
964 .with_transaction(move |txn| -> Result<_> {
965 let Some(event) = txn
966 .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
967 .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
968 .optional()?
969 else {
970 return Ok(None);
972 };
973
974 let event = serde_json::from_slice(&this.decode_value(&event)?)?;
975
976 Ok(Some(event))
977 })
978 .await
979 }
980
981 async fn find_event_relations(
982 &self,
983 room_id: &RoomId,
984 event_id: &EventId,
985 filters: Option<&[RelationType]>,
986 ) -> Result<Vec<Event>, Self::Error> {
987 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
988 let event_id = event_id.to_owned();
989 let filters = filters.map(ToOwned::to_owned);
990 let this = self.clone();
991
992 self.acquire()
993 .await?
994 .with_transaction(move |txn| -> Result<_> {
995 let filter_query = if let Some(filters) = compute_filters_string(filters.as_deref())
996 {
997 format!(
998 " AND rel_type IN ({})",
999 filters
1000 .into_iter()
1001 .map(|f| format!(r#""{f}""#))
1002 .collect::<Vec<_>>()
1003 .join(", ")
1004 )
1005 } else {
1006 "".to_owned()
1007 };
1008
1009 let query = format!(
1010 "SELECT content FROM events WHERE relates_to = ? AND room_id = ? {}",
1011 filter_query
1012 );
1013
1014 let mut related = Vec::new();
1016 for ev in
1017 txn.prepare(&query)?.query_map((event_id.as_str(), hashed_room_id), |row| {
1018 row.get::<_, Vec<u8>>(0)
1019 })?
1020 {
1021 let ev = ev?;
1022 let ev = serde_json::from_slice(&this.decode_value(&ev)?)?;
1023 related.push(ev);
1024 }
1025
1026 Ok(related)
1027 })
1028 .await
1029 }
1030
1031 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1032 let Some(event_id) = event.event_id() else {
1033 error!(%room_id, "Trying to save an event with no ID");
1034 return Ok(());
1035 };
1036
1037 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1038 let event_id = event_id.to_string();
1039 let encoded_event = self.encode_event(&event)?;
1040
1041 self.acquire()
1042 .await?
1043 .with_transaction(move |txn| -> Result<_> {
1044 txn.execute(
1045 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
1046 , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1047
1048 Ok(())
1049 })
1050 .await
1051 }
1052
1053 async fn add_media_content(
1054 &self,
1055 request: &MediaRequestParameters,
1056 content: Vec<u8>,
1057 ignore_policy: IgnoreMediaRetentionPolicy,
1058 ) -> Result<()> {
1059 self.media_service.add_media_content(self, request, content, ignore_policy).await
1060 }
1061
1062 async fn replace_media_key(
1063 &self,
1064 from: &MediaRequestParameters,
1065 to: &MediaRequestParameters,
1066 ) -> Result<(), Self::Error> {
1067 let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
1068 let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
1069
1070 let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
1071 let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
1072
1073 let conn = self.acquire().await?;
1074 conn.execute(
1075 r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
1076 (new_uri, new_format, prev_uri, prev_format),
1077 )
1078 .await?;
1079
1080 Ok(())
1081 }
1082
1083 async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
1084 self.media_service.get_media_content(self, request).await
1085 }
1086
1087 async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
1088 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1089 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1090
1091 let conn = self.acquire().await?;
1092 conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
1093
1094 Ok(())
1095 }
1096
1097 async fn get_media_content_for_uri(
1098 &self,
1099 uri: &MxcUri,
1100 ) -> Result<Option<Vec<u8>>, Self::Error> {
1101 self.media_service.get_media_content_for_uri(self, uri).await
1102 }
1103
1104 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
1105 let uri = self.encode_key(keys::MEDIA, uri);
1106
1107 let conn = self.acquire().await?;
1108 conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
1109
1110 Ok(())
1111 }
1112
1113 async fn set_media_retention_policy(
1114 &self,
1115 policy: MediaRetentionPolicy,
1116 ) -> Result<(), Self::Error> {
1117 self.media_service.set_media_retention_policy(self, policy).await
1118 }
1119
1120 fn media_retention_policy(&self) -> MediaRetentionPolicy {
1121 self.media_service.media_retention_policy()
1122 }
1123
1124 async fn set_ignore_media_retention_policy(
1125 &self,
1126 request: &MediaRequestParameters,
1127 ignore_policy: IgnoreMediaRetentionPolicy,
1128 ) -> Result<(), Self::Error> {
1129 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
1130 }
1131
1132 async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
1133 self.media_service.clean_up_media_cache(self).await
1134 }
1135}
1136
1137#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1138#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1139impl EventCacheStoreMedia for SqliteEventCacheStore {
1140 type Error = Error;
1141
1142 async fn media_retention_policy_inner(
1143 &self,
1144 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
1145 let conn = self.acquire().await?;
1146 conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
1147 }
1148
1149 async fn set_media_retention_policy_inner(
1150 &self,
1151 policy: MediaRetentionPolicy,
1152 ) -> Result<(), Self::Error> {
1153 let conn = self.acquire().await?;
1154 conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
1155 Ok(())
1156 }
1157
1158 async fn add_media_content_inner(
1159 &self,
1160 request: &MediaRequestParameters,
1161 data: Vec<u8>,
1162 last_access: SystemTime,
1163 policy: MediaRetentionPolicy,
1164 ignore_policy: IgnoreMediaRetentionPolicy,
1165 ) -> Result<(), Self::Error> {
1166 let ignore_policy = ignore_policy.is_yes();
1167 let data = self.encode_value(data)?;
1168
1169 if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
1170 return Ok(());
1171 }
1172
1173 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1174 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1175 let timestamp = time_to_timestamp(last_access);
1176
1177 let conn = self.acquire().await?;
1178 conn.execute(
1179 "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
1180 (uri, format, data, timestamp, ignore_policy),
1181 )
1182 .await?;
1183
1184 Ok(())
1185 }
1186
1187 async fn set_ignore_media_retention_policy_inner(
1188 &self,
1189 request: &MediaRequestParameters,
1190 ignore_policy: IgnoreMediaRetentionPolicy,
1191 ) -> Result<(), Self::Error> {
1192 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1193 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1194 let ignore_policy = ignore_policy.is_yes();
1195
1196 let conn = self.acquire().await?;
1197 conn.execute(
1198 r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
1199 (ignore_policy, uri, format),
1200 )
1201 .await?;
1202
1203 Ok(())
1204 }
1205
1206 async fn get_media_content_inner(
1207 &self,
1208 request: &MediaRequestParameters,
1209 current_time: SystemTime,
1210 ) -> Result<Option<Vec<u8>>, Self::Error> {
1211 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1212 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1213 let timestamp = time_to_timestamp(current_time);
1214
1215 let conn = self.acquire().await?;
1216 let data = conn
1217 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1218 txn.execute(
1222 "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
1223 (timestamp, &uri, &format),
1224 )?;
1225
1226 txn.query_row::<Vec<u8>, _, _>(
1227 "SELECT data FROM media WHERE uri = ? AND format = ?",
1228 (&uri, &format),
1229 |row| row.get(0),
1230 )
1231 .optional()
1232 })
1233 .await?;
1234
1235 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1236 }
1237
1238 async fn get_media_content_for_uri_inner(
1239 &self,
1240 uri: &MxcUri,
1241 current_time: SystemTime,
1242 ) -> Result<Option<Vec<u8>>, Self::Error> {
1243 let uri = self.encode_key(keys::MEDIA, uri);
1244 let timestamp = time_to_timestamp(current_time);
1245
1246 let conn = self.acquire().await?;
1247 let data = conn
1248 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1249 txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
1253
1254 txn.query_row::<Vec<u8>, _, _>(
1255 "SELECT data FROM media WHERE uri = ?",
1256 (&uri,),
1257 |row| row.get(0),
1258 )
1259 .optional()
1260 })
1261 .await?;
1262
1263 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1264 }
1265
1266 async fn clean_up_media_cache_inner(
1267 &self,
1268 policy: MediaRetentionPolicy,
1269 current_time: SystemTime,
1270 ) -> Result<(), Self::Error> {
1271 if !policy.has_limitations() {
1272 return Ok(());
1274 }
1275
1276 let conn = self.acquire().await?;
1277 let removed = conn
1278 .with_transaction::<_, Error, _>(move |txn| {
1279 let mut removed = false;
1280
1281 if let Some(max_file_size) = policy.computed_max_file_size() {
1283 let count = txn.execute(
1284 "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
1285 (max_file_size,),
1286 )?;
1287
1288 if count > 0 {
1289 removed = true;
1290 }
1291 }
1292
1293 if let Some(last_access_expiry) = policy.last_access_expiry {
1295 let current_timestamp = time_to_timestamp(current_time);
1296 let expiry_secs = last_access_expiry.as_secs();
1297 let count = txn.execute(
1298 "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
1299 (current_timestamp, expiry_secs),
1300 )?;
1301
1302 if count > 0 {
1303 removed = true;
1304 }
1305 }
1306
1307 if let Some(max_cache_size) = policy.max_cache_size {
1309 let cache_size = txn
1312 .query_row(
1313 "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
1314 (),
1315 |row| {
1316 row.get::<_, Option<u64>>(0)
1318 },
1319 )?
1320 .unwrap_or_default();
1321
1322 if cache_size > max_cache_size {
1324 let mut cached_stmt = txn.prepare_cached(
1326 "SELECT rowid, length(data) FROM media \
1327 WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
1328 )?;
1329 let content_sizes = cached_stmt
1330 .query(())?
1331 .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
1332
1333 let mut accumulated_items_size = 0u64;
1334 let mut limit_reached = false;
1335 let mut rows_to_remove = Vec::new();
1336
1337 for result in content_sizes {
1338 let (row_id, size) = match result {
1339 Ok(content_size) => content_size,
1340 Err(error) => {
1341 return Err(error.into());
1342 }
1343 };
1344
1345 if limit_reached {
1346 rows_to_remove.push(row_id);
1347 continue;
1348 }
1349
1350 match accumulated_items_size.checked_add(size) {
1351 Some(acc) if acc > max_cache_size => {
1352 limit_reached = true;
1354 rows_to_remove.push(row_id);
1355 }
1356 Some(acc) => accumulated_items_size = acc,
1357 None => {
1358 limit_reached = true;
1361 rows_to_remove.push(row_id);
1362 }
1363 };
1364 }
1365
1366 if !rows_to_remove.is_empty() {
1367 removed = true;
1368 }
1369
1370 txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
1371 let sql_params = repeat_vars(row_ids.len());
1372 let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
1373 txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
1374 Ok(Vec::<()>::new())
1375 })?;
1376 }
1377 }
1378
1379 txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
1380
1381 Ok(removed)
1382 })
1383 .await?;
1384
1385 if removed {
1388 conn.vacuum().await?;
1389 }
1390
1391 Ok(())
1392 }
1393
1394 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1395 let conn = self.acquire().await?;
1396 conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
1397 }
1398}
1399
1400async fn with_immediate_transaction<
1405 T: Send + 'static,
1406 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1407>(
1408 conn: SqliteAsyncConn,
1409 f: F,
1410) -> Result<T, Error> {
1411 conn.interact(move |conn| -> Result<T, Error> {
1412 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1416
1417 let code = || -> Result<T, Error> {
1418 let txn = conn.transaction()?;
1419 let res = f(&txn)?;
1420 txn.commit()?;
1421 Ok(res)
1422 };
1423
1424 let res = code();
1425
1426 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1429
1430 res
1431 })
1432 .await
1433 .unwrap()
1435}
1436
1437fn insert_chunk(
1438 txn: &Transaction<'_>,
1439 room_id: &Key,
1440 previous: Option<u64>,
1441 new: u64,
1442 next: Option<u64>,
1443 type_str: &str,
1444) -> rusqlite::Result<()> {
1445 txn.execute(
1447 r#"
1448 INSERT INTO linked_chunks(id, room_id, previous, next, type)
1449 VALUES (?, ?, ?, ?, ?)
1450 "#,
1451 (new, room_id, previous, next, type_str),
1452 )?;
1453
1454 if let Some(previous) = previous {
1456 txn.execute(
1457 r#"
1458 UPDATE linked_chunks
1459 SET next = ?
1460 WHERE id = ? AND room_id = ?
1461 "#,
1462 (new, previous, room_id),
1463 )?;
1464 }
1465
1466 if let Some(next) = next {
1468 txn.execute(
1469 r#"
1470 UPDATE linked_chunks
1471 SET previous = ?
1472 WHERE id = ? AND room_id = ?
1473 "#,
1474 (new, next, room_id),
1475 )?;
1476 }
1477
1478 Ok(())
1479}
1480
1481#[cfg(test)]
1482mod tests {
1483 use std::{
1484 path::PathBuf,
1485 sync::atomic::{AtomicU32, Ordering::SeqCst},
1486 time::Duration,
1487 };
1488
1489 use assert_matches::assert_matches;
1490 use matrix_sdk_base::{
1491 event_cache::{
1492 store::{
1493 integration_tests::{
1494 check_test_event, make_test_event, make_test_event_with_event_id,
1495 },
1496 media::IgnoreMediaRetentionPolicy,
1497 EventCacheStore, EventCacheStoreError,
1498 },
1499 Gap,
1500 },
1501 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1502 event_cache_store_media_integration_tests,
1503 linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
1504 media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1505 };
1506 use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1507 use once_cell::sync::Lazy;
1508 use ruma::{event_id, events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1509 use tempfile::{tempdir, TempDir};
1510
1511 use super::SqliteEventCacheStore;
1512 use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
1513
1514 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1515 static NUM: AtomicU32 = AtomicU32::new(0);
1516
1517 fn new_event_cache_store_workspace() -> PathBuf {
1518 let name = NUM.fetch_add(1, SeqCst).to_string();
1519 TMP_DIR.path().join(name)
1520 }
1521
1522 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1523 let tmpdir_path = new_event_cache_store_workspace();
1524
1525 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1526
1527 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1528 }
1529
1530 event_cache_store_integration_tests!();
1531 event_cache_store_integration_tests_time!();
1532 event_cache_store_media_integration_tests!(with_media_size_tests);
1533
1534 async fn get_event_cache_store_content_sorted_by_last_access(
1535 event_cache_store: &SqliteEventCacheStore,
1536 ) -> Vec<Vec<u8>> {
1537 let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed");
1538 sqlite_db
1539 .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1540 stmt.query(())?.mapped(|row| row.get(0)).collect()
1541 })
1542 .await
1543 .expect("querying media cache content by last access failed")
1544 }
1545
1546 #[async_test]
1547 async fn test_pool_size() {
1548 let tmpdir_path = new_event_cache_store_workspace();
1549 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1550
1551 let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1552
1553 assert_eq!(store.pool.status().max_size, 42);
1554 }
1555
1556 #[async_test]
1557 async fn test_last_access() {
1558 let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1559 let uri = mxc_uri!("mxc://localhost/media");
1560 let file_request = MediaRequestParameters {
1561 source: MediaSource::Plain(uri.to_owned()),
1562 format: MediaFormat::File,
1563 };
1564 let thumbnail_request = MediaRequestParameters {
1565 source: MediaSource::Plain(uri.to_owned()),
1566 format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1567 Method::Crop,
1568 uint!(100),
1569 uint!(100),
1570 )),
1571 };
1572
1573 let content: Vec<u8> = "hello world".into();
1574 let thumbnail_content: Vec<u8> = "hello…".into();
1575
1576 event_cache_store
1578 .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1579 .await
1580 .expect("adding file failed");
1581
1582 tokio::time::sleep(Duration::from_secs(3)).await;
1585
1586 event_cache_store
1587 .add_media_content(
1588 &thumbnail_request,
1589 thumbnail_content.clone(),
1590 IgnoreMediaRetentionPolicy::No,
1591 )
1592 .await
1593 .expect("adding thumbnail failed");
1594
1595 let contents =
1597 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1598
1599 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1600 assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1601 assert_eq!(contents[1], content, "file is not second-to-last access");
1602
1603 tokio::time::sleep(Duration::from_secs(3)).await;
1606
1607 let _ = event_cache_store
1609 .get_media_content(&file_request)
1610 .await
1611 .expect("getting file failed")
1612 .expect("file is missing");
1613
1614 let contents =
1616 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1617
1618 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1619 assert_eq!(contents[0], content, "file is not last access");
1620 assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
1621 }
1622
1623 #[async_test]
1624 async fn test_linked_chunk_new_items_chunk() {
1625 let store = get_event_cache_store().await.expect("creating cache store failed");
1626
1627 let room_id = &DEFAULT_TEST_ROOM_ID;
1628
1629 store
1630 .handle_linked_chunk_updates(
1631 room_id,
1632 vec![
1633 Update::NewItemsChunk {
1634 previous: None,
1635 new: ChunkIdentifier::new(42),
1636 next: None, },
1638 Update::NewItemsChunk {
1639 previous: Some(ChunkIdentifier::new(42)),
1640 new: ChunkIdentifier::new(13),
1641 next: Some(ChunkIdentifier::new(37)), },
1644 Update::NewItemsChunk {
1645 previous: Some(ChunkIdentifier::new(13)),
1646 new: ChunkIdentifier::new(37),
1647 next: None,
1648 },
1649 ],
1650 )
1651 .await
1652 .unwrap();
1653
1654 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1655
1656 assert_eq!(chunks.len(), 3);
1657
1658 {
1659 let c = chunks.remove(0);
1661 assert_eq!(c.identifier, ChunkIdentifier::new(13));
1662 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1663 assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1664 assert_matches!(c.content, ChunkContent::Items(events) => {
1665 assert!(events.is_empty());
1666 });
1667
1668 let c = chunks.remove(0);
1669 assert_eq!(c.identifier, ChunkIdentifier::new(37));
1670 assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1671 assert_eq!(c.next, None);
1672 assert_matches!(c.content, ChunkContent::Items(events) => {
1673 assert!(events.is_empty());
1674 });
1675
1676 let c = chunks.remove(0);
1677 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1678 assert_eq!(c.previous, None);
1679 assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1680 assert_matches!(c.content, ChunkContent::Items(events) => {
1681 assert!(events.is_empty());
1682 });
1683 }
1684 }
1685
1686 #[async_test]
1687 async fn test_linked_chunk_new_gap_chunk() {
1688 let store = get_event_cache_store().await.expect("creating cache store failed");
1689
1690 let room_id = &DEFAULT_TEST_ROOM_ID;
1691
1692 store
1693 .handle_linked_chunk_updates(
1694 room_id,
1695 vec![Update::NewGapChunk {
1696 previous: None,
1697 new: ChunkIdentifier::new(42),
1698 next: None,
1699 gap: Gap { prev_token: "raclette".to_owned() },
1700 }],
1701 )
1702 .await
1703 .unwrap();
1704
1705 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1706
1707 assert_eq!(chunks.len(), 1);
1708
1709 let c = chunks.remove(0);
1711 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1712 assert_eq!(c.previous, None);
1713 assert_eq!(c.next, None);
1714 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1715 assert_eq!(gap.prev_token, "raclette");
1716 });
1717 }
1718
1719 #[async_test]
1720 async fn test_linked_chunk_replace_item() {
1721 let store = get_event_cache_store().await.expect("creating cache store failed");
1722
1723 let room_id = &DEFAULT_TEST_ROOM_ID;
1724 let event_id = event_id!("$world");
1725
1726 store
1727 .handle_linked_chunk_updates(
1728 room_id,
1729 vec![
1730 Update::NewItemsChunk {
1731 previous: None,
1732 new: ChunkIdentifier::new(42),
1733 next: None,
1734 },
1735 Update::PushItems {
1736 at: Position::new(ChunkIdentifier::new(42), 0),
1737 items: vec![
1738 make_test_event(room_id, "hello"),
1739 make_test_event_with_event_id(room_id, "world", Some(event_id)),
1740 ],
1741 },
1742 Update::ReplaceItem {
1743 at: Position::new(ChunkIdentifier::new(42), 1),
1744 item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1745 },
1746 ],
1747 )
1748 .await
1749 .unwrap();
1750
1751 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1752
1753 assert_eq!(chunks.len(), 1);
1754
1755 let c = chunks.remove(0);
1756 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1757 assert_eq!(c.previous, None);
1758 assert_eq!(c.next, None);
1759 assert_matches!(c.content, ChunkContent::Items(events) => {
1760 assert_eq!(events.len(), 2);
1761 check_test_event(&events[0], "hello");
1762 check_test_event(&events[1], "yolo");
1763 });
1764 }
1765
1766 #[async_test]
1767 async fn test_linked_chunk_remove_chunk() {
1768 let store = get_event_cache_store().await.expect("creating cache store failed");
1769
1770 let room_id = &DEFAULT_TEST_ROOM_ID;
1771
1772 store
1773 .handle_linked_chunk_updates(
1774 room_id,
1775 vec![
1776 Update::NewGapChunk {
1777 previous: None,
1778 new: ChunkIdentifier::new(42),
1779 next: None,
1780 gap: Gap { prev_token: "raclette".to_owned() },
1781 },
1782 Update::NewGapChunk {
1783 previous: Some(ChunkIdentifier::new(42)),
1784 new: ChunkIdentifier::new(43),
1785 next: None,
1786 gap: Gap { prev_token: "fondue".to_owned() },
1787 },
1788 Update::NewGapChunk {
1789 previous: Some(ChunkIdentifier::new(43)),
1790 new: ChunkIdentifier::new(44),
1791 next: None,
1792 gap: Gap { prev_token: "tartiflette".to_owned() },
1793 },
1794 Update::RemoveChunk(ChunkIdentifier::new(43)),
1795 ],
1796 )
1797 .await
1798 .unwrap();
1799
1800 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1801
1802 assert_eq!(chunks.len(), 2);
1803
1804 let c = chunks.remove(0);
1806 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1807 assert_eq!(c.previous, None);
1808 assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1809 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1810 assert_eq!(gap.prev_token, "raclette");
1811 });
1812
1813 let c = chunks.remove(0);
1814 assert_eq!(c.identifier, ChunkIdentifier::new(44));
1815 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1816 assert_eq!(c.next, None);
1817 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1818 assert_eq!(gap.prev_token, "tartiflette");
1819 });
1820
1821 let gaps = store
1823 .acquire()
1824 .await
1825 .unwrap()
1826 .with_transaction(|txn| -> rusqlite::Result<_> {
1827 let mut gaps = Vec::new();
1828 for data in txn
1829 .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1830 .query_map((), |row| row.get::<_, u64>(0))?
1831 {
1832 gaps.push(data?);
1833 }
1834 Ok(gaps)
1835 })
1836 .await
1837 .unwrap();
1838
1839 assert_eq!(gaps, vec![42, 44]);
1840 }
1841
1842 #[async_test]
1843 async fn test_linked_chunk_push_items() {
1844 let store = get_event_cache_store().await.expect("creating cache store failed");
1845
1846 let room_id = &DEFAULT_TEST_ROOM_ID;
1847
1848 store
1849 .handle_linked_chunk_updates(
1850 room_id,
1851 vec![
1852 Update::NewItemsChunk {
1853 previous: None,
1854 new: ChunkIdentifier::new(42),
1855 next: None,
1856 },
1857 Update::PushItems {
1858 at: Position::new(ChunkIdentifier::new(42), 0),
1859 items: vec![
1860 make_test_event(room_id, "hello"),
1861 make_test_event(room_id, "world"),
1862 ],
1863 },
1864 Update::PushItems {
1865 at: Position::new(ChunkIdentifier::new(42), 2),
1866 items: vec![make_test_event(room_id, "who?")],
1867 },
1868 ],
1869 )
1870 .await
1871 .unwrap();
1872
1873 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1874
1875 assert_eq!(chunks.len(), 1);
1876
1877 let c = chunks.remove(0);
1878 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1879 assert_eq!(c.previous, None);
1880 assert_eq!(c.next, None);
1881 assert_matches!(c.content, ChunkContent::Items(events) => {
1882 assert_eq!(events.len(), 3);
1883
1884 check_test_event(&events[0], "hello");
1885 check_test_event(&events[1], "world");
1886 check_test_event(&events[2], "who?");
1887 });
1888 }
1889
1890 #[async_test]
1891 async fn test_linked_chunk_remove_item() {
1892 let store = get_event_cache_store().await.expect("creating cache store failed");
1893
1894 let room_id = *DEFAULT_TEST_ROOM_ID;
1895
1896 store
1897 .handle_linked_chunk_updates(
1898 room_id,
1899 vec![
1900 Update::NewItemsChunk {
1901 previous: None,
1902 new: ChunkIdentifier::new(42),
1903 next: None,
1904 },
1905 Update::PushItems {
1906 at: Position::new(ChunkIdentifier::new(42), 0),
1907 items: vec![
1908 make_test_event(room_id, "hello"),
1909 make_test_event(room_id, "world"),
1910 ],
1911 },
1912 Update::RemoveItem { at: Position::new(ChunkIdentifier::new(42), 0) },
1913 ],
1914 )
1915 .await
1916 .unwrap();
1917
1918 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1919
1920 assert_eq!(chunks.len(), 1);
1921
1922 let c = chunks.remove(0);
1923 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1924 assert_eq!(c.previous, None);
1925 assert_eq!(c.next, None);
1926 assert_matches!(c.content, ChunkContent::Items(events) => {
1927 assert_eq!(events.len(), 1);
1928 check_test_event(&events[0], "world");
1929 });
1930
1931 let num_rows: u64 = store
1933 .acquire()
1934 .await
1935 .unwrap()
1936 .with_transaction(move |txn| {
1937 txn.query_row(
1938 "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND room_id = ? AND position = 0",
1939 (room_id.as_bytes(),),
1940 |row| row.get(0),
1941 )
1942 })
1943 .await
1944 .unwrap();
1945 assert_eq!(num_rows, 1);
1946 }
1947
1948 #[async_test]
1949 async fn test_linked_chunk_detach_last_items() {
1950 let store = get_event_cache_store().await.expect("creating cache store failed");
1951
1952 let room_id = *DEFAULT_TEST_ROOM_ID;
1953
1954 store
1955 .handle_linked_chunk_updates(
1956 room_id,
1957 vec![
1958 Update::NewItemsChunk {
1959 previous: None,
1960 new: ChunkIdentifier::new(42),
1961 next: None,
1962 },
1963 Update::PushItems {
1964 at: Position::new(ChunkIdentifier::new(42), 0),
1965 items: vec![
1966 make_test_event(room_id, "hello"),
1967 make_test_event(room_id, "world"),
1968 make_test_event(room_id, "howdy"),
1969 ],
1970 },
1971 Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
1972 ],
1973 )
1974 .await
1975 .unwrap();
1976
1977 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1978
1979 assert_eq!(chunks.len(), 1);
1980
1981 let c = chunks.remove(0);
1982 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1983 assert_eq!(c.previous, None);
1984 assert_eq!(c.next, None);
1985 assert_matches!(c.content, ChunkContent::Items(events) => {
1986 assert_eq!(events.len(), 1);
1987 check_test_event(&events[0], "hello");
1988 });
1989 }
1990
1991 #[async_test]
1992 async fn test_linked_chunk_start_end_reattach_items() {
1993 let store = get_event_cache_store().await.expect("creating cache store failed");
1994
1995 let room_id = *DEFAULT_TEST_ROOM_ID;
1996
1997 store
2001 .handle_linked_chunk_updates(
2002 room_id,
2003 vec![
2004 Update::NewItemsChunk {
2005 previous: None,
2006 new: ChunkIdentifier::new(42),
2007 next: None,
2008 },
2009 Update::PushItems {
2010 at: Position::new(ChunkIdentifier::new(42), 0),
2011 items: vec![
2012 make_test_event(room_id, "hello"),
2013 make_test_event(room_id, "world"),
2014 make_test_event(room_id, "howdy"),
2015 ],
2016 },
2017 Update::StartReattachItems,
2018 Update::EndReattachItems,
2019 ],
2020 )
2021 .await
2022 .unwrap();
2023
2024 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
2025
2026 assert_eq!(chunks.len(), 1);
2027
2028 let c = chunks.remove(0);
2029 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2030 assert_eq!(c.previous, None);
2031 assert_eq!(c.next, None);
2032 assert_matches!(c.content, ChunkContent::Items(events) => {
2033 assert_eq!(events.len(), 3);
2034 check_test_event(&events[0], "hello");
2035 check_test_event(&events[1], "world");
2036 check_test_event(&events[2], "howdy");
2037 });
2038 }
2039
2040 #[async_test]
2041 async fn test_linked_chunk_clear() {
2042 let store = get_event_cache_store().await.expect("creating cache store failed");
2043
2044 let room_id = *DEFAULT_TEST_ROOM_ID;
2045 let event_0 = make_test_event(room_id, "hello");
2046 let event_1 = make_test_event(room_id, "world");
2047 let event_2 = make_test_event(room_id, "howdy");
2048
2049 store
2050 .handle_linked_chunk_updates(
2051 room_id,
2052 vec![
2053 Update::NewItemsChunk {
2054 previous: None,
2055 new: ChunkIdentifier::new(42),
2056 next: None,
2057 },
2058 Update::NewGapChunk {
2059 previous: Some(ChunkIdentifier::new(42)),
2060 new: ChunkIdentifier::new(54),
2061 next: None,
2062 gap: Gap { prev_token: "fondue".to_owned() },
2063 },
2064 Update::PushItems {
2065 at: Position::new(ChunkIdentifier::new(42), 0),
2066 items: vec![event_0.clone(), event_1, event_2],
2067 },
2068 Update::Clear,
2069 ],
2070 )
2071 .await
2072 .unwrap();
2073
2074 let chunks = store.load_all_chunks(room_id).await.unwrap();
2075 assert!(chunks.is_empty());
2076
2077 store
2079 .acquire()
2080 .await
2081 .unwrap()
2082 .with_transaction(|txn| -> rusqlite::Result<_> {
2083 let num_gaps = txn
2084 .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2085 .query_row((), |row| row.get::<_, u64>(0))?;
2086 assert_eq!(num_gaps, 0);
2087
2088 let num_events = txn
2089 .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2090 .query_row((), |row| row.get::<_, u64>(0))?;
2091 assert_eq!(num_events, 0);
2092
2093 Ok(())
2094 })
2095 .await
2096 .unwrap();
2097
2098 store
2100 .handle_linked_chunk_updates(
2101 room_id,
2102 vec![
2103 Update::NewItemsChunk {
2104 previous: None,
2105 new: ChunkIdentifier::new(42),
2106 next: None,
2107 },
2108 Update::PushItems {
2109 at: Position::new(ChunkIdentifier::new(42), 0),
2110 items: vec![event_0],
2111 },
2112 ],
2113 )
2114 .await
2115 .unwrap();
2116 }
2117
2118 #[async_test]
2119 async fn test_linked_chunk_multiple_rooms() {
2120 let store = get_event_cache_store().await.expect("creating cache store failed");
2121
2122 let room1 = room_id!("!realcheeselovers:raclette.fr");
2123 let room2 = room_id!("!realcheeselovers:fondue.ch");
2124
2125 store
2129 .handle_linked_chunk_updates(
2130 room1,
2131 vec![
2132 Update::NewItemsChunk {
2133 previous: None,
2134 new: ChunkIdentifier::new(42),
2135 next: None,
2136 },
2137 Update::PushItems {
2138 at: Position::new(ChunkIdentifier::new(42), 0),
2139 items: vec![
2140 make_test_event(room1, "best cheese is raclette"),
2141 make_test_event(room1, "obviously"),
2142 ],
2143 },
2144 ],
2145 )
2146 .await
2147 .unwrap();
2148
2149 store
2150 .handle_linked_chunk_updates(
2151 room2,
2152 vec![
2153 Update::NewItemsChunk {
2154 previous: None,
2155 new: ChunkIdentifier::new(42),
2156 next: None,
2157 },
2158 Update::PushItems {
2159 at: Position::new(ChunkIdentifier::new(42), 0),
2160 items: vec![make_test_event(room1, "beaufort is the best")],
2161 },
2162 ],
2163 )
2164 .await
2165 .unwrap();
2166
2167 let mut chunks_room1 = store.load_all_chunks(room1).await.unwrap();
2169 assert_eq!(chunks_room1.len(), 1);
2170
2171 let c = chunks_room1.remove(0);
2172 assert_matches!(c.content, ChunkContent::Items(events) => {
2173 assert_eq!(events.len(), 2);
2174 check_test_event(&events[0], "best cheese is raclette");
2175 check_test_event(&events[1], "obviously");
2176 });
2177
2178 let mut chunks_room2 = store.load_all_chunks(room2).await.unwrap();
2180 assert_eq!(chunks_room2.len(), 1);
2181
2182 let c = chunks_room2.remove(0);
2183 assert_matches!(c.content, ChunkContent::Items(events) => {
2184 assert_eq!(events.len(), 1);
2185 check_test_event(&events[0], "beaufort is the best");
2186 });
2187 }
2188
2189 #[async_test]
2190 async fn test_linked_chunk_update_is_a_transaction() {
2191 let store = get_event_cache_store().await.expect("creating cache store failed");
2192
2193 let room_id = *DEFAULT_TEST_ROOM_ID;
2194
2195 let err = store
2198 .handle_linked_chunk_updates(
2199 room_id,
2200 vec![
2201 Update::NewItemsChunk {
2202 previous: None,
2203 new: ChunkIdentifier::new(42),
2204 next: None,
2205 },
2206 Update::NewItemsChunk {
2207 previous: None,
2208 new: ChunkIdentifier::new(42),
2209 next: None,
2210 },
2211 ],
2212 )
2213 .await
2214 .unwrap_err();
2215
2216 assert_matches!(err, crate::error::Error::Sqlite(err) => {
2218 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2219 });
2220
2221 let chunks = store.load_all_chunks(room_id).await.unwrap();
2225 assert!(chunks.is_empty());
2226 }
2227
2228 #[async_test]
2229 async fn test_filter_duplicate_events_no_events() {
2230 let store = get_event_cache_store().await.expect("creating cache store failed");
2231
2232 let room_id = *DEFAULT_TEST_ROOM_ID;
2233 let duplicates = store.filter_duplicated_events(room_id, Vec::new()).await.unwrap();
2234 assert!(duplicates.is_empty());
2235 }
2236
2237 #[async_test]
2238 async fn test_load_last_chunk() {
2239 let room_id = room_id!("!r0:matrix.org");
2240 let event = |msg: &str| make_test_event(room_id, msg);
2241 let store = get_event_cache_store().await.expect("creating cache store failed");
2242
2243 {
2245 let (last_chunk, chunk_identifier_generator) =
2246 store.load_last_chunk(room_id).await.unwrap();
2247
2248 assert!(last_chunk.is_none());
2249 assert_eq!(chunk_identifier_generator.current(), 0);
2250 }
2251
2252 {
2254 store
2255 .handle_linked_chunk_updates(
2256 room_id,
2257 vec![
2258 Update::NewItemsChunk {
2259 previous: None,
2260 new: ChunkIdentifier::new(42),
2261 next: None,
2262 },
2263 Update::PushItems {
2264 at: Position::new(ChunkIdentifier::new(42), 0),
2265 items: vec![event("saucisse de morteau"), event("comté")],
2266 },
2267 ],
2268 )
2269 .await
2270 .unwrap();
2271
2272 let (last_chunk, chunk_identifier_generator) =
2273 store.load_last_chunk(room_id).await.unwrap();
2274
2275 assert_matches!(last_chunk, Some(last_chunk) => {
2276 assert_eq!(last_chunk.identifier, 42);
2277 assert!(last_chunk.previous.is_none());
2278 assert!(last_chunk.next.is_none());
2279 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2280 assert_eq!(items.len(), 2);
2281 check_test_event(&items[0], "saucisse de morteau");
2282 check_test_event(&items[1], "comté");
2283 });
2284 });
2285 assert_eq!(chunk_identifier_generator.current(), 42);
2286 }
2287
2288 {
2290 store
2291 .handle_linked_chunk_updates(
2292 room_id,
2293 vec![
2294 Update::NewItemsChunk {
2295 previous: Some(ChunkIdentifier::new(42)),
2296 new: ChunkIdentifier::new(7),
2297 next: None,
2298 },
2299 Update::PushItems {
2300 at: Position::new(ChunkIdentifier::new(7), 0),
2301 items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2302 },
2303 ],
2304 )
2305 .await
2306 .unwrap();
2307
2308 let (last_chunk, chunk_identifier_generator) =
2309 store.load_last_chunk(room_id).await.unwrap();
2310
2311 assert_matches!(last_chunk, Some(last_chunk) => {
2312 assert_eq!(last_chunk.identifier, 7);
2313 assert_matches!(last_chunk.previous, Some(previous) => {
2314 assert_eq!(previous, 42);
2315 });
2316 assert!(last_chunk.next.is_none());
2317 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2318 assert_eq!(items.len(), 3);
2319 check_test_event(&items[0], "fondue");
2320 check_test_event(&items[1], "gruyère");
2321 check_test_event(&items[2], "mont d'or");
2322 });
2323 });
2324 assert_eq!(chunk_identifier_generator.current(), 42);
2325 }
2326 }
2327
2328 #[async_test]
2329 async fn test_load_last_chunk_with_a_cycle() {
2330 let room_id = room_id!("!r0:matrix.org");
2331 let store = get_event_cache_store().await.expect("creating cache store failed");
2332
2333 store
2334 .handle_linked_chunk_updates(
2335 room_id,
2336 vec![
2337 Update::NewItemsChunk {
2338 previous: None,
2339 new: ChunkIdentifier::new(0),
2340 next: None,
2341 },
2342 Update::NewItemsChunk {
2343 previous: Some(ChunkIdentifier::new(0)),
2347 new: ChunkIdentifier::new(1),
2348 next: Some(ChunkIdentifier::new(0)),
2349 },
2350 ],
2351 )
2352 .await
2353 .unwrap();
2354
2355 store.load_last_chunk(room_id).await.unwrap_err();
2356 }
2357
2358 #[async_test]
2359 async fn test_load_previous_chunk() {
2360 let room_id = room_id!("!r0:matrix.org");
2361 let event = |msg: &str| make_test_event(room_id, msg);
2362 let store = get_event_cache_store().await.expect("creating cache store failed");
2363
2364 {
2367 let previous_chunk =
2368 store.load_previous_chunk(room_id, ChunkIdentifier::new(153)).await.unwrap();
2369
2370 assert!(previous_chunk.is_none());
2371 }
2372
2373 {
2376 store
2377 .handle_linked_chunk_updates(
2378 room_id,
2379 vec![Update::NewItemsChunk {
2380 previous: None,
2381 new: ChunkIdentifier::new(42),
2382 next: None,
2383 }],
2384 )
2385 .await
2386 .unwrap();
2387
2388 let previous_chunk =
2389 store.load_previous_chunk(room_id, ChunkIdentifier::new(42)).await.unwrap();
2390
2391 assert!(previous_chunk.is_none());
2392 }
2393
2394 {
2396 store
2397 .handle_linked_chunk_updates(
2398 room_id,
2399 vec![
2400 Update::NewItemsChunk {
2402 previous: None,
2403 new: ChunkIdentifier::new(7),
2404 next: Some(ChunkIdentifier::new(42)),
2405 },
2406 Update::PushItems {
2407 at: Position::new(ChunkIdentifier::new(7), 0),
2408 items: vec![event("brigand du jorat"), event("morbier")],
2409 },
2410 ],
2411 )
2412 .await
2413 .unwrap();
2414
2415 let previous_chunk =
2416 store.load_previous_chunk(room_id, ChunkIdentifier::new(42)).await.unwrap();
2417
2418 assert_matches!(previous_chunk, Some(previous_chunk) => {
2419 assert_eq!(previous_chunk.identifier, 7);
2420 assert!(previous_chunk.previous.is_none());
2421 assert_matches!(previous_chunk.next, Some(next) => {
2422 assert_eq!(next, 42);
2423 });
2424 assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2425 assert_eq!(items.len(), 2);
2426 check_test_event(&items[0], "brigand du jorat");
2427 check_test_event(&items[1], "morbier");
2428 });
2429 });
2430 }
2431 }
2432}
2433
2434#[cfg(test)]
2435mod encrypted_tests {
2436 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2437
2438 use matrix_sdk_base::{
2439 event_cache::store::EventCacheStoreError, event_cache_store_integration_tests,
2440 event_cache_store_integration_tests_time, event_cache_store_media_integration_tests,
2441 };
2442 use once_cell::sync::Lazy;
2443 use tempfile::{tempdir, TempDir};
2444
2445 use super::SqliteEventCacheStore;
2446
2447 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2448 static NUM: AtomicU32 = AtomicU32::new(0);
2449
2450 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2451 let name = NUM.fetch_add(1, SeqCst).to_string();
2452 let tmpdir_path = TMP_DIR.path().join(name);
2453
2454 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2455
2456 Ok(SqliteEventCacheStore::open(
2457 tmpdir_path.to_str().unwrap(),
2458 Some("default_test_password"),
2459 )
2460 .await
2461 .unwrap())
2462 }
2463
2464 event_cache_store_integration_tests!();
2465 event_cache_store_integration_tests_time!();
2466 event_cache_store_media_integration_tests!();
2467}