matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sqlite-based backend for the [`EventCacheStore`].
16
17use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22    deserialized_responses::TimelineEvent,
23    event_cache::{
24        store::{
25            compute_filters_string, extract_event_relation,
26            media::{
27                EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
28                MediaService,
29            },
30            EventCacheStore,
31        },
32        Event, Gap,
33    },
34    linked_chunk::{
35        ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, Position, RawChunk, Update,
36    },
37    media::{MediaRequestParameters, UniqueKey},
38};
39use matrix_sdk_store_encryption::StoreCipher;
40use ruma::{
41    events::relation::RelationType, time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri,
42    OwnedEventId, RoomId,
43};
44use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
45use tokio::fs;
46use tracing::{debug, error, trace};
47
48use crate::{
49    error::{Error, Result},
50    utils::{
51        repeat_vars, time_to_timestamp, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52        SqliteKeyValueStoreConnExt, SqliteTransactionExt,
53    },
54    OpenStoreError, SqliteStoreConfig,
55};
56
57mod keys {
58    // Entries in Key-value store
59    pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
60    pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
61
62    // Tables
63    pub const LINKED_CHUNKS: &str = "linked_chunks";
64    pub const MEDIA: &str = "media";
65}
66
67/// The database name.
68const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
69
70/// Identifier of the latest database version.
71///
72/// This is used to figure whether the SQLite database requires a migration.
73/// Every new SQL migration should imply a bump of this number, and changes in
74/// the [`run_migrations`] function.
75const DATABASE_VERSION: u8 = 7;
76
77/// The string used to identify a chunk of type events, in the `type` field in
78/// the database.
79const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
80/// The string used to identify a chunk of type gap, in the `type` field in the
81/// database.
82const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
83
84/// A SQLite-based event cache store.
85#[derive(Clone)]
86pub struct SqliteEventCacheStore {
87    store_cipher: Option<Arc<StoreCipher>>,
88    pool: SqlitePool,
89    media_service: MediaService,
90}
91
92#[cfg(not(tarpaulin_include))]
93impl fmt::Debug for SqliteEventCacheStore {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
96    }
97}
98
99impl SqliteEventCacheStore {
100    /// Open the SQLite-based event cache store at the given path using the
101    /// given passphrase to encrypt private data.
102    pub async fn open(
103        path: impl AsRef<Path>,
104        passphrase: Option<&str>,
105    ) -> Result<Self, OpenStoreError> {
106        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
107    }
108
109    /// Open the sqlite-based event cache store with the config open config.
110    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
111        let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
112
113        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
114
115        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
116        config.pool = Some(pool_config);
117
118        let pool = config.create_pool(Runtime::Tokio1)?;
119
120        let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
121        this.pool.get().await?.apply_runtime_config(runtime_config).await?;
122
123        Ok(this)
124    }
125
126    /// Open an SQLite-based event cache store using the given SQLite database
127    /// pool. The given passphrase will be used to encrypt private data.
128    async fn open_with_pool(
129        pool: SqlitePool,
130        passphrase: Option<&str>,
131    ) -> Result<Self, OpenStoreError> {
132        let conn = pool.get().await?;
133
134        let version = conn.db_version().await?;
135        run_migrations(&conn, version).await?;
136
137        let store_cipher = match passphrase {
138            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
139            None => None,
140        };
141
142        let media_service = MediaService::new();
143        let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
144        let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
145        media_service.restore(media_retention_policy, last_media_cleanup_time);
146
147        Ok(Self { store_cipher, pool, media_service })
148    }
149
150    fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
151        if let Some(key) = &self.store_cipher {
152            let encrypted = key.encrypt_value_data(value)?;
153            Ok(rmp_serde::to_vec_named(&encrypted)?)
154        } else {
155            Ok(value)
156        }
157    }
158
159    fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
160        if let Some(key) = &self.store_cipher {
161            let encrypted = rmp_serde::from_slice(value)?;
162            let decrypted = key.decrypt_value_data(encrypted)?;
163            Ok(Cow::Owned(decrypted))
164        } else {
165            Ok(Cow::Borrowed(value))
166        }
167    }
168
169    fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
170        let bytes = key.as_ref();
171        if let Some(store_cipher) = &self.store_cipher {
172            Key::Hashed(store_cipher.hash_key(table_name, bytes))
173        } else {
174            Key::Plain(bytes.to_owned())
175        }
176    }
177
178    async fn acquire(&self) -> Result<SqliteAsyncConn> {
179        let connection = self.pool.get().await?;
180
181        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key support must be
182        // enabled on a per-connection basis. Execute it every time we try to get a
183        // connection, since we can't guarantee a previous connection did enable
184        // it before.
185        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
186
187        Ok(connection)
188    }
189
190    fn map_row_to_chunk(
191        row: &rusqlite::Row<'_>,
192    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
193        Ok((
194            row.get::<_, u64>(0)?,
195            row.get::<_, Option<u64>>(1)?,
196            row.get::<_, Option<u64>>(2)?,
197            row.get::<_, String>(3)?,
198        ))
199    }
200
201    fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
202        let serialized = serde_json::to_vec(event)?;
203
204        // Extract the relationship info here.
205        let raw_event = event.raw();
206        let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
207
208        // The content may be encrypted.
209        let content = self.encode_value(serialized)?;
210
211        Ok(EncodedEvent {
212            content,
213            rel_type,
214            relates_to: relates_to.map(|relates_to| relates_to.to_string()),
215        })
216    }
217}
218
219struct EncodedEvent {
220    content: Vec<u8>,
221    rel_type: Option<String>,
222    relates_to: Option<String>,
223}
224
225trait TransactionExtForLinkedChunks {
226    fn rebuild_chunk(
227        &self,
228        store: &SqliteEventCacheStore,
229        room_id: &Key,
230        previous: Option<u64>,
231        index: u64,
232        next: Option<u64>,
233        chunk_type: &str,
234    ) -> Result<RawChunk<Event, Gap>>;
235
236    fn load_gap_content(
237        &self,
238        store: &SqliteEventCacheStore,
239        room_id: &Key,
240        chunk_id: ChunkIdentifier,
241    ) -> Result<Gap>;
242
243    fn load_events_content(
244        &self,
245        store: &SqliteEventCacheStore,
246        room_id: &Key,
247        chunk_id: ChunkIdentifier,
248    ) -> Result<Vec<Event>>;
249}
250
251impl TransactionExtForLinkedChunks for Transaction<'_> {
252    fn rebuild_chunk(
253        &self,
254        store: &SqliteEventCacheStore,
255        room_id: &Key,
256        previous: Option<u64>,
257        id: u64,
258        next: Option<u64>,
259        chunk_type: &str,
260    ) -> Result<RawChunk<Event, Gap>> {
261        let previous = previous.map(ChunkIdentifier::new);
262        let next = next.map(ChunkIdentifier::new);
263        let id = ChunkIdentifier::new(id);
264
265        match chunk_type {
266            CHUNK_TYPE_GAP_TYPE_STRING => {
267                // It's a gap!
268                let gap = self.load_gap_content(store, room_id, id)?;
269                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
270            }
271
272            CHUNK_TYPE_EVENT_TYPE_STRING => {
273                // It's events!
274                let events = self.load_events_content(store, room_id, id)?;
275                Ok(RawChunk {
276                    content: ChunkContent::Items(events),
277                    previous,
278                    identifier: id,
279                    next,
280                })
281            }
282
283            other => {
284                // It's an error!
285                Err(Error::InvalidData {
286                    details: format!("a linked chunk has an unknown type {other}"),
287                })
288            }
289        }
290    }
291
292    fn load_gap_content(
293        &self,
294        store: &SqliteEventCacheStore,
295        room_id: &Key,
296        chunk_id: ChunkIdentifier,
297    ) -> Result<Gap> {
298        // There's at most one row for it in the database, so a call to `query_row` is
299        // sufficient.
300        let encoded_prev_token: Vec<u8> = self.query_row(
301            "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND room_id = ?",
302            (chunk_id.index(), &room_id),
303            |row| row.get(0),
304        )?;
305        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
306        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
307        Ok(Gap { prev_token })
308    }
309
310    fn load_events_content(
311        &self,
312        store: &SqliteEventCacheStore,
313        room_id: &Key,
314        chunk_id: ChunkIdentifier,
315    ) -> Result<Vec<Event>> {
316        // Retrieve all the events from the database.
317        let mut events = Vec::new();
318
319        for event_data in self
320            .prepare(
321                r#"
322                    SELECT events.content
323                    FROM event_chunks ec
324                    INNER JOIN events USING (event_id, room_id)
325                    WHERE ec.chunk_id = ? AND ec.room_id = ?
326                    ORDER BY ec.position ASC
327                "#,
328            )?
329            .query_map((chunk_id.index(), &room_id), |row| row.get::<_, Vec<u8>>(0))?
330        {
331            let encoded_content = event_data?;
332            let serialized_content = store.decode_value(&encoded_content)?;
333            let event = serde_json::from_slice(&serialized_content)?;
334
335            events.push(event);
336        }
337
338        Ok(events)
339    }
340}
341
342/// Run migrations for the given version of the database.
343async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
344    if version == 0 {
345        debug!("Creating database");
346    } else if version < DATABASE_VERSION {
347        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
348    } else {
349        return Ok(());
350    }
351
352    // Always enable foreign keys for the current connection.
353    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
354
355    if version < 1 {
356        // First turn on WAL mode, this can't be done in the transaction, it fails with
357        // the error message: "cannot change into wal mode from within a transaction".
358        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
359        conn.with_transaction(|txn| {
360            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
361            txn.set_db_version(1)
362        })
363        .await?;
364    }
365
366    if version < 2 {
367        conn.with_transaction(|txn| {
368            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
369            txn.set_db_version(2)
370        })
371        .await?;
372    }
373
374    if version < 3 {
375        conn.with_transaction(|txn| {
376            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
377            txn.set_db_version(3)
378        })
379        .await?;
380    }
381
382    if version < 4 {
383        conn.with_transaction(|txn| {
384            txn.execute_batch(include_str!(
385                "../migrations/event_cache_store/004_ignore_policy.sql"
386            ))?;
387            txn.set_db_version(4)
388        })
389        .await?;
390    }
391
392    if version < 5 {
393        conn.with_transaction(|txn| {
394            txn.execute_batch(include_str!(
395                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
396            ))?;
397            txn.set_db_version(5)
398        })
399        .await?;
400    }
401
402    if version < 6 {
403        conn.with_transaction(|txn| {
404            txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
405            txn.set_db_version(6)
406        })
407        .await?;
408    }
409
410    if version < 7 {
411        conn.with_transaction(|txn| {
412            txn.execute_batch(include_str!(
413                "../migrations/event_cache_store/007_event_chunks.sql"
414            ))?;
415            txn.set_db_version(7)
416        })
417        .await?;
418    }
419
420    Ok(())
421}
422
423#[async_trait]
424impl EventCacheStore for SqliteEventCacheStore {
425    type Error = Error;
426
427    async fn try_take_leased_lock(
428        &self,
429        lease_duration_ms: u32,
430        key: &str,
431        holder: &str,
432    ) -> Result<bool> {
433        let key = key.to_owned();
434        let holder = holder.to_owned();
435
436        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
437        let expiration = now + lease_duration_ms as u64;
438
439        let num_touched = self
440            .acquire()
441            .await?
442            .with_transaction(move |txn| {
443                txn.execute(
444                    "INSERT INTO lease_locks (key, holder, expiration)
445                    VALUES (?1, ?2, ?3)
446                    ON CONFLICT (key)
447                    DO
448                        UPDATE SET holder = ?2, expiration = ?3
449                        WHERE holder = ?2
450                        OR expiration < ?4
451                ",
452                    (key, holder, expiration, now),
453                )
454            })
455            .await?;
456
457        Ok(num_touched == 1)
458    }
459
460    async fn handle_linked_chunk_updates(
461        &self,
462        room_id: &RoomId,
463        updates: Vec<Update<Event, Gap>>,
464    ) -> Result<(), Self::Error> {
465        // Use a single transaction throughout this function, so that either all updates
466        // work, or none is taken into account.
467        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
468        let room_id = room_id.to_owned();
469        let this = self.clone();
470
471        with_immediate_transaction(self.acquire().await?, move |txn| {
472            for up in updates {
473                match up {
474                    Update::NewItemsChunk { previous, new, next } => {
475                        let previous = previous.as_ref().map(ChunkIdentifier::index);
476                        let new = new.index();
477                        let next = next.as_ref().map(ChunkIdentifier::index);
478
479                        trace!(
480                            %room_id,
481                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
482                        );
483
484                        insert_chunk(
485                            txn,
486                            &hashed_room_id,
487                            previous,
488                            new,
489                            next,
490                            CHUNK_TYPE_EVENT_TYPE_STRING,
491                        )?;
492                    }
493
494                    Update::NewGapChunk { previous, new, next, gap } => {
495                        let serialized = serde_json::to_vec(&gap.prev_token)?;
496                        let prev_token = this.encode_value(serialized)?;
497
498                        let previous = previous.as_ref().map(ChunkIdentifier::index);
499                        let new = new.index();
500                        let next = next.as_ref().map(ChunkIdentifier::index);
501
502                        trace!(
503                            %room_id,
504                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
505                        );
506
507                        // Insert the chunk as a gap.
508                        insert_chunk(
509                            txn,
510                            &hashed_room_id,
511                            previous,
512                            new,
513                            next,
514                            CHUNK_TYPE_GAP_TYPE_STRING,
515                        )?;
516
517                        // Insert the gap's value.
518                        txn.execute(
519                            r#"
520                            INSERT INTO gap_chunks(chunk_id, room_id, prev_token)
521                            VALUES (?, ?, ?)
522                        "#,
523                            (new, &hashed_room_id, prev_token),
524                        )?;
525                    }
526
527                    Update::RemoveChunk(chunk_identifier) => {
528                        let chunk_id = chunk_identifier.index();
529
530                        trace!(%room_id, "removing chunk @ {chunk_id}");
531
532                        // Find chunk to delete.
533                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
534                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?",
535                            (chunk_id, &hashed_room_id),
536                            |row| Ok((row.get(0)?, row.get(1)?))
537                        )?;
538
539                        // Replace its previous' next to its own next.
540                        if let Some(previous) = previous {
541                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?;
542                        }
543
544                        // Replace its next' previous to its own previous.
545                        if let Some(next) = next {
546                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?;
547                        }
548
549                        // Now delete it, and let cascading delete corresponding entries in the
550                        // other data tables.
551                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, &hashed_room_id))?;
552                    }
553
554                    Update::PushItems { at, items } => {
555                        if items.is_empty() {
556                            // Should never happens, but better be safe.
557                            continue;
558                        }
559
560                        let chunk_id = at.chunk_identifier().index();
561
562                        trace!(%room_id, "pushing {} items @ {chunk_id}", items.len());
563
564                        let mut chunk_statement = txn.prepare(
565                            "INSERT INTO event_chunks(chunk_id, room_id, event_id, position) VALUES (?, ?, ?, ?)"
566                        )?;
567
568                        // Note: we use `OR REPLACE` here, because the event might have been
569                        // already inserted in the database. This is the case when an event is
570                        // deduplicated and moved to another position; or because it was inserted
571                        // outside the context of a linked chunk (e.g. pinned event).
572                        let mut content_statement = txn.prepare(
573                            "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
574                        )?;
575
576                        let invalid_event = |event: TimelineEvent| {
577                            let Some(event_id) = event.event_id() else {
578                                error!(%room_id, "Trying to push an event with no ID");
579                                return None;
580                            };
581
582                            Some((event_id.to_string(), event))
583                        };
584
585                        for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
586                            // Insert the location information into the database.
587                            let index = at.index() + i;
588                            chunk_statement.execute((chunk_id, &hashed_room_id, &event_id, index))?;
589
590                            // Now, insert the event content into the database.
591                            let encoded_event = this.encode_event(&event)?;
592                            content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
593                        }
594                    }
595
596                    Update::ReplaceItem { at, item: event } => {
597                        let chunk_id = at.chunk_identifier().index();
598
599                        let index = at.index();
600
601                        trace!(%room_id, "replacing item @ {chunk_id}:{index}");
602
603                        // The event id should be the same, but just in case it changed…
604                        let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
605                            error!(%room_id, "Trying to replace an event with a new one that has no ID");
606                            continue;
607                        };
608
609                        // Replace the event's content. Really we'd like to update, but in case the
610                        // event id changed, we are a bit lenient here and will allow an insertion
611                        // of the new event.
612                        let encoded_event = this.encode_event(&event)?;
613                        txn.execute(
614                            "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
615                        , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
616
617                        // Replace the event id in the linked chunk, in case it changed.
618                        txn.execute(
619                            r#"UPDATE event_chunks SET event_id = ? WHERE room_id = ? AND chunk_id = ? AND position = ?"#,
620                            (event_id, &hashed_room_id, chunk_id, index)
621                        )?;
622                    }
623
624                    Update::RemoveItem { at } => {
625                        let chunk_id = at.chunk_identifier().index();
626                        let index = at.index();
627
628                        trace!(%room_id, "removing item @ {chunk_id}:{index}");
629
630                        // Remove the entry in the chunk table.
631                        txn.execute("DELETE FROM event_chunks WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?;
632
633                        // Decrement the index of each item after the one we're going to remove.
634                        txn.execute(
635                            r#"
636                                UPDATE event_chunks
637                                SET position = position - 1
638                                WHERE room_id = ? AND chunk_id = ? AND position > ?
639                            "#,
640                            (&hashed_room_id, chunk_id, index)
641                        )?;
642
643                    }
644
645                    Update::DetachLastItems { at } => {
646                        let chunk_id = at.chunk_identifier().index();
647                        let index = at.index();
648
649                        trace!(%room_id, "truncating items >= {chunk_id}:{index}");
650
651                        // Remove these entries.
652                        txn.execute("DELETE FROM event_chunks WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?;
653                    }
654
655                    Update::Clear => {
656                        trace!(%room_id, "clearing items");
657
658                        // Remove chunks, and let cascading do its job.
659                        txn.execute(
660                            "DELETE FROM linked_chunks WHERE room_id = ?",
661                            (&hashed_room_id,),
662                        )?;
663                    }
664
665                    Update::StartReattachItems | Update::EndReattachItems => {
666                        // Nothing.
667                    }
668                }
669            }
670
671            Ok(())
672        })
673        .await?;
674
675        Ok(())
676    }
677
678    async fn load_all_chunks(
679        &self,
680        room_id: &RoomId,
681    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
682        let room_id = room_id.to_owned();
683        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
684
685        let this = self.clone();
686
687        let result = self
688            .acquire()
689            .await?
690            .with_transaction(move |txn| -> Result<_> {
691                let mut items = Vec::new();
692
693                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
694                for data in txn
695                    .prepare(
696                        "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? ORDER BY id",
697                    )?
698                    .query_map((&hashed_room_id,), Self::map_row_to_chunk)?
699                {
700                    let (id, previous, next, chunk_type) = data?;
701                    let new = txn.rebuild_chunk(
702                        &this,
703                        &hashed_room_id,
704                        previous,
705                        id,
706                        next,
707                        chunk_type.as_str(),
708                    )?;
709                    items.push(new);
710                }
711
712                Ok(items)
713            })
714            .await?;
715
716        Ok(result)
717    }
718
719    async fn load_last_chunk(
720        &self,
721        room_id: &RoomId,
722    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
723        let room_id = room_id.to_owned();
724        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
725
726        let this = self.clone();
727
728        self
729            .acquire()
730            .await?
731            .with_transaction(move |txn| -> Result<_> {
732                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
733                let (chunk_identifier_generator, number_of_chunks) = txn
734                    .prepare(
735                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE room_id = ?"
736                    )?
737                    .query_row(
738                        (&hashed_room_id,),
739                        |row| {
740                            Ok((
741                                // Read the `MAX(id)` as an `Option<u64>` instead
742                                // of `u64` in case the `SELECT` returns nothing.
743                                // Indeed, if it returns no line, the `MAX(id)` is
744                                // set to `Null`.
745                                row.get::<_, Option<u64>>(0)?,
746                                row.get::<_, u64>(1)?,
747                            ))
748                        }
749                    )?;
750
751                let chunk_identifier_generator = match chunk_identifier_generator {
752                    Some(last_chunk_identifier) => {
753                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
754                            ChunkIdentifier::new(last_chunk_identifier)
755                        )
756                    },
757                    None => ChunkIdentifierGenerator::new_from_scratch(),
758                };
759
760                // Find the last chunk.
761                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
762                    .prepare(
763                        "SELECT id, previous, type FROM linked_chunks WHERE room_id = ? AND next IS NULL"
764                    )?
765                    .query_row(
766                        (&hashed_room_id,),
767                        |row| {
768                            Ok((
769                                row.get::<_, u64>(0)?,
770                                row.get::<_, Option<u64>>(1)?,
771                                row.get::<_, String>(2)?,
772                            ))
773                        }
774                    )
775                    .optional()?
776                else {
777                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
778                    // good.
779                    if number_of_chunks == 0 {
780                        return Ok((None, chunk_identifier_generator));
781                    }
782                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
783                    // linked chunk is malformed.
784                    //
785                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
786                    else {
787                        return Err(Error::InvalidData {
788                            details:
789                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
790                                    .to_owned()
791                            }
792                        )
793                    }
794                };
795
796                // Build the chunk.
797                let last_chunk = txn.rebuild_chunk(
798                    &this,
799                    &hashed_room_id,
800                    previous_chunk,
801                    chunk_identifier,
802                    None,
803                    &chunk_type
804                )?;
805
806                Ok((Some(last_chunk), chunk_identifier_generator))
807            })
808            .await
809    }
810
811    async fn load_previous_chunk(
812        &self,
813        room_id: &RoomId,
814        before_chunk_identifier: ChunkIdentifier,
815    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
816        let room_id = room_id.to_owned();
817        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
818
819        let this = self.clone();
820
821        self
822            .acquire()
823            .await?
824            .with_transaction(move |txn| -> Result<_> {
825                // Find the chunk before the chunk identified by `before_chunk_identifier`.
826                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
827                    .prepare(
828                        "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? AND next = ?"
829                    )?
830                    .query_row(
831                        (&hashed_room_id, before_chunk_identifier.index()),
832                        |row| {
833                            Ok((
834                                row.get::<_, u64>(0)?,
835                                row.get::<_, Option<u64>>(1)?,
836                                row.get::<_, Option<u64>>(2)?,
837                                row.get::<_, String>(3)?,
838                            ))
839                        }
840                    )
841                    .optional()?
842                else {
843                    // Chunk is not found.
844                    return Ok(None);
845                };
846
847                // Build the chunk.
848                let last_chunk = txn.rebuild_chunk(
849                    &this,
850                    &hashed_room_id,
851                    previous_chunk,
852                    chunk_identifier,
853                    next_chunk,
854                    &chunk_type
855                )?;
856
857                Ok(Some(last_chunk))
858            })
859            .await
860    }
861
862    async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error> {
863        self.acquire()
864            .await?
865            .with_transaction(move |txn| {
866                // Remove all the chunks, and let cascading do its job.
867                txn.execute("DELETE FROM linked_chunks", ())?;
868                // Also clear all the events' contents.
869                txn.execute("DELETE FROM events", ())
870            })
871            .await?;
872        Ok(())
873    }
874
875    async fn filter_duplicated_events(
876        &self,
877        room_id: &RoomId,
878        events: Vec<OwnedEventId>,
879    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
880        // If there's no events for which we want to check duplicates, we can return
881        // early. It's not only an optimization to do so: it's required, otherwise the
882        // `repeat_vars` call below will panic.
883        if events.is_empty() {
884            return Ok(Vec::new());
885        }
886
887        // Select all events that exist in the store, i.e. the duplicates.
888        let room_id = room_id.to_owned();
889        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
890
891        self.acquire()
892            .await?
893            .with_transaction(move |txn| -> Result<_> {
894                txn.chunk_large_query_over(events, None, move |txn, events| {
895                    let query = format!(
896                        r#"
897                            SELECT event_id, chunk_id, position
898                            FROM event_chunks
899                            WHERE room_id = ? AND event_id IN ({})
900                            ORDER BY chunk_id ASC, position ASC
901                        "#,
902                        repeat_vars(events.len()),
903                    );
904
905                    let parameters = params_from_iter(
906                        // parameter for `room_id = ?`
907                        once(
908                            hashed_room_id
909                                .to_sql()
910                                // SAFETY: it cannot fail since `Key::to_sql` never fails
911                                .unwrap(),
912                        )
913                        // parameters for `event_id IN (…)`
914                        .chain(events.iter().map(|event| {
915                            event
916                                .as_str()
917                                .to_sql()
918                                // SAFETY: it cannot fail since `str::to_sql` never fails
919                                .unwrap()
920                        })),
921                    );
922
923                    let mut duplicated_events = Vec::new();
924
925                    for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
926                        Ok((
927                            row.get::<_, String>(0)?,
928                            row.get::<_, u64>(1)?,
929                            row.get::<_, usize>(2)?,
930                        ))
931                    })? {
932                        let (duplicated_event, chunk_identifier, index) = duplicated_event?;
933
934                        let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
935                            // Normally unreachable, but the event ID has been stored even if it is
936                            // malformed, let's skip it.
937                            error!(%duplicated_event, %room_id, "Reading an malformed event ID");
938                            continue;
939                        };
940
941                        duplicated_events.push((
942                            duplicated_event,
943                            Position::new(ChunkIdentifier::new(chunk_identifier), index),
944                        ));
945                    }
946
947                    Ok(duplicated_events)
948                })
949            })
950            .await
951    }
952
953    async fn find_event(
954        &self,
955        room_id: &RoomId,
956        event_id: &EventId,
957    ) -> Result<Option<Event>, Self::Error> {
958        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
959        let event_id = event_id.to_owned();
960        let this = self.clone();
961
962        self.acquire()
963            .await?
964            .with_transaction(move |txn| -> Result<_> {
965                let Some(event) = txn
966                    .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
967                    .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
968                    .optional()?
969                else {
970                    // Event is not found.
971                    return Ok(None);
972                };
973
974                let event = serde_json::from_slice(&this.decode_value(&event)?)?;
975
976                Ok(Some(event))
977            })
978            .await
979    }
980
981    async fn find_event_relations(
982        &self,
983        room_id: &RoomId,
984        event_id: &EventId,
985        filters: Option<&[RelationType]>,
986    ) -> Result<Vec<Event>, Self::Error> {
987        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
988        let event_id = event_id.to_owned();
989        let filters = filters.map(ToOwned::to_owned);
990        let this = self.clone();
991
992        self.acquire()
993            .await?
994            .with_transaction(move |txn| -> Result<_> {
995                let filter_query = if let Some(filters) = compute_filters_string(filters.as_deref())
996                {
997                    format!(
998                        " AND rel_type IN ({})",
999                        filters
1000                            .into_iter()
1001                            .map(|f| format!(r#""{f}""#))
1002                            .collect::<Vec<_>>()
1003                            .join(", ")
1004                    )
1005                } else {
1006                    "".to_owned()
1007                };
1008
1009                let query = format!(
1010                    "SELECT content FROM events WHERE relates_to = ? AND room_id = ? {}",
1011                    filter_query
1012                );
1013
1014                // Collect related events.
1015                let mut related = Vec::new();
1016                for ev in
1017                    txn.prepare(&query)?.query_map((event_id.as_str(), hashed_room_id), |row| {
1018                        row.get::<_, Vec<u8>>(0)
1019                    })?
1020                {
1021                    let ev = ev?;
1022                    let ev = serde_json::from_slice(&this.decode_value(&ev)?)?;
1023                    related.push(ev);
1024                }
1025
1026                Ok(related)
1027            })
1028            .await
1029    }
1030
1031    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1032        let Some(event_id) = event.event_id() else {
1033            error!(%room_id, "Trying to save an event with no ID");
1034            return Ok(());
1035        };
1036
1037        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1038        let event_id = event_id.to_string();
1039        let encoded_event = self.encode_event(&event)?;
1040
1041        self.acquire()
1042            .await?
1043            .with_transaction(move |txn| -> Result<_> {
1044                txn.execute(
1045                    "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
1046                    , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1047
1048                Ok(())
1049            })
1050            .await
1051    }
1052
1053    async fn add_media_content(
1054        &self,
1055        request: &MediaRequestParameters,
1056        content: Vec<u8>,
1057        ignore_policy: IgnoreMediaRetentionPolicy,
1058    ) -> Result<()> {
1059        self.media_service.add_media_content(self, request, content, ignore_policy).await
1060    }
1061
1062    async fn replace_media_key(
1063        &self,
1064        from: &MediaRequestParameters,
1065        to: &MediaRequestParameters,
1066    ) -> Result<(), Self::Error> {
1067        let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
1068        let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
1069
1070        let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
1071        let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
1072
1073        let conn = self.acquire().await?;
1074        conn.execute(
1075            r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
1076            (new_uri, new_format, prev_uri, prev_format),
1077        )
1078        .await?;
1079
1080        Ok(())
1081    }
1082
1083    async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
1084        self.media_service.get_media_content(self, request).await
1085    }
1086
1087    async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
1088        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1089        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1090
1091        let conn = self.acquire().await?;
1092        conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
1093
1094        Ok(())
1095    }
1096
1097    async fn get_media_content_for_uri(
1098        &self,
1099        uri: &MxcUri,
1100    ) -> Result<Option<Vec<u8>>, Self::Error> {
1101        self.media_service.get_media_content_for_uri(self, uri).await
1102    }
1103
1104    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
1105        let uri = self.encode_key(keys::MEDIA, uri);
1106
1107        let conn = self.acquire().await?;
1108        conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
1109
1110        Ok(())
1111    }
1112
1113    async fn set_media_retention_policy(
1114        &self,
1115        policy: MediaRetentionPolicy,
1116    ) -> Result<(), Self::Error> {
1117        self.media_service.set_media_retention_policy(self, policy).await
1118    }
1119
1120    fn media_retention_policy(&self) -> MediaRetentionPolicy {
1121        self.media_service.media_retention_policy()
1122    }
1123
1124    async fn set_ignore_media_retention_policy(
1125        &self,
1126        request: &MediaRequestParameters,
1127        ignore_policy: IgnoreMediaRetentionPolicy,
1128    ) -> Result<(), Self::Error> {
1129        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
1130    }
1131
1132    async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
1133        self.media_service.clean_up_media_cache(self).await
1134    }
1135}
1136
1137#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1138#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1139impl EventCacheStoreMedia for SqliteEventCacheStore {
1140    type Error = Error;
1141
1142    async fn media_retention_policy_inner(
1143        &self,
1144    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
1145        let conn = self.acquire().await?;
1146        conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
1147    }
1148
1149    async fn set_media_retention_policy_inner(
1150        &self,
1151        policy: MediaRetentionPolicy,
1152    ) -> Result<(), Self::Error> {
1153        let conn = self.acquire().await?;
1154        conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
1155        Ok(())
1156    }
1157
1158    async fn add_media_content_inner(
1159        &self,
1160        request: &MediaRequestParameters,
1161        data: Vec<u8>,
1162        last_access: SystemTime,
1163        policy: MediaRetentionPolicy,
1164        ignore_policy: IgnoreMediaRetentionPolicy,
1165    ) -> Result<(), Self::Error> {
1166        let ignore_policy = ignore_policy.is_yes();
1167        let data = self.encode_value(data)?;
1168
1169        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
1170            return Ok(());
1171        }
1172
1173        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1174        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1175        let timestamp = time_to_timestamp(last_access);
1176
1177        let conn = self.acquire().await?;
1178        conn.execute(
1179            "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
1180            (uri, format, data, timestamp, ignore_policy),
1181        )
1182        .await?;
1183
1184        Ok(())
1185    }
1186
1187    async fn set_ignore_media_retention_policy_inner(
1188        &self,
1189        request: &MediaRequestParameters,
1190        ignore_policy: IgnoreMediaRetentionPolicy,
1191    ) -> Result<(), Self::Error> {
1192        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1193        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1194        let ignore_policy = ignore_policy.is_yes();
1195
1196        let conn = self.acquire().await?;
1197        conn.execute(
1198            r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
1199            (ignore_policy, uri, format),
1200        )
1201        .await?;
1202
1203        Ok(())
1204    }
1205
1206    async fn get_media_content_inner(
1207        &self,
1208        request: &MediaRequestParameters,
1209        current_time: SystemTime,
1210    ) -> Result<Option<Vec<u8>>, Self::Error> {
1211        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1212        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1213        let timestamp = time_to_timestamp(current_time);
1214
1215        let conn = self.acquire().await?;
1216        let data = conn
1217            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1218                // Update the last access.
1219                // We need to do this first so the transaction is in write mode right away.
1220                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
1221                txn.execute(
1222                    "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
1223                    (timestamp, &uri, &format),
1224                )?;
1225
1226                txn.query_row::<Vec<u8>, _, _>(
1227                    "SELECT data FROM media WHERE uri = ? AND format = ?",
1228                    (&uri, &format),
1229                    |row| row.get(0),
1230                )
1231                .optional()
1232            })
1233            .await?;
1234
1235        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1236    }
1237
1238    async fn get_media_content_for_uri_inner(
1239        &self,
1240        uri: &MxcUri,
1241        current_time: SystemTime,
1242    ) -> Result<Option<Vec<u8>>, Self::Error> {
1243        let uri = self.encode_key(keys::MEDIA, uri);
1244        let timestamp = time_to_timestamp(current_time);
1245
1246        let conn = self.acquire().await?;
1247        let data = conn
1248            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1249                // Update the last access.
1250                // We need to do this first so the transaction is in write mode right away.
1251                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
1252                txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
1253
1254                txn.query_row::<Vec<u8>, _, _>(
1255                    "SELECT data FROM media WHERE uri = ?",
1256                    (&uri,),
1257                    |row| row.get(0),
1258                )
1259                .optional()
1260            })
1261            .await?;
1262
1263        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1264    }
1265
1266    async fn clean_up_media_cache_inner(
1267        &self,
1268        policy: MediaRetentionPolicy,
1269        current_time: SystemTime,
1270    ) -> Result<(), Self::Error> {
1271        if !policy.has_limitations() {
1272            // We can safely skip all the checks.
1273            return Ok(());
1274        }
1275
1276        let conn = self.acquire().await?;
1277        let removed = conn
1278            .with_transaction::<_, Error, _>(move |txn| {
1279                let mut removed = false;
1280
1281                // First, check media content that exceed the max filesize.
1282                if let Some(max_file_size) = policy.computed_max_file_size() {
1283                    let count = txn.execute(
1284                        "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
1285                        (max_file_size,),
1286                    )?;
1287
1288                    if count > 0 {
1289                        removed = true;
1290                    }
1291                }
1292
1293                // Then, clean up expired media content.
1294                if let Some(last_access_expiry) = policy.last_access_expiry {
1295                    let current_timestamp = time_to_timestamp(current_time);
1296                    let expiry_secs = last_access_expiry.as_secs();
1297                    let count = txn.execute(
1298                        "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
1299                        (current_timestamp, expiry_secs),
1300                    )?;
1301
1302                    if count > 0 {
1303                        removed = true;
1304                    }
1305                }
1306
1307                // Finally, if the cache size is too big, remove old items until it fits.
1308                if let Some(max_cache_size) = policy.max_cache_size {
1309                    // i64 is the integer type used by SQLite, use it here to avoid usize overflow
1310                    // during the conversion of the result.
1311                    let cache_size = txn
1312                        .query_row(
1313                            "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
1314                            (),
1315                            |row| {
1316                                // `sum()` returns `NULL` if there are no rows.
1317                                row.get::<_, Option<u64>>(0)
1318                            },
1319                        )?
1320                        .unwrap_or_default();
1321
1322                    // If the cache size is overflowing or bigger than max cache size, clean up.
1323                    if cache_size > max_cache_size {
1324                        // Get the sizes of the media contents ordered by last access.
1325                        let mut cached_stmt = txn.prepare_cached(
1326                            "SELECT rowid, length(data) FROM media \
1327                             WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
1328                        )?;
1329                        let content_sizes = cached_stmt
1330                            .query(())?
1331                            .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
1332
1333                        let mut accumulated_items_size = 0u64;
1334                        let mut limit_reached = false;
1335                        let mut rows_to_remove = Vec::new();
1336
1337                        for result in content_sizes {
1338                            let (row_id, size) = match result {
1339                                Ok(content_size) => content_size,
1340                                Err(error) => {
1341                                    return Err(error.into());
1342                                }
1343                            };
1344
1345                            if limit_reached {
1346                                rows_to_remove.push(row_id);
1347                                continue;
1348                            }
1349
1350                            match accumulated_items_size.checked_add(size) {
1351                                Some(acc) if acc > max_cache_size => {
1352                                    // We can stop accumulating.
1353                                    limit_reached = true;
1354                                    rows_to_remove.push(row_id);
1355                                }
1356                                Some(acc) => accumulated_items_size = acc,
1357                                None => {
1358                                    // The accumulated size is overflowing but the setting cannot be
1359                                    // bigger than usize::MAX, we can stop accumulating.
1360                                    limit_reached = true;
1361                                    rows_to_remove.push(row_id);
1362                                }
1363                            };
1364                        }
1365
1366                        if !rows_to_remove.is_empty() {
1367                            removed = true;
1368                        }
1369
1370                        txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
1371                            let sql_params = repeat_vars(row_ids.len());
1372                            let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
1373                            txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
1374                            Ok(Vec::<()>::new())
1375                        })?;
1376                    }
1377                }
1378
1379                txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
1380
1381                Ok(removed)
1382            })
1383            .await?;
1384
1385        // If we removed media, defragment the database and free space on the
1386        // filesystem.
1387        if removed {
1388            conn.vacuum().await?;
1389        }
1390
1391        Ok(())
1392    }
1393
1394    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1395        let conn = self.acquire().await?;
1396        conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
1397    }
1398}
1399
1400/// Like `deadpool::managed::Object::with_transaction`, but starts the
1401/// transaction in immediate (write) mode from the beginning, precluding errors
1402/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1403/// both reads and writes, and start with a write.
1404async fn with_immediate_transaction<
1405    T: Send + 'static,
1406    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1407>(
1408    conn: SqliteAsyncConn,
1409    f: F,
1410) -> Result<T, Error> {
1411    conn.interact(move |conn| -> Result<T, Error> {
1412        // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1413        // to avoid read transactions upgrading to write mode and causing
1414        // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1415        conn.set_transaction_behavior(TransactionBehavior::Immediate);
1416
1417        let code = || -> Result<T, Error> {
1418            let txn = conn.transaction()?;
1419            let res = f(&txn)?;
1420            txn.commit()?;
1421            Ok(res)
1422        };
1423
1424        let res = code();
1425
1426        // Reset the transaction behavior to use Deferred, after this transaction has
1427        // been run, whether it was successful or not.
1428        conn.set_transaction_behavior(TransactionBehavior::Deferred);
1429
1430        res
1431    })
1432    .await
1433    // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1434    .unwrap()
1435}
1436
1437fn insert_chunk(
1438    txn: &Transaction<'_>,
1439    room_id: &Key,
1440    previous: Option<u64>,
1441    new: u64,
1442    next: Option<u64>,
1443    type_str: &str,
1444) -> rusqlite::Result<()> {
1445    // First, insert the new chunk.
1446    txn.execute(
1447        r#"
1448            INSERT INTO linked_chunks(id, room_id, previous, next, type)
1449            VALUES (?, ?, ?, ?, ?)
1450        "#,
1451        (new, room_id, previous, next, type_str),
1452    )?;
1453
1454    // If this chunk has a previous one, update its `next` field.
1455    if let Some(previous) = previous {
1456        txn.execute(
1457            r#"
1458                UPDATE linked_chunks
1459                SET next = ?
1460                WHERE id = ? AND room_id = ?
1461            "#,
1462            (new, previous, room_id),
1463        )?;
1464    }
1465
1466    // If this chunk has a next one, update its `previous` field.
1467    if let Some(next) = next {
1468        txn.execute(
1469            r#"
1470                UPDATE linked_chunks
1471                SET previous = ?
1472                WHERE id = ? AND room_id = ?
1473            "#,
1474            (new, next, room_id),
1475        )?;
1476    }
1477
1478    Ok(())
1479}
1480
1481#[cfg(test)]
1482mod tests {
1483    use std::{
1484        path::PathBuf,
1485        sync::atomic::{AtomicU32, Ordering::SeqCst},
1486        time::Duration,
1487    };
1488
1489    use assert_matches::assert_matches;
1490    use matrix_sdk_base::{
1491        event_cache::{
1492            store::{
1493                integration_tests::{
1494                    check_test_event, make_test_event, make_test_event_with_event_id,
1495                },
1496                media::IgnoreMediaRetentionPolicy,
1497                EventCacheStore, EventCacheStoreError,
1498            },
1499            Gap,
1500        },
1501        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1502        event_cache_store_media_integration_tests,
1503        linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
1504        media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1505    };
1506    use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1507    use once_cell::sync::Lazy;
1508    use ruma::{event_id, events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1509    use tempfile::{tempdir, TempDir};
1510
1511    use super::SqliteEventCacheStore;
1512    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
1513
1514    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1515    static NUM: AtomicU32 = AtomicU32::new(0);
1516
1517    fn new_event_cache_store_workspace() -> PathBuf {
1518        let name = NUM.fetch_add(1, SeqCst).to_string();
1519        TMP_DIR.path().join(name)
1520    }
1521
1522    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1523        let tmpdir_path = new_event_cache_store_workspace();
1524
1525        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1526
1527        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1528    }
1529
1530    event_cache_store_integration_tests!();
1531    event_cache_store_integration_tests_time!();
1532    event_cache_store_media_integration_tests!(with_media_size_tests);
1533
1534    async fn get_event_cache_store_content_sorted_by_last_access(
1535        event_cache_store: &SqliteEventCacheStore,
1536    ) -> Vec<Vec<u8>> {
1537        let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed");
1538        sqlite_db
1539            .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1540                stmt.query(())?.mapped(|row| row.get(0)).collect()
1541            })
1542            .await
1543            .expect("querying media cache content by last access failed")
1544    }
1545
1546    #[async_test]
1547    async fn test_pool_size() {
1548        let tmpdir_path = new_event_cache_store_workspace();
1549        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1550
1551        let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1552
1553        assert_eq!(store.pool.status().max_size, 42);
1554    }
1555
1556    #[async_test]
1557    async fn test_last_access() {
1558        let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1559        let uri = mxc_uri!("mxc://localhost/media");
1560        let file_request = MediaRequestParameters {
1561            source: MediaSource::Plain(uri.to_owned()),
1562            format: MediaFormat::File,
1563        };
1564        let thumbnail_request = MediaRequestParameters {
1565            source: MediaSource::Plain(uri.to_owned()),
1566            format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1567                Method::Crop,
1568                uint!(100),
1569                uint!(100),
1570            )),
1571        };
1572
1573        let content: Vec<u8> = "hello world".into();
1574        let thumbnail_content: Vec<u8> = "hello…".into();
1575
1576        // Add the media.
1577        event_cache_store
1578            .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1579            .await
1580            .expect("adding file failed");
1581
1582        // Since the precision of the timestamp is in seconds, wait so the timestamps
1583        // differ.
1584        tokio::time::sleep(Duration::from_secs(3)).await;
1585
1586        event_cache_store
1587            .add_media_content(
1588                &thumbnail_request,
1589                thumbnail_content.clone(),
1590                IgnoreMediaRetentionPolicy::No,
1591            )
1592            .await
1593            .expect("adding thumbnail failed");
1594
1595        // File's last access is older than thumbnail.
1596        let contents =
1597            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1598
1599        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1600        assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1601        assert_eq!(contents[1], content, "file is not second-to-last access");
1602
1603        // Since the precision of the timestamp is in seconds, wait so the timestamps
1604        // differ.
1605        tokio::time::sleep(Duration::from_secs(3)).await;
1606
1607        // Access the file so its last access is more recent.
1608        let _ = event_cache_store
1609            .get_media_content(&file_request)
1610            .await
1611            .expect("getting file failed")
1612            .expect("file is missing");
1613
1614        // File's last access is more recent than thumbnail.
1615        let contents =
1616            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1617
1618        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1619        assert_eq!(contents[0], content, "file is not last access");
1620        assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
1621    }
1622
1623    #[async_test]
1624    async fn test_linked_chunk_new_items_chunk() {
1625        let store = get_event_cache_store().await.expect("creating cache store failed");
1626
1627        let room_id = &DEFAULT_TEST_ROOM_ID;
1628
1629        store
1630            .handle_linked_chunk_updates(
1631                room_id,
1632                vec![
1633                    Update::NewItemsChunk {
1634                        previous: None,
1635                        new: ChunkIdentifier::new(42),
1636                        next: None, // Note: the store must link the next entry itself.
1637                    },
1638                    Update::NewItemsChunk {
1639                        previous: Some(ChunkIdentifier::new(42)),
1640                        new: ChunkIdentifier::new(13),
1641                        next: Some(ChunkIdentifier::new(37)), /* But it's fine to explicitly pass
1642                                                               * the next link ahead of time. */
1643                    },
1644                    Update::NewItemsChunk {
1645                        previous: Some(ChunkIdentifier::new(13)),
1646                        new: ChunkIdentifier::new(37),
1647                        next: None,
1648                    },
1649                ],
1650            )
1651            .await
1652            .unwrap();
1653
1654        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1655
1656        assert_eq!(chunks.len(), 3);
1657
1658        {
1659            // Chunks are ordered from smaller to bigger IDs.
1660            let c = chunks.remove(0);
1661            assert_eq!(c.identifier, ChunkIdentifier::new(13));
1662            assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1663            assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1664            assert_matches!(c.content, ChunkContent::Items(events) => {
1665                assert!(events.is_empty());
1666            });
1667
1668            let c = chunks.remove(0);
1669            assert_eq!(c.identifier, ChunkIdentifier::new(37));
1670            assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1671            assert_eq!(c.next, None);
1672            assert_matches!(c.content, ChunkContent::Items(events) => {
1673                assert!(events.is_empty());
1674            });
1675
1676            let c = chunks.remove(0);
1677            assert_eq!(c.identifier, ChunkIdentifier::new(42));
1678            assert_eq!(c.previous, None);
1679            assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1680            assert_matches!(c.content, ChunkContent::Items(events) => {
1681                assert!(events.is_empty());
1682            });
1683        }
1684    }
1685
1686    #[async_test]
1687    async fn test_linked_chunk_new_gap_chunk() {
1688        let store = get_event_cache_store().await.expect("creating cache store failed");
1689
1690        let room_id = &DEFAULT_TEST_ROOM_ID;
1691
1692        store
1693            .handle_linked_chunk_updates(
1694                room_id,
1695                vec![Update::NewGapChunk {
1696                    previous: None,
1697                    new: ChunkIdentifier::new(42),
1698                    next: None,
1699                    gap: Gap { prev_token: "raclette".to_owned() },
1700                }],
1701            )
1702            .await
1703            .unwrap();
1704
1705        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1706
1707        assert_eq!(chunks.len(), 1);
1708
1709        // Chunks are ordered from smaller to bigger IDs.
1710        let c = chunks.remove(0);
1711        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1712        assert_eq!(c.previous, None);
1713        assert_eq!(c.next, None);
1714        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1715            assert_eq!(gap.prev_token, "raclette");
1716        });
1717    }
1718
1719    #[async_test]
1720    async fn test_linked_chunk_replace_item() {
1721        let store = get_event_cache_store().await.expect("creating cache store failed");
1722
1723        let room_id = &DEFAULT_TEST_ROOM_ID;
1724        let event_id = event_id!("$world");
1725
1726        store
1727            .handle_linked_chunk_updates(
1728                room_id,
1729                vec![
1730                    Update::NewItemsChunk {
1731                        previous: None,
1732                        new: ChunkIdentifier::new(42),
1733                        next: None,
1734                    },
1735                    Update::PushItems {
1736                        at: Position::new(ChunkIdentifier::new(42), 0),
1737                        items: vec![
1738                            make_test_event(room_id, "hello"),
1739                            make_test_event_with_event_id(room_id, "world", Some(event_id)),
1740                        ],
1741                    },
1742                    Update::ReplaceItem {
1743                        at: Position::new(ChunkIdentifier::new(42), 1),
1744                        item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1745                    },
1746                ],
1747            )
1748            .await
1749            .unwrap();
1750
1751        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1752
1753        assert_eq!(chunks.len(), 1);
1754
1755        let c = chunks.remove(0);
1756        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1757        assert_eq!(c.previous, None);
1758        assert_eq!(c.next, None);
1759        assert_matches!(c.content, ChunkContent::Items(events) => {
1760            assert_eq!(events.len(), 2);
1761            check_test_event(&events[0], "hello");
1762            check_test_event(&events[1], "yolo");
1763        });
1764    }
1765
1766    #[async_test]
1767    async fn test_linked_chunk_remove_chunk() {
1768        let store = get_event_cache_store().await.expect("creating cache store failed");
1769
1770        let room_id = &DEFAULT_TEST_ROOM_ID;
1771
1772        store
1773            .handle_linked_chunk_updates(
1774                room_id,
1775                vec![
1776                    Update::NewGapChunk {
1777                        previous: None,
1778                        new: ChunkIdentifier::new(42),
1779                        next: None,
1780                        gap: Gap { prev_token: "raclette".to_owned() },
1781                    },
1782                    Update::NewGapChunk {
1783                        previous: Some(ChunkIdentifier::new(42)),
1784                        new: ChunkIdentifier::new(43),
1785                        next: None,
1786                        gap: Gap { prev_token: "fondue".to_owned() },
1787                    },
1788                    Update::NewGapChunk {
1789                        previous: Some(ChunkIdentifier::new(43)),
1790                        new: ChunkIdentifier::new(44),
1791                        next: None,
1792                        gap: Gap { prev_token: "tartiflette".to_owned() },
1793                    },
1794                    Update::RemoveChunk(ChunkIdentifier::new(43)),
1795                ],
1796            )
1797            .await
1798            .unwrap();
1799
1800        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1801
1802        assert_eq!(chunks.len(), 2);
1803
1804        // Chunks are ordered from smaller to bigger IDs.
1805        let c = chunks.remove(0);
1806        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1807        assert_eq!(c.previous, None);
1808        assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1809        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1810            assert_eq!(gap.prev_token, "raclette");
1811        });
1812
1813        let c = chunks.remove(0);
1814        assert_eq!(c.identifier, ChunkIdentifier::new(44));
1815        assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1816        assert_eq!(c.next, None);
1817        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1818            assert_eq!(gap.prev_token, "tartiflette");
1819        });
1820
1821        // Check that cascading worked. Yes, sqlite, I doubt you.
1822        let gaps = store
1823            .acquire()
1824            .await
1825            .unwrap()
1826            .with_transaction(|txn| -> rusqlite::Result<_> {
1827                let mut gaps = Vec::new();
1828                for data in txn
1829                    .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1830                    .query_map((), |row| row.get::<_, u64>(0))?
1831                {
1832                    gaps.push(data?);
1833                }
1834                Ok(gaps)
1835            })
1836            .await
1837            .unwrap();
1838
1839        assert_eq!(gaps, vec![42, 44]);
1840    }
1841
1842    #[async_test]
1843    async fn test_linked_chunk_push_items() {
1844        let store = get_event_cache_store().await.expect("creating cache store failed");
1845
1846        let room_id = &DEFAULT_TEST_ROOM_ID;
1847
1848        store
1849            .handle_linked_chunk_updates(
1850                room_id,
1851                vec![
1852                    Update::NewItemsChunk {
1853                        previous: None,
1854                        new: ChunkIdentifier::new(42),
1855                        next: None,
1856                    },
1857                    Update::PushItems {
1858                        at: Position::new(ChunkIdentifier::new(42), 0),
1859                        items: vec![
1860                            make_test_event(room_id, "hello"),
1861                            make_test_event(room_id, "world"),
1862                        ],
1863                    },
1864                    Update::PushItems {
1865                        at: Position::new(ChunkIdentifier::new(42), 2),
1866                        items: vec![make_test_event(room_id, "who?")],
1867                    },
1868                ],
1869            )
1870            .await
1871            .unwrap();
1872
1873        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1874
1875        assert_eq!(chunks.len(), 1);
1876
1877        let c = chunks.remove(0);
1878        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1879        assert_eq!(c.previous, None);
1880        assert_eq!(c.next, None);
1881        assert_matches!(c.content, ChunkContent::Items(events) => {
1882            assert_eq!(events.len(), 3);
1883
1884            check_test_event(&events[0], "hello");
1885            check_test_event(&events[1], "world");
1886            check_test_event(&events[2], "who?");
1887        });
1888    }
1889
1890    #[async_test]
1891    async fn test_linked_chunk_remove_item() {
1892        let store = get_event_cache_store().await.expect("creating cache store failed");
1893
1894        let room_id = *DEFAULT_TEST_ROOM_ID;
1895
1896        store
1897            .handle_linked_chunk_updates(
1898                room_id,
1899                vec![
1900                    Update::NewItemsChunk {
1901                        previous: None,
1902                        new: ChunkIdentifier::new(42),
1903                        next: None,
1904                    },
1905                    Update::PushItems {
1906                        at: Position::new(ChunkIdentifier::new(42), 0),
1907                        items: vec![
1908                            make_test_event(room_id, "hello"),
1909                            make_test_event(room_id, "world"),
1910                        ],
1911                    },
1912                    Update::RemoveItem { at: Position::new(ChunkIdentifier::new(42), 0) },
1913                ],
1914            )
1915            .await
1916            .unwrap();
1917
1918        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1919
1920        assert_eq!(chunks.len(), 1);
1921
1922        let c = chunks.remove(0);
1923        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1924        assert_eq!(c.previous, None);
1925        assert_eq!(c.next, None);
1926        assert_matches!(c.content, ChunkContent::Items(events) => {
1927            assert_eq!(events.len(), 1);
1928            check_test_event(&events[0], "world");
1929        });
1930
1931        // Make sure the position has been updated for the remaining event.
1932        let num_rows: u64 = store
1933            .acquire()
1934            .await
1935            .unwrap()
1936            .with_transaction(move |txn| {
1937                txn.query_row(
1938                    "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND room_id = ? AND position = 0",
1939                    (room_id.as_bytes(),),
1940                    |row| row.get(0),
1941                )
1942            })
1943            .await
1944            .unwrap();
1945        assert_eq!(num_rows, 1);
1946    }
1947
1948    #[async_test]
1949    async fn test_linked_chunk_detach_last_items() {
1950        let store = get_event_cache_store().await.expect("creating cache store failed");
1951
1952        let room_id = *DEFAULT_TEST_ROOM_ID;
1953
1954        store
1955            .handle_linked_chunk_updates(
1956                room_id,
1957                vec![
1958                    Update::NewItemsChunk {
1959                        previous: None,
1960                        new: ChunkIdentifier::new(42),
1961                        next: None,
1962                    },
1963                    Update::PushItems {
1964                        at: Position::new(ChunkIdentifier::new(42), 0),
1965                        items: vec![
1966                            make_test_event(room_id, "hello"),
1967                            make_test_event(room_id, "world"),
1968                            make_test_event(room_id, "howdy"),
1969                        ],
1970                    },
1971                    Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
1972                ],
1973            )
1974            .await
1975            .unwrap();
1976
1977        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1978
1979        assert_eq!(chunks.len(), 1);
1980
1981        let c = chunks.remove(0);
1982        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1983        assert_eq!(c.previous, None);
1984        assert_eq!(c.next, None);
1985        assert_matches!(c.content, ChunkContent::Items(events) => {
1986            assert_eq!(events.len(), 1);
1987            check_test_event(&events[0], "hello");
1988        });
1989    }
1990
1991    #[async_test]
1992    async fn test_linked_chunk_start_end_reattach_items() {
1993        let store = get_event_cache_store().await.expect("creating cache store failed");
1994
1995        let room_id = *DEFAULT_TEST_ROOM_ID;
1996
1997        // Same updates and checks as test_linked_chunk_push_items, but with extra
1998        // `StartReattachItems` and `EndReattachItems` updates, which must have no
1999        // effects.
2000        store
2001            .handle_linked_chunk_updates(
2002                room_id,
2003                vec![
2004                    Update::NewItemsChunk {
2005                        previous: None,
2006                        new: ChunkIdentifier::new(42),
2007                        next: None,
2008                    },
2009                    Update::PushItems {
2010                        at: Position::new(ChunkIdentifier::new(42), 0),
2011                        items: vec![
2012                            make_test_event(room_id, "hello"),
2013                            make_test_event(room_id, "world"),
2014                            make_test_event(room_id, "howdy"),
2015                        ],
2016                    },
2017                    Update::StartReattachItems,
2018                    Update::EndReattachItems,
2019                ],
2020            )
2021            .await
2022            .unwrap();
2023
2024        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
2025
2026        assert_eq!(chunks.len(), 1);
2027
2028        let c = chunks.remove(0);
2029        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2030        assert_eq!(c.previous, None);
2031        assert_eq!(c.next, None);
2032        assert_matches!(c.content, ChunkContent::Items(events) => {
2033            assert_eq!(events.len(), 3);
2034            check_test_event(&events[0], "hello");
2035            check_test_event(&events[1], "world");
2036            check_test_event(&events[2], "howdy");
2037        });
2038    }
2039
2040    #[async_test]
2041    async fn test_linked_chunk_clear() {
2042        let store = get_event_cache_store().await.expect("creating cache store failed");
2043
2044        let room_id = *DEFAULT_TEST_ROOM_ID;
2045        let event_0 = make_test_event(room_id, "hello");
2046        let event_1 = make_test_event(room_id, "world");
2047        let event_2 = make_test_event(room_id, "howdy");
2048
2049        store
2050            .handle_linked_chunk_updates(
2051                room_id,
2052                vec![
2053                    Update::NewItemsChunk {
2054                        previous: None,
2055                        new: ChunkIdentifier::new(42),
2056                        next: None,
2057                    },
2058                    Update::NewGapChunk {
2059                        previous: Some(ChunkIdentifier::new(42)),
2060                        new: ChunkIdentifier::new(54),
2061                        next: None,
2062                        gap: Gap { prev_token: "fondue".to_owned() },
2063                    },
2064                    Update::PushItems {
2065                        at: Position::new(ChunkIdentifier::new(42), 0),
2066                        items: vec![event_0.clone(), event_1, event_2],
2067                    },
2068                    Update::Clear,
2069                ],
2070            )
2071            .await
2072            .unwrap();
2073
2074        let chunks = store.load_all_chunks(room_id).await.unwrap();
2075        assert!(chunks.is_empty());
2076
2077        // Check that cascading worked. Yes, sqlite, I doubt you.
2078        store
2079            .acquire()
2080            .await
2081            .unwrap()
2082            .with_transaction(|txn| -> rusqlite::Result<_> {
2083                let num_gaps = txn
2084                    .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2085                    .query_row((), |row| row.get::<_, u64>(0))?;
2086                assert_eq!(num_gaps, 0);
2087
2088                let num_events = txn
2089                    .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2090                    .query_row((), |row| row.get::<_, u64>(0))?;
2091                assert_eq!(num_events, 0);
2092
2093                Ok(())
2094            })
2095            .await
2096            .unwrap();
2097
2098        // It's okay to re-insert a past event.
2099        store
2100            .handle_linked_chunk_updates(
2101                room_id,
2102                vec![
2103                    Update::NewItemsChunk {
2104                        previous: None,
2105                        new: ChunkIdentifier::new(42),
2106                        next: None,
2107                    },
2108                    Update::PushItems {
2109                        at: Position::new(ChunkIdentifier::new(42), 0),
2110                        items: vec![event_0],
2111                    },
2112                ],
2113            )
2114            .await
2115            .unwrap();
2116    }
2117
2118    #[async_test]
2119    async fn test_linked_chunk_multiple_rooms() {
2120        let store = get_event_cache_store().await.expect("creating cache store failed");
2121
2122        let room1 = room_id!("!realcheeselovers:raclette.fr");
2123        let room2 = room_id!("!realcheeselovers:fondue.ch");
2124
2125        // Check that applying updates to one room doesn't affect the others.
2126        // Use the same chunk identifier in both rooms to battle-test search.
2127
2128        store
2129            .handle_linked_chunk_updates(
2130                room1,
2131                vec![
2132                    Update::NewItemsChunk {
2133                        previous: None,
2134                        new: ChunkIdentifier::new(42),
2135                        next: None,
2136                    },
2137                    Update::PushItems {
2138                        at: Position::new(ChunkIdentifier::new(42), 0),
2139                        items: vec![
2140                            make_test_event(room1, "best cheese is raclette"),
2141                            make_test_event(room1, "obviously"),
2142                        ],
2143                    },
2144                ],
2145            )
2146            .await
2147            .unwrap();
2148
2149        store
2150            .handle_linked_chunk_updates(
2151                room2,
2152                vec![
2153                    Update::NewItemsChunk {
2154                        previous: None,
2155                        new: ChunkIdentifier::new(42),
2156                        next: None,
2157                    },
2158                    Update::PushItems {
2159                        at: Position::new(ChunkIdentifier::new(42), 0),
2160                        items: vec![make_test_event(room1, "beaufort is the best")],
2161                    },
2162                ],
2163            )
2164            .await
2165            .unwrap();
2166
2167        // Check chunks from room 1.
2168        let mut chunks_room1 = store.load_all_chunks(room1).await.unwrap();
2169        assert_eq!(chunks_room1.len(), 1);
2170
2171        let c = chunks_room1.remove(0);
2172        assert_matches!(c.content, ChunkContent::Items(events) => {
2173            assert_eq!(events.len(), 2);
2174            check_test_event(&events[0], "best cheese is raclette");
2175            check_test_event(&events[1], "obviously");
2176        });
2177
2178        // Check chunks from room 2.
2179        let mut chunks_room2 = store.load_all_chunks(room2).await.unwrap();
2180        assert_eq!(chunks_room2.len(), 1);
2181
2182        let c = chunks_room2.remove(0);
2183        assert_matches!(c.content, ChunkContent::Items(events) => {
2184            assert_eq!(events.len(), 1);
2185            check_test_event(&events[0], "beaufort is the best");
2186        });
2187    }
2188
2189    #[async_test]
2190    async fn test_linked_chunk_update_is_a_transaction() {
2191        let store = get_event_cache_store().await.expect("creating cache store failed");
2192
2193        let room_id = *DEFAULT_TEST_ROOM_ID;
2194
2195        // Trigger a violation of the unique constraint on the (room id, chunk id)
2196        // couple.
2197        let err = store
2198            .handle_linked_chunk_updates(
2199                room_id,
2200                vec![
2201                    Update::NewItemsChunk {
2202                        previous: None,
2203                        new: ChunkIdentifier::new(42),
2204                        next: None,
2205                    },
2206                    Update::NewItemsChunk {
2207                        previous: None,
2208                        new: ChunkIdentifier::new(42),
2209                        next: None,
2210                    },
2211                ],
2212            )
2213            .await
2214            .unwrap_err();
2215
2216        // The operation fails with a constraint violation error.
2217        assert_matches!(err, crate::error::Error::Sqlite(err) => {
2218            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2219        });
2220
2221        // If the updates have been handled transactionally, then no new chunks should
2222        // have been added; failure of the second update leads to the first one being
2223        // rolled back.
2224        let chunks = store.load_all_chunks(room_id).await.unwrap();
2225        assert!(chunks.is_empty());
2226    }
2227
2228    #[async_test]
2229    async fn test_filter_duplicate_events_no_events() {
2230        let store = get_event_cache_store().await.expect("creating cache store failed");
2231
2232        let room_id = *DEFAULT_TEST_ROOM_ID;
2233        let duplicates = store.filter_duplicated_events(room_id, Vec::new()).await.unwrap();
2234        assert!(duplicates.is_empty());
2235    }
2236
2237    #[async_test]
2238    async fn test_load_last_chunk() {
2239        let room_id = room_id!("!r0:matrix.org");
2240        let event = |msg: &str| make_test_event(room_id, msg);
2241        let store = get_event_cache_store().await.expect("creating cache store failed");
2242
2243        // Case #1: no last chunk.
2244        {
2245            let (last_chunk, chunk_identifier_generator) =
2246                store.load_last_chunk(room_id).await.unwrap();
2247
2248            assert!(last_chunk.is_none());
2249            assert_eq!(chunk_identifier_generator.current(), 0);
2250        }
2251
2252        // Case #2: only one chunk is present.
2253        {
2254            store
2255                .handle_linked_chunk_updates(
2256                    room_id,
2257                    vec![
2258                        Update::NewItemsChunk {
2259                            previous: None,
2260                            new: ChunkIdentifier::new(42),
2261                            next: None,
2262                        },
2263                        Update::PushItems {
2264                            at: Position::new(ChunkIdentifier::new(42), 0),
2265                            items: vec![event("saucisse de morteau"), event("comté")],
2266                        },
2267                    ],
2268                )
2269                .await
2270                .unwrap();
2271
2272            let (last_chunk, chunk_identifier_generator) =
2273                store.load_last_chunk(room_id).await.unwrap();
2274
2275            assert_matches!(last_chunk, Some(last_chunk) => {
2276                assert_eq!(last_chunk.identifier, 42);
2277                assert!(last_chunk.previous.is_none());
2278                assert!(last_chunk.next.is_none());
2279                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2280                    assert_eq!(items.len(), 2);
2281                    check_test_event(&items[0], "saucisse de morteau");
2282                    check_test_event(&items[1], "comté");
2283                });
2284            });
2285            assert_eq!(chunk_identifier_generator.current(), 42);
2286        }
2287
2288        // Case #3: more chunks are present.
2289        {
2290            store
2291                .handle_linked_chunk_updates(
2292                    room_id,
2293                    vec![
2294                        Update::NewItemsChunk {
2295                            previous: Some(ChunkIdentifier::new(42)),
2296                            new: ChunkIdentifier::new(7),
2297                            next: None,
2298                        },
2299                        Update::PushItems {
2300                            at: Position::new(ChunkIdentifier::new(7), 0),
2301                            items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2302                        },
2303                    ],
2304                )
2305                .await
2306                .unwrap();
2307
2308            let (last_chunk, chunk_identifier_generator) =
2309                store.load_last_chunk(room_id).await.unwrap();
2310
2311            assert_matches!(last_chunk, Some(last_chunk) => {
2312                assert_eq!(last_chunk.identifier, 7);
2313                assert_matches!(last_chunk.previous, Some(previous) => {
2314                    assert_eq!(previous, 42);
2315                });
2316                assert!(last_chunk.next.is_none());
2317                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2318                    assert_eq!(items.len(), 3);
2319                    check_test_event(&items[0], "fondue");
2320                    check_test_event(&items[1], "gruyère");
2321                    check_test_event(&items[2], "mont d'or");
2322                });
2323            });
2324            assert_eq!(chunk_identifier_generator.current(), 42);
2325        }
2326    }
2327
2328    #[async_test]
2329    async fn test_load_last_chunk_with_a_cycle() {
2330        let room_id = room_id!("!r0:matrix.org");
2331        let store = get_event_cache_store().await.expect("creating cache store failed");
2332
2333        store
2334            .handle_linked_chunk_updates(
2335                room_id,
2336                vec![
2337                    Update::NewItemsChunk {
2338                        previous: None,
2339                        new: ChunkIdentifier::new(0),
2340                        next: None,
2341                    },
2342                    Update::NewItemsChunk {
2343                        // Because `previous` connects to chunk #0, it will create a cycle.
2344                        // Chunk #0 will have a `next` set to chunk #1! Consequently, the last chunk
2345                        // **does not exist**. We have to detect this cycle.
2346                        previous: Some(ChunkIdentifier::new(0)),
2347                        new: ChunkIdentifier::new(1),
2348                        next: Some(ChunkIdentifier::new(0)),
2349                    },
2350                ],
2351            )
2352            .await
2353            .unwrap();
2354
2355        store.load_last_chunk(room_id).await.unwrap_err();
2356    }
2357
2358    #[async_test]
2359    async fn test_load_previous_chunk() {
2360        let room_id = room_id!("!r0:matrix.org");
2361        let event = |msg: &str| make_test_event(room_id, msg);
2362        let store = get_event_cache_store().await.expect("creating cache store failed");
2363
2364        // Case #1: no chunk at all, equivalent to having an nonexistent
2365        // `before_chunk_identifier`.
2366        {
2367            let previous_chunk =
2368                store.load_previous_chunk(room_id, ChunkIdentifier::new(153)).await.unwrap();
2369
2370            assert!(previous_chunk.is_none());
2371        }
2372
2373        // Case #2: there is one chunk only: we request the previous on this
2374        // one, it doesn't exist.
2375        {
2376            store
2377                .handle_linked_chunk_updates(
2378                    room_id,
2379                    vec![Update::NewItemsChunk {
2380                        previous: None,
2381                        new: ChunkIdentifier::new(42),
2382                        next: None,
2383                    }],
2384                )
2385                .await
2386                .unwrap();
2387
2388            let previous_chunk =
2389                store.load_previous_chunk(room_id, ChunkIdentifier::new(42)).await.unwrap();
2390
2391            assert!(previous_chunk.is_none());
2392        }
2393
2394        // Case #3: there are two chunks.
2395        {
2396            store
2397                .handle_linked_chunk_updates(
2398                    room_id,
2399                    vec![
2400                        // new chunk before the one that exists.
2401                        Update::NewItemsChunk {
2402                            previous: None,
2403                            new: ChunkIdentifier::new(7),
2404                            next: Some(ChunkIdentifier::new(42)),
2405                        },
2406                        Update::PushItems {
2407                            at: Position::new(ChunkIdentifier::new(7), 0),
2408                            items: vec![event("brigand du jorat"), event("morbier")],
2409                        },
2410                    ],
2411                )
2412                .await
2413                .unwrap();
2414
2415            let previous_chunk =
2416                store.load_previous_chunk(room_id, ChunkIdentifier::new(42)).await.unwrap();
2417
2418            assert_matches!(previous_chunk, Some(previous_chunk) => {
2419                assert_eq!(previous_chunk.identifier, 7);
2420                assert!(previous_chunk.previous.is_none());
2421                assert_matches!(previous_chunk.next, Some(next) => {
2422                    assert_eq!(next, 42);
2423                });
2424                assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2425                    assert_eq!(items.len(), 2);
2426                    check_test_event(&items[0], "brigand du jorat");
2427                    check_test_event(&items[1], "morbier");
2428                });
2429            });
2430        }
2431    }
2432}
2433
2434#[cfg(test)]
2435mod encrypted_tests {
2436    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2437
2438    use matrix_sdk_base::{
2439        event_cache::store::EventCacheStoreError, event_cache_store_integration_tests,
2440        event_cache_store_integration_tests_time, event_cache_store_media_integration_tests,
2441    };
2442    use once_cell::sync::Lazy;
2443    use tempfile::{tempdir, TempDir};
2444
2445    use super::SqliteEventCacheStore;
2446
2447    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2448    static NUM: AtomicU32 = AtomicU32::new(0);
2449
2450    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2451        let name = NUM.fetch_add(1, SeqCst).to_string();
2452        let tmpdir_path = TMP_DIR.path().join(name);
2453
2454        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2455
2456        Ok(SqliteEventCacheStore::open(
2457            tmpdir_path.to_str().unwrap(),
2458            Some("default_test_password"),
2459        )
2460        .await
2461        .unwrap())
2462    }
2463
2464    event_cache_store_integration_tests!();
2465    event_cache_store_integration_tests_time!();
2466    event_cache_store_media_integration_tests!();
2467}