matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sqlite-based backend for the [`EventCacheStore`].
16
17use std::{borrow::Cow, fmt, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22    event_cache::{
23        store::{
24            media::{
25                EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
26                MediaService,
27            },
28            EventCacheStore,
29        },
30        Event, Gap,
31    },
32    linked_chunk::{ChunkContent, ChunkIdentifier, RawChunk, Update},
33    media::{MediaRequestParameters, UniqueKey},
34};
35use matrix_sdk_store_encryption::StoreCipher;
36use ruma::{time::SystemTime, MilliSecondsSinceUnixEpoch, MxcUri, RoomId};
37use rusqlite::{params_from_iter, OptionalExtension, Transaction, TransactionBehavior};
38use tokio::fs;
39#[cfg(not(test))]
40use tracing::warn;
41use tracing::{debug, trace};
42
43use crate::{
44    error::{Error, Result},
45    utils::{
46        repeat_vars, time_to_timestamp, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47        SqliteKeyValueStoreConnExt, SqliteTransactionExt,
48    },
49    OpenStoreError,
50};
51
52mod keys {
53    // Entries in Key-value store
54    pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
55
56    // Tables
57    pub const LINKED_CHUNKS: &str = "linked_chunks";
58    pub const MEDIA: &str = "media";
59}
60
61/// Identifier of the latest database version.
62///
63/// This is used to figure whether the SQLite database requires a migration.
64/// Every new SQL migration should imply a bump of this number, and changes in
65/// the [`run_migrations`] function.
66const DATABASE_VERSION: u8 = 4;
67
68/// The string used to identify a chunk of type events, in the `type` field in
69/// the database.
70const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
71/// The string used to identify a chunk of type gap, in the `type` field in the
72/// database.
73const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
74
75/// A SQLite-based event cache store.
76#[derive(Clone)]
77pub struct SqliteEventCacheStore {
78    store_cipher: Option<Arc<StoreCipher>>,
79    pool: SqlitePool,
80    media_service: Arc<MediaService>,
81}
82
83#[cfg(not(tarpaulin_include))]
84impl fmt::Debug for SqliteEventCacheStore {
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
87    }
88}
89
90impl SqliteEventCacheStore {
91    /// Open the SQLite-based event cache store at the given path using the
92    /// given passphrase to encrypt private data.
93    pub async fn open(
94        path: impl AsRef<Path>,
95        passphrase: Option<&str>,
96    ) -> Result<Self, OpenStoreError> {
97        let pool = create_pool(path.as_ref()).await?;
98
99        Self::open_with_pool(pool, passphrase).await
100    }
101
102    /// Open an SQLite-based event cache store using the given SQLite database
103    /// pool. The given passphrase will be used to encrypt private data.
104    pub async fn open_with_pool(
105        pool: SqlitePool,
106        passphrase: Option<&str>,
107    ) -> Result<Self, OpenStoreError> {
108        let conn = pool.get().await?;
109        conn.set_journal_size_limit().await?;
110
111        let version = conn.db_version().await?;
112        run_migrations(&conn, version).await?;
113        conn.optimize().await?;
114
115        let store_cipher = match passphrase {
116            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
117            None => None,
118        };
119
120        let media_service = MediaService::new();
121        let media_retention_policy = media_retention_policy(&conn).await?;
122        media_service.restore(media_retention_policy);
123
124        Ok(Self { store_cipher, pool, media_service: Arc::new(media_service) })
125    }
126
127    fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
128        if let Some(key) = &self.store_cipher {
129            let encrypted = key.encrypt_value_data(value)?;
130            Ok(rmp_serde::to_vec_named(&encrypted)?)
131        } else {
132            Ok(value)
133        }
134    }
135
136    fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
137        if let Some(key) = &self.store_cipher {
138            let encrypted = rmp_serde::from_slice(value)?;
139            let decrypted = key.decrypt_value_data(encrypted)?;
140            Ok(Cow::Owned(decrypted))
141        } else {
142            Ok(Cow::Borrowed(value))
143        }
144    }
145
146    fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
147        let bytes = key.as_ref();
148        if let Some(store_cipher) = &self.store_cipher {
149            Key::Hashed(store_cipher.hash_key(table_name, bytes))
150        } else {
151            Key::Plain(bytes.to_owned())
152        }
153    }
154
155    async fn acquire(&self) -> Result<SqliteAsyncConn> {
156        Ok(self.pool.get().await?)
157    }
158
159    fn map_row_to_chunk(
160        row: &rusqlite::Row<'_>,
161    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
162        Ok((
163            row.get::<_, u64>(0)?,
164            row.get::<_, Option<u64>>(1)?,
165            row.get::<_, Option<u64>>(2)?,
166            row.get::<_, String>(3)?,
167        ))
168    }
169}
170
171trait TransactionExtForLinkedChunks {
172    fn rebuild_chunk(
173        &self,
174        store: &SqliteEventCacheStore,
175        room_id: &Key,
176        previous: Option<u64>,
177        index: u64,
178        next: Option<u64>,
179        chunk_type: &str,
180    ) -> Result<RawChunk<Event, Gap>>;
181
182    fn load_gap_content(
183        &self,
184        store: &SqliteEventCacheStore,
185        room_id: &Key,
186        chunk_id: ChunkIdentifier,
187    ) -> Result<Gap>;
188
189    fn load_events_content(
190        &self,
191        store: &SqliteEventCacheStore,
192        room_id: &Key,
193        chunk_id: ChunkIdentifier,
194    ) -> Result<Vec<Event>>;
195}
196
197impl TransactionExtForLinkedChunks for Transaction<'_> {
198    fn rebuild_chunk(
199        &self,
200        store: &SqliteEventCacheStore,
201        room_id: &Key,
202        previous: Option<u64>,
203        id: u64,
204        next: Option<u64>,
205        chunk_type: &str,
206    ) -> Result<RawChunk<Event, Gap>> {
207        let previous = previous.map(ChunkIdentifier::new);
208        let next = next.map(ChunkIdentifier::new);
209        let id = ChunkIdentifier::new(id);
210
211        match chunk_type {
212            CHUNK_TYPE_GAP_TYPE_STRING => {
213                // It's a gap! There's at most one row for it in the database, so a
214                // call to `query_row` is sufficient.
215                let gap = self.load_gap_content(store, room_id, id)?;
216                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
217            }
218
219            CHUNK_TYPE_EVENT_TYPE_STRING => {
220                // It's events!
221                let events = self.load_events_content(store, room_id, id)?;
222                Ok(RawChunk {
223                    content: ChunkContent::Items(events),
224                    previous,
225                    identifier: id,
226                    next,
227                })
228            }
229
230            other => {
231                // It's an error!
232                Err(Error::InvalidData {
233                    details: format!("a linked chunk has an unknown type {other}"),
234                })
235            }
236        }
237    }
238
239    fn load_gap_content(
240        &self,
241        store: &SqliteEventCacheStore,
242        room_id: &Key,
243        chunk_id: ChunkIdentifier,
244    ) -> Result<Gap> {
245        let encoded_prev_token: Vec<u8> = self.query_row(
246            "SELECT prev_token FROM gaps WHERE chunk_id = ? AND room_id = ?",
247            (chunk_id.index(), &room_id),
248            |row| row.get(0),
249        )?;
250        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
251        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
252        Ok(Gap { prev_token })
253    }
254
255    fn load_events_content(
256        &self,
257        store: &SqliteEventCacheStore,
258        room_id: &Key,
259        chunk_id: ChunkIdentifier,
260    ) -> Result<Vec<Event>> {
261        // Retrieve all the events from the database.
262        let mut events = Vec::new();
263
264        for event_data in self
265            .prepare(
266                r#"
267                    SELECT content FROM events
268                    WHERE chunk_id = ? AND room_id = ?
269                    ORDER BY position ASC
270                "#,
271            )?
272            .query_map((chunk_id.index(), &room_id), |row| row.get::<_, Vec<u8>>(0))?
273        {
274            let encoded_content = event_data?;
275            let serialized_content = store.decode_value(&encoded_content)?;
276            let sync_timeline_event = serde_json::from_slice(&serialized_content)?;
277
278            events.push(sync_timeline_event);
279        }
280
281        Ok(events)
282    }
283}
284
285async fn create_pool(path: &Path) -> Result<SqlitePool, OpenStoreError> {
286    fs::create_dir_all(path).await.map_err(OpenStoreError::CreateDir)?;
287    let cfg = deadpool_sqlite::Config::new(path.join("matrix-sdk-event-cache.sqlite3"));
288    Ok(cfg.create_pool(Runtime::Tokio1)?)
289}
290
291/// Run migrations for the given version of the database.
292async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
293    if version == 0 {
294        debug!("Creating database");
295    } else if version < DATABASE_VERSION {
296        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
297    } else {
298        return Ok(());
299    }
300
301    if version < 1 {
302        // First turn on WAL mode, this can't be done in the transaction, it fails with
303        // the error message: "cannot change into wal mode from within a transaction".
304        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
305        conn.with_transaction(|txn| {
306            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
307            txn.set_db_version(1)
308        })
309        .await?;
310    }
311
312    if version < 2 {
313        conn.with_transaction(|txn| {
314            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
315            txn.set_db_version(2)
316        })
317        .await?;
318    }
319
320    if version < 3 {
321        // Enable foreign keys for this database.
322        conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
323
324        conn.with_transaction(|txn| {
325            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
326            txn.set_db_version(3)
327        })
328        .await?;
329    }
330
331    if version < 4 {
332        conn.with_transaction(|txn| {
333            txn.execute_batch(include_str!(
334                "../migrations/event_cache_store/004_ignore_policy.sql"
335            ))?;
336            txn.set_db_version(4)
337        })
338        .await?;
339    }
340
341    Ok(())
342}
343
344#[async_trait]
345impl EventCacheStore for SqliteEventCacheStore {
346    type Error = Error;
347
348    async fn try_take_leased_lock(
349        &self,
350        lease_duration_ms: u32,
351        key: &str,
352        holder: &str,
353    ) -> Result<bool> {
354        let key = key.to_owned();
355        let holder = holder.to_owned();
356
357        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
358        let expiration = now + lease_duration_ms as u64;
359
360        let num_touched = self
361            .acquire()
362            .await?
363            .with_transaction(move |txn| {
364                txn.execute(
365                    "INSERT INTO lease_locks (key, holder, expiration)
366                    VALUES (?1, ?2, ?3)
367                    ON CONFLICT (key)
368                    DO
369                        UPDATE SET holder = ?2, expiration = ?3
370                        WHERE holder = ?2
371                        OR expiration < ?4
372                ",
373                    (key, holder, expiration, now),
374                )
375            })
376            .await?;
377
378        Ok(num_touched == 1)
379    }
380
381    async fn handle_linked_chunk_updates(
382        &self,
383        room_id: &RoomId,
384        updates: Vec<Update<Event, Gap>>,
385    ) -> Result<(), Self::Error> {
386        // Use a single transaction throughout this function, so that either all updates
387        // work, or none is taken into account.
388        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
389        let room_id = room_id.to_owned();
390        let this = self.clone();
391
392        with_immediate_transaction(self.acquire().await?, move |txn| {
393                for up in updates {
394                    match up {
395                        Update::NewItemsChunk { previous, new, next } => {
396                            let previous = previous.as_ref().map(ChunkIdentifier::index);
397                            let new = new.index();
398                            let next = next.as_ref().map(ChunkIdentifier::index);
399
400                            trace!(
401                                %room_id,
402                                "new events chunk (prev={previous:?}, i={new}, next={next:?})",
403                            );
404
405                            insert_chunk(
406                                txn,
407                                &hashed_room_id,
408                                previous,
409                                new,
410                                next,
411                                CHUNK_TYPE_EVENT_TYPE_STRING,
412                            )?;
413                        }
414
415                        Update::NewGapChunk { previous, new, next, gap } => {
416                            let serialized = serde_json::to_vec(&gap.prev_token)?;
417                            let prev_token = this.encode_value(serialized)?;
418
419                            let previous = previous.as_ref().map(ChunkIdentifier::index);
420                            let new = new.index();
421                            let next = next.as_ref().map(ChunkIdentifier::index);
422
423                            trace!(
424                                %room_id,
425                                "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
426                            );
427
428                            // Insert the chunk as a gap.
429                            insert_chunk(
430                                txn,
431                                &hashed_room_id,
432                                previous,
433                                new,
434                                next,
435                                CHUNK_TYPE_GAP_TYPE_STRING,
436                            )?;
437
438                            // Insert the gap's value.
439                            txn.execute(
440                                r#"
441                                INSERT INTO gaps(chunk_id, room_id, prev_token)
442                                VALUES (?, ?, ?)
443                            "#,
444                                (new, &hashed_room_id, prev_token),
445                            )?;
446                        }
447
448                        Update::RemoveChunk(chunk_identifier) => {
449                            let chunk_id = chunk_identifier.index();
450
451                            trace!(%room_id, "removing chunk @ {chunk_id}");
452
453                            // Find chunk to delete.
454                            let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
455                                "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?",
456                                (chunk_id, &hashed_room_id),
457                                |row| Ok((row.get(0)?, row.get(1)?))
458                            )?;
459
460                            // Replace its previous' next to its own next.
461                            if let Some(previous) = previous {
462                                txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?;
463                            }
464
465                            // Replace its next' previous to its own previous.
466                            if let Some(next) = next {
467                                txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?;
468                            }
469
470                            // Now delete it, and let cascading delete corresponding entries in the
471                            // other data tables.
472                            txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, &hashed_room_id))?;
473                        }
474
475                        Update::PushItems { at, items } => {
476                            let chunk_id = at.chunk_identifier().index();
477
478                            trace!(%room_id, "pushing {} items @ {chunk_id}", items.len());
479
480                            for (i, event) in items.into_iter().enumerate() {
481                                let serialized = serde_json::to_vec(&event)?;
482                                let content = this.encode_value(serialized)?;
483
484                                let event_id = event.event_id().map(|event_id| event_id.to_string());
485                                let index = at.index() + i;
486
487                                txn.execute(
488                                    r#"
489                                    INSERT INTO events(chunk_id, room_id, event_id, content, position)
490                                    VALUES (?, ?, ?, ?, ?)
491                                "#,
492                                    (chunk_id, &hashed_room_id, event_id, content, index),
493                                )?;
494                            }
495                        }
496
497                        Update::ReplaceItem { at, item: event } => {
498                            let chunk_id = at.chunk_identifier().index();
499                            let index = at.index();
500
501                            trace!(%room_id, "replacing item @ {chunk_id}:{index}");
502
503                            let serialized = serde_json::to_vec(&event)?;
504                            let content = this.encode_value(serialized)?;
505
506                            // The event id should be the same, but just in case it changed…
507                            let event_id = event.event_id().map(|event_id| event_id.to_string());
508
509                            txn.execute(
510                                r#"
511                                UPDATE events
512                                SET content = ?, event_id = ?
513                                WHERE room_id = ? AND chunk_id = ? AND position = ?
514                            "#,
515                                (content, event_id, &hashed_room_id, chunk_id, index,)
516                            )?;
517                        }
518
519                        Update::RemoveItem { at } => {
520                            let chunk_id = at.chunk_identifier().index();
521                            let index = at.index();
522
523                            trace!(%room_id, "removing item @ {chunk_id}:{index}");
524
525                            // Remove the entry.
526                            txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?;
527
528                            // Decrement the index of each item after the one we're going to remove.
529                            txn.execute(
530                                r#"
531                                    UPDATE events
532                                    SET position = position - 1
533                                    WHERE room_id = ? AND chunk_id = ? AND position > ?
534                                "#,
535                                (&hashed_room_id, chunk_id, index)
536                            )?;
537
538                        }
539
540                        Update::DetachLastItems { at } => {
541                            let chunk_id = at.chunk_identifier().index();
542                            let index = at.index();
543
544                            trace!(%room_id, "truncating items >= {chunk_id}:{index}");
545
546                            // Remove these entries.
547                            txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?;
548                        }
549
550                        Update::Clear => {
551                            trace!(%room_id, "clearing items");
552
553                            // Remove chunks, and let cascading do its job.
554                            txn.execute(
555                                "DELETE FROM linked_chunks WHERE room_id = ?",
556                                (&hashed_room_id,),
557                            )?;
558                        }
559
560                        Update::StartReattachItems | Update::EndReattachItems => {
561                            // Nothing.
562                        }
563                    }
564                }
565
566                Ok(())
567            })
568        .await?;
569
570        Ok(())
571    }
572
573    async fn reload_linked_chunk(
574        &self,
575        room_id: &RoomId,
576    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
577        let room_id = room_id.to_owned();
578        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
579
580        let this = self.clone();
581
582        let result = self
583            .acquire()
584            .await?
585            .with_transaction(move |txn| -> Result<_> {
586                let mut items = Vec::new();
587
588                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
589                for data in txn
590                    .prepare(
591                        "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? ORDER BY id",
592                    )?
593                    .query_map((&hashed_room_id,), Self::map_row_to_chunk)?
594                {
595                    let (id, previous, next, chunk_type) = data?;
596                    let new = txn.rebuild_chunk(
597                        &this,
598                        &hashed_room_id,
599                        previous,
600                        id,
601                        next,
602                        chunk_type.as_str(),
603                    )?;
604                    items.push(new);
605                }
606
607                Ok(items)
608            })
609            .await?;
610
611        Ok(result)
612    }
613
614    async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error> {
615        self.acquire()
616            .await?
617            .with_transaction(move |txn| {
618                // Remove all the chunks, and let cascading do its job.
619                txn.execute("DELETE FROM linked_chunks", ())
620            })
621            .await?;
622        Ok(())
623    }
624
625    async fn add_media_content(
626        &self,
627        request: &MediaRequestParameters,
628        content: Vec<u8>,
629        ignore_policy: IgnoreMediaRetentionPolicy,
630    ) -> Result<()> {
631        self.media_service.add_media_content(self, request, content, ignore_policy).await
632    }
633
634    async fn replace_media_key(
635        &self,
636        from: &MediaRequestParameters,
637        to: &MediaRequestParameters,
638    ) -> Result<(), Self::Error> {
639        let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
640        let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
641
642        let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
643        let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
644
645        let conn = self.acquire().await?;
646        conn.execute(
647            r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
648            (new_uri, new_format, prev_uri, prev_format),
649        )
650        .await?;
651
652        Ok(())
653    }
654
655    async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
656        self.media_service.get_media_content(self, request).await
657    }
658
659    async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
660        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
661        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
662
663        let conn = self.acquire().await?;
664        conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
665
666        Ok(())
667    }
668
669    async fn get_media_content_for_uri(
670        &self,
671        uri: &MxcUri,
672    ) -> Result<Option<Vec<u8>>, Self::Error> {
673        self.media_service.get_media_content_for_uri(self, uri).await
674    }
675
676    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
677        let uri = self.encode_key(keys::MEDIA, uri);
678
679        let conn = self.acquire().await?;
680        conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
681
682        Ok(())
683    }
684
685    async fn set_media_retention_policy(
686        &self,
687        policy: MediaRetentionPolicy,
688    ) -> Result<(), Self::Error> {
689        self.media_service.set_media_retention_policy(self, policy).await
690    }
691
692    fn media_retention_policy(&self) -> MediaRetentionPolicy {
693        self.media_service.media_retention_policy()
694    }
695
696    async fn set_ignore_media_retention_policy(
697        &self,
698        request: &MediaRequestParameters,
699        ignore_policy: IgnoreMediaRetentionPolicy,
700    ) -> Result<(), Self::Error> {
701        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
702    }
703
704    async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
705        self.media_service.clean_up_media_cache(self).await
706    }
707}
708
709#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
710#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
711impl EventCacheStoreMedia for SqliteEventCacheStore {
712    type Error = Error;
713
714    async fn media_retention_policy_inner(
715        &self,
716    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
717        let conn = self.acquire().await?;
718        media_retention_policy(&conn).await
719    }
720
721    async fn set_media_retention_policy_inner(
722        &self,
723        policy: MediaRetentionPolicy,
724    ) -> Result<(), Self::Error> {
725        let conn = self.acquire().await?;
726
727        let serialized_policy = rmp_serde::to_vec_named(&policy)?;
728        conn.set_kv(keys::MEDIA_RETENTION_POLICY, serialized_policy).await?;
729
730        Ok(())
731    }
732
733    async fn add_media_content_inner(
734        &self,
735        request: &MediaRequestParameters,
736        data: Vec<u8>,
737        last_access: SystemTime,
738        policy: MediaRetentionPolicy,
739        ignore_policy: IgnoreMediaRetentionPolicy,
740    ) -> Result<(), Self::Error> {
741        let ignore_policy = ignore_policy.is_yes();
742        let data = self.encode_value(data)?;
743
744        if !ignore_policy && policy.exceeds_max_file_size(data.len()) {
745            return Ok(());
746        }
747
748        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
749        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
750        let timestamp = time_to_timestamp(last_access);
751
752        let conn = self.acquire().await?;
753        conn.execute(
754            "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
755            (uri, format, data, timestamp, ignore_policy),
756        )
757        .await?;
758
759        Ok(())
760    }
761
762    async fn set_ignore_media_retention_policy_inner(
763        &self,
764        request: &MediaRequestParameters,
765        ignore_policy: IgnoreMediaRetentionPolicy,
766    ) -> Result<(), Self::Error> {
767        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
768        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
769        let ignore_policy = ignore_policy.is_yes();
770
771        let conn = self.acquire().await?;
772        conn.execute(
773            r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
774            (ignore_policy, uri, format),
775        )
776        .await?;
777
778        Ok(())
779    }
780
781    async fn get_media_content_inner(
782        &self,
783        request: &MediaRequestParameters,
784        current_time: SystemTime,
785    ) -> Result<Option<Vec<u8>>, Self::Error> {
786        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
787        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
788        let timestamp = time_to_timestamp(current_time);
789
790        let conn = self.acquire().await?;
791        let data = conn
792            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
793                // Update the last access.
794                // We need to do this first so the transaction is in write mode right away.
795                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
796                txn.execute(
797                    "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
798                    (timestamp, &uri, &format),
799                )?;
800
801                txn.query_row::<Vec<u8>, _, _>(
802                    "SELECT data FROM media WHERE uri = ? AND format = ?",
803                    (&uri, &format),
804                    |row| row.get(0),
805                )
806                .optional()
807            })
808            .await?;
809
810        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
811    }
812
813    async fn get_media_content_for_uri_inner(
814        &self,
815        uri: &MxcUri,
816        current_time: SystemTime,
817    ) -> Result<Option<Vec<u8>>, Self::Error> {
818        let uri = self.encode_key(keys::MEDIA, uri);
819        let timestamp = time_to_timestamp(current_time);
820
821        let conn = self.acquire().await?;
822        let data = conn
823            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
824                // Update the last access.
825                // We need to do this first so the transaction is in write mode right away.
826                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
827                txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
828
829                txn.query_row::<Vec<u8>, _, _>(
830                    "SELECT data FROM media WHERE uri = ?",
831                    (&uri,),
832                    |row| row.get(0),
833                )
834                .optional()
835            })
836            .await?;
837
838        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
839    }
840
841    async fn clean_up_media_cache_inner(
842        &self,
843        policy: MediaRetentionPolicy,
844        current_time: SystemTime,
845    ) -> Result<(), Self::Error> {
846        if !policy.has_limitations() {
847            // We can safely skip all the checks.
848            return Ok(());
849        }
850
851        let conn = self.acquire().await?;
852        let removed = conn
853            .with_transaction::<_, Error, _>(move |txn| {
854                let mut removed = false;
855
856                // First, check media content that exceed the max filesize.
857                if let Some(max_file_size) = policy.computed_max_file_size() {
858                    let count = txn.execute(
859                        "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
860                        (max_file_size,),
861                    )?;
862
863                    if count > 0 {
864                        removed = true;
865                    }
866                }
867
868                // Then, clean up expired media content.
869                if let Some(last_access_expiry) = policy.last_access_expiry {
870                    let current_timestamp = time_to_timestamp(current_time);
871                    let expiry_secs = last_access_expiry.as_secs();
872                    let count = txn.execute(
873                        "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
874                        (current_timestamp, expiry_secs),
875                    )?;
876
877                    if count > 0 {
878                        removed = true;
879                    }
880                }
881
882                // Finally, if the cache size is too big, remove old items until it fits.
883                if let Some(max_cache_size) = policy.max_cache_size {
884                    // i64 is the integer type used by SQLite, use it here to avoid usize overflow
885                    // during the conversion of the result.
886                    let cache_size_int = txn
887                        .query_row(
888                            "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
889                            (),
890                            |row| {
891                                // `sum()` returns `NULL` if there are no rows.
892                                row.get::<_, Option<i64>>(0)
893                            },
894                        )?
895                        .unwrap_or_default();
896                    let cache_size_usize = usize::try_from(cache_size_int);
897
898                    // If the cache size is overflowing or bigger than max cache size, clean up.
899                    if cache_size_usize.is_err()
900                        || cache_size_usize.is_ok_and(|cache_size| cache_size > max_cache_size)
901                    {
902                        // Get the sizes of the media contents ordered by last access.
903                        let mut cached_stmt = txn.prepare_cached(
904                            "SELECT rowid, length(data) FROM media \
905                             WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
906                        )?;
907                        let content_sizes = cached_stmt
908                            .query(())?
909                            .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, usize>(1)?)));
910
911                        let mut accumulated_items_size = 0usize;
912                        let mut limit_reached = false;
913                        let mut rows_to_remove = Vec::new();
914
915                        for result in content_sizes {
916                            let (row_id, size) = match result {
917                                Ok(content_size) => content_size,
918                                Err(error) => {
919                                    return Err(error.into());
920                                }
921                            };
922
923                            if limit_reached {
924                                rows_to_remove.push(row_id);
925                                continue;
926                            }
927
928                            match accumulated_items_size.checked_add(size) {
929                                Some(acc) if acc > max_cache_size => {
930                                    // We can stop accumulating.
931                                    limit_reached = true;
932                                    rows_to_remove.push(row_id);
933                                }
934                                Some(acc) => accumulated_items_size = acc,
935                                None => {
936                                    // The accumulated size is overflowing but the setting cannot be
937                                    // bigger than usize::MAX, we can stop accumulating.
938                                    limit_reached = true;
939                                    rows_to_remove.push(row_id);
940                                }
941                            };
942                        }
943
944                        if !rows_to_remove.is_empty() {
945                            removed = true;
946                        }
947
948                        txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
949                            let sql_params = repeat_vars(row_ids.len());
950                            let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
951                            txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
952                            Ok(Vec::<()>::new())
953                        })?;
954                    }
955                }
956
957                Ok(removed)
958            })
959            .await?;
960
961        // If we removed media, use the VACUUM command to defragment the
962        // database and free space on the filesystem.
963        if removed {
964            if let Err(error) = conn.execute("VACUUM", ()).await {
965                // Since this is an optimisation step, do not propagate the error
966                // but log it.
967                #[cfg(not(test))]
968                warn!("Failed to vacuum database: {error}");
969
970                // We want to know if there is an error with this step during tests.
971                #[cfg(test)]
972                return Err(error.into());
973            }
974        }
975
976        Ok(())
977    }
978}
979
980/// Like `deadpool::managed::Object::with_transaction`, but starts the
981/// transaction in immediate (write) mode from the beginning, precluding errors
982/// of the kind SQLITE_BUSY from happening, for transactions that may involve
983/// both reads and writes, and start with a write.
984async fn with_immediate_transaction<
985    T: Send + 'static,
986    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
987>(
988    conn: SqliteAsyncConn,
989    f: F,
990) -> Result<T, Error> {
991    conn.interact(move |conn| -> Result<T, Error> {
992        // Start the transaction in IMMEDIATE mode since all updates may cause writes,
993        // to avoid read transactions upgrading to write mode and causing
994        // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
995        conn.set_transaction_behavior(TransactionBehavior::Immediate);
996
997        let code = || -> Result<T, Error> {
998            let txn = conn.transaction()?;
999            let res = f(&txn)?;
1000            txn.commit()?;
1001            Ok(res)
1002        };
1003
1004        let res = code();
1005
1006        // Reset the transaction behavior to use Deferred, after this transaction has
1007        // been run, whether it was successful or not.
1008        conn.set_transaction_behavior(TransactionBehavior::Deferred);
1009
1010        res
1011    })
1012    .await
1013    // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1014    .unwrap()
1015}
1016
1017fn insert_chunk(
1018    txn: &Transaction<'_>,
1019    room_id: &Key,
1020    previous: Option<u64>,
1021    new: u64,
1022    next: Option<u64>,
1023    type_str: &str,
1024) -> rusqlite::Result<()> {
1025    // First, insert the new chunk.
1026    txn.execute(
1027        r#"
1028            INSERT INTO linked_chunks(id, room_id, previous, next, type)
1029            VALUES (?, ?, ?, ?, ?)
1030        "#,
1031        (new, room_id, previous, next, type_str),
1032    )?;
1033
1034    // If this chunk has a previous one, update its `next` field.
1035    if let Some(previous) = previous {
1036        txn.execute(
1037            r#"
1038                UPDATE linked_chunks
1039                SET next = ?
1040                WHERE id = ? AND room_id = ?
1041            "#,
1042            (new, previous, room_id),
1043        )?;
1044    }
1045
1046    // If this chunk has a next one, update its `previous` field.
1047    if let Some(next) = next {
1048        txn.execute(
1049            r#"
1050                UPDATE linked_chunks
1051                SET previous = ?
1052                WHERE id = ? AND room_id = ?
1053            "#,
1054            (new, next, room_id),
1055        )?;
1056    }
1057
1058    Ok(())
1059}
1060
1061/// Get the persisted [`MediaRetentionPolicy`] with the given connection.
1062async fn media_retention_policy(
1063    conn: &SqliteAsyncConn,
1064) -> Result<Option<MediaRetentionPolicy>, Error> {
1065    let Some(bytes) = conn.get_kv(keys::MEDIA_RETENTION_POLICY).await? else {
1066        return Ok(None);
1067    };
1068
1069    Ok(Some(rmp_serde::from_slice(&bytes)?))
1070}
1071
1072#[cfg(test)]
1073mod tests {
1074    use std::{
1075        sync::atomic::{AtomicU32, Ordering::SeqCst},
1076        time::Duration,
1077    };
1078
1079    use assert_matches::assert_matches;
1080    use matrix_sdk_base::{
1081        event_cache::{
1082            store::{
1083                integration_tests::{check_test_event, make_test_event},
1084                media::IgnoreMediaRetentionPolicy,
1085                EventCacheStore, EventCacheStoreError,
1086            },
1087            Gap,
1088        },
1089        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1090        event_cache_store_media_integration_tests,
1091        linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
1092        media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1093    };
1094    use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1095    use once_cell::sync::Lazy;
1096    use ruma::{events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1097    use tempfile::{tempdir, TempDir};
1098
1099    use super::SqliteEventCacheStore;
1100    use crate::utils::SqliteAsyncConnExt;
1101
1102    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1103    static NUM: AtomicU32 = AtomicU32::new(0);
1104
1105    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1106        let name = NUM.fetch_add(1, SeqCst).to_string();
1107        let tmpdir_path = TMP_DIR.path().join(name);
1108
1109        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1110
1111        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1112    }
1113
1114    event_cache_store_integration_tests!();
1115    event_cache_store_integration_tests_time!();
1116    event_cache_store_media_integration_tests!(with_media_size_tests);
1117
1118    async fn get_event_cache_store_content_sorted_by_last_access(
1119        event_cache_store: &SqliteEventCacheStore,
1120    ) -> Vec<Vec<u8>> {
1121        let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed");
1122        sqlite_db
1123            .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1124                stmt.query(())?.mapped(|row| row.get(0)).collect()
1125            })
1126            .await
1127            .expect("querying media cache content by last access failed")
1128    }
1129
1130    #[async_test]
1131    async fn test_last_access() {
1132        let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1133        let uri = mxc_uri!("mxc://localhost/media");
1134        let file_request = MediaRequestParameters {
1135            source: MediaSource::Plain(uri.to_owned()),
1136            format: MediaFormat::File,
1137        };
1138        let thumbnail_request = MediaRequestParameters {
1139            source: MediaSource::Plain(uri.to_owned()),
1140            format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1141                Method::Crop,
1142                uint!(100),
1143                uint!(100),
1144            )),
1145        };
1146
1147        let content: Vec<u8> = "hello world".into();
1148        let thumbnail_content: Vec<u8> = "hello…".into();
1149
1150        // Add the media.
1151        event_cache_store
1152            .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1153            .await
1154            .expect("adding file failed");
1155
1156        // Since the precision of the timestamp is in seconds, wait so the timestamps
1157        // differ.
1158        tokio::time::sleep(Duration::from_secs(3)).await;
1159
1160        event_cache_store
1161            .add_media_content(
1162                &thumbnail_request,
1163                thumbnail_content.clone(),
1164                IgnoreMediaRetentionPolicy::No,
1165            )
1166            .await
1167            .expect("adding thumbnail failed");
1168
1169        // File's last access is older than thumbnail.
1170        let contents =
1171            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1172
1173        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1174        assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1175        assert_eq!(contents[1], content, "file is not second-to-last access");
1176
1177        // Since the precision of the timestamp is in seconds, wait so the timestamps
1178        // differ.
1179        tokio::time::sleep(Duration::from_secs(3)).await;
1180
1181        // Access the file so its last access is more recent.
1182        let _ = event_cache_store
1183            .get_media_content(&file_request)
1184            .await
1185            .expect("getting file failed")
1186            .expect("file is missing");
1187
1188        // File's last access is more recent than thumbnail.
1189        let contents =
1190            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1191
1192        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1193        assert_eq!(contents[0], content, "file is not last access");
1194        assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
1195    }
1196
1197    #[async_test]
1198    async fn test_linked_chunk_new_items_chunk() {
1199        let store = get_event_cache_store().await.expect("creating cache store failed");
1200
1201        let room_id = &DEFAULT_TEST_ROOM_ID;
1202
1203        store
1204            .handle_linked_chunk_updates(
1205                room_id,
1206                vec![
1207                    Update::NewItemsChunk {
1208                        previous: None,
1209                        new: ChunkIdentifier::new(42),
1210                        next: None, // Note: the store must link the next entry itself.
1211                    },
1212                    Update::NewItemsChunk {
1213                        previous: Some(ChunkIdentifier::new(42)),
1214                        new: ChunkIdentifier::new(13),
1215                        next: Some(ChunkIdentifier::new(37)), /* But it's fine to explicitly pass
1216                                                               * the next link ahead of time. */
1217                    },
1218                    Update::NewItemsChunk {
1219                        previous: Some(ChunkIdentifier::new(13)),
1220                        new: ChunkIdentifier::new(37),
1221                        next: None,
1222                    },
1223                ],
1224            )
1225            .await
1226            .unwrap();
1227
1228        let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1229
1230        assert_eq!(chunks.len(), 3);
1231
1232        {
1233            // Chunks are ordered from smaller to bigger IDs.
1234            let c = chunks.remove(0);
1235            assert_eq!(c.identifier, ChunkIdentifier::new(13));
1236            assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1237            assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1238            assert_matches!(c.content, ChunkContent::Items(events) => {
1239                assert!(events.is_empty());
1240            });
1241
1242            let c = chunks.remove(0);
1243            assert_eq!(c.identifier, ChunkIdentifier::new(37));
1244            assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1245            assert_eq!(c.next, None);
1246            assert_matches!(c.content, ChunkContent::Items(events) => {
1247                assert!(events.is_empty());
1248            });
1249
1250            let c = chunks.remove(0);
1251            assert_eq!(c.identifier, ChunkIdentifier::new(42));
1252            assert_eq!(c.previous, None);
1253            assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1254            assert_matches!(c.content, ChunkContent::Items(events) => {
1255                assert!(events.is_empty());
1256            });
1257        }
1258    }
1259
1260    #[async_test]
1261    async fn test_linked_chunk_new_gap_chunk() {
1262        let store = get_event_cache_store().await.expect("creating cache store failed");
1263
1264        let room_id = &DEFAULT_TEST_ROOM_ID;
1265
1266        store
1267            .handle_linked_chunk_updates(
1268                room_id,
1269                vec![Update::NewGapChunk {
1270                    previous: None,
1271                    new: ChunkIdentifier::new(42),
1272                    next: None,
1273                    gap: Gap { prev_token: "raclette".to_owned() },
1274                }],
1275            )
1276            .await
1277            .unwrap();
1278
1279        let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1280
1281        assert_eq!(chunks.len(), 1);
1282
1283        // Chunks are ordered from smaller to bigger IDs.
1284        let c = chunks.remove(0);
1285        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1286        assert_eq!(c.previous, None);
1287        assert_eq!(c.next, None);
1288        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1289            assert_eq!(gap.prev_token, "raclette");
1290        });
1291    }
1292
1293    #[async_test]
1294    async fn test_linked_chunk_replace_item() {
1295        let store = get_event_cache_store().await.expect("creating cache store failed");
1296
1297        let room_id = &DEFAULT_TEST_ROOM_ID;
1298
1299        store
1300            .handle_linked_chunk_updates(
1301                room_id,
1302                vec![
1303                    Update::NewItemsChunk {
1304                        previous: None,
1305                        new: ChunkIdentifier::new(42),
1306                        next: None,
1307                    },
1308                    Update::PushItems {
1309                        at: Position::new(ChunkIdentifier::new(42), 0),
1310                        items: vec![
1311                            make_test_event(room_id, "hello"),
1312                            make_test_event(room_id, "world"),
1313                        ],
1314                    },
1315                    Update::ReplaceItem {
1316                        at: Position::new(ChunkIdentifier::new(42), 1),
1317                        item: make_test_event(room_id, "yolo"),
1318                    },
1319                ],
1320            )
1321            .await
1322            .unwrap();
1323
1324        let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1325
1326        assert_eq!(chunks.len(), 1);
1327
1328        let c = chunks.remove(0);
1329        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1330        assert_eq!(c.previous, None);
1331        assert_eq!(c.next, None);
1332        assert_matches!(c.content, ChunkContent::Items(events) => {
1333            assert_eq!(events.len(), 2);
1334            check_test_event(&events[0], "hello");
1335            check_test_event(&events[1], "yolo");
1336        });
1337    }
1338
1339    #[async_test]
1340    async fn test_linked_chunk_remove_chunk() {
1341        let store = get_event_cache_store().await.expect("creating cache store failed");
1342
1343        let room_id = &DEFAULT_TEST_ROOM_ID;
1344
1345        store
1346            .handle_linked_chunk_updates(
1347                room_id,
1348                vec![
1349                    Update::NewGapChunk {
1350                        previous: None,
1351                        new: ChunkIdentifier::new(42),
1352                        next: None,
1353                        gap: Gap { prev_token: "raclette".to_owned() },
1354                    },
1355                    Update::NewGapChunk {
1356                        previous: Some(ChunkIdentifier::new(42)),
1357                        new: ChunkIdentifier::new(43),
1358                        next: None,
1359                        gap: Gap { prev_token: "fondue".to_owned() },
1360                    },
1361                    Update::NewGapChunk {
1362                        previous: Some(ChunkIdentifier::new(43)),
1363                        new: ChunkIdentifier::new(44),
1364                        next: None,
1365                        gap: Gap { prev_token: "tartiflette".to_owned() },
1366                    },
1367                    Update::RemoveChunk(ChunkIdentifier::new(43)),
1368                ],
1369            )
1370            .await
1371            .unwrap();
1372
1373        let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1374
1375        assert_eq!(chunks.len(), 2);
1376
1377        // Chunks are ordered from smaller to bigger IDs.
1378        let c = chunks.remove(0);
1379        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1380        assert_eq!(c.previous, None);
1381        assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1382        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1383            assert_eq!(gap.prev_token, "raclette");
1384        });
1385
1386        let c = chunks.remove(0);
1387        assert_eq!(c.identifier, ChunkIdentifier::new(44));
1388        assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1389        assert_eq!(c.next, None);
1390        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1391            assert_eq!(gap.prev_token, "tartiflette");
1392        });
1393
1394        // Check that cascading worked. Yes, sqlite, I doubt you.
1395        let gaps = store
1396            .acquire()
1397            .await
1398            .unwrap()
1399            .with_transaction(|txn| -> rusqlite::Result<_> {
1400                let mut gaps = Vec::new();
1401                for data in txn
1402                    .prepare("SELECT chunk_id FROM gaps ORDER BY chunk_id")?
1403                    .query_map((), |row| row.get::<_, u64>(0))?
1404                {
1405                    gaps.push(data?);
1406                }
1407                Ok(gaps)
1408            })
1409            .await
1410            .unwrap();
1411
1412        assert_eq!(gaps, vec![42, 44]);
1413    }
1414
1415    #[async_test]
1416    async fn test_linked_chunk_push_items() {
1417        let store = get_event_cache_store().await.expect("creating cache store failed");
1418
1419        let room_id = &DEFAULT_TEST_ROOM_ID;
1420
1421        store
1422            .handle_linked_chunk_updates(
1423                room_id,
1424                vec![
1425                    Update::NewItemsChunk {
1426                        previous: None,
1427                        new: ChunkIdentifier::new(42),
1428                        next: None,
1429                    },
1430                    Update::PushItems {
1431                        at: Position::new(ChunkIdentifier::new(42), 0),
1432                        items: vec![
1433                            make_test_event(room_id, "hello"),
1434                            make_test_event(room_id, "world"),
1435                        ],
1436                    },
1437                    Update::PushItems {
1438                        at: Position::new(ChunkIdentifier::new(42), 2),
1439                        items: vec![make_test_event(room_id, "who?")],
1440                    },
1441                ],
1442            )
1443            .await
1444            .unwrap();
1445
1446        let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1447
1448        assert_eq!(chunks.len(), 1);
1449
1450        let c = chunks.remove(0);
1451        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1452        assert_eq!(c.previous, None);
1453        assert_eq!(c.next, None);
1454        assert_matches!(c.content, ChunkContent::Items(events) => {
1455            assert_eq!(events.len(), 3);
1456
1457            check_test_event(&events[0], "hello");
1458            check_test_event(&events[1], "world");
1459            check_test_event(&events[2], "who?");
1460        });
1461    }
1462
1463    #[async_test]
1464    async fn test_linked_chunk_remove_item() {
1465        let store = get_event_cache_store().await.expect("creating cache store failed");
1466
1467        let room_id = *DEFAULT_TEST_ROOM_ID;
1468
1469        store
1470            .handle_linked_chunk_updates(
1471                room_id,
1472                vec![
1473                    Update::NewItemsChunk {
1474                        previous: None,
1475                        new: ChunkIdentifier::new(42),
1476                        next: None,
1477                    },
1478                    Update::PushItems {
1479                        at: Position::new(ChunkIdentifier::new(42), 0),
1480                        items: vec![
1481                            make_test_event(room_id, "hello"),
1482                            make_test_event(room_id, "world"),
1483                        ],
1484                    },
1485                    Update::RemoveItem { at: Position::new(ChunkIdentifier::new(42), 0) },
1486                ],
1487            )
1488            .await
1489            .unwrap();
1490
1491        let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1492
1493        assert_eq!(chunks.len(), 1);
1494
1495        let c = chunks.remove(0);
1496        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1497        assert_eq!(c.previous, None);
1498        assert_eq!(c.next, None);
1499        assert_matches!(c.content, ChunkContent::Items(events) => {
1500            assert_eq!(events.len(), 1);
1501            check_test_event(&events[0], "world");
1502        });
1503
1504        // Make sure the position has been updated for the remaining event.
1505        let num_rows: u64 = store
1506            .acquire()
1507            .await
1508            .unwrap()
1509            .with_transaction(move |txn| {
1510                txn.query_row(
1511                    "SELECT COUNT(*) FROM events WHERE chunk_id = 42 AND room_id = ? AND position = 0",
1512                    (room_id.as_bytes(),),
1513                    |row| row.get(0),
1514                )
1515            })
1516            .await
1517            .unwrap();
1518        assert_eq!(num_rows, 1);
1519    }
1520
1521    #[async_test]
1522    async fn test_linked_chunk_detach_last_items() {
1523        let store = get_event_cache_store().await.expect("creating cache store failed");
1524
1525        let room_id = *DEFAULT_TEST_ROOM_ID;
1526
1527        store
1528            .handle_linked_chunk_updates(
1529                room_id,
1530                vec![
1531                    Update::NewItemsChunk {
1532                        previous: None,
1533                        new: ChunkIdentifier::new(42),
1534                        next: None,
1535                    },
1536                    Update::PushItems {
1537                        at: Position::new(ChunkIdentifier::new(42), 0),
1538                        items: vec![
1539                            make_test_event(room_id, "hello"),
1540                            make_test_event(room_id, "world"),
1541                            make_test_event(room_id, "howdy"),
1542                        ],
1543                    },
1544                    Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
1545                ],
1546            )
1547            .await
1548            .unwrap();
1549
1550        let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1551
1552        assert_eq!(chunks.len(), 1);
1553
1554        let c = chunks.remove(0);
1555        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1556        assert_eq!(c.previous, None);
1557        assert_eq!(c.next, None);
1558        assert_matches!(c.content, ChunkContent::Items(events) => {
1559            assert_eq!(events.len(), 1);
1560            check_test_event(&events[0], "hello");
1561        });
1562    }
1563
1564    #[async_test]
1565    async fn test_linked_chunk_start_end_reattach_items() {
1566        let store = get_event_cache_store().await.expect("creating cache store failed");
1567
1568        let room_id = *DEFAULT_TEST_ROOM_ID;
1569
1570        // Same updates and checks as test_linked_chunk_push_items, but with extra
1571        // `StartReattachItems` and `EndReattachItems` updates, which must have no
1572        // effects.
1573        store
1574            .handle_linked_chunk_updates(
1575                room_id,
1576                vec![
1577                    Update::NewItemsChunk {
1578                        previous: None,
1579                        new: ChunkIdentifier::new(42),
1580                        next: None,
1581                    },
1582                    Update::PushItems {
1583                        at: Position::new(ChunkIdentifier::new(42), 0),
1584                        items: vec![
1585                            make_test_event(room_id, "hello"),
1586                            make_test_event(room_id, "world"),
1587                            make_test_event(room_id, "howdy"),
1588                        ],
1589                    },
1590                    Update::StartReattachItems,
1591                    Update::EndReattachItems,
1592                ],
1593            )
1594            .await
1595            .unwrap();
1596
1597        let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
1598
1599        assert_eq!(chunks.len(), 1);
1600
1601        let c = chunks.remove(0);
1602        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1603        assert_eq!(c.previous, None);
1604        assert_eq!(c.next, None);
1605        assert_matches!(c.content, ChunkContent::Items(events) => {
1606            assert_eq!(events.len(), 3);
1607            check_test_event(&events[0], "hello");
1608            check_test_event(&events[1], "world");
1609            check_test_event(&events[2], "howdy");
1610        });
1611    }
1612
1613    #[async_test]
1614    async fn test_linked_chunk_clear() {
1615        let store = get_event_cache_store().await.expect("creating cache store failed");
1616
1617        let room_id = *DEFAULT_TEST_ROOM_ID;
1618
1619        store
1620            .handle_linked_chunk_updates(
1621                room_id,
1622                vec![
1623                    Update::NewItemsChunk {
1624                        previous: None,
1625                        new: ChunkIdentifier::new(42),
1626                        next: None,
1627                    },
1628                    Update::NewGapChunk {
1629                        previous: Some(ChunkIdentifier::new(42)),
1630                        new: ChunkIdentifier::new(54),
1631                        next: None,
1632                        gap: Gap { prev_token: "fondue".to_owned() },
1633                    },
1634                    Update::PushItems {
1635                        at: Position::new(ChunkIdentifier::new(42), 0),
1636                        items: vec![
1637                            make_test_event(room_id, "hello"),
1638                            make_test_event(room_id, "world"),
1639                            make_test_event(room_id, "howdy"),
1640                        ],
1641                    },
1642                    Update::Clear,
1643                ],
1644            )
1645            .await
1646            .unwrap();
1647
1648        let chunks = store.reload_linked_chunk(room_id).await.unwrap();
1649        assert!(chunks.is_empty());
1650    }
1651
1652    #[async_test]
1653    async fn test_linked_chunk_multiple_rooms() {
1654        let store = get_event_cache_store().await.expect("creating cache store failed");
1655
1656        let room1 = room_id!("!realcheeselovers:raclette.fr");
1657        let room2 = room_id!("!realcheeselovers:fondue.ch");
1658
1659        // Check that applying updates to one room doesn't affect the others.
1660        // Use the same chunk identifier in both rooms to battle-test search.
1661
1662        store
1663            .handle_linked_chunk_updates(
1664                room1,
1665                vec![
1666                    Update::NewItemsChunk {
1667                        previous: None,
1668                        new: ChunkIdentifier::new(42),
1669                        next: None,
1670                    },
1671                    Update::PushItems {
1672                        at: Position::new(ChunkIdentifier::new(42), 0),
1673                        items: vec![
1674                            make_test_event(room1, "best cheese is raclette"),
1675                            make_test_event(room1, "obviously"),
1676                        ],
1677                    },
1678                ],
1679            )
1680            .await
1681            .unwrap();
1682
1683        store
1684            .handle_linked_chunk_updates(
1685                room2,
1686                vec![
1687                    Update::NewItemsChunk {
1688                        previous: None,
1689                        new: ChunkIdentifier::new(42),
1690                        next: None,
1691                    },
1692                    Update::PushItems {
1693                        at: Position::new(ChunkIdentifier::new(42), 0),
1694                        items: vec![make_test_event(room1, "beaufort is the best")],
1695                    },
1696                ],
1697            )
1698            .await
1699            .unwrap();
1700
1701        // Check chunks from room 1.
1702        let mut chunks_room1 = store.reload_linked_chunk(room1).await.unwrap();
1703        assert_eq!(chunks_room1.len(), 1);
1704
1705        let c = chunks_room1.remove(0);
1706        assert_matches!(c.content, ChunkContent::Items(events) => {
1707            assert_eq!(events.len(), 2);
1708            check_test_event(&events[0], "best cheese is raclette");
1709            check_test_event(&events[1], "obviously");
1710        });
1711
1712        // Check chunks from room 2.
1713        let mut chunks_room2 = store.reload_linked_chunk(room2).await.unwrap();
1714        assert_eq!(chunks_room2.len(), 1);
1715
1716        let c = chunks_room2.remove(0);
1717        assert_matches!(c.content, ChunkContent::Items(events) => {
1718            assert_eq!(events.len(), 1);
1719            check_test_event(&events[0], "beaufort is the best");
1720        });
1721    }
1722
1723    #[async_test]
1724    async fn test_linked_chunk_update_is_a_transaction() {
1725        let store = get_event_cache_store().await.expect("creating cache store failed");
1726
1727        let room_id = *DEFAULT_TEST_ROOM_ID;
1728
1729        // Trigger a violation of the unique constraint on the (room id, chunk id)
1730        // couple.
1731        let err = store
1732            .handle_linked_chunk_updates(
1733                room_id,
1734                vec![
1735                    Update::NewItemsChunk {
1736                        previous: None,
1737                        new: ChunkIdentifier::new(42),
1738                        next: None,
1739                    },
1740                    Update::NewItemsChunk {
1741                        previous: None,
1742                        new: ChunkIdentifier::new(42),
1743                        next: None,
1744                    },
1745                ],
1746            )
1747            .await
1748            .unwrap_err();
1749
1750        // The operation fails with a constraint violation error.
1751        assert_matches!(err, crate::error::Error::Sqlite(err) => {
1752            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
1753        });
1754
1755        // If the updates have been handled transactionally, then no new chunks should
1756        // have been added; failure of the second update leads to the first one being
1757        // rolled back.
1758        let chunks = store.reload_linked_chunk(room_id).await.unwrap();
1759        assert!(chunks.is_empty());
1760    }
1761}
1762
1763#[cfg(test)]
1764mod encrypted_tests {
1765    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
1766
1767    use matrix_sdk_base::{
1768        event_cache::store::EventCacheStoreError, event_cache_store_integration_tests,
1769        event_cache_store_integration_tests_time, event_cache_store_media_integration_tests,
1770    };
1771    use once_cell::sync::Lazy;
1772    use tempfile::{tempdir, TempDir};
1773
1774    use super::SqliteEventCacheStore;
1775
1776    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1777    static NUM: AtomicU32 = AtomicU32::new(0);
1778
1779    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1780        let name = NUM.fetch_add(1, SeqCst).to_string();
1781        let tmpdir_path = TMP_DIR.path().join(name);
1782
1783        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1784
1785        Ok(SqliteEventCacheStore::open(
1786            tmpdir_path.to_str().unwrap(),
1787            Some("default_test_password"),
1788        )
1789        .await
1790        .unwrap())
1791    }
1792
1793    event_cache_store_integration_tests!();
1794    event_cache_store_integration_tests_time!();
1795    event_cache_store_media_integration_tests!();
1796}