matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`EventCacheStore`].
16
17use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22    deserialized_responses::TimelineEvent,
23    event_cache::{
24        store::{
25            compute_filters_string, extract_event_relation,
26            media::{
27                EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
28                MediaService,
29            },
30            EventCacheStore,
31        },
32        Event, Gap,
33    },
34    linked_chunk::{
35        ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunkId, Position, RawChunk,
36        Update,
37    },
38    media::{MediaRequestParameters, UniqueKey},
39};
40use matrix_sdk_store_encryption::StoreCipher;
41use ruma::{
42    events::relation::RelationType, time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri,
43    OwnedEventId, RoomId,
44};
45use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
46use tokio::fs;
47use tracing::{debug, error, trace};
48
49use crate::{
50    error::{Error, Result},
51    utils::{
52        repeat_vars, time_to_timestamp, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
53        SqliteKeyValueStoreConnExt, SqliteTransactionExt,
54    },
55    OpenStoreError, SqliteStoreConfig,
56};
57
58mod keys {
59    // Entries in Key-value store
60    pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
61    pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
62
63    // Tables
64    pub const LINKED_CHUNKS: &str = "linked_chunks";
65    pub const MEDIA: &str = "media";
66}
67
68/// The database name.
69const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
70
71/// Identifier of the latest database version.
72///
73/// This is used to figure whether the SQLite database requires a migration.
74/// Every new SQL migration should imply a bump of this number, and changes in
75/// the [`run_migrations`] function.
76const DATABASE_VERSION: u8 = 8;
77
78/// The string used to identify a chunk of type events, in the `type` field in
79/// the database.
80const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
81/// The string used to identify a chunk of type gap, in the `type` field in the
82/// database.
83const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
84
85/// An SQLite-based event cache store.
86#[derive(Clone)]
87pub struct SqliteEventCacheStore {
88    store_cipher: Option<Arc<StoreCipher>>,
89    pool: SqlitePool,
90    media_service: MediaService,
91}
92
93#[cfg(not(tarpaulin_include))]
94impl fmt::Debug for SqliteEventCacheStore {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
97    }
98}
99
100impl SqliteEventCacheStore {
101    /// Open the SQLite-based event cache store at the given path using the
102    /// given passphrase to encrypt private data.
103    pub async fn open(
104        path: impl AsRef<Path>,
105        passphrase: Option<&str>,
106    ) -> Result<Self, OpenStoreError> {
107        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
108    }
109
110    /// Open the SQLite-based event cache store with the config open config.
111    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
112        let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
113
114        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
115
116        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
117        config.pool = Some(pool_config);
118
119        let pool = config.create_pool(Runtime::Tokio1)?;
120
121        let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
122        this.pool.get().await?.apply_runtime_config(runtime_config).await?;
123
124        Ok(this)
125    }
126
127    /// Open an SQLite-based event cache store using the given SQLite database
128    /// pool. The given passphrase will be used to encrypt private data.
129    async fn open_with_pool(
130        pool: SqlitePool,
131        passphrase: Option<&str>,
132    ) -> Result<Self, OpenStoreError> {
133        let conn = pool.get().await?;
134
135        let version = conn.db_version().await?;
136        run_migrations(&conn, version).await?;
137
138        let store_cipher = match passphrase {
139            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
140            None => None,
141        };
142
143        let media_service = MediaService::new();
144        let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
145        let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
146        media_service.restore(media_retention_policy, last_media_cleanup_time);
147
148        Ok(Self { store_cipher, pool, media_service })
149    }
150
151    fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
152        if let Some(key) = &self.store_cipher {
153            let encrypted = key.encrypt_value_data(value)?;
154            Ok(rmp_serde::to_vec_named(&encrypted)?)
155        } else {
156            Ok(value)
157        }
158    }
159
160    fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
161        if let Some(key) = &self.store_cipher {
162            let encrypted = rmp_serde::from_slice(value)?;
163            let decrypted = key.decrypt_value_data(encrypted)?;
164            Ok(Cow::Owned(decrypted))
165        } else {
166            Ok(Cow::Borrowed(value))
167        }
168    }
169
170    fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
171        let bytes = key.as_ref();
172        if let Some(store_cipher) = &self.store_cipher {
173            Key::Hashed(store_cipher.hash_key(table_name, bytes))
174        } else {
175            Key::Plain(bytes.to_owned())
176        }
177    }
178
179    async fn acquire(&self) -> Result<SqliteAsyncConn> {
180        let connection = self.pool.get().await?;
181
182        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
183        // support must be enabled on a per-connection basis. Execute it every
184        // time we try to get a connection, since we can't guarantee a previous
185        // connection did enable it before.
186        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
187
188        Ok(connection)
189    }
190
191    fn map_row_to_chunk(
192        row: &rusqlite::Row<'_>,
193    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
194        Ok((
195            row.get::<_, u64>(0)?,
196            row.get::<_, Option<u64>>(1)?,
197            row.get::<_, Option<u64>>(2)?,
198            row.get::<_, String>(3)?,
199        ))
200    }
201
202    fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
203        let serialized = serde_json::to_vec(event)?;
204
205        // Extract the relationship info here.
206        let raw_event = event.raw();
207        let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
208
209        // The content may be encrypted.
210        let content = self.encode_value(serialized)?;
211
212        Ok(EncodedEvent {
213            content,
214            rel_type,
215            relates_to: relates_to.map(|relates_to| relates_to.to_string()),
216        })
217    }
218}
219
220struct EncodedEvent {
221    content: Vec<u8>,
222    rel_type: Option<String>,
223    relates_to: Option<String>,
224}
225
226trait TransactionExtForLinkedChunks {
227    fn rebuild_chunk(
228        &self,
229        store: &SqliteEventCacheStore,
230        linked_chunk_id: &Key,
231        previous: Option<u64>,
232        index: u64,
233        next: Option<u64>,
234        chunk_type: &str,
235    ) -> Result<RawChunk<Event, Gap>>;
236
237    fn load_gap_content(
238        &self,
239        store: &SqliteEventCacheStore,
240        linked_chunk_id: &Key,
241        chunk_id: ChunkIdentifier,
242    ) -> Result<Gap>;
243
244    fn load_events_content(
245        &self,
246        store: &SqliteEventCacheStore,
247        linked_chunk_id: &Key,
248        chunk_id: ChunkIdentifier,
249    ) -> Result<Vec<Event>>;
250}
251
252impl TransactionExtForLinkedChunks for Transaction<'_> {
253    fn rebuild_chunk(
254        &self,
255        store: &SqliteEventCacheStore,
256        linked_chunk_id: &Key,
257        previous: Option<u64>,
258        id: u64,
259        next: Option<u64>,
260        chunk_type: &str,
261    ) -> Result<RawChunk<Event, Gap>> {
262        let previous = previous.map(ChunkIdentifier::new);
263        let next = next.map(ChunkIdentifier::new);
264        let id = ChunkIdentifier::new(id);
265
266        match chunk_type {
267            CHUNK_TYPE_GAP_TYPE_STRING => {
268                // It's a gap!
269                let gap = self.load_gap_content(store, linked_chunk_id, id)?;
270                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
271            }
272
273            CHUNK_TYPE_EVENT_TYPE_STRING => {
274                // It's events!
275                let events = self.load_events_content(store, linked_chunk_id, id)?;
276                Ok(RawChunk {
277                    content: ChunkContent::Items(events),
278                    previous,
279                    identifier: id,
280                    next,
281                })
282            }
283
284            other => {
285                // It's an error!
286                Err(Error::InvalidData {
287                    details: format!("a linked chunk has an unknown type {other}"),
288                })
289            }
290        }
291    }
292
293    fn load_gap_content(
294        &self,
295        store: &SqliteEventCacheStore,
296        linked_chunk_id: &Key,
297        chunk_id: ChunkIdentifier,
298    ) -> Result<Gap> {
299        // There's at most one row for it in the database, so a call to `query_row` is
300        // sufficient.
301        let encoded_prev_token: Vec<u8> = self.query_row(
302            "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
303            (chunk_id.index(), &linked_chunk_id),
304            |row| row.get(0),
305        )?;
306        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
307        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
308        Ok(Gap { prev_token })
309    }
310
311    fn load_events_content(
312        &self,
313        store: &SqliteEventCacheStore,
314        linked_chunk_id: &Key,
315        chunk_id: ChunkIdentifier,
316    ) -> Result<Vec<Event>> {
317        // Retrieve all the events from the database.
318        let mut events = Vec::new();
319
320        for event_data in self
321            .prepare(
322                r#"
323                    SELECT events.content
324                    FROM event_chunks ec, events
325                    WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
326                    ORDER BY ec.position ASC
327                "#,
328            )?
329            .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
330        {
331            let encoded_content = event_data?;
332            let serialized_content = store.decode_value(&encoded_content)?;
333            let event = serde_json::from_slice(&serialized_content)?;
334
335            events.push(event);
336        }
337
338        Ok(events)
339    }
340}
341
342/// Run migrations for the given version of the database.
343async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
344    if version == 0 {
345        debug!("Creating database");
346    } else if version < DATABASE_VERSION {
347        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
348    } else {
349        return Ok(());
350    }
351
352    // Always enable foreign keys for the current connection.
353    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
354
355    if version < 1 {
356        // First turn on WAL mode, this can't be done in the transaction, it fails with
357        // the error message: "cannot change into wal mode from within a transaction".
358        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
359        conn.with_transaction(|txn| {
360            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
361            txn.set_db_version(1)
362        })
363        .await?;
364    }
365
366    if version < 2 {
367        conn.with_transaction(|txn| {
368            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
369            txn.set_db_version(2)
370        })
371        .await?;
372    }
373
374    if version < 3 {
375        conn.with_transaction(|txn| {
376            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
377            txn.set_db_version(3)
378        })
379        .await?;
380    }
381
382    if version < 4 {
383        conn.with_transaction(|txn| {
384            txn.execute_batch(include_str!(
385                "../migrations/event_cache_store/004_ignore_policy.sql"
386            ))?;
387            txn.set_db_version(4)
388        })
389        .await?;
390    }
391
392    if version < 5 {
393        conn.with_transaction(|txn| {
394            txn.execute_batch(include_str!(
395                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
396            ))?;
397            txn.set_db_version(5)
398        })
399        .await?;
400    }
401
402    if version < 6 {
403        conn.with_transaction(|txn| {
404            txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
405            txn.set_db_version(6)
406        })
407        .await?;
408    }
409
410    if version < 7 {
411        conn.with_transaction(|txn| {
412            txn.execute_batch(include_str!(
413                "../migrations/event_cache_store/007_event_chunks.sql"
414            ))?;
415            txn.set_db_version(7)
416        })
417        .await?;
418    }
419
420    if version < 8 {
421        conn.with_transaction(|txn| {
422            txn.execute_batch(include_str!(
423                "../migrations/event_cache_store/008_linked_chunk_id.sql"
424            ))?;
425            txn.set_db_version(8)
426        })
427        .await?;
428    }
429
430    Ok(())
431}
432
433#[async_trait]
434impl EventCacheStore for SqliteEventCacheStore {
435    type Error = Error;
436
437    async fn try_take_leased_lock(
438        &self,
439        lease_duration_ms: u32,
440        key: &str,
441        holder: &str,
442    ) -> Result<bool> {
443        let key = key.to_owned();
444        let holder = holder.to_owned();
445
446        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
447        let expiration = now + lease_duration_ms as u64;
448
449        let num_touched = self
450            .acquire()
451            .await?
452            .with_transaction(move |txn| {
453                txn.execute(
454                    "INSERT INTO lease_locks (key, holder, expiration)
455                    VALUES (?1, ?2, ?3)
456                    ON CONFLICT (key)
457                    DO
458                        UPDATE SET holder = ?2, expiration = ?3
459                        WHERE holder = ?2
460                        OR expiration < ?4
461                ",
462                    (key, holder, expiration, now),
463                )
464            })
465            .await?;
466
467        Ok(num_touched == 1)
468    }
469
470    async fn handle_linked_chunk_updates(
471        &self,
472        linked_chunk_id: LinkedChunkId<'_>,
473        updates: Vec<Update<Event, Gap>>,
474    ) -> Result<(), Self::Error> {
475        // Use a single transaction throughout this function, so that either all updates
476        // work, or none is taken into account.
477        let hashed_linked_chunk_id =
478            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
479        let linked_chunk_id = linked_chunk_id.to_owned();
480        let this = self.clone();
481
482        with_immediate_transaction(self.acquire().await?, move |txn| {
483            for up in updates {
484                match up {
485                    Update::NewItemsChunk { previous, new, next } => {
486                        let previous = previous.as_ref().map(ChunkIdentifier::index);
487                        let new = new.index();
488                        let next = next.as_ref().map(ChunkIdentifier::index);
489
490                        trace!(
491                            %linked_chunk_id,
492                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
493                        );
494
495                        insert_chunk(
496                            txn,
497                            &hashed_linked_chunk_id,
498                            previous,
499                            new,
500                            next,
501                            CHUNK_TYPE_EVENT_TYPE_STRING,
502                        )?;
503                    }
504
505                    Update::NewGapChunk { previous, new, next, gap } => {
506                        let serialized = serde_json::to_vec(&gap.prev_token)?;
507                        let prev_token = this.encode_value(serialized)?;
508
509                        let previous = previous.as_ref().map(ChunkIdentifier::index);
510                        let new = new.index();
511                        let next = next.as_ref().map(ChunkIdentifier::index);
512
513                        trace!(
514                            %linked_chunk_id,
515                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
516                        );
517
518                        // Insert the chunk as a gap.
519                        insert_chunk(
520                            txn,
521                            &hashed_linked_chunk_id,
522                            previous,
523                            new,
524                            next,
525                            CHUNK_TYPE_GAP_TYPE_STRING,
526                        )?;
527
528                        // Insert the gap's value.
529                        txn.execute(
530                            r#"
531                            INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
532                            VALUES (?, ?, ?)
533                        "#,
534                            (new, &hashed_linked_chunk_id, prev_token),
535                        )?;
536                    }
537
538                    Update::RemoveChunk(chunk_identifier) => {
539                        let chunk_id = chunk_identifier.index();
540
541                        trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
542
543                        // Find chunk to delete.
544                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
545                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
546                            (chunk_id, &hashed_linked_chunk_id),
547                            |row| Ok((row.get(0)?, row.get(1)?))
548                        )?;
549
550                        // Replace its previous' next to its own next.
551                        if let Some(previous) = previous {
552                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
553                        }
554
555                        // Replace its next' previous to its own previous.
556                        if let Some(next) = next {
557                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
558                        }
559
560                        // Now delete it, and let cascading delete corresponding entries in the
561                        // other data tables.
562                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
563                    }
564
565                    Update::PushItems { at, items } => {
566                        if items.is_empty() {
567                            // Should never happens, but better be safe.
568                            continue;
569                        }
570
571                        let chunk_id = at.chunk_identifier().index();
572
573                        trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
574
575                        let mut chunk_statement = txn.prepare(
576                            "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
577                        )?;
578
579                        // Note: we use `OR REPLACE` here, because the event might have been
580                        // already inserted in the database. This is the case when an event is
581                        // deduplicated and moved to another position; or because it was inserted
582                        // outside the context of a linked chunk (e.g. pinned event).
583                        let mut content_statement = txn.prepare(
584                            "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
585                        )?;
586
587                        let invalid_event = |event: TimelineEvent| {
588                            let Some(event_id) = event.event_id() else {
589                                error!(%linked_chunk_id, "Trying to push an event with no ID");
590                                return None;
591                            };
592
593                            Some((event_id.to_string(), event))
594                        };
595
596                        let room_id = linked_chunk_id.room_id();
597                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
598
599                        for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
600                            // Insert the location information into the database.
601                            let index = at.index() + i;
602                            chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
603
604                            // Now, insert the event content into the database.
605                            let encoded_event = this.encode_event(&event)?;
606                            content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
607                        }
608                    }
609
610                    Update::ReplaceItem { at, item: event } => {
611                        let chunk_id = at.chunk_identifier().index();
612
613                        let index = at.index();
614
615                        trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
616
617                        // The event id should be the same, but just in case it changed…
618                        let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
619                            error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
620                            continue;
621                        };
622
623                        // Replace the event's content. Really we'd like to update, but in case the
624                        // event id changed, we are a bit lenient here and will allow an insertion
625                        // of the new event.
626                        let encoded_event = this.encode_event(&event)?;
627                        let room_id = linked_chunk_id.room_id();
628                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
629                        txn.execute(
630                            "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
631                        , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
632
633                        // Replace the event id in the linked chunk, in case it changed.
634                        txn.execute(
635                            r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
636                            (event_id, &hashed_linked_chunk_id, chunk_id, index)
637                        )?;
638                    }
639
640                    Update::RemoveItem { at } => {
641                        let chunk_id = at.chunk_identifier().index();
642                        let index = at.index();
643
644                        trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
645
646                        // Remove the entry in the chunk table.
647                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
648
649                        // Decrement the index of each item after the one we are
650                        // going to remove.
651                        //
652                        // Imagine we have the following events:
653                        //
654                        // | event_id | linked_chunk_id | chunk_id | position |
655                        // |----------|-----------------|----------|----------|
656                        // | $ev0     | !r0             | 42       | 0        |
657                        // | $ev1     | !r0             | 42       | 1        |
658                        // | $ev2     | !r0             | 42       | 2        |
659                        // | $ev3     | !r0             | 42       | 3        |
660                        // | $ev4     | !r0             | 42       | 4        |
661                        // 
662                        // `$ev2` has been removed, then we end up in this
663                        // state:
664                        //
665                        // | event_id | linked_chunk_id    | chunk_id | position |
666                        // |----------|--------------------|----------|----------|
667                        // | $ev0     | !r0                | 42       | 0        |
668                        // | $ev1     | !r0                | 42       | 1        |
669                        // |          |                    |          |          | <- no more `$ev2`
670                        // | $ev3     | !r0                | 42       | 3        |
671                        // | $ev4     | !r0                | 42       | 4        |
672                        //
673                        // We need to shift the `position` of `$ev3` and `$ev4`
674                        // to `position - 1`, like so:
675                        // 
676                        // | event_id | linked_chunk_id | chunk_id | position |
677                        // |----------|-----------------|----------|----------|
678                        // | $ev0     | !r0             | 42       | 0        |
679                        // | $ev1     | !r0             | 42       | 1        |
680                        // | $ev3     | !r0             | 42       | 2        |
681                        // | $ev4     | !r0             | 42       | 3        |
682                        //
683                        // Usually, it boils down to run the following query:
684                        //
685                        // ```sql
686                        // UPDATE event_chunks
687                        // SET position = position - 1
688                        // WHERE position > 2 AND …
689                        // ```
690                        //
691                        // Okay. But `UPDATE` runs on rows in no particular
692                        // order. It means that it can update `$ev4` before
693                        // `$ev3` for example. What happens in this particular
694                        // case? The `position` of `$ev4` becomes `3`, however
695                        // `$ev3` already has `position = 3`. Because there
696                        // is a `UNIQUE` constraint on `(linked_chunk_id, chunk_id,
697                        // position)`, it will result in a constraint violation.
698                        //
699                        // There is **no way** to control the execution order of
700                        // `UPDATE` in SQLite. To persuade yourself, try:
701                        //
702                        // ```sql
703                        // UPDATE event_chunks
704                        // SET position = position - 1
705                        // FROM (
706                        //     SELECT event_id
707                        //     FROM event_chunks
708                        //     WHERE position > 2 AND …
709                        //     ORDER BY position ASC
710                        // ) as ordered
711                        // WHERE event_chunks.event_id = ordered.event_id
712                        // ```
713                        //
714                        // It will fail the same way.
715                        //
716                        // Thus, we have 2 solutions:
717                        //
718                        // 1. Remove the `UNIQUE` constraint,
719                        // 2. Be creative.
720                        //
721                        // The `UNIQUE` constraint is a safe belt. Normally, we
722                        // have `event_cache::Deduplicator` that is responsible
723                        // to ensure there is no duplicated event. However,
724                        // relying on this is “fragile” in the sense it can
725                        // contain bugs. Relying on the `UNIQUE` constraint from
726                        // SQLite is more robust. It's “braces and belt” as we
727                        // say here.
728                        //
729                        // So. We need to be creative.
730                        //
731                        // Many solutions exist. Amongst the most popular, we
732                        // see _dropping and re-creating the index_, which is
733                        // no-go for us, it's too expensive. I (@hywan) have
734                        // adopted the following one:
735                        // 
736                        // - Do `position = position - 1` but in the negative
737                        //   space, so `position = -(position - 1)`. A position
738                        //   cannot be negative; we are sure it is unique!
739                        // - Once all candidate rows are updated, do `position =
740                        //   -position` to move back to the positive space.
741                        //
742                        // 'told you it's gonna be creative.
743                        //
744                        // This solution is a hack, **but** it is a small
745                        // number of operations, and we can keep the `UNIQUE`
746                        // constraint in place.
747                        txn.execute(
748                            r#"
749                                UPDATE event_chunks
750                                SET position = -(position - 1)
751                                WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
752                            "#,
753                            (&hashed_linked_chunk_id, chunk_id, index)
754                        )?;
755                        txn.execute(
756                            r#"
757                                UPDATE event_chunks
758                                SET position = -position
759                                WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
760                            "#,
761                            (&hashed_linked_chunk_id, chunk_id)
762                        )?;
763                    }
764
765                    Update::DetachLastItems { at } => {
766                        let chunk_id = at.chunk_identifier().index();
767                        let index = at.index();
768
769                        trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
770
771                        // Remove these entries.
772                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
773                    }
774
775                    Update::Clear => {
776                        trace!(%linked_chunk_id, "clearing items");
777
778                        // Remove chunks, and let cascading do its job.
779                        txn.execute(
780                            "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
781                            (&hashed_linked_chunk_id,),
782                        )?;
783                    }
784
785                    Update::StartReattachItems | Update::EndReattachItems => {
786                        // Nothing.
787                    }
788                }
789            }
790
791            Ok(())
792        })
793        .await?;
794
795        Ok(())
796    }
797
798    async fn load_all_chunks(
799        &self,
800        linked_chunk_id: LinkedChunkId<'_>,
801    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
802        let hashed_linked_chunk_id =
803            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
804
805        let this = self.clone();
806
807        let result = self
808            .acquire()
809            .await?
810            .with_transaction(move |txn| -> Result<_> {
811                let mut items = Vec::new();
812
813                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
814                for data in txn
815                    .prepare(
816                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
817                    )?
818                    .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
819                {
820                    let (id, previous, next, chunk_type) = data?;
821                    let new = txn.rebuild_chunk(
822                        &this,
823                        &hashed_linked_chunk_id,
824                        previous,
825                        id,
826                        next,
827                        chunk_type.as_str(),
828                    )?;
829                    items.push(new);
830                }
831
832                Ok(items)
833            })
834            .await?;
835
836        Ok(result)
837    }
838
839    async fn load_last_chunk(
840        &self,
841        linked_chunk_id: LinkedChunkId<'_>,
842    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
843        let hashed_linked_chunk_id =
844            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
845
846        let this = self.clone();
847
848        self
849            .acquire()
850            .await?
851            .with_transaction(move |txn| -> Result<_> {
852                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
853                let (chunk_identifier_generator, number_of_chunks) = txn
854                    .prepare(
855                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
856                    )?
857                    .query_row(
858                        (&hashed_linked_chunk_id,),
859                        |row| {
860                            Ok((
861                                // Read the `MAX(id)` as an `Option<u64>` instead
862                                // of `u64` in case the `SELECT` returns nothing.
863                                // Indeed, if it returns no line, the `MAX(id)` is
864                                // set to `Null`.
865                                row.get::<_, Option<u64>>(0)?,
866                                row.get::<_, u64>(1)?,
867                            ))
868                        }
869                    )?;
870
871                let chunk_identifier_generator = match chunk_identifier_generator {
872                    Some(last_chunk_identifier) => {
873                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
874                            ChunkIdentifier::new(last_chunk_identifier)
875                        )
876                    },
877                    None => ChunkIdentifierGenerator::new_from_scratch(),
878                };
879
880                // Find the last chunk.
881                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
882                    .prepare(
883                        "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
884                    )?
885                    .query_row(
886                        (&hashed_linked_chunk_id,),
887                        |row| {
888                            Ok((
889                                row.get::<_, u64>(0)?,
890                                row.get::<_, Option<u64>>(1)?,
891                                row.get::<_, String>(2)?,
892                            ))
893                        }
894                    )
895                    .optional()?
896                else {
897                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
898                    // good.
899                    if number_of_chunks == 0 {
900                        return Ok((None, chunk_identifier_generator));
901                    }
902                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
903                    // linked chunk is malformed.
904                    //
905                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
906                    else {
907                        return Err(Error::InvalidData {
908                            details:
909                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
910                                    .to_owned()
911                            }
912                        )
913                    }
914                };
915
916                // Build the chunk.
917                let last_chunk = txn.rebuild_chunk(
918                    &this,
919                    &hashed_linked_chunk_id,
920                    previous_chunk,
921                    chunk_identifier,
922                    None,
923                    &chunk_type
924                )?;
925
926                Ok((Some(last_chunk), chunk_identifier_generator))
927            })
928            .await
929    }
930
931    async fn load_previous_chunk(
932        &self,
933        linked_chunk_id: LinkedChunkId<'_>,
934        before_chunk_identifier: ChunkIdentifier,
935    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
936        let hashed_linked_chunk_id =
937            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
938
939        let this = self.clone();
940
941        self
942            .acquire()
943            .await?
944            .with_transaction(move |txn| -> Result<_> {
945                // Find the chunk before the chunk identified by `before_chunk_identifier`.
946                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
947                    .prepare(
948                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
949                    )?
950                    .query_row(
951                        (&hashed_linked_chunk_id, before_chunk_identifier.index()),
952                        |row| {
953                            Ok((
954                                row.get::<_, u64>(0)?,
955                                row.get::<_, Option<u64>>(1)?,
956                                row.get::<_, Option<u64>>(2)?,
957                                row.get::<_, String>(3)?,
958                            ))
959                        }
960                    )
961                    .optional()?
962                else {
963                    // Chunk is not found.
964                    return Ok(None);
965                };
966
967                // Build the chunk.
968                let last_chunk = txn.rebuild_chunk(
969                    &this,
970                    &hashed_linked_chunk_id,
971                    previous_chunk,
972                    chunk_identifier,
973                    next_chunk,
974                    &chunk_type
975                )?;
976
977                Ok(Some(last_chunk))
978            })
979            .await
980    }
981
982    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
983        self.acquire()
984            .await?
985            .with_transaction(move |txn| {
986                // Remove all the chunks, and let cascading do its job.
987                txn.execute("DELETE FROM linked_chunks", ())?;
988                // Also clear all the events' contents.
989                txn.execute("DELETE FROM events", ())
990            })
991            .await?;
992        Ok(())
993    }
994
995    async fn filter_duplicated_events(
996        &self,
997        linked_chunk_id: LinkedChunkId<'_>,
998        events: Vec<OwnedEventId>,
999    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1000        // If there's no events for which we want to check duplicates, we can return
1001        // early. It's not only an optimization to do so: it's required, otherwise the
1002        // `repeat_vars` call below will panic.
1003        if events.is_empty() {
1004            return Ok(Vec::new());
1005        }
1006
1007        // Select all events that exist in the store, i.e. the duplicates.
1008        let hashed_linked_chunk_id =
1009            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1010        let linked_chunk_id = linked_chunk_id.to_owned();
1011
1012        self.acquire()
1013            .await?
1014            .with_transaction(move |txn| -> Result<_> {
1015                txn.chunk_large_query_over(events, None, move |txn, events| {
1016                    let query = format!(
1017                        r#"
1018                            SELECT event_id, chunk_id, position
1019                            FROM event_chunks
1020                            WHERE linked_chunk_id = ? AND event_id IN ({})
1021                            ORDER BY chunk_id ASC, position ASC
1022                        "#,
1023                        repeat_vars(events.len()),
1024                    );
1025
1026                    let parameters = params_from_iter(
1027                        // parameter for `linked_chunk_id = ?`
1028                        once(
1029                            hashed_linked_chunk_id
1030                                .to_sql()
1031                                // SAFETY: it cannot fail since `Key::to_sql` never fails
1032                                .unwrap(),
1033                        )
1034                        // parameters for `event_id IN (…)`
1035                        .chain(events.iter().map(|event| {
1036                            event
1037                                .as_str()
1038                                .to_sql()
1039                                // SAFETY: it cannot fail since `str::to_sql` never fails
1040                                .unwrap()
1041                        })),
1042                    );
1043
1044                    let mut duplicated_events = Vec::new();
1045
1046                    for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1047                        Ok((
1048                            row.get::<_, String>(0)?,
1049                            row.get::<_, u64>(1)?,
1050                            row.get::<_, usize>(2)?,
1051                        ))
1052                    })? {
1053                        let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1054
1055                        let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1056                            // Normally unreachable, but the event ID has been stored even if it is
1057                            // malformed, let's skip it.
1058                            error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1059                            continue;
1060                        };
1061
1062                        duplicated_events.push((
1063                            duplicated_event,
1064                            Position::new(ChunkIdentifier::new(chunk_identifier), index),
1065                        ));
1066                    }
1067
1068                    Ok(duplicated_events)
1069                })
1070            })
1071            .await
1072    }
1073
1074    async fn find_event(
1075        &self,
1076        room_id: &RoomId,
1077        event_id: &EventId,
1078    ) -> Result<Option<Event>, Self::Error> {
1079        let event_id = event_id.to_owned();
1080        let this = self.clone();
1081
1082        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1083
1084        self.acquire()
1085            .await?
1086            .with_transaction(move |txn| -> Result<_> {
1087                let Some(event) = txn
1088                    .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1089                    .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1090                    .optional()?
1091                else {
1092                    // Event is not found.
1093                    return Ok(None);
1094                };
1095
1096                let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1097
1098                Ok(Some(event))
1099            })
1100            .await
1101    }
1102
1103    async fn find_event_relations(
1104        &self,
1105        room_id: &RoomId,
1106        event_id: &EventId,
1107        filters: Option<&[RelationType]>,
1108    ) -> Result<Vec<Event>, Self::Error> {
1109        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1110
1111        let event_id = event_id.to_owned();
1112        let filters = filters.map(ToOwned::to_owned);
1113        let this = self.clone();
1114
1115        self.acquire()
1116            .await?
1117            .with_transaction(move |txn| -> Result<_> {
1118                let filter_query = if let Some(filters) = compute_filters_string(filters.as_deref())
1119                {
1120                    format!(
1121                        " AND rel_type IN ({})",
1122                        filters
1123                            .into_iter()
1124                            .map(|f| format!(r#""{f}""#))
1125                            .collect::<Vec<_>>()
1126                            .join(", ")
1127                    )
1128                } else {
1129                    "".to_owned()
1130                };
1131
1132                let query = format!(
1133                    "SELECT content FROM events WHERE relates_to = ? AND room_id = ? {filter_query}"
1134                );
1135
1136                // Collect related events.
1137                let mut related = Vec::new();
1138                for ev in
1139                    txn.prepare(&query)?.query_map((event_id.as_str(), hashed_room_id), |row| {
1140                        row.get::<_, Vec<u8>>(0)
1141                    })?
1142                {
1143                    let ev = ev?;
1144                    let ev = serde_json::from_slice(&this.decode_value(&ev)?)?;
1145                    related.push(ev);
1146                }
1147
1148                Ok(related)
1149            })
1150            .await
1151    }
1152
1153    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1154        let Some(event_id) = event.event_id() else {
1155            error!(%room_id, "Trying to save an event with no ID");
1156            return Ok(());
1157        };
1158
1159        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1160        let event_id = event_id.to_string();
1161        let encoded_event = self.encode_event(&event)?;
1162
1163        self.acquire()
1164            .await?
1165            .with_transaction(move |txn| -> Result<_> {
1166                txn.execute(
1167                    "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
1168                    , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1169
1170                Ok(())
1171            })
1172            .await
1173    }
1174
1175    async fn add_media_content(
1176        &self,
1177        request: &MediaRequestParameters,
1178        content: Vec<u8>,
1179        ignore_policy: IgnoreMediaRetentionPolicy,
1180    ) -> Result<()> {
1181        self.media_service.add_media_content(self, request, content, ignore_policy).await
1182    }
1183
1184    async fn replace_media_key(
1185        &self,
1186        from: &MediaRequestParameters,
1187        to: &MediaRequestParameters,
1188    ) -> Result<(), Self::Error> {
1189        let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
1190        let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
1191
1192        let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
1193        let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
1194
1195        let conn = self.acquire().await?;
1196        conn.execute(
1197            r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
1198            (new_uri, new_format, prev_uri, prev_format),
1199        )
1200        .await?;
1201
1202        Ok(())
1203    }
1204
1205    async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
1206        self.media_service.get_media_content(self, request).await
1207    }
1208
1209    async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
1210        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1211        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1212
1213        let conn = self.acquire().await?;
1214        conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
1215
1216        Ok(())
1217    }
1218
1219    async fn get_media_content_for_uri(
1220        &self,
1221        uri: &MxcUri,
1222    ) -> Result<Option<Vec<u8>>, Self::Error> {
1223        self.media_service.get_media_content_for_uri(self, uri).await
1224    }
1225
1226    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
1227        let uri = self.encode_key(keys::MEDIA, uri);
1228
1229        let conn = self.acquire().await?;
1230        conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
1231
1232        Ok(())
1233    }
1234
1235    async fn set_media_retention_policy(
1236        &self,
1237        policy: MediaRetentionPolicy,
1238    ) -> Result<(), Self::Error> {
1239        self.media_service.set_media_retention_policy(self, policy).await
1240    }
1241
1242    fn media_retention_policy(&self) -> MediaRetentionPolicy {
1243        self.media_service.media_retention_policy()
1244    }
1245
1246    async fn set_ignore_media_retention_policy(
1247        &self,
1248        request: &MediaRequestParameters,
1249        ignore_policy: IgnoreMediaRetentionPolicy,
1250    ) -> Result<(), Self::Error> {
1251        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
1252    }
1253
1254    async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
1255        self.media_service.clean_up_media_cache(self).await
1256    }
1257}
1258
1259#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1260#[cfg_attr(not(target_family = "wasm"), async_trait)]
1261impl EventCacheStoreMedia for SqliteEventCacheStore {
1262    type Error = Error;
1263
1264    async fn media_retention_policy_inner(
1265        &self,
1266    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
1267        let conn = self.acquire().await?;
1268        conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
1269    }
1270
1271    async fn set_media_retention_policy_inner(
1272        &self,
1273        policy: MediaRetentionPolicy,
1274    ) -> Result<(), Self::Error> {
1275        let conn = self.acquire().await?;
1276        conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
1277        Ok(())
1278    }
1279
1280    async fn add_media_content_inner(
1281        &self,
1282        request: &MediaRequestParameters,
1283        data: Vec<u8>,
1284        last_access: SystemTime,
1285        policy: MediaRetentionPolicy,
1286        ignore_policy: IgnoreMediaRetentionPolicy,
1287    ) -> Result<(), Self::Error> {
1288        let ignore_policy = ignore_policy.is_yes();
1289        let data = self.encode_value(data)?;
1290
1291        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
1292            return Ok(());
1293        }
1294
1295        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1296        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1297        let timestamp = time_to_timestamp(last_access);
1298
1299        let conn = self.acquire().await?;
1300        conn.execute(
1301            "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
1302            (uri, format, data, timestamp, ignore_policy),
1303        )
1304        .await?;
1305
1306        Ok(())
1307    }
1308
1309    async fn set_ignore_media_retention_policy_inner(
1310        &self,
1311        request: &MediaRequestParameters,
1312        ignore_policy: IgnoreMediaRetentionPolicy,
1313    ) -> Result<(), Self::Error> {
1314        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1315        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1316        let ignore_policy = ignore_policy.is_yes();
1317
1318        let conn = self.acquire().await?;
1319        conn.execute(
1320            r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
1321            (ignore_policy, uri, format),
1322        )
1323        .await?;
1324
1325        Ok(())
1326    }
1327
1328    async fn get_media_content_inner(
1329        &self,
1330        request: &MediaRequestParameters,
1331        current_time: SystemTime,
1332    ) -> Result<Option<Vec<u8>>, Self::Error> {
1333        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1334        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1335        let timestamp = time_to_timestamp(current_time);
1336
1337        let conn = self.acquire().await?;
1338        let data = conn
1339            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1340                // Update the last access.
1341                // We need to do this first so the transaction is in write mode right away.
1342                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
1343                txn.execute(
1344                    "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
1345                    (timestamp, &uri, &format),
1346                )?;
1347
1348                txn.query_row::<Vec<u8>, _, _>(
1349                    "SELECT data FROM media WHERE uri = ? AND format = ?",
1350                    (&uri, &format),
1351                    |row| row.get(0),
1352                )
1353                .optional()
1354            })
1355            .await?;
1356
1357        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1358    }
1359
1360    async fn get_media_content_for_uri_inner(
1361        &self,
1362        uri: &MxcUri,
1363        current_time: SystemTime,
1364    ) -> Result<Option<Vec<u8>>, Self::Error> {
1365        let uri = self.encode_key(keys::MEDIA, uri);
1366        let timestamp = time_to_timestamp(current_time);
1367
1368        let conn = self.acquire().await?;
1369        let data = conn
1370            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1371                // Update the last access.
1372                // We need to do this first so the transaction is in write mode right away.
1373                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
1374                txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
1375
1376                txn.query_row::<Vec<u8>, _, _>(
1377                    "SELECT data FROM media WHERE uri = ?",
1378                    (&uri,),
1379                    |row| row.get(0),
1380                )
1381                .optional()
1382            })
1383            .await?;
1384
1385        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1386    }
1387
1388    async fn clean_up_media_cache_inner(
1389        &self,
1390        policy: MediaRetentionPolicy,
1391        current_time: SystemTime,
1392    ) -> Result<(), Self::Error> {
1393        if !policy.has_limitations() {
1394            // We can safely skip all the checks.
1395            return Ok(());
1396        }
1397
1398        let conn = self.acquire().await?;
1399        let removed = conn
1400            .with_transaction::<_, Error, _>(move |txn| {
1401                let mut removed = false;
1402
1403                // First, check media content that exceed the max filesize.
1404                if let Some(max_file_size) = policy.computed_max_file_size() {
1405                    let count = txn.execute(
1406                        "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
1407                        (max_file_size,),
1408                    )?;
1409
1410                    if count > 0 {
1411                        removed = true;
1412                    }
1413                }
1414
1415                // Then, clean up expired media content.
1416                if let Some(last_access_expiry) = policy.last_access_expiry {
1417                    let current_timestamp = time_to_timestamp(current_time);
1418                    let expiry_secs = last_access_expiry.as_secs();
1419                    let count = txn.execute(
1420                        "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
1421                        (current_timestamp, expiry_secs),
1422                    )?;
1423
1424                    if count > 0 {
1425                        removed = true;
1426                    }
1427                }
1428
1429                // Finally, if the cache size is too big, remove old items until it fits.
1430                if let Some(max_cache_size) = policy.max_cache_size {
1431                    // i64 is the integer type used by SQLite, use it here to avoid usize overflow
1432                    // during the conversion of the result.
1433                    let cache_size = txn
1434                        .query_row(
1435                            "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
1436                            (),
1437                            |row| {
1438                                // `sum()` returns `NULL` if there are no rows.
1439                                row.get::<_, Option<u64>>(0)
1440                            },
1441                        )?
1442                        .unwrap_or_default();
1443
1444                    // If the cache size is overflowing or bigger than max cache size, clean up.
1445                    if cache_size > max_cache_size {
1446                        // Get the sizes of the media contents ordered by last access.
1447                        let mut cached_stmt = txn.prepare_cached(
1448                            "SELECT rowid, length(data) FROM media \
1449                             WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
1450                        )?;
1451                        let content_sizes = cached_stmt
1452                            .query(())?
1453                            .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
1454
1455                        let mut accumulated_items_size = 0u64;
1456                        let mut limit_reached = false;
1457                        let mut rows_to_remove = Vec::new();
1458
1459                        for result in content_sizes {
1460                            let (row_id, size) = match result {
1461                                Ok(content_size) => content_size,
1462                                Err(error) => {
1463                                    return Err(error.into());
1464                                }
1465                            };
1466
1467                            if limit_reached {
1468                                rows_to_remove.push(row_id);
1469                                continue;
1470                            }
1471
1472                            match accumulated_items_size.checked_add(size) {
1473                                Some(acc) if acc > max_cache_size => {
1474                                    // We can stop accumulating.
1475                                    limit_reached = true;
1476                                    rows_to_remove.push(row_id);
1477                                }
1478                                Some(acc) => accumulated_items_size = acc,
1479                                None => {
1480                                    // The accumulated size is overflowing but the setting cannot be
1481                                    // bigger than usize::MAX, we can stop accumulating.
1482                                    limit_reached = true;
1483                                    rows_to_remove.push(row_id);
1484                                }
1485                            };
1486                        }
1487
1488                        if !rows_to_remove.is_empty() {
1489                            removed = true;
1490                        }
1491
1492                        txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
1493                            let sql_params = repeat_vars(row_ids.len());
1494                            let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
1495                            txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
1496                            Ok(Vec::<()>::new())
1497                        })?;
1498                    }
1499                }
1500
1501                txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
1502
1503                Ok(removed)
1504            })
1505            .await?;
1506
1507        // If we removed media, defragment the database and free space on the
1508        // filesystem.
1509        if removed {
1510            conn.vacuum().await?;
1511        }
1512
1513        Ok(())
1514    }
1515
1516    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1517        let conn = self.acquire().await?;
1518        conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
1519    }
1520}
1521
1522/// Like `deadpool::managed::Object::with_transaction`, but starts the
1523/// transaction in immediate (write) mode from the beginning, precluding errors
1524/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1525/// both reads and writes, and start with a write.
1526async fn with_immediate_transaction<
1527    T: Send + 'static,
1528    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1529>(
1530    conn: SqliteAsyncConn,
1531    f: F,
1532) -> Result<T, Error> {
1533    conn.interact(move |conn| -> Result<T, Error> {
1534        // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1535        // to avoid read transactions upgrading to write mode and causing
1536        // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1537        conn.set_transaction_behavior(TransactionBehavior::Immediate);
1538
1539        let code = || -> Result<T, Error> {
1540            let txn = conn.transaction()?;
1541            let res = f(&txn)?;
1542            txn.commit()?;
1543            Ok(res)
1544        };
1545
1546        let res = code();
1547
1548        // Reset the transaction behavior to use Deferred, after this transaction has
1549        // been run, whether it was successful or not.
1550        conn.set_transaction_behavior(TransactionBehavior::Deferred);
1551
1552        res
1553    })
1554    .await
1555    // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1556    .unwrap()
1557}
1558
1559fn insert_chunk(
1560    txn: &Transaction<'_>,
1561    linked_chunk_id: &Key,
1562    previous: Option<u64>,
1563    new: u64,
1564    next: Option<u64>,
1565    type_str: &str,
1566) -> rusqlite::Result<()> {
1567    // First, insert the new chunk.
1568    txn.execute(
1569        r#"
1570            INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1571            VALUES (?, ?, ?, ?, ?)
1572        "#,
1573        (new, linked_chunk_id, previous, next, type_str),
1574    )?;
1575
1576    // If this chunk has a previous one, update its `next` field.
1577    if let Some(previous) = previous {
1578        txn.execute(
1579            r#"
1580                UPDATE linked_chunks
1581                SET next = ?
1582                WHERE id = ? AND linked_chunk_id = ?
1583            "#,
1584            (new, previous, linked_chunk_id),
1585        )?;
1586    }
1587
1588    // If this chunk has a next one, update its `previous` field.
1589    if let Some(next) = next {
1590        txn.execute(
1591            r#"
1592                UPDATE linked_chunks
1593                SET previous = ?
1594                WHERE id = ? AND linked_chunk_id = ?
1595            "#,
1596            (new, next, linked_chunk_id),
1597        )?;
1598    }
1599
1600    Ok(())
1601}
1602
1603#[cfg(test)]
1604mod tests {
1605    use std::{
1606        path::PathBuf,
1607        sync::atomic::{AtomicU32, Ordering::SeqCst},
1608        time::Duration,
1609    };
1610
1611    use assert_matches::assert_matches;
1612    use matrix_sdk_base::{
1613        event_cache::{
1614            store::{
1615                integration_tests::{
1616                    check_test_event, make_test_event, make_test_event_with_event_id,
1617                },
1618                media::IgnoreMediaRetentionPolicy,
1619                EventCacheStore, EventCacheStoreError,
1620            },
1621            Gap,
1622        },
1623        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1624        event_cache_store_media_integration_tests,
1625        linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1626        media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1627    };
1628    use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1629    use once_cell::sync::Lazy;
1630    use ruma::{event_id, events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1631    use tempfile::{tempdir, TempDir};
1632
1633    use super::SqliteEventCacheStore;
1634    use crate::{event_cache_store::keys, utils::SqliteAsyncConnExt, SqliteStoreConfig};
1635
1636    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1637    static NUM: AtomicU32 = AtomicU32::new(0);
1638
1639    fn new_event_cache_store_workspace() -> PathBuf {
1640        let name = NUM.fetch_add(1, SeqCst).to_string();
1641        TMP_DIR.path().join(name)
1642    }
1643
1644    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1645        let tmpdir_path = new_event_cache_store_workspace();
1646
1647        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1648
1649        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1650    }
1651
1652    event_cache_store_integration_tests!();
1653    event_cache_store_integration_tests_time!();
1654    event_cache_store_media_integration_tests!(with_media_size_tests);
1655
1656    async fn get_event_cache_store_content_sorted_by_last_access(
1657        event_cache_store: &SqliteEventCacheStore,
1658    ) -> Vec<Vec<u8>> {
1659        let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed");
1660        sqlite_db
1661            .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1662                stmt.query(())?.mapped(|row| row.get(0)).collect()
1663            })
1664            .await
1665            .expect("querying media cache content by last access failed")
1666    }
1667
1668    #[async_test]
1669    async fn test_pool_size() {
1670        let tmpdir_path = new_event_cache_store_workspace();
1671        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1672
1673        let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1674
1675        assert_eq!(store.pool.status().max_size, 42);
1676    }
1677
1678    #[async_test]
1679    async fn test_last_access() {
1680        let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1681        let uri = mxc_uri!("mxc://localhost/media");
1682        let file_request = MediaRequestParameters {
1683            source: MediaSource::Plain(uri.to_owned()),
1684            format: MediaFormat::File,
1685        };
1686        let thumbnail_request = MediaRequestParameters {
1687            source: MediaSource::Plain(uri.to_owned()),
1688            format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1689                Method::Crop,
1690                uint!(100),
1691                uint!(100),
1692            )),
1693        };
1694
1695        let content: Vec<u8> = "hello world".into();
1696        let thumbnail_content: Vec<u8> = "hello…".into();
1697
1698        // Add the media.
1699        event_cache_store
1700            .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1701            .await
1702            .expect("adding file failed");
1703
1704        // Since the precision of the timestamp is in seconds, wait so the timestamps
1705        // differ.
1706        tokio::time::sleep(Duration::from_secs(3)).await;
1707
1708        event_cache_store
1709            .add_media_content(
1710                &thumbnail_request,
1711                thumbnail_content.clone(),
1712                IgnoreMediaRetentionPolicy::No,
1713            )
1714            .await
1715            .expect("adding thumbnail failed");
1716
1717        // File's last access is older than thumbnail.
1718        let contents =
1719            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1720
1721        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1722        assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1723        assert_eq!(contents[1], content, "file is not second-to-last access");
1724
1725        // Since the precision of the timestamp is in seconds, wait so the timestamps
1726        // differ.
1727        tokio::time::sleep(Duration::from_secs(3)).await;
1728
1729        // Access the file so its last access is more recent.
1730        let _ = event_cache_store
1731            .get_media_content(&file_request)
1732            .await
1733            .expect("getting file failed")
1734            .expect("file is missing");
1735
1736        // File's last access is more recent than thumbnail.
1737        let contents =
1738            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1739
1740        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1741        assert_eq!(contents[0], content, "file is not last access");
1742        assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
1743    }
1744
1745    #[async_test]
1746    async fn test_linked_chunk_new_items_chunk() {
1747        let store = get_event_cache_store().await.expect("creating cache store failed");
1748
1749        let room_id = &DEFAULT_TEST_ROOM_ID;
1750        let linked_chunk_id = LinkedChunkId::Room(room_id);
1751
1752        store
1753            .handle_linked_chunk_updates(
1754                linked_chunk_id,
1755                vec![
1756                    Update::NewItemsChunk {
1757                        previous: None,
1758                        new: ChunkIdentifier::new(42),
1759                        next: None, // Note: the store must link the next entry itself.
1760                    },
1761                    Update::NewItemsChunk {
1762                        previous: Some(ChunkIdentifier::new(42)),
1763                        new: ChunkIdentifier::new(13),
1764                        next: Some(ChunkIdentifier::new(37)), /* But it's fine to explicitly pass
1765                                                               * the next link ahead of time. */
1766                    },
1767                    Update::NewItemsChunk {
1768                        previous: Some(ChunkIdentifier::new(13)),
1769                        new: ChunkIdentifier::new(37),
1770                        next: None,
1771                    },
1772                ],
1773            )
1774            .await
1775            .unwrap();
1776
1777        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1778
1779        assert_eq!(chunks.len(), 3);
1780
1781        {
1782            // Chunks are ordered from smaller to bigger IDs.
1783            let c = chunks.remove(0);
1784            assert_eq!(c.identifier, ChunkIdentifier::new(13));
1785            assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1786            assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1787            assert_matches!(c.content, ChunkContent::Items(events) => {
1788                assert!(events.is_empty());
1789            });
1790
1791            let c = chunks.remove(0);
1792            assert_eq!(c.identifier, ChunkIdentifier::new(37));
1793            assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1794            assert_eq!(c.next, None);
1795            assert_matches!(c.content, ChunkContent::Items(events) => {
1796                assert!(events.is_empty());
1797            });
1798
1799            let c = chunks.remove(0);
1800            assert_eq!(c.identifier, ChunkIdentifier::new(42));
1801            assert_eq!(c.previous, None);
1802            assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1803            assert_matches!(c.content, ChunkContent::Items(events) => {
1804                assert!(events.is_empty());
1805            });
1806        }
1807    }
1808
1809    #[async_test]
1810    async fn test_linked_chunk_new_gap_chunk() {
1811        let store = get_event_cache_store().await.expect("creating cache store failed");
1812
1813        let room_id = &DEFAULT_TEST_ROOM_ID;
1814        let linked_chunk_id = LinkedChunkId::Room(room_id);
1815
1816        store
1817            .handle_linked_chunk_updates(
1818                linked_chunk_id,
1819                vec![Update::NewGapChunk {
1820                    previous: None,
1821                    new: ChunkIdentifier::new(42),
1822                    next: None,
1823                    gap: Gap { prev_token: "raclette".to_owned() },
1824                }],
1825            )
1826            .await
1827            .unwrap();
1828
1829        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1830
1831        assert_eq!(chunks.len(), 1);
1832
1833        // Chunks are ordered from smaller to bigger IDs.
1834        let c = chunks.remove(0);
1835        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1836        assert_eq!(c.previous, None);
1837        assert_eq!(c.next, None);
1838        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1839            assert_eq!(gap.prev_token, "raclette");
1840        });
1841    }
1842
1843    #[async_test]
1844    async fn test_linked_chunk_replace_item() {
1845        let store = get_event_cache_store().await.expect("creating cache store failed");
1846
1847        let room_id = &DEFAULT_TEST_ROOM_ID;
1848        let linked_chunk_id = LinkedChunkId::Room(room_id);
1849        let event_id = event_id!("$world");
1850
1851        store
1852            .handle_linked_chunk_updates(
1853                linked_chunk_id,
1854                vec![
1855                    Update::NewItemsChunk {
1856                        previous: None,
1857                        new: ChunkIdentifier::new(42),
1858                        next: None,
1859                    },
1860                    Update::PushItems {
1861                        at: Position::new(ChunkIdentifier::new(42), 0),
1862                        items: vec![
1863                            make_test_event(room_id, "hello"),
1864                            make_test_event_with_event_id(room_id, "world", Some(event_id)),
1865                        ],
1866                    },
1867                    Update::ReplaceItem {
1868                        at: Position::new(ChunkIdentifier::new(42), 1),
1869                        item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1870                    },
1871                ],
1872            )
1873            .await
1874            .unwrap();
1875
1876        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1877
1878        assert_eq!(chunks.len(), 1);
1879
1880        let c = chunks.remove(0);
1881        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1882        assert_eq!(c.previous, None);
1883        assert_eq!(c.next, None);
1884        assert_matches!(c.content, ChunkContent::Items(events) => {
1885            assert_eq!(events.len(), 2);
1886            check_test_event(&events[0], "hello");
1887            check_test_event(&events[1], "yolo");
1888        });
1889    }
1890
1891    #[async_test]
1892    async fn test_linked_chunk_remove_chunk() {
1893        let store = get_event_cache_store().await.expect("creating cache store failed");
1894
1895        let room_id = &DEFAULT_TEST_ROOM_ID;
1896        let linked_chunk_id = LinkedChunkId::Room(room_id);
1897
1898        store
1899            .handle_linked_chunk_updates(
1900                linked_chunk_id,
1901                vec![
1902                    Update::NewGapChunk {
1903                        previous: None,
1904                        new: ChunkIdentifier::new(42),
1905                        next: None,
1906                        gap: Gap { prev_token: "raclette".to_owned() },
1907                    },
1908                    Update::NewGapChunk {
1909                        previous: Some(ChunkIdentifier::new(42)),
1910                        new: ChunkIdentifier::new(43),
1911                        next: None,
1912                        gap: Gap { prev_token: "fondue".to_owned() },
1913                    },
1914                    Update::NewGapChunk {
1915                        previous: Some(ChunkIdentifier::new(43)),
1916                        new: ChunkIdentifier::new(44),
1917                        next: None,
1918                        gap: Gap { prev_token: "tartiflette".to_owned() },
1919                    },
1920                    Update::RemoveChunk(ChunkIdentifier::new(43)),
1921                ],
1922            )
1923            .await
1924            .unwrap();
1925
1926        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1927
1928        assert_eq!(chunks.len(), 2);
1929
1930        // Chunks are ordered from smaller to bigger IDs.
1931        let c = chunks.remove(0);
1932        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1933        assert_eq!(c.previous, None);
1934        assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1935        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1936            assert_eq!(gap.prev_token, "raclette");
1937        });
1938
1939        let c = chunks.remove(0);
1940        assert_eq!(c.identifier, ChunkIdentifier::new(44));
1941        assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1942        assert_eq!(c.next, None);
1943        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1944            assert_eq!(gap.prev_token, "tartiflette");
1945        });
1946
1947        // Check that cascading worked. Yes, SQLite, I doubt you.
1948        let gaps = store
1949            .acquire()
1950            .await
1951            .unwrap()
1952            .with_transaction(|txn| -> rusqlite::Result<_> {
1953                let mut gaps = Vec::new();
1954                for data in txn
1955                    .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1956                    .query_map((), |row| row.get::<_, u64>(0))?
1957                {
1958                    gaps.push(data?);
1959                }
1960                Ok(gaps)
1961            })
1962            .await
1963            .unwrap();
1964
1965        assert_eq!(gaps, vec![42, 44]);
1966    }
1967
1968    #[async_test]
1969    async fn test_linked_chunk_push_items() {
1970        let store = get_event_cache_store().await.expect("creating cache store failed");
1971
1972        let room_id = &DEFAULT_TEST_ROOM_ID;
1973        let linked_chunk_id = LinkedChunkId::Room(room_id);
1974
1975        store
1976            .handle_linked_chunk_updates(
1977                linked_chunk_id,
1978                vec![
1979                    Update::NewItemsChunk {
1980                        previous: None,
1981                        new: ChunkIdentifier::new(42),
1982                        next: None,
1983                    },
1984                    Update::PushItems {
1985                        at: Position::new(ChunkIdentifier::new(42), 0),
1986                        items: vec![
1987                            make_test_event(room_id, "hello"),
1988                            make_test_event(room_id, "world"),
1989                        ],
1990                    },
1991                    Update::PushItems {
1992                        at: Position::new(ChunkIdentifier::new(42), 2),
1993                        items: vec![make_test_event(room_id, "who?")],
1994                    },
1995                ],
1996            )
1997            .await
1998            .unwrap();
1999
2000        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2001
2002        assert_eq!(chunks.len(), 1);
2003
2004        let c = chunks.remove(0);
2005        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2006        assert_eq!(c.previous, None);
2007        assert_eq!(c.next, None);
2008        assert_matches!(c.content, ChunkContent::Items(events) => {
2009            assert_eq!(events.len(), 3);
2010
2011            check_test_event(&events[0], "hello");
2012            check_test_event(&events[1], "world");
2013            check_test_event(&events[2], "who?");
2014        });
2015    }
2016
2017    #[async_test]
2018    async fn test_linked_chunk_remove_item() {
2019        let store = get_event_cache_store().await.expect("creating cache store failed");
2020
2021        let room_id = *DEFAULT_TEST_ROOM_ID;
2022        let linked_chunk_id = LinkedChunkId::Room(room_id);
2023
2024        store
2025            .handle_linked_chunk_updates(
2026                linked_chunk_id,
2027                vec![
2028                    Update::NewItemsChunk {
2029                        previous: None,
2030                        new: ChunkIdentifier::new(42),
2031                        next: None,
2032                    },
2033                    Update::PushItems {
2034                        at: Position::new(ChunkIdentifier::new(42), 0),
2035                        items: vec![
2036                            make_test_event(room_id, "one"),
2037                            make_test_event(room_id, "two"),
2038                            make_test_event(room_id, "three"),
2039                            make_test_event(room_id, "four"),
2040                            make_test_event(room_id, "five"),
2041                            make_test_event(room_id, "six"),
2042                        ],
2043                    },
2044                    Update::RemoveItem {
2045                        at: Position::new(ChunkIdentifier::new(42), 2), /* "three" */
2046                    },
2047                ],
2048            )
2049            .await
2050            .unwrap();
2051
2052        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2053
2054        assert_eq!(chunks.len(), 1);
2055
2056        let c = chunks.remove(0);
2057        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2058        assert_eq!(c.previous, None);
2059        assert_eq!(c.next, None);
2060        assert_matches!(c.content, ChunkContent::Items(events) => {
2061            assert_eq!(events.len(), 5);
2062            check_test_event(&events[0], "one");
2063            check_test_event(&events[1], "two");
2064            check_test_event(&events[2], "four");
2065            check_test_event(&events[3], "five");
2066            check_test_event(&events[4], "six");
2067        });
2068
2069        // Make sure the position have been updated for the remaining events.
2070        let num_rows: u64 = store
2071            .acquire()
2072            .await
2073            .unwrap()
2074            .with_transaction(move |txn| {
2075                txn.query_row(
2076                    "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
2077                    (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
2078                    |row| row.get(0),
2079                )
2080            })
2081            .await
2082            .unwrap();
2083        assert_eq!(num_rows, 3);
2084    }
2085
2086    #[async_test]
2087    async fn test_linked_chunk_detach_last_items() {
2088        let store = get_event_cache_store().await.expect("creating cache store failed");
2089
2090        let room_id = *DEFAULT_TEST_ROOM_ID;
2091        let linked_chunk_id = LinkedChunkId::Room(room_id);
2092
2093        store
2094            .handle_linked_chunk_updates(
2095                linked_chunk_id,
2096                vec![
2097                    Update::NewItemsChunk {
2098                        previous: None,
2099                        new: ChunkIdentifier::new(42),
2100                        next: None,
2101                    },
2102                    Update::PushItems {
2103                        at: Position::new(ChunkIdentifier::new(42), 0),
2104                        items: vec![
2105                            make_test_event(room_id, "hello"),
2106                            make_test_event(room_id, "world"),
2107                            make_test_event(room_id, "howdy"),
2108                        ],
2109                    },
2110                    Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2111                ],
2112            )
2113            .await
2114            .unwrap();
2115
2116        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2117
2118        assert_eq!(chunks.len(), 1);
2119
2120        let c = chunks.remove(0);
2121        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2122        assert_eq!(c.previous, None);
2123        assert_eq!(c.next, None);
2124        assert_matches!(c.content, ChunkContent::Items(events) => {
2125            assert_eq!(events.len(), 1);
2126            check_test_event(&events[0], "hello");
2127        });
2128    }
2129
2130    #[async_test]
2131    async fn test_linked_chunk_start_end_reattach_items() {
2132        let store = get_event_cache_store().await.expect("creating cache store failed");
2133
2134        let room_id = *DEFAULT_TEST_ROOM_ID;
2135        let linked_chunk_id = LinkedChunkId::Room(room_id);
2136
2137        // Same updates and checks as test_linked_chunk_push_items, but with extra
2138        // `StartReattachItems` and `EndReattachItems` updates, which must have no
2139        // effects.
2140        store
2141            .handle_linked_chunk_updates(
2142                linked_chunk_id,
2143                vec![
2144                    Update::NewItemsChunk {
2145                        previous: None,
2146                        new: ChunkIdentifier::new(42),
2147                        next: None,
2148                    },
2149                    Update::PushItems {
2150                        at: Position::new(ChunkIdentifier::new(42), 0),
2151                        items: vec![
2152                            make_test_event(room_id, "hello"),
2153                            make_test_event(room_id, "world"),
2154                            make_test_event(room_id, "howdy"),
2155                        ],
2156                    },
2157                    Update::StartReattachItems,
2158                    Update::EndReattachItems,
2159                ],
2160            )
2161            .await
2162            .unwrap();
2163
2164        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2165
2166        assert_eq!(chunks.len(), 1);
2167
2168        let c = chunks.remove(0);
2169        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2170        assert_eq!(c.previous, None);
2171        assert_eq!(c.next, None);
2172        assert_matches!(c.content, ChunkContent::Items(events) => {
2173            assert_eq!(events.len(), 3);
2174            check_test_event(&events[0], "hello");
2175            check_test_event(&events[1], "world");
2176            check_test_event(&events[2], "howdy");
2177        });
2178    }
2179
2180    #[async_test]
2181    async fn test_linked_chunk_clear() {
2182        let store = get_event_cache_store().await.expect("creating cache store failed");
2183
2184        let room_id = *DEFAULT_TEST_ROOM_ID;
2185        let linked_chunk_id = LinkedChunkId::Room(room_id);
2186        let event_0 = make_test_event(room_id, "hello");
2187        let event_1 = make_test_event(room_id, "world");
2188        let event_2 = make_test_event(room_id, "howdy");
2189
2190        store
2191            .handle_linked_chunk_updates(
2192                linked_chunk_id,
2193                vec![
2194                    Update::NewItemsChunk {
2195                        previous: None,
2196                        new: ChunkIdentifier::new(42),
2197                        next: None,
2198                    },
2199                    Update::NewGapChunk {
2200                        previous: Some(ChunkIdentifier::new(42)),
2201                        new: ChunkIdentifier::new(54),
2202                        next: None,
2203                        gap: Gap { prev_token: "fondue".to_owned() },
2204                    },
2205                    Update::PushItems {
2206                        at: Position::new(ChunkIdentifier::new(42), 0),
2207                        items: vec![event_0.clone(), event_1, event_2],
2208                    },
2209                    Update::Clear,
2210                ],
2211            )
2212            .await
2213            .unwrap();
2214
2215        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2216        assert!(chunks.is_empty());
2217
2218        // Check that cascading worked. Yes, SQLite, I doubt you.
2219        store
2220            .acquire()
2221            .await
2222            .unwrap()
2223            .with_transaction(|txn| -> rusqlite::Result<_> {
2224                let num_gaps = txn
2225                    .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2226                    .query_row((), |row| row.get::<_, u64>(0))?;
2227                assert_eq!(num_gaps, 0);
2228
2229                let num_events = txn
2230                    .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2231                    .query_row((), |row| row.get::<_, u64>(0))?;
2232                assert_eq!(num_events, 0);
2233
2234                Ok(())
2235            })
2236            .await
2237            .unwrap();
2238
2239        // It's okay to re-insert a past event.
2240        store
2241            .handle_linked_chunk_updates(
2242                linked_chunk_id,
2243                vec![
2244                    Update::NewItemsChunk {
2245                        previous: None,
2246                        new: ChunkIdentifier::new(42),
2247                        next: None,
2248                    },
2249                    Update::PushItems {
2250                        at: Position::new(ChunkIdentifier::new(42), 0),
2251                        items: vec![event_0],
2252                    },
2253                ],
2254            )
2255            .await
2256            .unwrap();
2257    }
2258
2259    #[async_test]
2260    async fn test_linked_chunk_multiple_rooms() {
2261        let store = get_event_cache_store().await.expect("creating cache store failed");
2262
2263        let room1 = room_id!("!realcheeselovers:raclette.fr");
2264        let linked_chunk_id1 = LinkedChunkId::Room(room1);
2265        let room2 = room_id!("!realcheeselovers:fondue.ch");
2266        let linked_chunk_id2 = LinkedChunkId::Room(room2);
2267
2268        // Check that applying updates to one room doesn't affect the others.
2269        // Use the same chunk identifier in both rooms to battle-test search.
2270
2271        store
2272            .handle_linked_chunk_updates(
2273                linked_chunk_id1,
2274                vec![
2275                    Update::NewItemsChunk {
2276                        previous: None,
2277                        new: ChunkIdentifier::new(42),
2278                        next: None,
2279                    },
2280                    Update::PushItems {
2281                        at: Position::new(ChunkIdentifier::new(42), 0),
2282                        items: vec![
2283                            make_test_event(room1, "best cheese is raclette"),
2284                            make_test_event(room1, "obviously"),
2285                        ],
2286                    },
2287                ],
2288            )
2289            .await
2290            .unwrap();
2291
2292        store
2293            .handle_linked_chunk_updates(
2294                linked_chunk_id2,
2295                vec![
2296                    Update::NewItemsChunk {
2297                        previous: None,
2298                        new: ChunkIdentifier::new(42),
2299                        next: None,
2300                    },
2301                    Update::PushItems {
2302                        at: Position::new(ChunkIdentifier::new(42), 0),
2303                        items: vec![make_test_event(room1, "beaufort is the best")],
2304                    },
2305                ],
2306            )
2307            .await
2308            .unwrap();
2309
2310        // Check chunks from room 1.
2311        let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2312        assert_eq!(chunks_room1.len(), 1);
2313
2314        let c = chunks_room1.remove(0);
2315        assert_matches!(c.content, ChunkContent::Items(events) => {
2316            assert_eq!(events.len(), 2);
2317            check_test_event(&events[0], "best cheese is raclette");
2318            check_test_event(&events[1], "obviously");
2319        });
2320
2321        // Check chunks from room 2.
2322        let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2323        assert_eq!(chunks_room2.len(), 1);
2324
2325        let c = chunks_room2.remove(0);
2326        assert_matches!(c.content, ChunkContent::Items(events) => {
2327            assert_eq!(events.len(), 1);
2328            check_test_event(&events[0], "beaufort is the best");
2329        });
2330    }
2331
2332    #[async_test]
2333    async fn test_linked_chunk_update_is_a_transaction() {
2334        let store = get_event_cache_store().await.expect("creating cache store failed");
2335
2336        let room_id = *DEFAULT_TEST_ROOM_ID;
2337        let linked_chunk_id = LinkedChunkId::Room(room_id);
2338
2339        // Trigger a violation of the unique constraint on the (room id, chunk id)
2340        // couple.
2341        let err = store
2342            .handle_linked_chunk_updates(
2343                linked_chunk_id,
2344                vec![
2345                    Update::NewItemsChunk {
2346                        previous: None,
2347                        new: ChunkIdentifier::new(42),
2348                        next: None,
2349                    },
2350                    Update::NewItemsChunk {
2351                        previous: None,
2352                        new: ChunkIdentifier::new(42),
2353                        next: None,
2354                    },
2355                ],
2356            )
2357            .await
2358            .unwrap_err();
2359
2360        // The operation fails with a constraint violation error.
2361        assert_matches!(err, crate::error::Error::Sqlite(err) => {
2362            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2363        });
2364
2365        // If the updates have been handled transactionally, then no new chunks should
2366        // have been added; failure of the second update leads to the first one being
2367        // rolled back.
2368        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2369        assert!(chunks.is_empty());
2370    }
2371
2372    #[async_test]
2373    async fn test_filter_duplicate_events_no_events() {
2374        let store = get_event_cache_store().await.expect("creating cache store failed");
2375
2376        let room_id = *DEFAULT_TEST_ROOM_ID;
2377        let linked_chunk_id = LinkedChunkId::Room(room_id);
2378        let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2379        assert!(duplicates.is_empty());
2380    }
2381
2382    #[async_test]
2383    async fn test_load_last_chunk() {
2384        let room_id = room_id!("!r0:matrix.org");
2385        let linked_chunk_id = LinkedChunkId::Room(room_id);
2386        let event = |msg: &str| make_test_event(room_id, msg);
2387        let store = get_event_cache_store().await.expect("creating cache store failed");
2388
2389        // Case #1: no last chunk.
2390        {
2391            let (last_chunk, chunk_identifier_generator) =
2392                store.load_last_chunk(linked_chunk_id).await.unwrap();
2393
2394            assert!(last_chunk.is_none());
2395            assert_eq!(chunk_identifier_generator.current(), 0);
2396        }
2397
2398        // Case #2: only one chunk is present.
2399        {
2400            store
2401                .handle_linked_chunk_updates(
2402                    linked_chunk_id,
2403                    vec![
2404                        Update::NewItemsChunk {
2405                            previous: None,
2406                            new: ChunkIdentifier::new(42),
2407                            next: None,
2408                        },
2409                        Update::PushItems {
2410                            at: Position::new(ChunkIdentifier::new(42), 0),
2411                            items: vec![event("saucisse de morteau"), event("comté")],
2412                        },
2413                    ],
2414                )
2415                .await
2416                .unwrap();
2417
2418            let (last_chunk, chunk_identifier_generator) =
2419                store.load_last_chunk(linked_chunk_id).await.unwrap();
2420
2421            assert_matches!(last_chunk, Some(last_chunk) => {
2422                assert_eq!(last_chunk.identifier, 42);
2423                assert!(last_chunk.previous.is_none());
2424                assert!(last_chunk.next.is_none());
2425                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2426                    assert_eq!(items.len(), 2);
2427                    check_test_event(&items[0], "saucisse de morteau");
2428                    check_test_event(&items[1], "comté");
2429                });
2430            });
2431            assert_eq!(chunk_identifier_generator.current(), 42);
2432        }
2433
2434        // Case #3: more chunks are present.
2435        {
2436            store
2437                .handle_linked_chunk_updates(
2438                    linked_chunk_id,
2439                    vec![
2440                        Update::NewItemsChunk {
2441                            previous: Some(ChunkIdentifier::new(42)),
2442                            new: ChunkIdentifier::new(7),
2443                            next: None,
2444                        },
2445                        Update::PushItems {
2446                            at: Position::new(ChunkIdentifier::new(7), 0),
2447                            items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2448                        },
2449                    ],
2450                )
2451                .await
2452                .unwrap();
2453
2454            let (last_chunk, chunk_identifier_generator) =
2455                store.load_last_chunk(linked_chunk_id).await.unwrap();
2456
2457            assert_matches!(last_chunk, Some(last_chunk) => {
2458                assert_eq!(last_chunk.identifier, 7);
2459                assert_matches!(last_chunk.previous, Some(previous) => {
2460                    assert_eq!(previous, 42);
2461                });
2462                assert!(last_chunk.next.is_none());
2463                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2464                    assert_eq!(items.len(), 3);
2465                    check_test_event(&items[0], "fondue");
2466                    check_test_event(&items[1], "gruyère");
2467                    check_test_event(&items[2], "mont d'or");
2468                });
2469            });
2470            assert_eq!(chunk_identifier_generator.current(), 42);
2471        }
2472    }
2473
2474    #[async_test]
2475    async fn test_load_last_chunk_with_a_cycle() {
2476        let room_id = room_id!("!r0:matrix.org");
2477        let linked_chunk_id = LinkedChunkId::Room(room_id);
2478        let store = get_event_cache_store().await.expect("creating cache store failed");
2479
2480        store
2481            .handle_linked_chunk_updates(
2482                linked_chunk_id,
2483                vec![
2484                    Update::NewItemsChunk {
2485                        previous: None,
2486                        new: ChunkIdentifier::new(0),
2487                        next: None,
2488                    },
2489                    Update::NewItemsChunk {
2490                        // Because `previous` connects to chunk #0, it will create a cycle.
2491                        // Chunk #0 will have a `next` set to chunk #1! Consequently, the last chunk
2492                        // **does not exist**. We have to detect this cycle.
2493                        previous: Some(ChunkIdentifier::new(0)),
2494                        new: ChunkIdentifier::new(1),
2495                        next: Some(ChunkIdentifier::new(0)),
2496                    },
2497                ],
2498            )
2499            .await
2500            .unwrap();
2501
2502        store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2503    }
2504
2505    #[async_test]
2506    async fn test_load_previous_chunk() {
2507        let room_id = room_id!("!r0:matrix.org");
2508        let linked_chunk_id = LinkedChunkId::Room(room_id);
2509        let event = |msg: &str| make_test_event(room_id, msg);
2510        let store = get_event_cache_store().await.expect("creating cache store failed");
2511
2512        // Case #1: no chunk at all, equivalent to having an nonexistent
2513        // `before_chunk_identifier`.
2514        {
2515            let previous_chunk = store
2516                .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2517                .await
2518                .unwrap();
2519
2520            assert!(previous_chunk.is_none());
2521        }
2522
2523        // Case #2: there is one chunk only: we request the previous on this
2524        // one, it doesn't exist.
2525        {
2526            store
2527                .handle_linked_chunk_updates(
2528                    linked_chunk_id,
2529                    vec![Update::NewItemsChunk {
2530                        previous: None,
2531                        new: ChunkIdentifier::new(42),
2532                        next: None,
2533                    }],
2534                )
2535                .await
2536                .unwrap();
2537
2538            let previous_chunk =
2539                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2540
2541            assert!(previous_chunk.is_none());
2542        }
2543
2544        // Case #3: there are two chunks.
2545        {
2546            store
2547                .handle_linked_chunk_updates(
2548                    linked_chunk_id,
2549                    vec![
2550                        // new chunk before the one that exists.
2551                        Update::NewItemsChunk {
2552                            previous: None,
2553                            new: ChunkIdentifier::new(7),
2554                            next: Some(ChunkIdentifier::new(42)),
2555                        },
2556                        Update::PushItems {
2557                            at: Position::new(ChunkIdentifier::new(7), 0),
2558                            items: vec![event("brigand du jorat"), event("morbier")],
2559                        },
2560                    ],
2561                )
2562                .await
2563                .unwrap();
2564
2565            let previous_chunk =
2566                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2567
2568            assert_matches!(previous_chunk, Some(previous_chunk) => {
2569                assert_eq!(previous_chunk.identifier, 7);
2570                assert!(previous_chunk.previous.is_none());
2571                assert_matches!(previous_chunk.next, Some(next) => {
2572                    assert_eq!(next, 42);
2573                });
2574                assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2575                    assert_eq!(items.len(), 2);
2576                    check_test_event(&items[0], "brigand du jorat");
2577                    check_test_event(&items[1], "morbier");
2578                });
2579            });
2580        }
2581    }
2582}
2583
2584#[cfg(test)]
2585mod encrypted_tests {
2586    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2587
2588    use matrix_sdk_base::{
2589        event_cache::store::EventCacheStoreError, event_cache_store_integration_tests,
2590        event_cache_store_integration_tests_time, event_cache_store_media_integration_tests,
2591    };
2592    use once_cell::sync::Lazy;
2593    use tempfile::{tempdir, TempDir};
2594
2595    use super::SqliteEventCacheStore;
2596
2597    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2598    static NUM: AtomicU32 = AtomicU32::new(0);
2599
2600    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2601        let name = NUM.fetch_add(1, SeqCst).to_string();
2602        let tmpdir_path = TMP_DIR.path().join(name);
2603
2604        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2605
2606        Ok(SqliteEventCacheStore::open(
2607            tmpdir_path.to_str().unwrap(),
2608            Some("default_test_password"),
2609        )
2610        .await
2611        .unwrap())
2612    }
2613
2614    event_cache_store_integration_tests!();
2615    event_cache_store_integration_tests_time!();
2616    event_cache_store_media_integration_tests!();
2617}