1use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use matrix_sdk_base::{
21 cross_process_lock::CrossProcessLockGeneration,
22 deserialized_responses::TimelineEvent,
23 event_cache::{
24 store::{extract_event_relation, EventCacheStore},
25 Event, Gap,
26 },
27 linked_chunk::{
28 ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29 Position, RawChunk, Update,
30 },
31 timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35 events::relation::RelationType, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId,
36};
37use rusqlite::{
38 params, params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior,
39};
40use tokio::{
41 fs,
42 sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, error, instrument, trace};
45
46use crate::{
47 connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
48 error::{Error, Result},
49 utils::{
50 repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
51 SqliteKeyValueStoreConnExt, SqliteTransactionExt,
52 },
53 OpenStoreError, Secret, SqliteStoreConfig,
54};
55
56mod keys {
57 pub const LINKED_CHUNKS: &str = "linked_chunks";
59 pub const EVENTS: &str = "events";
60}
61
62const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
64
65const DATABASE_VERSION: u8 = 13;
71
72const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
75const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
78
79#[derive(Clone)]
81pub struct SqliteEventCacheStore {
82 store_cipher: Option<Arc<StoreCipher>>,
83
84 pool: SqlitePool,
86
87 write_connection: Arc<Mutex<SqliteAsyncConn>>,
92}
93
94#[cfg(not(tarpaulin_include))]
95impl fmt::Debug for SqliteEventCacheStore {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
98 }
99}
100
101impl EncryptableStore for SqliteEventCacheStore {
102 fn get_cypher(&self) -> Option<&StoreCipher> {
103 self.store_cipher.as_deref()
104 }
105}
106
107impl SqliteEventCacheStore {
108 pub async fn open(
111 path: impl AsRef<Path>,
112 passphrase: Option<&str>,
113 ) -> Result<Self, OpenStoreError> {
114 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
115 }
116
117 pub async fn open_with_key(
120 path: impl AsRef<Path>,
121 key: Option<&[u8; 32]>,
122 ) -> Result<Self, OpenStoreError> {
123 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
124 }
125
126 #[instrument(skip(config), fields(path = ?config.path))]
128 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
129 debug!(?config);
130
131 let _timer = timer!("open_with_config");
132
133 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
134
135 let pool = config.build_pool_of_connections(DATABASE_NAME)?;
136
137 let this = Self::open_with_pool(pool, config.secret).await?;
138 this.write().await?.apply_runtime_config(config.runtime_config).await?;
139
140 Ok(this)
141 }
142
143 async fn open_with_pool(
146 pool: SqlitePool,
147 secret: Option<Secret>,
148 ) -> Result<Self, OpenStoreError> {
149 let conn = pool.get().await?;
150
151 let version = conn.db_version().await?;
152 run_migrations(&conn, version).await?;
153
154 let store_cipher = match secret {
155 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
156 None => None,
157 };
158
159 Ok(Self {
160 store_cipher,
161 pool,
162 write_connection: Arc::new(Mutex::new(conn)),
164 })
165 }
166
167 #[instrument(skip_all)]
169 async fn read(&self) -> Result<SqliteAsyncConn> {
170 let connection = self.pool.get().await?;
171
172 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
177
178 Ok(connection)
179 }
180
181 #[instrument(skip_all)]
183 async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
184 let connection = self.write_connection.clone().lock_owned().await;
185
186 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
191
192 Ok(connection)
193 }
194
195 fn map_row_to_chunk(
196 row: &rusqlite::Row<'_>,
197 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
198 Ok((
199 row.get::<_, u64>(0)?,
200 row.get::<_, Option<u64>>(1)?,
201 row.get::<_, Option<u64>>(2)?,
202 row.get::<_, String>(3)?,
203 ))
204 }
205
206 fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
207 let serialized = serde_json::to_vec(event)?;
208
209 let raw_event = event.raw();
211 let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
212
213 let content = self.encode_value(serialized)?;
215
216 Ok(EncodedEvent {
217 content,
218 rel_type,
219 relates_to: relates_to.map(|relates_to| relates_to.to_string()),
220 })
221 }
222
223 pub async fn vacuum(&self) -> Result<()> {
224 self.write_connection.lock().await.vacuum().await
225 }
226
227 async fn get_db_size(&self) -> Result<Option<usize>> {
228 Ok(Some(self.pool.get().await?.get_db_size().await?))
229 }
230}
231
232struct EncodedEvent {
233 content: Vec<u8>,
234 rel_type: Option<String>,
235 relates_to: Option<String>,
236}
237
238trait TransactionExtForLinkedChunks {
239 fn rebuild_chunk(
240 &self,
241 store: &SqliteEventCacheStore,
242 linked_chunk_id: &Key,
243 previous: Option<u64>,
244 index: u64,
245 next: Option<u64>,
246 chunk_type: &str,
247 ) -> Result<RawChunk<Event, Gap>>;
248
249 fn load_gap_content(
250 &self,
251 store: &SqliteEventCacheStore,
252 linked_chunk_id: &Key,
253 chunk_id: ChunkIdentifier,
254 ) -> Result<Gap>;
255
256 fn load_events_content(
257 &self,
258 store: &SqliteEventCacheStore,
259 linked_chunk_id: &Key,
260 chunk_id: ChunkIdentifier,
261 ) -> Result<Vec<Event>>;
262}
263
264impl TransactionExtForLinkedChunks for Transaction<'_> {
265 fn rebuild_chunk(
266 &self,
267 store: &SqliteEventCacheStore,
268 linked_chunk_id: &Key,
269 previous: Option<u64>,
270 id: u64,
271 next: Option<u64>,
272 chunk_type: &str,
273 ) -> Result<RawChunk<Event, Gap>> {
274 let previous = previous.map(ChunkIdentifier::new);
275 let next = next.map(ChunkIdentifier::new);
276 let id = ChunkIdentifier::new(id);
277
278 match chunk_type {
279 CHUNK_TYPE_GAP_TYPE_STRING => {
280 let gap = self.load_gap_content(store, linked_chunk_id, id)?;
282 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
283 }
284
285 CHUNK_TYPE_EVENT_TYPE_STRING => {
286 let events = self.load_events_content(store, linked_chunk_id, id)?;
288 Ok(RawChunk {
289 content: ChunkContent::Items(events),
290 previous,
291 identifier: id,
292 next,
293 })
294 }
295
296 other => {
297 Err(Error::InvalidData {
299 details: format!("a linked chunk has an unknown type {other}"),
300 })
301 }
302 }
303 }
304
305 fn load_gap_content(
306 &self,
307 store: &SqliteEventCacheStore,
308 linked_chunk_id: &Key,
309 chunk_id: ChunkIdentifier,
310 ) -> Result<Gap> {
311 let encoded_prev_token: Vec<u8> = self.query_row(
314 "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
315 (chunk_id.index(), &linked_chunk_id),
316 |row| row.get(0),
317 )?;
318 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
319 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
320 Ok(Gap { prev_token })
321 }
322
323 fn load_events_content(
324 &self,
325 store: &SqliteEventCacheStore,
326 linked_chunk_id: &Key,
327 chunk_id: ChunkIdentifier,
328 ) -> Result<Vec<Event>> {
329 let mut events = Vec::new();
331
332 for event_data in self
333 .prepare(
334 r#"
335 SELECT events.content
336 FROM event_chunks ec, events
337 WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
338 ORDER BY ec.position ASC
339 "#,
340 )?
341 .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
342 {
343 let encoded_content = event_data?;
344 let serialized_content = store.decode_value(&encoded_content)?;
345 let event = serde_json::from_slice(&serialized_content)?;
346
347 events.push(event);
348 }
349
350 Ok(events)
351 }
352}
353
354async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
356 if version == 0 {
357 debug!("Creating database");
358 } else if version < DATABASE_VERSION {
359 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
360 } else {
361 return Ok(());
362 }
363
364 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
366
367 if version < 1 {
368 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
371 conn.with_transaction(|txn| {
372 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
373 txn.set_db_version(1)
374 })
375 .await?;
376 }
377
378 if version < 2 {
379 conn.with_transaction(|txn| {
380 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
381 txn.set_db_version(2)
382 })
383 .await?;
384 }
385
386 if version < 3 {
387 conn.with_transaction(|txn| {
388 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
389 txn.set_db_version(3)
390 })
391 .await?;
392 }
393
394 if version < 4 {
395 conn.with_transaction(|txn| {
396 txn.execute_batch(include_str!(
397 "../migrations/event_cache_store/004_ignore_policy.sql"
398 ))?;
399 txn.set_db_version(4)
400 })
401 .await?;
402 }
403
404 if version < 5 {
405 conn.with_transaction(|txn| {
406 txn.execute_batch(include_str!(
407 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
408 ))?;
409 txn.set_db_version(5)
410 })
411 .await?;
412 }
413
414 if version < 6 {
415 conn.with_transaction(|txn| {
416 txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
417 txn.set_db_version(6)
418 })
419 .await?;
420 }
421
422 if version < 7 {
423 conn.with_transaction(|txn| {
424 txn.execute_batch(include_str!(
425 "../migrations/event_cache_store/007_event_chunks.sql"
426 ))?;
427 txn.set_db_version(7)
428 })
429 .await?;
430 }
431
432 if version < 8 {
433 conn.with_transaction(|txn| {
434 txn.execute_batch(include_str!(
435 "../migrations/event_cache_store/008_linked_chunk_id.sql"
436 ))?;
437 txn.set_db_version(8)
438 })
439 .await?;
440 }
441
442 if version < 9 {
443 conn.with_transaction(|txn| {
444 txn.execute_batch(include_str!(
445 "../migrations/event_cache_store/009_related_event_index.sql"
446 ))?;
447 txn.set_db_version(9)
448 })
449 .await?;
450 }
451
452 if version < 10 {
453 conn.with_transaction(|txn| {
454 txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
455 txn.set_db_version(10)
456 })
457 .await?;
458
459 if version >= 1 {
460 conn.vacuum().await?;
463 }
464 }
465
466 if version < 11 {
467 conn.with_transaction(|txn| {
468 txn.execute_batch(include_str!(
469 "../migrations/event_cache_store/011_empty_event_cache.sql"
470 ))?;
471 txn.set_db_version(11)
472 })
473 .await?;
474 }
475
476 if version < 12 {
477 conn.with_transaction(|txn| {
478 txn.execute_batch(include_str!(
479 "../migrations/event_cache_store/012_store_event_type.sql"
480 ))?;
481 txn.set_db_version(12)
482 })
483 .await?;
484 }
485
486 if version < 13 {
487 conn.with_transaction(|txn| {
488 txn.execute_batch(include_str!(
489 "../migrations/event_cache_store/013_lease_locks_with_generation.sql"
490 ))?;
491 txn.set_db_version(13)
492 })
493 .await?;
494 }
495
496 Ok(())
497}
498
499#[async_trait]
500impl EventCacheStore for SqliteEventCacheStore {
501 type Error = Error;
502
503 #[instrument(skip(self))]
504 async fn try_take_leased_lock(
505 &self,
506 lease_duration_ms: u32,
507 key: &str,
508 holder: &str,
509 ) -> Result<Option<CrossProcessLockGeneration>> {
510 let key = key.to_owned();
511 let holder = holder.to_owned();
512
513 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
514 let expiration = now + lease_duration_ms as u64;
515
516 let generation = self
518 .write()
519 .await?
520 .with_transaction(move |txn| {
521 txn.query_row(
522 "INSERT INTO lease_locks (key, holder, expiration)
523 VALUES (?1, ?2, ?3)
524 ON CONFLICT (key)
525 DO
526 UPDATE SET
527 holder = excluded.holder,
528 expiration = excluded.expiration,
529 generation =
530 CASE holder
531 WHEN excluded.holder THEN generation
532 ELSE generation + 1
533 END
534 WHERE
535 holder = excluded.holder
536 OR expiration < ?4
537 RETURNING generation
538 ",
539 (key, holder, expiration, now),
540 |row| row.get(0),
541 )
542 .optional()
543 })
544 .await?;
545
546 Ok(generation)
547 }
548
549 #[instrument(skip(self, updates))]
550 async fn handle_linked_chunk_updates(
551 &self,
552 linked_chunk_id: LinkedChunkId<'_>,
553 updates: Vec<Update<Event, Gap>>,
554 ) -> Result<(), Self::Error> {
555 let _timer = timer!("method");
556
557 let hashed_linked_chunk_id =
560 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
561 let linked_chunk_id = linked_chunk_id.to_owned();
562 let this = self.clone();
563
564 with_immediate_transaction(self, move |txn| {
565 for up in updates {
566 match up {
567 Update::NewItemsChunk { previous, new, next } => {
568 let previous = previous.as_ref().map(ChunkIdentifier::index);
569 let new = new.index();
570 let next = next.as_ref().map(ChunkIdentifier::index);
571
572 trace!(
573 %linked_chunk_id,
574 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
575 );
576
577 insert_chunk(
578 txn,
579 &hashed_linked_chunk_id,
580 previous,
581 new,
582 next,
583 CHUNK_TYPE_EVENT_TYPE_STRING,
584 )?;
585 }
586
587 Update::NewGapChunk { previous, new, next, gap } => {
588 let serialized = serde_json::to_vec(&gap.prev_token)?;
589 let prev_token = this.encode_value(serialized)?;
590
591 let previous = previous.as_ref().map(ChunkIdentifier::index);
592 let new = new.index();
593 let next = next.as_ref().map(ChunkIdentifier::index);
594
595 trace!(
596 %linked_chunk_id,
597 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
598 );
599
600 insert_chunk(
602 txn,
603 &hashed_linked_chunk_id,
604 previous,
605 new,
606 next,
607 CHUNK_TYPE_GAP_TYPE_STRING,
608 )?;
609
610 txn.execute(
612 r#"
613 INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
614 VALUES (?, ?, ?)
615 "#,
616 (new, &hashed_linked_chunk_id, prev_token),
617 )?;
618 }
619
620 Update::RemoveChunk(chunk_identifier) => {
621 let chunk_id = chunk_identifier.index();
622
623 trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
624
625 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
627 "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
628 (chunk_id, &hashed_linked_chunk_id),
629 |row| Ok((row.get(0)?, row.get(1)?))
630 )?;
631
632 if let Some(previous) = previous {
634 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
635 }
636
637 if let Some(next) = next {
639 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
640 }
641
642 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
645 }
646
647 Update::PushItems { at, items } => {
648 if items.is_empty() {
649 continue;
651 }
652
653 let chunk_id = at.chunk_identifier().index();
654
655 trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
656
657 let mut chunk_statement = txn.prepare(
658 "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
659 )?;
660
661 let mut content_statement = txn.prepare(
666 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
667 )?;
668
669 let invalid_event = |event: TimelineEvent| {
670 let Some(event_id) = event.event_id() else {
671 error!(%linked_chunk_id, "Trying to push an event with no ID");
672 return None;
673 };
674
675 let Some(event_type) = event.kind.event_type() else {
676 error!(%event_id, "Trying to save an event with no event type");
677 return None;
678 };
679
680 Some((event_id.to_string(), event_type, event))
681 };
682
683 let room_id = linked_chunk_id.room_id();
684 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
685
686 for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
687 let index = at.index() + i;
689 chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
690
691 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
692 let event_type = this.encode_key(keys::EVENTS, event_type);
693
694 let encoded_event = this.encode_event(&event)?;
696 content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
697 }
698 }
699
700 Update::ReplaceItem { at, item: event } => {
701 let chunk_id = at.chunk_identifier().index();
702
703 let index = at.index();
704
705 trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
706
707 let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
709 error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
710 continue;
711 };
712
713 let Some(event_type) = event.kind.event_type() else {
714 error!(%event_id, "Trying to save an event with no event type");
715 continue;
716 };
717
718 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
719 let event_type = this.encode_key(keys::EVENTS, event_type);
720
721 let encoded_event = this.encode_event(&event)?;
725 let room_id = linked_chunk_id.room_id();
726 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
727 txn.execute(
728 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
729 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
730
731 txn.execute(
733 r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
734 (event_id, &hashed_linked_chunk_id, chunk_id, index)
735 )?;
736 }
737
738 Update::RemoveItem { at } => {
739 let chunk_id = at.chunk_identifier().index();
740 let index = at.index();
741
742 trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
743
744 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
746
747 txn.execute(
846 r#"
847 UPDATE event_chunks
848 SET position = -(position - 1)
849 WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
850 "#,
851 (&hashed_linked_chunk_id, chunk_id, index)
852 )?;
853 txn.execute(
854 r#"
855 UPDATE event_chunks
856 SET position = -position
857 WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
858 "#,
859 (&hashed_linked_chunk_id, chunk_id)
860 )?;
861 }
862
863 Update::DetachLastItems { at } => {
864 let chunk_id = at.chunk_identifier().index();
865 let index = at.index();
866
867 trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
868
869 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
871 }
872
873 Update::Clear => {
874 trace!(%linked_chunk_id, "clearing items");
875
876 txn.execute(
878 "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
879 (&hashed_linked_chunk_id,),
880 )?;
881 }
882
883 Update::StartReattachItems | Update::EndReattachItems => {
884 }
886 }
887 }
888
889 Ok(())
890 })
891 .await?;
892
893 Ok(())
894 }
895
896 #[instrument(skip(self))]
897 async fn load_all_chunks(
898 &self,
899 linked_chunk_id: LinkedChunkId<'_>,
900 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
901 let _timer = timer!("method");
902
903 let hashed_linked_chunk_id =
904 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
905
906 let this = self.clone();
907
908 let result = self
909 .read()
910 .await?
911 .with_transaction(move |txn| -> Result<_> {
912 let mut items = Vec::new();
913
914 for data in txn
916 .prepare(
917 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
918 )?
919 .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
920 {
921 let (id, previous, next, chunk_type) = data?;
922 let new = txn.rebuild_chunk(
923 &this,
924 &hashed_linked_chunk_id,
925 previous,
926 id,
927 next,
928 chunk_type.as_str(),
929 )?;
930 items.push(new);
931 }
932
933 Ok(items)
934 })
935 .await?;
936
937 Ok(result)
938 }
939
940 #[instrument(skip(self))]
941 async fn load_all_chunks_metadata(
942 &self,
943 linked_chunk_id: LinkedChunkId<'_>,
944 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
945 let _timer = timer!("method");
946
947 let hashed_linked_chunk_id =
948 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
949
950 self.read()
951 .await?
952 .with_transaction(move |txn| -> Result<_> {
953 let num_events_by_chunk_ids = txn
980 .prepare(
981 r#"
982 SELECT ec.chunk_id, COUNT(ec.event_id)
983 FROM event_chunks as ec
984 WHERE ec.linked_chunk_id = ?
985 GROUP BY ec.chunk_id
986 "#,
987 )?
988 .query_map((&hashed_linked_chunk_id,), |row| {
989 Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
990 })?
991 .collect::<Result<HashMap<_, _>, _>>()?;
992
993 txn.prepare(
994 r#"
995 SELECT
996 lc.id,
997 lc.previous,
998 lc.next,
999 lc.type
1000 FROM linked_chunks as lc
1001 WHERE lc.linked_chunk_id = ?
1002 ORDER BY lc.id"#,
1003 )?
1004 .query_map((&hashed_linked_chunk_id,), |row| {
1005 Ok((
1006 row.get::<_, u64>(0)?,
1007 row.get::<_, Option<u64>>(1)?,
1008 row.get::<_, Option<u64>>(2)?,
1009 row.get::<_, String>(3)?,
1010 ))
1011 })?
1012 .map(|data| -> Result<_> {
1013 let (id, previous, next, chunk_type) = data?;
1014
1015 let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1021 0
1022 } else {
1023 num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1024 };
1025
1026 Ok(ChunkMetadata {
1027 identifier: ChunkIdentifier::new(id),
1028 previous: previous.map(ChunkIdentifier::new),
1029 next: next.map(ChunkIdentifier::new),
1030 num_items,
1031 })
1032 })
1033 .collect::<Result<Vec<_>, _>>()
1034 })
1035 .await
1036 }
1037
1038 #[instrument(skip(self))]
1039 async fn load_last_chunk(
1040 &self,
1041 linked_chunk_id: LinkedChunkId<'_>,
1042 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1043 let _timer = timer!("method");
1044
1045 let hashed_linked_chunk_id =
1046 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1047
1048 let this = self.clone();
1049
1050 self
1051 .read()
1052 .await?
1053 .with_transaction(move |txn| -> Result<_> {
1054 let (observed_max_identifier, number_of_chunks) = txn
1056 .prepare(
1057 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1058 )?
1059 .query_row(
1060 (&hashed_linked_chunk_id,),
1061 |row| {
1062 Ok((
1063 row.get::<_, Option<u64>>(0)?,
1068 row.get::<_, u64>(1)?,
1069 ))
1070 }
1071 )?;
1072
1073 let chunk_identifier_generator = match observed_max_identifier {
1074 Some(max_observed_identifier) => {
1075 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1076 ChunkIdentifier::new(max_observed_identifier)
1077 )
1078 },
1079 None => ChunkIdentifierGenerator::new_from_scratch(),
1080 };
1081
1082 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1084 .prepare(
1085 "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1086 )?
1087 .query_row(
1088 (&hashed_linked_chunk_id,),
1089 |row| {
1090 Ok((
1091 row.get::<_, u64>(0)?,
1092 row.get::<_, Option<u64>>(1)?,
1093 row.get::<_, String>(2)?,
1094 ))
1095 }
1096 )
1097 .optional()?
1098 else {
1099 if number_of_chunks == 0 {
1102 return Ok((None, chunk_identifier_generator));
1103 }
1104 else {
1109 return Err(Error::InvalidData {
1110 details:
1111 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1112 .to_owned()
1113 }
1114 )
1115 }
1116 };
1117
1118 let last_chunk = txn.rebuild_chunk(
1120 &this,
1121 &hashed_linked_chunk_id,
1122 previous_chunk,
1123 chunk_identifier,
1124 None,
1125 &chunk_type
1126 )?;
1127
1128 Ok((Some(last_chunk), chunk_identifier_generator))
1129 })
1130 .await
1131 }
1132
1133 #[instrument(skip(self))]
1134 async fn load_previous_chunk(
1135 &self,
1136 linked_chunk_id: LinkedChunkId<'_>,
1137 before_chunk_identifier: ChunkIdentifier,
1138 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1139 let _timer = timer!("method");
1140
1141 let hashed_linked_chunk_id =
1142 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1143
1144 let this = self.clone();
1145
1146 self
1147 .read()
1148 .await?
1149 .with_transaction(move |txn| -> Result<_> {
1150 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1152 .prepare(
1153 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1154 )?
1155 .query_row(
1156 (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1157 |row| {
1158 Ok((
1159 row.get::<_, u64>(0)?,
1160 row.get::<_, Option<u64>>(1)?,
1161 row.get::<_, Option<u64>>(2)?,
1162 row.get::<_, String>(3)?,
1163 ))
1164 }
1165 )
1166 .optional()?
1167 else {
1168 return Ok(None);
1170 };
1171
1172 let last_chunk = txn.rebuild_chunk(
1174 &this,
1175 &hashed_linked_chunk_id,
1176 previous_chunk,
1177 chunk_identifier,
1178 next_chunk,
1179 &chunk_type
1180 )?;
1181
1182 Ok(Some(last_chunk))
1183 })
1184 .await
1185 }
1186
1187 #[instrument(skip(self))]
1188 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1189 let _timer = timer!("method");
1190
1191 self.write()
1192 .await?
1193 .with_transaction(move |txn| {
1194 txn.execute("DELETE FROM linked_chunks", ())?;
1196 txn.execute("DELETE FROM events", ())
1198 })
1199 .await?;
1200
1201 Ok(())
1202 }
1203
1204 #[instrument(skip(self, events))]
1205 async fn filter_duplicated_events(
1206 &self,
1207 linked_chunk_id: LinkedChunkId<'_>,
1208 events: Vec<OwnedEventId>,
1209 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1210 let _timer = timer!("method");
1211
1212 if events.is_empty() {
1216 return Ok(Vec::new());
1217 }
1218
1219 let hashed_linked_chunk_id =
1221 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1222 let linked_chunk_id = linked_chunk_id.to_owned();
1223
1224 self.read()
1225 .await?
1226 .with_transaction(move |txn| -> Result<_> {
1227 txn.chunk_large_query_over(events, None, move |txn, events| {
1228 let query = format!(
1229 r#"
1230 SELECT event_id, chunk_id, position
1231 FROM event_chunks
1232 WHERE linked_chunk_id = ? AND event_id IN ({})
1233 ORDER BY chunk_id ASC, position ASC
1234 "#,
1235 repeat_vars(events.len()),
1236 );
1237
1238 let parameters = params_from_iter(
1239 once(
1241 hashed_linked_chunk_id
1242 .to_sql()
1243 .unwrap(),
1245 )
1246 .chain(events.iter().map(|event| {
1248 event
1249 .as_str()
1250 .to_sql()
1251 .unwrap()
1253 })),
1254 );
1255
1256 let mut duplicated_events = Vec::new();
1257
1258 for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1259 Ok((
1260 row.get::<_, String>(0)?,
1261 row.get::<_, u64>(1)?,
1262 row.get::<_, usize>(2)?,
1263 ))
1264 })? {
1265 let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1266
1267 let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1268 error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1271 continue;
1272 };
1273
1274 duplicated_events.push((
1275 duplicated_event,
1276 Position::new(ChunkIdentifier::new(chunk_identifier), index),
1277 ));
1278 }
1279
1280 Ok(duplicated_events)
1281 })
1282 })
1283 .await
1284 }
1285
1286 #[instrument(skip(self, event_id))]
1287 async fn find_event(
1288 &self,
1289 room_id: &RoomId,
1290 event_id: &EventId,
1291 ) -> Result<Option<Event>, Self::Error> {
1292 let _timer = timer!("method");
1293
1294 let event_id = event_id.to_owned();
1295 let this = self.clone();
1296
1297 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1298
1299 self.read()
1300 .await?
1301 .with_transaction(move |txn| -> Result<_> {
1302 let Some(event) = txn
1303 .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1304 .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1305 .optional()?
1306 else {
1307 return Ok(None);
1309 };
1310
1311 let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1312
1313 Ok(Some(event))
1314 })
1315 .await
1316 }
1317
1318 #[instrument(skip(self, event_id, filters))]
1319 async fn find_event_relations(
1320 &self,
1321 room_id: &RoomId,
1322 event_id: &EventId,
1323 filters: Option<&[RelationType]>,
1324 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1325 let _timer = timer!("method");
1326
1327 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1328
1329 let hashed_linked_chunk_id =
1330 self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1331
1332 let event_id = event_id.to_owned();
1333 let filters = filters.map(ToOwned::to_owned);
1334 let store = self.clone();
1335
1336 self.read()
1337 .await?
1338 .with_transaction(move |txn| -> Result<_> {
1339 find_event_relations_transaction(
1340 store,
1341 hashed_room_id,
1342 hashed_linked_chunk_id,
1343 event_id,
1344 filters,
1345 txn,
1346 )
1347 })
1348 .await
1349 }
1350
1351 #[instrument(skip(self))]
1352 async fn get_room_events(
1353 &self,
1354 room_id: &RoomId,
1355 event_type: Option<&str>,
1356 session_id: Option<&str>,
1357 ) -> Result<Vec<Event>, Self::Error> {
1358 let _timer = timer!("method");
1359
1360 let this = self.clone();
1361
1362 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1363 let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1364 let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1365
1366 self.read()
1367 .await?
1368 .with_transaction(move |txn| -> Result<_> {
1369 #[allow(clippy::redundant_clone)]
1373 let (query, keys) = match (hashed_event_type, hashed_session_id) {
1374 (None, None) => {
1375 ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1376 }
1377 (None, Some(session_id)) => (
1378 "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1379 params![hashed_room_id, session_id.to_owned()],
1380 ),
1381 (Some(event_type), None) => (
1382 "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1383 params![hashed_room_id, event_type.to_owned()]
1384 ),
1385 (Some(event_type), Some(session_id)) => (
1386 "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1387 params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1388 ),
1389 };
1390
1391 let mut statement = txn.prepare(query)?;
1392 let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1393
1394 let mut events = Vec::new();
1395 for ev in maybe_events {
1396 let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1397 events.push(event);
1398 }
1399
1400 Ok(events)
1401 })
1402 .await
1403 }
1404
1405 #[instrument(skip(self, event))]
1406 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1407 let _timer = timer!("method");
1408
1409 let Some(event_id) = event.event_id() else {
1410 error!("Trying to save an event with no ID");
1411 return Ok(());
1412 };
1413
1414 let Some(event_type) = event.kind.event_type() else {
1415 error!(%event_id, "Trying to save an event with no event type");
1416 return Ok(());
1417 };
1418
1419 let event_type = self.encode_key(keys::EVENTS, event_type);
1420 let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1421
1422 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1423 let event_id = event_id.to_string();
1424 let encoded_event = self.encode_event(&event)?;
1425
1426 self.write()
1427 .await?
1428 .with_transaction(move |txn| -> Result<_> {
1429 txn.execute(
1430 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1431 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1432
1433 Ok(())
1434 })
1435 .await
1436 }
1437
1438 async fn optimize(&self) -> Result<(), Self::Error> {
1439 Ok(self.vacuum().await?)
1440 }
1441
1442 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1443 self.get_db_size().await
1444 }
1445}
1446
1447fn find_event_relations_transaction(
1448 store: SqliteEventCacheStore,
1449 hashed_room_id: Key,
1450 hashed_linked_chunk_id: Key,
1451 event_id: OwnedEventId,
1452 filters: Option<Vec<RelationType>>,
1453 txn: &Transaction<'_>,
1454) -> Result<Vec<(Event, Option<Position>)>> {
1455 let get_rows = |row: &rusqlite::Row<'_>| {
1456 Ok((
1457 row.get::<_, Vec<u8>>(0)?,
1458 row.get::<_, Option<u64>>(1)?,
1459 row.get::<_, Option<usize>>(2)?,
1460 ))
1461 };
1462
1463 let collect_results = |transaction| {
1465 let mut related = Vec::new();
1466
1467 for result in transaction {
1468 let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1469
1470 let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1471
1472 let pos = chunk_id
1475 .zip(index)
1476 .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1477
1478 related.push((event, pos));
1479 }
1480
1481 Ok(related)
1482 };
1483
1484 let related = if let Some(filters) = filters {
1485 let question_marks = repeat_vars(filters.len());
1486 let query = format!(
1487 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1488 FROM events
1489 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1490 WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1491 );
1492
1493 let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1498 let filters_params: Vec<_> = filter_strings
1499 .iter()
1500 .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1501 .collect();
1502
1503 let parameters = params_from_iter(
1504 [
1505 hashed_linked_chunk_id.to_sql().expect(
1506 "We should be able to convert a hashed linked chunk ID to a SQLite value",
1507 ),
1508 event_id
1509 .as_str()
1510 .to_sql()
1511 .expect("We should be able to convert an event ID to a SQLite value"),
1512 hashed_room_id
1513 .to_sql()
1514 .expect("We should be able to convert a room ID to a SQLite value"),
1515 ]
1516 .into_iter()
1517 .chain(filters_params),
1518 );
1519
1520 let mut transaction = txn.prepare(&query)?;
1521 let transaction = transaction.query_map(parameters, get_rows)?;
1522
1523 collect_results(transaction)
1524 } else {
1525 let query =
1526 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1527 FROM events
1528 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1529 WHERE relates_to = ? AND room_id = ?";
1530 let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1531
1532 let mut transaction = txn.prepare(query)?;
1533 let transaction = transaction.query_map(parameters, get_rows)?;
1534
1535 collect_results(transaction)
1536 };
1537
1538 related
1539}
1540
1541async fn with_immediate_transaction<
1546 T: Send + 'static,
1547 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1548>(
1549 this: &SqliteEventCacheStore,
1550 f: F,
1551) -> Result<T, Error> {
1552 this.write()
1553 .await?
1554 .interact(move |conn| -> Result<T, Error> {
1555 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1559
1560 let code = || -> Result<T, Error> {
1561 let txn = conn.transaction()?;
1562 let res = f(&txn)?;
1563 txn.commit()?;
1564 Ok(res)
1565 };
1566
1567 let res = code();
1568
1569 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1572
1573 res
1574 })
1575 .await
1576 .unwrap()
1578}
1579
1580fn insert_chunk(
1581 txn: &Transaction<'_>,
1582 linked_chunk_id: &Key,
1583 previous: Option<u64>,
1584 new: u64,
1585 next: Option<u64>,
1586 type_str: &str,
1587) -> rusqlite::Result<()> {
1588 txn.execute(
1590 r#"
1591 INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1592 VALUES (?, ?, ?, ?, ?)
1593 "#,
1594 (new, linked_chunk_id, previous, next, type_str),
1595 )?;
1596
1597 if let Some(previous) = previous {
1599 txn.execute(
1600 r#"
1601 UPDATE linked_chunks
1602 SET next = ?
1603 WHERE id = ? AND linked_chunk_id = ?
1604 "#,
1605 (new, previous, linked_chunk_id),
1606 )?;
1607 }
1608
1609 if let Some(next) = next {
1611 txn.execute(
1612 r#"
1613 UPDATE linked_chunks
1614 SET previous = ?
1615 WHERE id = ? AND linked_chunk_id = ?
1616 "#,
1617 (new, next, linked_chunk_id),
1618 )?;
1619 }
1620
1621 Ok(())
1622}
1623
1624#[cfg(test)]
1625mod tests {
1626 use std::{
1627 path::PathBuf,
1628 sync::atomic::{AtomicU32, Ordering::SeqCst},
1629 };
1630
1631 use assert_matches::assert_matches;
1632 use matrix_sdk_base::{
1633 event_cache::{
1634 store::{
1635 integration_tests::{
1636 check_test_event, make_test_event, make_test_event_with_event_id,
1637 },
1638 EventCacheStore, EventCacheStoreError,
1639 },
1640 Gap,
1641 },
1642 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1643 linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1644 };
1645 use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1646 use once_cell::sync::Lazy;
1647 use ruma::{event_id, room_id};
1648 use tempfile::{tempdir, TempDir};
1649
1650 use super::SqliteEventCacheStore;
1651 use crate::{
1652 event_cache_store::keys,
1653 utils::{EncryptableStore as _, SqliteAsyncConnExt},
1654 SqliteStoreConfig,
1655 };
1656
1657 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1658 static NUM: AtomicU32 = AtomicU32::new(0);
1659
1660 fn new_event_cache_store_workspace() -> PathBuf {
1661 let name = NUM.fetch_add(1, SeqCst).to_string();
1662 TMP_DIR.path().join(name)
1663 }
1664
1665 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1666 let tmpdir_path = new_event_cache_store_workspace();
1667
1668 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1669
1670 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1671 }
1672
1673 event_cache_store_integration_tests!();
1674 event_cache_store_integration_tests_time!();
1675
1676 #[async_test]
1677 async fn test_pool_size() {
1678 let tmpdir_path = new_event_cache_store_workspace();
1679 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1680
1681 let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1682
1683 assert_eq!(store.pool.status().max_size, 42);
1684 }
1685
1686 #[async_test]
1687 async fn test_linked_chunk_new_items_chunk() {
1688 let store = get_event_cache_store().await.expect("creating cache store failed");
1689
1690 let room_id = &DEFAULT_TEST_ROOM_ID;
1691 let linked_chunk_id = LinkedChunkId::Room(room_id);
1692
1693 store
1694 .handle_linked_chunk_updates(
1695 linked_chunk_id,
1696 vec![
1697 Update::NewItemsChunk {
1698 previous: None,
1699 new: ChunkIdentifier::new(42),
1700 next: None, },
1702 Update::NewItemsChunk {
1703 previous: Some(ChunkIdentifier::new(42)),
1704 new: ChunkIdentifier::new(13),
1705 next: Some(ChunkIdentifier::new(37)), },
1708 Update::NewItemsChunk {
1709 previous: Some(ChunkIdentifier::new(13)),
1710 new: ChunkIdentifier::new(37),
1711 next: None,
1712 },
1713 ],
1714 )
1715 .await
1716 .unwrap();
1717
1718 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1719
1720 assert_eq!(chunks.len(), 3);
1721
1722 {
1723 let c = chunks.remove(0);
1725 assert_eq!(c.identifier, ChunkIdentifier::new(13));
1726 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1727 assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1728 assert_matches!(c.content, ChunkContent::Items(events) => {
1729 assert!(events.is_empty());
1730 });
1731
1732 let c = chunks.remove(0);
1733 assert_eq!(c.identifier, ChunkIdentifier::new(37));
1734 assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1735 assert_eq!(c.next, None);
1736 assert_matches!(c.content, ChunkContent::Items(events) => {
1737 assert!(events.is_empty());
1738 });
1739
1740 let c = chunks.remove(0);
1741 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1742 assert_eq!(c.previous, None);
1743 assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1744 assert_matches!(c.content, ChunkContent::Items(events) => {
1745 assert!(events.is_empty());
1746 });
1747 }
1748 }
1749
1750 #[async_test]
1751 async fn test_linked_chunk_new_gap_chunk() {
1752 let store = get_event_cache_store().await.expect("creating cache store failed");
1753
1754 let room_id = &DEFAULT_TEST_ROOM_ID;
1755 let linked_chunk_id = LinkedChunkId::Room(room_id);
1756
1757 store
1758 .handle_linked_chunk_updates(
1759 linked_chunk_id,
1760 vec![Update::NewGapChunk {
1761 previous: None,
1762 new: ChunkIdentifier::new(42),
1763 next: None,
1764 gap: Gap { prev_token: "raclette".to_owned() },
1765 }],
1766 )
1767 .await
1768 .unwrap();
1769
1770 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1771
1772 assert_eq!(chunks.len(), 1);
1773
1774 let c = chunks.remove(0);
1776 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1777 assert_eq!(c.previous, None);
1778 assert_eq!(c.next, None);
1779 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1780 assert_eq!(gap.prev_token, "raclette");
1781 });
1782 }
1783
1784 #[async_test]
1785 async fn test_linked_chunk_replace_item() {
1786 let store = get_event_cache_store().await.expect("creating cache store failed");
1787
1788 let room_id = &DEFAULT_TEST_ROOM_ID;
1789 let linked_chunk_id = LinkedChunkId::Room(room_id);
1790 let event_id = event_id!("$world");
1791
1792 store
1793 .handle_linked_chunk_updates(
1794 linked_chunk_id,
1795 vec![
1796 Update::NewItemsChunk {
1797 previous: None,
1798 new: ChunkIdentifier::new(42),
1799 next: None,
1800 },
1801 Update::PushItems {
1802 at: Position::new(ChunkIdentifier::new(42), 0),
1803 items: vec![
1804 make_test_event(room_id, "hello"),
1805 make_test_event_with_event_id(room_id, "world", Some(event_id)),
1806 ],
1807 },
1808 Update::ReplaceItem {
1809 at: Position::new(ChunkIdentifier::new(42), 1),
1810 item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1811 },
1812 ],
1813 )
1814 .await
1815 .unwrap();
1816
1817 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1818
1819 assert_eq!(chunks.len(), 1);
1820
1821 let c = chunks.remove(0);
1822 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1823 assert_eq!(c.previous, None);
1824 assert_eq!(c.next, None);
1825 assert_matches!(c.content, ChunkContent::Items(events) => {
1826 assert_eq!(events.len(), 2);
1827 check_test_event(&events[0], "hello");
1828 check_test_event(&events[1], "yolo");
1829 });
1830 }
1831
1832 #[async_test]
1833 async fn test_linked_chunk_remove_chunk() {
1834 let store = get_event_cache_store().await.expect("creating cache store failed");
1835
1836 let room_id = &DEFAULT_TEST_ROOM_ID;
1837 let linked_chunk_id = LinkedChunkId::Room(room_id);
1838
1839 store
1840 .handle_linked_chunk_updates(
1841 linked_chunk_id,
1842 vec![
1843 Update::NewGapChunk {
1844 previous: None,
1845 new: ChunkIdentifier::new(42),
1846 next: None,
1847 gap: Gap { prev_token: "raclette".to_owned() },
1848 },
1849 Update::NewGapChunk {
1850 previous: Some(ChunkIdentifier::new(42)),
1851 new: ChunkIdentifier::new(43),
1852 next: None,
1853 gap: Gap { prev_token: "fondue".to_owned() },
1854 },
1855 Update::NewGapChunk {
1856 previous: Some(ChunkIdentifier::new(43)),
1857 new: ChunkIdentifier::new(44),
1858 next: None,
1859 gap: Gap { prev_token: "tartiflette".to_owned() },
1860 },
1861 Update::RemoveChunk(ChunkIdentifier::new(43)),
1862 ],
1863 )
1864 .await
1865 .unwrap();
1866
1867 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1868
1869 assert_eq!(chunks.len(), 2);
1870
1871 let c = chunks.remove(0);
1873 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1874 assert_eq!(c.previous, None);
1875 assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1876 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1877 assert_eq!(gap.prev_token, "raclette");
1878 });
1879
1880 let c = chunks.remove(0);
1881 assert_eq!(c.identifier, ChunkIdentifier::new(44));
1882 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1883 assert_eq!(c.next, None);
1884 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1885 assert_eq!(gap.prev_token, "tartiflette");
1886 });
1887
1888 let gaps = store
1890 .read()
1891 .await
1892 .unwrap()
1893 .with_transaction(|txn| -> rusqlite::Result<_> {
1894 let mut gaps = Vec::new();
1895 for data in txn
1896 .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1897 .query_map((), |row| row.get::<_, u64>(0))?
1898 {
1899 gaps.push(data?);
1900 }
1901 Ok(gaps)
1902 })
1903 .await
1904 .unwrap();
1905
1906 assert_eq!(gaps, vec![42, 44]);
1907 }
1908
1909 #[async_test]
1910 async fn test_linked_chunk_push_items() {
1911 let store = get_event_cache_store().await.expect("creating cache store failed");
1912
1913 let room_id = &DEFAULT_TEST_ROOM_ID;
1914 let linked_chunk_id = LinkedChunkId::Room(room_id);
1915
1916 store
1917 .handle_linked_chunk_updates(
1918 linked_chunk_id,
1919 vec![
1920 Update::NewItemsChunk {
1921 previous: None,
1922 new: ChunkIdentifier::new(42),
1923 next: None,
1924 },
1925 Update::PushItems {
1926 at: Position::new(ChunkIdentifier::new(42), 0),
1927 items: vec![
1928 make_test_event(room_id, "hello"),
1929 make_test_event(room_id, "world"),
1930 ],
1931 },
1932 Update::PushItems {
1933 at: Position::new(ChunkIdentifier::new(42), 2),
1934 items: vec![make_test_event(room_id, "who?")],
1935 },
1936 ],
1937 )
1938 .await
1939 .unwrap();
1940
1941 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1942
1943 assert_eq!(chunks.len(), 1);
1944
1945 let c = chunks.remove(0);
1946 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1947 assert_eq!(c.previous, None);
1948 assert_eq!(c.next, None);
1949 assert_matches!(c.content, ChunkContent::Items(events) => {
1950 assert_eq!(events.len(), 3);
1951
1952 check_test_event(&events[0], "hello");
1953 check_test_event(&events[1], "world");
1954 check_test_event(&events[2], "who?");
1955 });
1956 }
1957
1958 #[async_test]
1959 async fn test_linked_chunk_remove_item() {
1960 let store = get_event_cache_store().await.expect("creating cache store failed");
1961
1962 let room_id = *DEFAULT_TEST_ROOM_ID;
1963 let linked_chunk_id = LinkedChunkId::Room(room_id);
1964
1965 store
1966 .handle_linked_chunk_updates(
1967 linked_chunk_id,
1968 vec![
1969 Update::NewItemsChunk {
1970 previous: None,
1971 new: ChunkIdentifier::new(42),
1972 next: None,
1973 },
1974 Update::PushItems {
1975 at: Position::new(ChunkIdentifier::new(42), 0),
1976 items: vec![
1977 make_test_event(room_id, "one"),
1978 make_test_event(room_id, "two"),
1979 make_test_event(room_id, "three"),
1980 make_test_event(room_id, "four"),
1981 make_test_event(room_id, "five"),
1982 make_test_event(room_id, "six"),
1983 ],
1984 },
1985 Update::RemoveItem {
1986 at: Position::new(ChunkIdentifier::new(42), 2), },
1988 ],
1989 )
1990 .await
1991 .unwrap();
1992
1993 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1994
1995 assert_eq!(chunks.len(), 1);
1996
1997 let c = chunks.remove(0);
1998 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1999 assert_eq!(c.previous, None);
2000 assert_eq!(c.next, None);
2001 assert_matches!(c.content, ChunkContent::Items(events) => {
2002 assert_eq!(events.len(), 5);
2003 check_test_event(&events[0], "one");
2004 check_test_event(&events[1], "two");
2005 check_test_event(&events[2], "four");
2006 check_test_event(&events[3], "five");
2007 check_test_event(&events[4], "six");
2008 });
2009
2010 let num_rows: u64 = store
2012 .read()
2013 .await
2014 .unwrap()
2015 .with_transaction(move |txn| {
2016 txn.query_row(
2017 "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
2018 (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
2019 |row| row.get(0),
2020 )
2021 })
2022 .await
2023 .unwrap();
2024 assert_eq!(num_rows, 3);
2025 }
2026
2027 #[async_test]
2028 async fn test_linked_chunk_detach_last_items() {
2029 let store = get_event_cache_store().await.expect("creating cache store failed");
2030
2031 let room_id = *DEFAULT_TEST_ROOM_ID;
2032 let linked_chunk_id = LinkedChunkId::Room(room_id);
2033
2034 store
2035 .handle_linked_chunk_updates(
2036 linked_chunk_id,
2037 vec![
2038 Update::NewItemsChunk {
2039 previous: None,
2040 new: ChunkIdentifier::new(42),
2041 next: None,
2042 },
2043 Update::PushItems {
2044 at: Position::new(ChunkIdentifier::new(42), 0),
2045 items: vec![
2046 make_test_event(room_id, "hello"),
2047 make_test_event(room_id, "world"),
2048 make_test_event(room_id, "howdy"),
2049 ],
2050 },
2051 Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2052 ],
2053 )
2054 .await
2055 .unwrap();
2056
2057 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2058
2059 assert_eq!(chunks.len(), 1);
2060
2061 let c = chunks.remove(0);
2062 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2063 assert_eq!(c.previous, None);
2064 assert_eq!(c.next, None);
2065 assert_matches!(c.content, ChunkContent::Items(events) => {
2066 assert_eq!(events.len(), 1);
2067 check_test_event(&events[0], "hello");
2068 });
2069 }
2070
2071 #[async_test]
2072 async fn test_linked_chunk_start_end_reattach_items() {
2073 let store = get_event_cache_store().await.expect("creating cache store failed");
2074
2075 let room_id = *DEFAULT_TEST_ROOM_ID;
2076 let linked_chunk_id = LinkedChunkId::Room(room_id);
2077
2078 store
2082 .handle_linked_chunk_updates(
2083 linked_chunk_id,
2084 vec![
2085 Update::NewItemsChunk {
2086 previous: None,
2087 new: ChunkIdentifier::new(42),
2088 next: None,
2089 },
2090 Update::PushItems {
2091 at: Position::new(ChunkIdentifier::new(42), 0),
2092 items: vec![
2093 make_test_event(room_id, "hello"),
2094 make_test_event(room_id, "world"),
2095 make_test_event(room_id, "howdy"),
2096 ],
2097 },
2098 Update::StartReattachItems,
2099 Update::EndReattachItems,
2100 ],
2101 )
2102 .await
2103 .unwrap();
2104
2105 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2106
2107 assert_eq!(chunks.len(), 1);
2108
2109 let c = chunks.remove(0);
2110 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2111 assert_eq!(c.previous, None);
2112 assert_eq!(c.next, None);
2113 assert_matches!(c.content, ChunkContent::Items(events) => {
2114 assert_eq!(events.len(), 3);
2115 check_test_event(&events[0], "hello");
2116 check_test_event(&events[1], "world");
2117 check_test_event(&events[2], "howdy");
2118 });
2119 }
2120
2121 #[async_test]
2122 async fn test_linked_chunk_clear() {
2123 let store = get_event_cache_store().await.expect("creating cache store failed");
2124
2125 let room_id = *DEFAULT_TEST_ROOM_ID;
2126 let linked_chunk_id = LinkedChunkId::Room(room_id);
2127 let event_0 = make_test_event(room_id, "hello");
2128 let event_1 = make_test_event(room_id, "world");
2129 let event_2 = make_test_event(room_id, "howdy");
2130
2131 store
2132 .handle_linked_chunk_updates(
2133 linked_chunk_id,
2134 vec![
2135 Update::NewItemsChunk {
2136 previous: None,
2137 new: ChunkIdentifier::new(42),
2138 next: None,
2139 },
2140 Update::NewGapChunk {
2141 previous: Some(ChunkIdentifier::new(42)),
2142 new: ChunkIdentifier::new(54),
2143 next: None,
2144 gap: Gap { prev_token: "fondue".to_owned() },
2145 },
2146 Update::PushItems {
2147 at: Position::new(ChunkIdentifier::new(42), 0),
2148 items: vec![event_0.clone(), event_1, event_2],
2149 },
2150 Update::Clear,
2151 ],
2152 )
2153 .await
2154 .unwrap();
2155
2156 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2157 assert!(chunks.is_empty());
2158
2159 store
2161 .read()
2162 .await
2163 .unwrap()
2164 .with_transaction(|txn| -> rusqlite::Result<_> {
2165 let num_gaps = txn
2166 .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2167 .query_row((), |row| row.get::<_, u64>(0))?;
2168 assert_eq!(num_gaps, 0);
2169
2170 let num_events = txn
2171 .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2172 .query_row((), |row| row.get::<_, u64>(0))?;
2173 assert_eq!(num_events, 0);
2174
2175 Ok(())
2176 })
2177 .await
2178 .unwrap();
2179
2180 store
2182 .handle_linked_chunk_updates(
2183 linked_chunk_id,
2184 vec![
2185 Update::NewItemsChunk {
2186 previous: None,
2187 new: ChunkIdentifier::new(42),
2188 next: None,
2189 },
2190 Update::PushItems {
2191 at: Position::new(ChunkIdentifier::new(42), 0),
2192 items: vec![event_0],
2193 },
2194 ],
2195 )
2196 .await
2197 .unwrap();
2198 }
2199
2200 #[async_test]
2201 async fn test_linked_chunk_multiple_rooms() {
2202 let store = get_event_cache_store().await.expect("creating cache store failed");
2203
2204 let room1 = room_id!("!realcheeselovers:raclette.fr");
2205 let linked_chunk_id1 = LinkedChunkId::Room(room1);
2206 let room2 = room_id!("!realcheeselovers:fondue.ch");
2207 let linked_chunk_id2 = LinkedChunkId::Room(room2);
2208
2209 store
2213 .handle_linked_chunk_updates(
2214 linked_chunk_id1,
2215 vec![
2216 Update::NewItemsChunk {
2217 previous: None,
2218 new: ChunkIdentifier::new(42),
2219 next: None,
2220 },
2221 Update::PushItems {
2222 at: Position::new(ChunkIdentifier::new(42), 0),
2223 items: vec![
2224 make_test_event(room1, "best cheese is raclette"),
2225 make_test_event(room1, "obviously"),
2226 ],
2227 },
2228 ],
2229 )
2230 .await
2231 .unwrap();
2232
2233 store
2234 .handle_linked_chunk_updates(
2235 linked_chunk_id2,
2236 vec![
2237 Update::NewItemsChunk {
2238 previous: None,
2239 new: ChunkIdentifier::new(42),
2240 next: None,
2241 },
2242 Update::PushItems {
2243 at: Position::new(ChunkIdentifier::new(42), 0),
2244 items: vec![make_test_event(room1, "beaufort is the best")],
2245 },
2246 ],
2247 )
2248 .await
2249 .unwrap();
2250
2251 let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2253 assert_eq!(chunks_room1.len(), 1);
2254
2255 let c = chunks_room1.remove(0);
2256 assert_matches!(c.content, ChunkContent::Items(events) => {
2257 assert_eq!(events.len(), 2);
2258 check_test_event(&events[0], "best cheese is raclette");
2259 check_test_event(&events[1], "obviously");
2260 });
2261
2262 let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2264 assert_eq!(chunks_room2.len(), 1);
2265
2266 let c = chunks_room2.remove(0);
2267 assert_matches!(c.content, ChunkContent::Items(events) => {
2268 assert_eq!(events.len(), 1);
2269 check_test_event(&events[0], "beaufort is the best");
2270 });
2271 }
2272
2273 #[async_test]
2274 async fn test_linked_chunk_update_is_a_transaction() {
2275 let store = get_event_cache_store().await.expect("creating cache store failed");
2276
2277 let room_id = *DEFAULT_TEST_ROOM_ID;
2278 let linked_chunk_id = LinkedChunkId::Room(room_id);
2279
2280 let err = store
2283 .handle_linked_chunk_updates(
2284 linked_chunk_id,
2285 vec![
2286 Update::NewItemsChunk {
2287 previous: None,
2288 new: ChunkIdentifier::new(42),
2289 next: None,
2290 },
2291 Update::NewItemsChunk {
2292 previous: None,
2293 new: ChunkIdentifier::new(42),
2294 next: None,
2295 },
2296 ],
2297 )
2298 .await
2299 .unwrap_err();
2300
2301 assert_matches!(err, crate::error::Error::Sqlite(err) => {
2303 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2304 });
2305
2306 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2310 assert!(chunks.is_empty());
2311 }
2312
2313 #[async_test]
2314 async fn test_filter_duplicate_events_no_events() {
2315 let store = get_event_cache_store().await.expect("creating cache store failed");
2316
2317 let room_id = *DEFAULT_TEST_ROOM_ID;
2318 let linked_chunk_id = LinkedChunkId::Room(room_id);
2319 let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2320 assert!(duplicates.is_empty());
2321 }
2322
2323 #[async_test]
2324 async fn test_load_last_chunk() {
2325 let room_id = room_id!("!r0:matrix.org");
2326 let linked_chunk_id = LinkedChunkId::Room(room_id);
2327 let event = |msg: &str| make_test_event(room_id, msg);
2328 let store = get_event_cache_store().await.expect("creating cache store failed");
2329
2330 {
2332 let (last_chunk, chunk_identifier_generator) =
2333 store.load_last_chunk(linked_chunk_id).await.unwrap();
2334
2335 assert!(last_chunk.is_none());
2336 assert_eq!(chunk_identifier_generator.current(), 0);
2337 }
2338
2339 {
2341 store
2342 .handle_linked_chunk_updates(
2343 linked_chunk_id,
2344 vec![
2345 Update::NewItemsChunk {
2346 previous: None,
2347 new: ChunkIdentifier::new(42),
2348 next: None,
2349 },
2350 Update::PushItems {
2351 at: Position::new(ChunkIdentifier::new(42), 0),
2352 items: vec![event("saucisse de morteau"), event("comté")],
2353 },
2354 ],
2355 )
2356 .await
2357 .unwrap();
2358
2359 let (last_chunk, chunk_identifier_generator) =
2360 store.load_last_chunk(linked_chunk_id).await.unwrap();
2361
2362 assert_matches!(last_chunk, Some(last_chunk) => {
2363 assert_eq!(last_chunk.identifier, 42);
2364 assert!(last_chunk.previous.is_none());
2365 assert!(last_chunk.next.is_none());
2366 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2367 assert_eq!(items.len(), 2);
2368 check_test_event(&items[0], "saucisse de morteau");
2369 check_test_event(&items[1], "comté");
2370 });
2371 });
2372 assert_eq!(chunk_identifier_generator.current(), 42);
2373 }
2374
2375 {
2377 store
2378 .handle_linked_chunk_updates(
2379 linked_chunk_id,
2380 vec![
2381 Update::NewItemsChunk {
2382 previous: Some(ChunkIdentifier::new(42)),
2383 new: ChunkIdentifier::new(7),
2384 next: None,
2385 },
2386 Update::PushItems {
2387 at: Position::new(ChunkIdentifier::new(7), 0),
2388 items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2389 },
2390 ],
2391 )
2392 .await
2393 .unwrap();
2394
2395 let (last_chunk, chunk_identifier_generator) =
2396 store.load_last_chunk(linked_chunk_id).await.unwrap();
2397
2398 assert_matches!(last_chunk, Some(last_chunk) => {
2399 assert_eq!(last_chunk.identifier, 7);
2400 assert_matches!(last_chunk.previous, Some(previous) => {
2401 assert_eq!(previous, 42);
2402 });
2403 assert!(last_chunk.next.is_none());
2404 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2405 assert_eq!(items.len(), 3);
2406 check_test_event(&items[0], "fondue");
2407 check_test_event(&items[1], "gruyère");
2408 check_test_event(&items[2], "mont d'or");
2409 });
2410 });
2411 assert_eq!(chunk_identifier_generator.current(), 42);
2412 }
2413 }
2414
2415 #[async_test]
2416 async fn test_load_last_chunk_with_a_cycle() {
2417 let room_id = room_id!("!r0:matrix.org");
2418 let linked_chunk_id = LinkedChunkId::Room(room_id);
2419 let store = get_event_cache_store().await.expect("creating cache store failed");
2420
2421 store
2422 .handle_linked_chunk_updates(
2423 linked_chunk_id,
2424 vec![
2425 Update::NewItemsChunk {
2426 previous: None,
2427 new: ChunkIdentifier::new(0),
2428 next: None,
2429 },
2430 Update::NewItemsChunk {
2431 previous: Some(ChunkIdentifier::new(0)),
2435 new: ChunkIdentifier::new(1),
2436 next: Some(ChunkIdentifier::new(0)),
2437 },
2438 ],
2439 )
2440 .await
2441 .unwrap();
2442
2443 store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2444 }
2445
2446 #[async_test]
2447 async fn test_load_previous_chunk() {
2448 let room_id = room_id!("!r0:matrix.org");
2449 let linked_chunk_id = LinkedChunkId::Room(room_id);
2450 let event = |msg: &str| make_test_event(room_id, msg);
2451 let store = get_event_cache_store().await.expect("creating cache store failed");
2452
2453 {
2456 let previous_chunk = store
2457 .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2458 .await
2459 .unwrap();
2460
2461 assert!(previous_chunk.is_none());
2462 }
2463
2464 {
2467 store
2468 .handle_linked_chunk_updates(
2469 linked_chunk_id,
2470 vec![Update::NewItemsChunk {
2471 previous: None,
2472 new: ChunkIdentifier::new(42),
2473 next: None,
2474 }],
2475 )
2476 .await
2477 .unwrap();
2478
2479 let previous_chunk =
2480 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2481
2482 assert!(previous_chunk.is_none());
2483 }
2484
2485 {
2487 store
2488 .handle_linked_chunk_updates(
2489 linked_chunk_id,
2490 vec![
2491 Update::NewItemsChunk {
2493 previous: None,
2494 new: ChunkIdentifier::new(7),
2495 next: Some(ChunkIdentifier::new(42)),
2496 },
2497 Update::PushItems {
2498 at: Position::new(ChunkIdentifier::new(7), 0),
2499 items: vec![event("brigand du jorat"), event("morbier")],
2500 },
2501 ],
2502 )
2503 .await
2504 .unwrap();
2505
2506 let previous_chunk =
2507 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2508
2509 assert_matches!(previous_chunk, Some(previous_chunk) => {
2510 assert_eq!(previous_chunk.identifier, 7);
2511 assert!(previous_chunk.previous.is_none());
2512 assert_matches!(previous_chunk.next, Some(next) => {
2513 assert_eq!(next, 42);
2514 });
2515 assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2516 assert_eq!(items.len(), 2);
2517 check_test_event(&items[0], "brigand du jorat");
2518 check_test_event(&items[1], "morbier");
2519 });
2520 });
2521 }
2522 }
2523}
2524
2525#[cfg(test)]
2526mod encrypted_tests {
2527 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2528
2529 use matrix_sdk_base::{
2530 event_cache::store::{EventCacheStore, EventCacheStoreError},
2531 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2532 };
2533 use matrix_sdk_test::{async_test, event_factory::EventFactory};
2534 use once_cell::sync::Lazy;
2535 use ruma::{
2536 event_id,
2537 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2538 room_id, user_id,
2539 };
2540 use tempfile::{tempdir, TempDir};
2541
2542 use super::SqliteEventCacheStore;
2543
2544 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2545 static NUM: AtomicU32 = AtomicU32::new(0);
2546
2547 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2548 let name = NUM.fetch_add(1, SeqCst).to_string();
2549 let tmpdir_path = TMP_DIR.path().join(name);
2550
2551 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2552
2553 Ok(SqliteEventCacheStore::open(
2554 tmpdir_path.to_str().unwrap(),
2555 Some("default_test_password"),
2556 )
2557 .await
2558 .unwrap())
2559 }
2560
2561 event_cache_store_integration_tests!();
2562 event_cache_store_integration_tests_time!();
2563
2564 #[async_test]
2565 async fn test_no_sqlite_injection_in_find_event_relations() {
2566 let room_id = room_id!("!test:localhost");
2567 let another_room_id = room_id!("!r1:matrix.org");
2568 let sender = user_id!("@alice:localhost");
2569
2570 let store = get_event_cache_store()
2571 .await
2572 .expect("We should be able to create a new, empty, event cache store");
2573
2574 let f = EventFactory::new().room(room_id).sender(sender);
2575
2576 let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2578 let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2579
2580 let edit_id = event_id!("$find_me:matrix.org");
2582 let edit = f
2583 .text_msg("Find me")
2584 .event_id(edit_id)
2585 .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2586 .into_event();
2587
2588 let f = f.room(another_room_id);
2590
2591 let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2592 let another_event =
2593 f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2594
2595 store.save_event(room_id, event).await.unwrap();
2597 store.save_event(room_id, edit).await.unwrap();
2598 store.save_event(another_room_id, another_event).await.unwrap();
2599
2600 let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2604
2605 let results = store
2607 .find_event_relations(room_id, event_id, filter.as_deref())
2608 .await
2609 .expect("We should be able to attempt to find event relations");
2610
2611 similar_asserts::assert_eq!(
2613 results.len(),
2614 1,
2615 "We should only have loaded events for the first room {results:#?}"
2616 );
2617
2618 let (found_event, _) = &results[0];
2620 assert_eq!(
2621 found_event.event_id().as_deref(),
2622 Some(edit_id),
2623 "The single event we found should be the edit event"
2624 );
2625 }
2626}