matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`EventCacheStore`].
16
17use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use matrix_sdk_base::{
21    cross_process_lock::CrossProcessLockGeneration,
22    deserialized_responses::TimelineEvent,
23    event_cache::{
24        store::{extract_event_relation, EventCacheStore},
25        Event, Gap,
26    },
27    linked_chunk::{
28        ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29        Position, RawChunk, Update,
30    },
31    timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35    events::relation::RelationType, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId,
36};
37use rusqlite::{
38    params, params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior,
39};
40use tokio::{
41    fs,
42    sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, error, instrument, trace};
45
46use crate::{
47    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
48    error::{Error, Result},
49    utils::{
50        repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
51        SqliteKeyValueStoreConnExt, SqliteTransactionExt,
52    },
53    OpenStoreError, Secret, SqliteStoreConfig,
54};
55
56mod keys {
57    // Tables
58    pub const LINKED_CHUNKS: &str = "linked_chunks";
59    pub const EVENTS: &str = "events";
60}
61
62/// The database name.
63const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
64
65/// Identifier of the latest database version.
66///
67/// This is used to figure whether the SQLite database requires a migration.
68/// Every new SQL migration should imply a bump of this number, and changes in
69/// the [`run_migrations`] function.
70const DATABASE_VERSION: u8 = 13;
71
72/// The string used to identify a chunk of type events, in the `type` field in
73/// the database.
74const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
75/// The string used to identify a chunk of type gap, in the `type` field in the
76/// database.
77const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
78
79/// An SQLite-based event cache store.
80#[derive(Clone)]
81pub struct SqliteEventCacheStore {
82    store_cipher: Option<Arc<StoreCipher>>,
83
84    /// The pool of connections.
85    pool: SqlitePool,
86
87    /// We make the difference between connections for read operations, and for
88    /// write operations. We keep a single connection apart from write
89    /// operations. All other connections are used for read operations. The
90    /// lock is used to ensure there is one owner at a time.
91    write_connection: Arc<Mutex<SqliteAsyncConn>>,
92}
93
94#[cfg(not(tarpaulin_include))]
95impl fmt::Debug for SqliteEventCacheStore {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
98    }
99}
100
101impl EncryptableStore for SqliteEventCacheStore {
102    fn get_cypher(&self) -> Option<&StoreCipher> {
103        self.store_cipher.as_deref()
104    }
105}
106
107impl SqliteEventCacheStore {
108    /// Open the SQLite-based event cache store at the given path using the
109    /// given passphrase to encrypt private data.
110    pub async fn open(
111        path: impl AsRef<Path>,
112        passphrase: Option<&str>,
113    ) -> Result<Self, OpenStoreError> {
114        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
115    }
116
117    /// Open the SQLite-based event cache store at the given path using the
118    /// given key to encrypt private data.
119    pub async fn open_with_key(
120        path: impl AsRef<Path>,
121        key: Option<&[u8; 32]>,
122    ) -> Result<Self, OpenStoreError> {
123        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
124    }
125
126    /// Open the SQLite-based event cache store with the config open config.
127    #[instrument(skip(config), fields(path = ?config.path))]
128    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
129        debug!(?config);
130
131        let _timer = timer!("open_with_config");
132
133        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
134
135        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
136
137        let this = Self::open_with_pool(pool, config.secret).await?;
138        this.write().await?.apply_runtime_config(config.runtime_config).await?;
139
140        Ok(this)
141    }
142
143    /// Open an SQLite-based event cache store using the given SQLite database
144    /// pool. The given secret will be used to encrypt private data.
145    async fn open_with_pool(
146        pool: SqlitePool,
147        secret: Option<Secret>,
148    ) -> Result<Self, OpenStoreError> {
149        let conn = pool.get().await?;
150
151        let version = conn.db_version().await?;
152        run_migrations(&conn, version).await?;
153
154        let store_cipher = match secret {
155            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
156            None => None,
157        };
158
159        Ok(Self {
160            store_cipher,
161            pool,
162            // Use `conn` as our selected write connections.
163            write_connection: Arc::new(Mutex::new(conn)),
164        })
165    }
166
167    /// Acquire a connection for executing read operations.
168    #[instrument(skip_all)]
169    async fn read(&self) -> Result<SqliteAsyncConn> {
170        let connection = self.pool.get().await?;
171
172        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
173        // support must be enabled on a per-connection basis. Execute it every
174        // time we try to get a connection, since we can't guarantee a previous
175        // connection did enable it before.
176        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
177
178        Ok(connection)
179    }
180
181    /// Acquire a connection for executing write operations.
182    #[instrument(skip_all)]
183    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
184        let connection = self.write_connection.clone().lock_owned().await;
185
186        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
187        // support must be enabled on a per-connection basis. Execute it every
188        // time we try to get a connection, since we can't guarantee a previous
189        // connection did enable it before.
190        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
191
192        Ok(connection)
193    }
194
195    fn map_row_to_chunk(
196        row: &rusqlite::Row<'_>,
197    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
198        Ok((
199            row.get::<_, u64>(0)?,
200            row.get::<_, Option<u64>>(1)?,
201            row.get::<_, Option<u64>>(2)?,
202            row.get::<_, String>(3)?,
203        ))
204    }
205
206    fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
207        let serialized = serde_json::to_vec(event)?;
208
209        // Extract the relationship info here.
210        let raw_event = event.raw();
211        let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
212
213        // The content may be encrypted.
214        let content = self.encode_value(serialized)?;
215
216        Ok(EncodedEvent {
217            content,
218            rel_type,
219            relates_to: relates_to.map(|relates_to| relates_to.to_string()),
220        })
221    }
222
223    pub async fn vacuum(&self) -> Result<()> {
224        self.write_connection.lock().await.vacuum().await
225    }
226
227    async fn get_db_size(&self) -> Result<Option<usize>> {
228        Ok(Some(self.pool.get().await?.get_db_size().await?))
229    }
230}
231
232struct EncodedEvent {
233    content: Vec<u8>,
234    rel_type: Option<String>,
235    relates_to: Option<String>,
236}
237
238trait TransactionExtForLinkedChunks {
239    fn rebuild_chunk(
240        &self,
241        store: &SqliteEventCacheStore,
242        linked_chunk_id: &Key,
243        previous: Option<u64>,
244        index: u64,
245        next: Option<u64>,
246        chunk_type: &str,
247    ) -> Result<RawChunk<Event, Gap>>;
248
249    fn load_gap_content(
250        &self,
251        store: &SqliteEventCacheStore,
252        linked_chunk_id: &Key,
253        chunk_id: ChunkIdentifier,
254    ) -> Result<Gap>;
255
256    fn load_events_content(
257        &self,
258        store: &SqliteEventCacheStore,
259        linked_chunk_id: &Key,
260        chunk_id: ChunkIdentifier,
261    ) -> Result<Vec<Event>>;
262}
263
264impl TransactionExtForLinkedChunks for Transaction<'_> {
265    fn rebuild_chunk(
266        &self,
267        store: &SqliteEventCacheStore,
268        linked_chunk_id: &Key,
269        previous: Option<u64>,
270        id: u64,
271        next: Option<u64>,
272        chunk_type: &str,
273    ) -> Result<RawChunk<Event, Gap>> {
274        let previous = previous.map(ChunkIdentifier::new);
275        let next = next.map(ChunkIdentifier::new);
276        let id = ChunkIdentifier::new(id);
277
278        match chunk_type {
279            CHUNK_TYPE_GAP_TYPE_STRING => {
280                // It's a gap!
281                let gap = self.load_gap_content(store, linked_chunk_id, id)?;
282                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
283            }
284
285            CHUNK_TYPE_EVENT_TYPE_STRING => {
286                // It's events!
287                let events = self.load_events_content(store, linked_chunk_id, id)?;
288                Ok(RawChunk {
289                    content: ChunkContent::Items(events),
290                    previous,
291                    identifier: id,
292                    next,
293                })
294            }
295
296            other => {
297                // It's an error!
298                Err(Error::InvalidData {
299                    details: format!("a linked chunk has an unknown type {other}"),
300                })
301            }
302        }
303    }
304
305    fn load_gap_content(
306        &self,
307        store: &SqliteEventCacheStore,
308        linked_chunk_id: &Key,
309        chunk_id: ChunkIdentifier,
310    ) -> Result<Gap> {
311        // There's at most one row for it in the database, so a call to `query_row` is
312        // sufficient.
313        let encoded_prev_token: Vec<u8> = self.query_row(
314            "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
315            (chunk_id.index(), &linked_chunk_id),
316            |row| row.get(0),
317        )?;
318        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
319        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
320        Ok(Gap { prev_token })
321    }
322
323    fn load_events_content(
324        &self,
325        store: &SqliteEventCacheStore,
326        linked_chunk_id: &Key,
327        chunk_id: ChunkIdentifier,
328    ) -> Result<Vec<Event>> {
329        // Retrieve all the events from the database.
330        let mut events = Vec::new();
331
332        for event_data in self
333            .prepare(
334                r#"
335                    SELECT events.content
336                    FROM event_chunks ec, events
337                    WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
338                    ORDER BY ec.position ASC
339                "#,
340            )?
341            .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
342        {
343            let encoded_content = event_data?;
344            let serialized_content = store.decode_value(&encoded_content)?;
345            let event = serde_json::from_slice(&serialized_content)?;
346
347            events.push(event);
348        }
349
350        Ok(events)
351    }
352}
353
354/// Run migrations for the given version of the database.
355async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
356    if version == 0 {
357        debug!("Creating database");
358    } else if version < DATABASE_VERSION {
359        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
360    } else {
361        return Ok(());
362    }
363
364    // Always enable foreign keys for the current connection.
365    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
366
367    if version < 1 {
368        // First turn on WAL mode, this can't be done in the transaction, it fails with
369        // the error message: "cannot change into wal mode from within a transaction".
370        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
371        conn.with_transaction(|txn| {
372            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
373            txn.set_db_version(1)
374        })
375        .await?;
376    }
377
378    if version < 2 {
379        conn.with_transaction(|txn| {
380            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
381            txn.set_db_version(2)
382        })
383        .await?;
384    }
385
386    if version < 3 {
387        conn.with_transaction(|txn| {
388            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
389            txn.set_db_version(3)
390        })
391        .await?;
392    }
393
394    if version < 4 {
395        conn.with_transaction(|txn| {
396            txn.execute_batch(include_str!(
397                "../migrations/event_cache_store/004_ignore_policy.sql"
398            ))?;
399            txn.set_db_version(4)
400        })
401        .await?;
402    }
403
404    if version < 5 {
405        conn.with_transaction(|txn| {
406            txn.execute_batch(include_str!(
407                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
408            ))?;
409            txn.set_db_version(5)
410        })
411        .await?;
412    }
413
414    if version < 6 {
415        conn.with_transaction(|txn| {
416            txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
417            txn.set_db_version(6)
418        })
419        .await?;
420    }
421
422    if version < 7 {
423        conn.with_transaction(|txn| {
424            txn.execute_batch(include_str!(
425                "../migrations/event_cache_store/007_event_chunks.sql"
426            ))?;
427            txn.set_db_version(7)
428        })
429        .await?;
430    }
431
432    if version < 8 {
433        conn.with_transaction(|txn| {
434            txn.execute_batch(include_str!(
435                "../migrations/event_cache_store/008_linked_chunk_id.sql"
436            ))?;
437            txn.set_db_version(8)
438        })
439        .await?;
440    }
441
442    if version < 9 {
443        conn.with_transaction(|txn| {
444            txn.execute_batch(include_str!(
445                "../migrations/event_cache_store/009_related_event_index.sql"
446            ))?;
447            txn.set_db_version(9)
448        })
449        .await?;
450    }
451
452    if version < 10 {
453        conn.with_transaction(|txn| {
454            txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
455            txn.set_db_version(10)
456        })
457        .await?;
458
459        if version >= 1 {
460            // Defragment the DB and optimize its size on the filesystem now that we removed
461            // the media cache.
462            conn.vacuum().await?;
463        }
464    }
465
466    if version < 11 {
467        conn.with_transaction(|txn| {
468            txn.execute_batch(include_str!(
469                "../migrations/event_cache_store/011_empty_event_cache.sql"
470            ))?;
471            txn.set_db_version(11)
472        })
473        .await?;
474    }
475
476    if version < 12 {
477        conn.with_transaction(|txn| {
478            txn.execute_batch(include_str!(
479                "../migrations/event_cache_store/012_store_event_type.sql"
480            ))?;
481            txn.set_db_version(12)
482        })
483        .await?;
484    }
485
486    if version < 13 {
487        conn.with_transaction(|txn| {
488            txn.execute_batch(include_str!(
489                "../migrations/event_cache_store/013_lease_locks_with_generation.sql"
490            ))?;
491            txn.set_db_version(13)
492        })
493        .await?;
494    }
495
496    Ok(())
497}
498
499#[async_trait]
500impl EventCacheStore for SqliteEventCacheStore {
501    type Error = Error;
502
503    #[instrument(skip(self))]
504    async fn try_take_leased_lock(
505        &self,
506        lease_duration_ms: u32,
507        key: &str,
508        holder: &str,
509    ) -> Result<Option<CrossProcessLockGeneration>> {
510        let key = key.to_owned();
511        let holder = holder.to_owned();
512
513        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
514        let expiration = now + lease_duration_ms as u64;
515
516        // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html.
517        let generation = self
518            .write()
519            .await?
520            .with_transaction(move |txn| {
521                txn.query_row(
522                    "INSERT INTO lease_locks (key, holder, expiration)
523                    VALUES (?1, ?2, ?3)
524                    ON CONFLICT (key)
525                    DO
526                        UPDATE SET
527                            holder = excluded.holder,
528                            expiration = excluded.expiration,
529                            generation =
530                                CASE holder
531                                    WHEN excluded.holder THEN generation
532                                    ELSE generation + 1
533                                END
534                        WHERE
535                            holder = excluded.holder
536                            OR expiration < ?4
537                    RETURNING generation
538                    ",
539                    (key, holder, expiration, now),
540                    |row| row.get(0),
541                )
542                .optional()
543            })
544            .await?;
545
546        Ok(generation)
547    }
548
549    #[instrument(skip(self, updates))]
550    async fn handle_linked_chunk_updates(
551        &self,
552        linked_chunk_id: LinkedChunkId<'_>,
553        updates: Vec<Update<Event, Gap>>,
554    ) -> Result<(), Self::Error> {
555        let _timer = timer!("method");
556
557        // Use a single transaction throughout this function, so that either all updates
558        // work, or none is taken into account.
559        let hashed_linked_chunk_id =
560            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
561        let linked_chunk_id = linked_chunk_id.to_owned();
562        let this = self.clone();
563
564        with_immediate_transaction(self, move |txn| {
565            for up in updates {
566                match up {
567                    Update::NewItemsChunk { previous, new, next } => {
568                        let previous = previous.as_ref().map(ChunkIdentifier::index);
569                        let new = new.index();
570                        let next = next.as_ref().map(ChunkIdentifier::index);
571
572                        trace!(
573                            %linked_chunk_id,
574                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
575                        );
576
577                        insert_chunk(
578                            txn,
579                            &hashed_linked_chunk_id,
580                            previous,
581                            new,
582                            next,
583                            CHUNK_TYPE_EVENT_TYPE_STRING,
584                        )?;
585                    }
586
587                    Update::NewGapChunk { previous, new, next, gap } => {
588                        let serialized = serde_json::to_vec(&gap.prev_token)?;
589                        let prev_token = this.encode_value(serialized)?;
590
591                        let previous = previous.as_ref().map(ChunkIdentifier::index);
592                        let new = new.index();
593                        let next = next.as_ref().map(ChunkIdentifier::index);
594
595                        trace!(
596                            %linked_chunk_id,
597                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
598                        );
599
600                        // Insert the chunk as a gap.
601                        insert_chunk(
602                            txn,
603                            &hashed_linked_chunk_id,
604                            previous,
605                            new,
606                            next,
607                            CHUNK_TYPE_GAP_TYPE_STRING,
608                        )?;
609
610                        // Insert the gap's value.
611                        txn.execute(
612                            r#"
613                            INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
614                            VALUES (?, ?, ?)
615                        "#,
616                            (new, &hashed_linked_chunk_id, prev_token),
617                        )?;
618                    }
619
620                    Update::RemoveChunk(chunk_identifier) => {
621                        let chunk_id = chunk_identifier.index();
622
623                        trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
624
625                        // Find chunk to delete.
626                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
627                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
628                            (chunk_id, &hashed_linked_chunk_id),
629                            |row| Ok((row.get(0)?, row.get(1)?))
630                        )?;
631
632                        // Replace its previous' next to its own next.
633                        if let Some(previous) = previous {
634                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
635                        }
636
637                        // Replace its next' previous to its own previous.
638                        if let Some(next) = next {
639                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
640                        }
641
642                        // Now delete it, and let cascading delete corresponding entries in the
643                        // other data tables.
644                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
645                    }
646
647                    Update::PushItems { at, items } => {
648                        if items.is_empty() {
649                            // Should never happens, but better be safe.
650                            continue;
651                        }
652
653                        let chunk_id = at.chunk_identifier().index();
654
655                        trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
656
657                        let mut chunk_statement = txn.prepare(
658                            "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
659                        )?;
660
661                        // Note: we use `OR REPLACE` here, because the event might have been
662                        // already inserted in the database. This is the case when an event is
663                        // deduplicated and moved to another position; or because it was inserted
664                        // outside the context of a linked chunk (e.g. pinned event).
665                        let mut content_statement = txn.prepare(
666                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
667                        )?;
668
669                        let invalid_event = |event: TimelineEvent| {
670                            let Some(event_id) = event.event_id() else {
671                                error!(%linked_chunk_id, "Trying to push an event with no ID");
672                                return None;
673                            };
674
675                            let Some(event_type) = event.kind.event_type() else {
676                                error!(%event_id, "Trying to save an event with no event type");
677                                return None;
678                            };
679
680                            Some((event_id.to_string(), event_type, event))
681                        };
682
683                        let room_id = linked_chunk_id.room_id();
684                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
685
686                        for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
687                            // Insert the location information into the database.
688                            let index = at.index() + i;
689                            chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
690
691                            let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
692                            let event_type = this.encode_key(keys::EVENTS, event_type);
693
694                            // Now, insert the event content into the database.
695                            let encoded_event = this.encode_event(&event)?;
696                            content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
697                        }
698                    }
699
700                    Update::ReplaceItem { at, item: event } => {
701                        let chunk_id = at.chunk_identifier().index();
702
703                        let index = at.index();
704
705                        trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
706
707                        // The event id should be the same, but just in case it changed…
708                        let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
709                            error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
710                            continue;
711                        };
712
713                        let Some(event_type) = event.kind.event_type() else {
714                            error!(%event_id, "Trying to save an event with no event type");
715                            continue;
716                        };
717
718                        let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
719                        let event_type = this.encode_key(keys::EVENTS, event_type);
720
721                        // Replace the event's content. Really we'd like to update, but in case the
722                        // event id changed, we are a bit lenient here and will allow an insertion
723                        // of the new event.
724                        let encoded_event = this.encode_event(&event)?;
725                        let room_id = linked_chunk_id.room_id();
726                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
727                        txn.execute(
728                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
729                            (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
730
731                        // Replace the event id in the linked chunk, in case it changed.
732                        txn.execute(
733                            r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
734                            (event_id, &hashed_linked_chunk_id, chunk_id, index)
735                        )?;
736                    }
737
738                    Update::RemoveItem { at } => {
739                        let chunk_id = at.chunk_identifier().index();
740                        let index = at.index();
741
742                        trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
743
744                        // Remove the entry in the chunk table.
745                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
746
747                        // Decrement the index of each item after the one we are
748                        // going to remove.
749                        //
750                        // Imagine we have the following events:
751                        //
752                        // | event_id | linked_chunk_id | chunk_id | position |
753                        // |----------|-----------------|----------|----------|
754                        // | $ev0     | !r0             | 42       | 0        |
755                        // | $ev1     | !r0             | 42       | 1        |
756                        // | $ev2     | !r0             | 42       | 2        |
757                        // | $ev3     | !r0             | 42       | 3        |
758                        // | $ev4     | !r0             | 42       | 4        |
759                        //
760                        // `$ev2` has been removed, then we end up in this
761                        // state:
762                        //
763                        // | event_id | linked_chunk_id    | chunk_id | position |
764                        // |----------|--------------------|----------|----------|
765                        // | $ev0     | !r0                | 42       | 0        |
766                        // | $ev1     | !r0                | 42       | 1        |
767                        // |          |                    |          |          | <- no more `$ev2`
768                        // | $ev3     | !r0                | 42       | 3        |
769                        // | $ev4     | !r0                | 42       | 4        |
770                        //
771                        // We need to shift the `position` of `$ev3` and `$ev4`
772                        // to `position - 1`, like so:
773                        //
774                        // | event_id | linked_chunk_id | chunk_id | position |
775                        // |----------|-----------------|----------|----------|
776                        // | $ev0     | !r0             | 42       | 0        |
777                        // | $ev1     | !r0             | 42       | 1        |
778                        // | $ev3     | !r0             | 42       | 2        |
779                        // | $ev4     | !r0             | 42       | 3        |
780                        //
781                        // Usually, it boils down to run the following query:
782                        //
783                        // ```sql
784                        // UPDATE event_chunks
785                        // SET position = position - 1
786                        // WHERE position > 2 AND …
787                        // ```
788                        //
789                        // Okay. But `UPDATE` runs on rows in no particular
790                        // order. It means that it can update `$ev4` before
791                        // `$ev3` for example. What happens in this particular
792                        // case? The `position` of `$ev4` becomes `3`, however
793                        // `$ev3` already has `position = 3`. Because there
794                        // is a `UNIQUE` constraint on `(linked_chunk_id, chunk_id,
795                        // position)`, it will result in a constraint violation.
796                        //
797                        // There is **no way** to control the execution order of
798                        // `UPDATE` in SQLite. To persuade yourself, try:
799                        //
800                        // ```sql
801                        // UPDATE event_chunks
802                        // SET position = position - 1
803                        // FROM (
804                        //     SELECT event_id
805                        //     FROM event_chunks
806                        //     WHERE position > 2 AND …
807                        //     ORDER BY position ASC
808                        // ) as ordered
809                        // WHERE event_chunks.event_id = ordered.event_id
810                        // ```
811                        //
812                        // It will fail the same way.
813                        //
814                        // Thus, we have 2 solutions:
815                        //
816                        // 1. Remove the `UNIQUE` constraint,
817                        // 2. Be creative.
818                        //
819                        // The `UNIQUE` constraint is a safe belt. Normally, we
820                        // have `event_cache::Deduplicator` that is responsible
821                        // to ensure there is no duplicated event. However,
822                        // relying on this is “fragile” in the sense it can
823                        // contain bugs. Relying on the `UNIQUE` constraint from
824                        // SQLite is more robust. It's “braces and belt” as we
825                        // say here.
826                        //
827                        // So. We need to be creative.
828                        //
829                        // Many solutions exist. Amongst the most popular, we
830                        // see _dropping and re-creating the index_, which is
831                        // no-go for us, it's too expensive. I (@hywan) have
832                        // adopted the following one:
833                        //
834                        // - Do `position = position - 1` but in the negative
835                        //   space, so `position = -(position - 1)`. A position
836                        //   cannot be negative; we are sure it is unique!
837                        // - Once all candidate rows are updated, do `position =
838                        //   -position` to move back to the positive space.
839                        //
840                        // 'told you it's gonna be creative.
841                        //
842                        // This solution is a hack, **but** it is a small
843                        // number of operations, and we can keep the `UNIQUE`
844                        // constraint in place.
845                        txn.execute(
846                            r#"
847                                UPDATE event_chunks
848                                SET position = -(position - 1)
849                                WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
850                            "#,
851                            (&hashed_linked_chunk_id, chunk_id, index)
852                        )?;
853                        txn.execute(
854                            r#"
855                                UPDATE event_chunks
856                                SET position = -position
857                                WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
858                            "#,
859                            (&hashed_linked_chunk_id, chunk_id)
860                        )?;
861                    }
862
863                    Update::DetachLastItems { at } => {
864                        let chunk_id = at.chunk_identifier().index();
865                        let index = at.index();
866
867                        trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
868
869                        // Remove these entries.
870                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
871                    }
872
873                    Update::Clear => {
874                        trace!(%linked_chunk_id, "clearing items");
875
876                        // Remove chunks, and let cascading do its job.
877                        txn.execute(
878                            "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
879                            (&hashed_linked_chunk_id,),
880                        )?;
881                    }
882
883                    Update::StartReattachItems | Update::EndReattachItems => {
884                        // Nothing.
885                    }
886                }
887            }
888
889            Ok(())
890        })
891        .await?;
892
893        Ok(())
894    }
895
896    #[instrument(skip(self))]
897    async fn load_all_chunks(
898        &self,
899        linked_chunk_id: LinkedChunkId<'_>,
900    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
901        let _timer = timer!("method");
902
903        let hashed_linked_chunk_id =
904            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
905
906        let this = self.clone();
907
908        let result = self
909            .read()
910            .await?
911            .with_transaction(move |txn| -> Result<_> {
912                let mut items = Vec::new();
913
914                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
915                for data in txn
916                    .prepare(
917                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
918                    )?
919                    .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
920                {
921                    let (id, previous, next, chunk_type) = data?;
922                    let new = txn.rebuild_chunk(
923                        &this,
924                        &hashed_linked_chunk_id,
925                        previous,
926                        id,
927                        next,
928                        chunk_type.as_str(),
929                    )?;
930                    items.push(new);
931                }
932
933                Ok(items)
934            })
935            .await?;
936
937        Ok(result)
938    }
939
940    #[instrument(skip(self))]
941    async fn load_all_chunks_metadata(
942        &self,
943        linked_chunk_id: LinkedChunkId<'_>,
944    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
945        let _timer = timer!("method");
946
947        let hashed_linked_chunk_id =
948            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
949
950        self.read()
951            .await?
952            .with_transaction(move |txn| -> Result<_> {
953                // We want to collect the metadata about each chunk (id, next, previous), and
954                // for event chunks, the number of events in it. For gaps, the
955                // number of events is 0, by convention.
956                //
957                // We've tried different strategies over time:
958                // - use a `LEFT JOIN` + `COUNT`, which was extremely inefficient because it
959                //   caused a full table traversal for each chunk, including for gaps which
960                //   don't have any events. This happened in
961                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5225.
962                // - use a `CASE` statement on the chunk's type: if it's an event chunk, run an
963                //   additional `SELECT` query. It was an immense improvement, but still caused
964                //   one select query per event chunk. This happened in
965                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5411.
966                //
967                // The current solution is to run two queries:
968                // - one to get each chunk and its number of events, by doing a single `SELECT`
969                //   query over the `event_chunks` table, grouping by chunk ids. This gives us a
970                //   list of `(chunk_id, num_events)` pairs, which can be transformed into a
971                //   hashmap.
972                // - one to get each chunk's metadata (id, previous, next, type) from the
973                //   database with a `SELECT`, and then use the hashmap to get the number of
974                //   events.
975                //
976                // This strategy minimizes the number of queries to the database, and keeps them
977                // super simple, while doing a bit more processing here, which is much faster.
978
979                let num_events_by_chunk_ids = txn
980                    .prepare(
981                        r#"
982                            SELECT ec.chunk_id, COUNT(ec.event_id)
983                            FROM event_chunks as ec
984                            WHERE ec.linked_chunk_id = ?
985                            GROUP BY ec.chunk_id
986                        "#,
987                    )?
988                    .query_map((&hashed_linked_chunk_id,), |row| {
989                        Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
990                    })?
991                    .collect::<Result<HashMap<_, _>, _>>()?;
992
993                txn.prepare(
994                    r#"
995                        SELECT
996                            lc.id,
997                            lc.previous,
998                            lc.next,
999                            lc.type
1000                        FROM linked_chunks as lc
1001                        WHERE lc.linked_chunk_id = ?
1002                        ORDER BY lc.id"#,
1003                )?
1004                .query_map((&hashed_linked_chunk_id,), |row| {
1005                    Ok((
1006                        row.get::<_, u64>(0)?,
1007                        row.get::<_, Option<u64>>(1)?,
1008                        row.get::<_, Option<u64>>(2)?,
1009                        row.get::<_, String>(3)?,
1010                    ))
1011                })?
1012                .map(|data| -> Result<_> {
1013                    let (id, previous, next, chunk_type) = data?;
1014
1015                    // Note: since a gap has 0 events, an alternative could be to *not* retrieve
1016                    // the chunk type, and just let the hashmap lookup fail for gaps. However,
1017                    // benchmarking shows that this is slightly slower than matching the chunk
1018                    // type (around 1%, so in the realm of noise), so we keep the explicit
1019                    // check instead.
1020                    let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1021                        0
1022                    } else {
1023                        num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1024                    };
1025
1026                    Ok(ChunkMetadata {
1027                        identifier: ChunkIdentifier::new(id),
1028                        previous: previous.map(ChunkIdentifier::new),
1029                        next: next.map(ChunkIdentifier::new),
1030                        num_items,
1031                    })
1032                })
1033                .collect::<Result<Vec<_>, _>>()
1034            })
1035            .await
1036    }
1037
1038    #[instrument(skip(self))]
1039    async fn load_last_chunk(
1040        &self,
1041        linked_chunk_id: LinkedChunkId<'_>,
1042    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1043        let _timer = timer!("method");
1044
1045        let hashed_linked_chunk_id =
1046            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1047
1048        let this = self.clone();
1049
1050        self
1051            .read()
1052            .await?
1053            .with_transaction(move |txn| -> Result<_> {
1054                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
1055                let (observed_max_identifier, number_of_chunks) = txn
1056                    .prepare(
1057                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1058                    )?
1059                    .query_row(
1060                        (&hashed_linked_chunk_id,),
1061                        |row| {
1062                            Ok((
1063                                // Read the `MAX(id)` as an `Option<u64>` instead
1064                                // of `u64` in case the `SELECT` returns nothing.
1065                                // Indeed, if it returns no line, the `MAX(id)` is
1066                                // set to `Null`.
1067                                row.get::<_, Option<u64>>(0)?,
1068                                row.get::<_, u64>(1)?,
1069                            ))
1070                        }
1071                    )?;
1072
1073                let chunk_identifier_generator = match observed_max_identifier {
1074                    Some(max_observed_identifier) => {
1075                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1076                            ChunkIdentifier::new(max_observed_identifier)
1077                        )
1078                    },
1079                    None => ChunkIdentifierGenerator::new_from_scratch(),
1080                };
1081
1082                // Find the last chunk.
1083                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1084                    .prepare(
1085                        "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1086                    )?
1087                    .query_row(
1088                        (&hashed_linked_chunk_id,),
1089                        |row| {
1090                            Ok((
1091                                row.get::<_, u64>(0)?,
1092                                row.get::<_, Option<u64>>(1)?,
1093                                row.get::<_, String>(2)?,
1094                            ))
1095                        }
1096                    )
1097                    .optional()?
1098                else {
1099                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
1100                    // good.
1101                    if number_of_chunks == 0 {
1102                        return Ok((None, chunk_identifier_generator));
1103                    }
1104                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
1105                    // linked chunk is malformed.
1106                    //
1107                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
1108                    else {
1109                        return Err(Error::InvalidData {
1110                            details:
1111                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1112                                    .to_owned()
1113                            }
1114                        )
1115                    }
1116                };
1117
1118                // Build the chunk.
1119                let last_chunk = txn.rebuild_chunk(
1120                    &this,
1121                    &hashed_linked_chunk_id,
1122                    previous_chunk,
1123                    chunk_identifier,
1124                    None,
1125                    &chunk_type
1126                )?;
1127
1128                Ok((Some(last_chunk), chunk_identifier_generator))
1129            })
1130            .await
1131    }
1132
1133    #[instrument(skip(self))]
1134    async fn load_previous_chunk(
1135        &self,
1136        linked_chunk_id: LinkedChunkId<'_>,
1137        before_chunk_identifier: ChunkIdentifier,
1138    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1139        let _timer = timer!("method");
1140
1141        let hashed_linked_chunk_id =
1142            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1143
1144        let this = self.clone();
1145
1146        self
1147            .read()
1148            .await?
1149            .with_transaction(move |txn| -> Result<_> {
1150                // Find the chunk before the chunk identified by `before_chunk_identifier`.
1151                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1152                    .prepare(
1153                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1154                    )?
1155                    .query_row(
1156                        (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1157                        |row| {
1158                            Ok((
1159                                row.get::<_, u64>(0)?,
1160                                row.get::<_, Option<u64>>(1)?,
1161                                row.get::<_, Option<u64>>(2)?,
1162                                row.get::<_, String>(3)?,
1163                            ))
1164                        }
1165                    )
1166                    .optional()?
1167                else {
1168                    // Chunk is not found.
1169                    return Ok(None);
1170                };
1171
1172                // Build the chunk.
1173                let last_chunk = txn.rebuild_chunk(
1174                    &this,
1175                    &hashed_linked_chunk_id,
1176                    previous_chunk,
1177                    chunk_identifier,
1178                    next_chunk,
1179                    &chunk_type
1180                )?;
1181
1182                Ok(Some(last_chunk))
1183            })
1184            .await
1185    }
1186
1187    #[instrument(skip(self))]
1188    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1189        let _timer = timer!("method");
1190
1191        self.write()
1192            .await?
1193            .with_transaction(move |txn| {
1194                // Remove all the chunks, and let cascading do its job.
1195                txn.execute("DELETE FROM linked_chunks", ())?;
1196                // Also clear all the events' contents.
1197                txn.execute("DELETE FROM events", ())
1198            })
1199            .await?;
1200
1201        Ok(())
1202    }
1203
1204    #[instrument(skip(self, events))]
1205    async fn filter_duplicated_events(
1206        &self,
1207        linked_chunk_id: LinkedChunkId<'_>,
1208        events: Vec<OwnedEventId>,
1209    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1210        let _timer = timer!("method");
1211
1212        // If there's no events for which we want to check duplicates, we can return
1213        // early. It's not only an optimization to do so: it's required, otherwise the
1214        // `repeat_vars` call below will panic.
1215        if events.is_empty() {
1216            return Ok(Vec::new());
1217        }
1218
1219        // Select all events that exist in the store, i.e. the duplicates.
1220        let hashed_linked_chunk_id =
1221            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1222        let linked_chunk_id = linked_chunk_id.to_owned();
1223
1224        self.read()
1225            .await?
1226            .with_transaction(move |txn| -> Result<_> {
1227                txn.chunk_large_query_over(events, None, move |txn, events| {
1228                    let query = format!(
1229                        r#"
1230                            SELECT event_id, chunk_id, position
1231                            FROM event_chunks
1232                            WHERE linked_chunk_id = ? AND event_id IN ({})
1233                            ORDER BY chunk_id ASC, position ASC
1234                        "#,
1235                        repeat_vars(events.len()),
1236                    );
1237
1238                    let parameters = params_from_iter(
1239                        // parameter for `linked_chunk_id = ?`
1240                        once(
1241                            hashed_linked_chunk_id
1242                                .to_sql()
1243                                // SAFETY: it cannot fail since `Key::to_sql` never fails
1244                                .unwrap(),
1245                        )
1246                        // parameters for `event_id IN (…)`
1247                        .chain(events.iter().map(|event| {
1248                            event
1249                                .as_str()
1250                                .to_sql()
1251                                // SAFETY: it cannot fail since `str::to_sql` never fails
1252                                .unwrap()
1253                        })),
1254                    );
1255
1256                    let mut duplicated_events = Vec::new();
1257
1258                    for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1259                        Ok((
1260                            row.get::<_, String>(0)?,
1261                            row.get::<_, u64>(1)?,
1262                            row.get::<_, usize>(2)?,
1263                        ))
1264                    })? {
1265                        let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1266
1267                        let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1268                            // Normally unreachable, but the event ID has been stored even if it is
1269                            // malformed, let's skip it.
1270                            error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1271                            continue;
1272                        };
1273
1274                        duplicated_events.push((
1275                            duplicated_event,
1276                            Position::new(ChunkIdentifier::new(chunk_identifier), index),
1277                        ));
1278                    }
1279
1280                    Ok(duplicated_events)
1281                })
1282            })
1283            .await
1284    }
1285
1286    #[instrument(skip(self, event_id))]
1287    async fn find_event(
1288        &self,
1289        room_id: &RoomId,
1290        event_id: &EventId,
1291    ) -> Result<Option<Event>, Self::Error> {
1292        let _timer = timer!("method");
1293
1294        let event_id = event_id.to_owned();
1295        let this = self.clone();
1296
1297        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1298
1299        self.read()
1300            .await?
1301            .with_transaction(move |txn| -> Result<_> {
1302                let Some(event) = txn
1303                    .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1304                    .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1305                    .optional()?
1306                else {
1307                    // Event is not found.
1308                    return Ok(None);
1309                };
1310
1311                let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1312
1313                Ok(Some(event))
1314            })
1315            .await
1316    }
1317
1318    #[instrument(skip(self, event_id, filters))]
1319    async fn find_event_relations(
1320        &self,
1321        room_id: &RoomId,
1322        event_id: &EventId,
1323        filters: Option<&[RelationType]>,
1324    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1325        let _timer = timer!("method");
1326
1327        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1328
1329        let hashed_linked_chunk_id =
1330            self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1331
1332        let event_id = event_id.to_owned();
1333        let filters = filters.map(ToOwned::to_owned);
1334        let store = self.clone();
1335
1336        self.read()
1337            .await?
1338            .with_transaction(move |txn| -> Result<_> {
1339                find_event_relations_transaction(
1340                    store,
1341                    hashed_room_id,
1342                    hashed_linked_chunk_id,
1343                    event_id,
1344                    filters,
1345                    txn,
1346                )
1347            })
1348            .await
1349    }
1350
1351    #[instrument(skip(self))]
1352    async fn get_room_events(
1353        &self,
1354        room_id: &RoomId,
1355        event_type: Option<&str>,
1356        session_id: Option<&str>,
1357    ) -> Result<Vec<Event>, Self::Error> {
1358        let _timer = timer!("method");
1359
1360        let this = self.clone();
1361
1362        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1363        let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1364        let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1365
1366        self.read()
1367            .await?
1368            .with_transaction(move |txn| -> Result<_> {
1369                // I'm not sure why clippy claims that the clones aren't required. The compiler
1370                // tells us that the lifetimes aren't long enough if we remove them. Doesn't matter
1371                // much so let's silence things.
1372                #[allow(clippy::redundant_clone)]
1373                let (query, keys) = match (hashed_event_type, hashed_session_id) {
1374                    (None, None) => {
1375                        ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1376                    }
1377                    (None, Some(session_id)) => (
1378                        "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1379                        params![hashed_room_id, session_id.to_owned()],
1380                    ),
1381                    (Some(event_type), None) => (
1382                        "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1383                        params![hashed_room_id, event_type.to_owned()]
1384                    ),
1385                    (Some(event_type), Some(session_id)) => (
1386                        "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1387                        params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1388                    ),
1389                };
1390
1391                let mut statement = txn.prepare(query)?;
1392                let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1393
1394                let mut events = Vec::new();
1395                for ev in maybe_events {
1396                    let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1397                    events.push(event);
1398                }
1399
1400                Ok(events)
1401            })
1402            .await
1403    }
1404
1405    #[instrument(skip(self, event))]
1406    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1407        let _timer = timer!("method");
1408
1409        let Some(event_id) = event.event_id() else {
1410            error!("Trying to save an event with no ID");
1411            return Ok(());
1412        };
1413
1414        let Some(event_type) = event.kind.event_type() else {
1415            error!(%event_id, "Trying to save an event with no event type");
1416            return Ok(());
1417        };
1418
1419        let event_type = self.encode_key(keys::EVENTS, event_type);
1420        let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1421
1422        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1423        let event_id = event_id.to_string();
1424        let encoded_event = self.encode_event(&event)?;
1425
1426        self.write()
1427            .await?
1428            .with_transaction(move |txn| -> Result<_> {
1429                txn.execute(
1430                    "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1431                    (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1432
1433                Ok(())
1434            })
1435            .await
1436    }
1437
1438    async fn optimize(&self) -> Result<(), Self::Error> {
1439        Ok(self.vacuum().await?)
1440    }
1441
1442    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1443        self.get_db_size().await
1444    }
1445}
1446
1447fn find_event_relations_transaction(
1448    store: SqliteEventCacheStore,
1449    hashed_room_id: Key,
1450    hashed_linked_chunk_id: Key,
1451    event_id: OwnedEventId,
1452    filters: Option<Vec<RelationType>>,
1453    txn: &Transaction<'_>,
1454) -> Result<Vec<(Event, Option<Position>)>> {
1455    let get_rows = |row: &rusqlite::Row<'_>| {
1456        Ok((
1457            row.get::<_, Vec<u8>>(0)?,
1458            row.get::<_, Option<u64>>(1)?,
1459            row.get::<_, Option<usize>>(2)?,
1460        ))
1461    };
1462
1463    // Collect related events.
1464    let collect_results = |transaction| {
1465        let mut related = Vec::new();
1466
1467        for result in transaction {
1468            let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1469
1470            let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1471
1472            // Only build the position if both the chunk_id and position were present; in
1473            // theory, they should either be present at the same time, or not at all.
1474            let pos = chunk_id
1475                .zip(index)
1476                .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1477
1478            related.push((event, pos));
1479        }
1480
1481        Ok(related)
1482    };
1483
1484    let related = if let Some(filters) = filters {
1485        let question_marks = repeat_vars(filters.len());
1486        let query = format!(
1487            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1488            FROM events
1489            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1490            WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1491        );
1492
1493        // First the filters need to be stringified; because `.to_sql()` will borrow
1494        // from them, they also need to be stringified onto the stack, so as to
1495        // get a stable address (to avoid returning a temporary reference in the
1496        // map closure below).
1497        let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1498        let filters_params: Vec<_> = filter_strings
1499            .iter()
1500            .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1501            .collect();
1502
1503        let parameters = params_from_iter(
1504            [
1505                hashed_linked_chunk_id.to_sql().expect(
1506                    "We should be able to convert a hashed linked chunk ID to a SQLite value",
1507                ),
1508                event_id
1509                    .as_str()
1510                    .to_sql()
1511                    .expect("We should be able to convert an event ID to a SQLite value"),
1512                hashed_room_id
1513                    .to_sql()
1514                    .expect("We should be able to convert a room ID to a SQLite value"),
1515            ]
1516            .into_iter()
1517            .chain(filters_params),
1518        );
1519
1520        let mut transaction = txn.prepare(&query)?;
1521        let transaction = transaction.query_map(parameters, get_rows)?;
1522
1523        collect_results(transaction)
1524    } else {
1525        let query =
1526            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1527            FROM events
1528            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1529            WHERE relates_to = ? AND room_id = ?";
1530        let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1531
1532        let mut transaction = txn.prepare(query)?;
1533        let transaction = transaction.query_map(parameters, get_rows)?;
1534
1535        collect_results(transaction)
1536    };
1537
1538    related
1539}
1540
1541/// Like `deadpool::managed::Object::with_transaction`, but starts the
1542/// transaction in immediate (write) mode from the beginning, precluding errors
1543/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1544/// both reads and writes, and start with a write.
1545async fn with_immediate_transaction<
1546    T: Send + 'static,
1547    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1548>(
1549    this: &SqliteEventCacheStore,
1550    f: F,
1551) -> Result<T, Error> {
1552    this.write()
1553        .await?
1554        .interact(move |conn| -> Result<T, Error> {
1555            // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1556            // to avoid read transactions upgrading to write mode and causing
1557            // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1558            conn.set_transaction_behavior(TransactionBehavior::Immediate);
1559
1560            let code = || -> Result<T, Error> {
1561                let txn = conn.transaction()?;
1562                let res = f(&txn)?;
1563                txn.commit()?;
1564                Ok(res)
1565            };
1566
1567            let res = code();
1568
1569            // Reset the transaction behavior to use Deferred, after this transaction has
1570            // been run, whether it was successful or not.
1571            conn.set_transaction_behavior(TransactionBehavior::Deferred);
1572
1573            res
1574        })
1575        .await
1576        // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1577        .unwrap()
1578}
1579
1580fn insert_chunk(
1581    txn: &Transaction<'_>,
1582    linked_chunk_id: &Key,
1583    previous: Option<u64>,
1584    new: u64,
1585    next: Option<u64>,
1586    type_str: &str,
1587) -> rusqlite::Result<()> {
1588    // First, insert the new chunk.
1589    txn.execute(
1590        r#"
1591            INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1592            VALUES (?, ?, ?, ?, ?)
1593        "#,
1594        (new, linked_chunk_id, previous, next, type_str),
1595    )?;
1596
1597    // If this chunk has a previous one, update its `next` field.
1598    if let Some(previous) = previous {
1599        txn.execute(
1600            r#"
1601                UPDATE linked_chunks
1602                SET next = ?
1603                WHERE id = ? AND linked_chunk_id = ?
1604            "#,
1605            (new, previous, linked_chunk_id),
1606        )?;
1607    }
1608
1609    // If this chunk has a next one, update its `previous` field.
1610    if let Some(next) = next {
1611        txn.execute(
1612            r#"
1613                UPDATE linked_chunks
1614                SET previous = ?
1615                WHERE id = ? AND linked_chunk_id = ?
1616            "#,
1617            (new, next, linked_chunk_id),
1618        )?;
1619    }
1620
1621    Ok(())
1622}
1623
1624#[cfg(test)]
1625mod tests {
1626    use std::{
1627        path::PathBuf,
1628        sync::atomic::{AtomicU32, Ordering::SeqCst},
1629    };
1630
1631    use assert_matches::assert_matches;
1632    use matrix_sdk_base::{
1633        event_cache::{
1634            store::{
1635                integration_tests::{
1636                    check_test_event, make_test_event, make_test_event_with_event_id,
1637                },
1638                EventCacheStore, EventCacheStoreError,
1639            },
1640            Gap,
1641        },
1642        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1643        linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1644    };
1645    use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1646    use once_cell::sync::Lazy;
1647    use ruma::{event_id, room_id};
1648    use tempfile::{tempdir, TempDir};
1649
1650    use super::SqliteEventCacheStore;
1651    use crate::{
1652        event_cache_store::keys,
1653        utils::{EncryptableStore as _, SqliteAsyncConnExt},
1654        SqliteStoreConfig,
1655    };
1656
1657    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1658    static NUM: AtomicU32 = AtomicU32::new(0);
1659
1660    fn new_event_cache_store_workspace() -> PathBuf {
1661        let name = NUM.fetch_add(1, SeqCst).to_string();
1662        TMP_DIR.path().join(name)
1663    }
1664
1665    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1666        let tmpdir_path = new_event_cache_store_workspace();
1667
1668        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1669
1670        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1671    }
1672
1673    event_cache_store_integration_tests!();
1674    event_cache_store_integration_tests_time!();
1675
1676    #[async_test]
1677    async fn test_pool_size() {
1678        let tmpdir_path = new_event_cache_store_workspace();
1679        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1680
1681        let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1682
1683        assert_eq!(store.pool.status().max_size, 42);
1684    }
1685
1686    #[async_test]
1687    async fn test_linked_chunk_new_items_chunk() {
1688        let store = get_event_cache_store().await.expect("creating cache store failed");
1689
1690        let room_id = &DEFAULT_TEST_ROOM_ID;
1691        let linked_chunk_id = LinkedChunkId::Room(room_id);
1692
1693        store
1694            .handle_linked_chunk_updates(
1695                linked_chunk_id,
1696                vec![
1697                    Update::NewItemsChunk {
1698                        previous: None,
1699                        new: ChunkIdentifier::new(42),
1700                        next: None, // Note: the store must link the next entry itself.
1701                    },
1702                    Update::NewItemsChunk {
1703                        previous: Some(ChunkIdentifier::new(42)),
1704                        new: ChunkIdentifier::new(13),
1705                        next: Some(ChunkIdentifier::new(37)), /* But it's fine to explicitly pass
1706                                                               * the next link ahead of time. */
1707                    },
1708                    Update::NewItemsChunk {
1709                        previous: Some(ChunkIdentifier::new(13)),
1710                        new: ChunkIdentifier::new(37),
1711                        next: None,
1712                    },
1713                ],
1714            )
1715            .await
1716            .unwrap();
1717
1718        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1719
1720        assert_eq!(chunks.len(), 3);
1721
1722        {
1723            // Chunks are ordered from smaller to bigger IDs.
1724            let c = chunks.remove(0);
1725            assert_eq!(c.identifier, ChunkIdentifier::new(13));
1726            assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1727            assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1728            assert_matches!(c.content, ChunkContent::Items(events) => {
1729                assert!(events.is_empty());
1730            });
1731
1732            let c = chunks.remove(0);
1733            assert_eq!(c.identifier, ChunkIdentifier::new(37));
1734            assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1735            assert_eq!(c.next, None);
1736            assert_matches!(c.content, ChunkContent::Items(events) => {
1737                assert!(events.is_empty());
1738            });
1739
1740            let c = chunks.remove(0);
1741            assert_eq!(c.identifier, ChunkIdentifier::new(42));
1742            assert_eq!(c.previous, None);
1743            assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1744            assert_matches!(c.content, ChunkContent::Items(events) => {
1745                assert!(events.is_empty());
1746            });
1747        }
1748    }
1749
1750    #[async_test]
1751    async fn test_linked_chunk_new_gap_chunk() {
1752        let store = get_event_cache_store().await.expect("creating cache store failed");
1753
1754        let room_id = &DEFAULT_TEST_ROOM_ID;
1755        let linked_chunk_id = LinkedChunkId::Room(room_id);
1756
1757        store
1758            .handle_linked_chunk_updates(
1759                linked_chunk_id,
1760                vec![Update::NewGapChunk {
1761                    previous: None,
1762                    new: ChunkIdentifier::new(42),
1763                    next: None,
1764                    gap: Gap { prev_token: "raclette".to_owned() },
1765                }],
1766            )
1767            .await
1768            .unwrap();
1769
1770        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1771
1772        assert_eq!(chunks.len(), 1);
1773
1774        // Chunks are ordered from smaller to bigger IDs.
1775        let c = chunks.remove(0);
1776        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1777        assert_eq!(c.previous, None);
1778        assert_eq!(c.next, None);
1779        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1780            assert_eq!(gap.prev_token, "raclette");
1781        });
1782    }
1783
1784    #[async_test]
1785    async fn test_linked_chunk_replace_item() {
1786        let store = get_event_cache_store().await.expect("creating cache store failed");
1787
1788        let room_id = &DEFAULT_TEST_ROOM_ID;
1789        let linked_chunk_id = LinkedChunkId::Room(room_id);
1790        let event_id = event_id!("$world");
1791
1792        store
1793            .handle_linked_chunk_updates(
1794                linked_chunk_id,
1795                vec![
1796                    Update::NewItemsChunk {
1797                        previous: None,
1798                        new: ChunkIdentifier::new(42),
1799                        next: None,
1800                    },
1801                    Update::PushItems {
1802                        at: Position::new(ChunkIdentifier::new(42), 0),
1803                        items: vec![
1804                            make_test_event(room_id, "hello"),
1805                            make_test_event_with_event_id(room_id, "world", Some(event_id)),
1806                        ],
1807                    },
1808                    Update::ReplaceItem {
1809                        at: Position::new(ChunkIdentifier::new(42), 1),
1810                        item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1811                    },
1812                ],
1813            )
1814            .await
1815            .unwrap();
1816
1817        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1818
1819        assert_eq!(chunks.len(), 1);
1820
1821        let c = chunks.remove(0);
1822        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1823        assert_eq!(c.previous, None);
1824        assert_eq!(c.next, None);
1825        assert_matches!(c.content, ChunkContent::Items(events) => {
1826            assert_eq!(events.len(), 2);
1827            check_test_event(&events[0], "hello");
1828            check_test_event(&events[1], "yolo");
1829        });
1830    }
1831
1832    #[async_test]
1833    async fn test_linked_chunk_remove_chunk() {
1834        let store = get_event_cache_store().await.expect("creating cache store failed");
1835
1836        let room_id = &DEFAULT_TEST_ROOM_ID;
1837        let linked_chunk_id = LinkedChunkId::Room(room_id);
1838
1839        store
1840            .handle_linked_chunk_updates(
1841                linked_chunk_id,
1842                vec![
1843                    Update::NewGapChunk {
1844                        previous: None,
1845                        new: ChunkIdentifier::new(42),
1846                        next: None,
1847                        gap: Gap { prev_token: "raclette".to_owned() },
1848                    },
1849                    Update::NewGapChunk {
1850                        previous: Some(ChunkIdentifier::new(42)),
1851                        new: ChunkIdentifier::new(43),
1852                        next: None,
1853                        gap: Gap { prev_token: "fondue".to_owned() },
1854                    },
1855                    Update::NewGapChunk {
1856                        previous: Some(ChunkIdentifier::new(43)),
1857                        new: ChunkIdentifier::new(44),
1858                        next: None,
1859                        gap: Gap { prev_token: "tartiflette".to_owned() },
1860                    },
1861                    Update::RemoveChunk(ChunkIdentifier::new(43)),
1862                ],
1863            )
1864            .await
1865            .unwrap();
1866
1867        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1868
1869        assert_eq!(chunks.len(), 2);
1870
1871        // Chunks are ordered from smaller to bigger IDs.
1872        let c = chunks.remove(0);
1873        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1874        assert_eq!(c.previous, None);
1875        assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1876        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1877            assert_eq!(gap.prev_token, "raclette");
1878        });
1879
1880        let c = chunks.remove(0);
1881        assert_eq!(c.identifier, ChunkIdentifier::new(44));
1882        assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1883        assert_eq!(c.next, None);
1884        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1885            assert_eq!(gap.prev_token, "tartiflette");
1886        });
1887
1888        // Check that cascading worked. Yes, SQLite, I doubt you.
1889        let gaps = store
1890            .read()
1891            .await
1892            .unwrap()
1893            .with_transaction(|txn| -> rusqlite::Result<_> {
1894                let mut gaps = Vec::new();
1895                for data in txn
1896                    .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1897                    .query_map((), |row| row.get::<_, u64>(0))?
1898                {
1899                    gaps.push(data?);
1900                }
1901                Ok(gaps)
1902            })
1903            .await
1904            .unwrap();
1905
1906        assert_eq!(gaps, vec![42, 44]);
1907    }
1908
1909    #[async_test]
1910    async fn test_linked_chunk_push_items() {
1911        let store = get_event_cache_store().await.expect("creating cache store failed");
1912
1913        let room_id = &DEFAULT_TEST_ROOM_ID;
1914        let linked_chunk_id = LinkedChunkId::Room(room_id);
1915
1916        store
1917            .handle_linked_chunk_updates(
1918                linked_chunk_id,
1919                vec![
1920                    Update::NewItemsChunk {
1921                        previous: None,
1922                        new: ChunkIdentifier::new(42),
1923                        next: None,
1924                    },
1925                    Update::PushItems {
1926                        at: Position::new(ChunkIdentifier::new(42), 0),
1927                        items: vec![
1928                            make_test_event(room_id, "hello"),
1929                            make_test_event(room_id, "world"),
1930                        ],
1931                    },
1932                    Update::PushItems {
1933                        at: Position::new(ChunkIdentifier::new(42), 2),
1934                        items: vec![make_test_event(room_id, "who?")],
1935                    },
1936                ],
1937            )
1938            .await
1939            .unwrap();
1940
1941        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1942
1943        assert_eq!(chunks.len(), 1);
1944
1945        let c = chunks.remove(0);
1946        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1947        assert_eq!(c.previous, None);
1948        assert_eq!(c.next, None);
1949        assert_matches!(c.content, ChunkContent::Items(events) => {
1950            assert_eq!(events.len(), 3);
1951
1952            check_test_event(&events[0], "hello");
1953            check_test_event(&events[1], "world");
1954            check_test_event(&events[2], "who?");
1955        });
1956    }
1957
1958    #[async_test]
1959    async fn test_linked_chunk_remove_item() {
1960        let store = get_event_cache_store().await.expect("creating cache store failed");
1961
1962        let room_id = *DEFAULT_TEST_ROOM_ID;
1963        let linked_chunk_id = LinkedChunkId::Room(room_id);
1964
1965        store
1966            .handle_linked_chunk_updates(
1967                linked_chunk_id,
1968                vec![
1969                    Update::NewItemsChunk {
1970                        previous: None,
1971                        new: ChunkIdentifier::new(42),
1972                        next: None,
1973                    },
1974                    Update::PushItems {
1975                        at: Position::new(ChunkIdentifier::new(42), 0),
1976                        items: vec![
1977                            make_test_event(room_id, "one"),
1978                            make_test_event(room_id, "two"),
1979                            make_test_event(room_id, "three"),
1980                            make_test_event(room_id, "four"),
1981                            make_test_event(room_id, "five"),
1982                            make_test_event(room_id, "six"),
1983                        ],
1984                    },
1985                    Update::RemoveItem {
1986                        at: Position::new(ChunkIdentifier::new(42), 2), /* "three" */
1987                    },
1988                ],
1989            )
1990            .await
1991            .unwrap();
1992
1993        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1994
1995        assert_eq!(chunks.len(), 1);
1996
1997        let c = chunks.remove(0);
1998        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1999        assert_eq!(c.previous, None);
2000        assert_eq!(c.next, None);
2001        assert_matches!(c.content, ChunkContent::Items(events) => {
2002            assert_eq!(events.len(), 5);
2003            check_test_event(&events[0], "one");
2004            check_test_event(&events[1], "two");
2005            check_test_event(&events[2], "four");
2006            check_test_event(&events[3], "five");
2007            check_test_event(&events[4], "six");
2008        });
2009
2010        // Make sure the position have been updated for the remaining events.
2011        let num_rows: u64 = store
2012            .read()
2013            .await
2014            .unwrap()
2015            .with_transaction(move |txn| {
2016                txn.query_row(
2017                    "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
2018                    (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
2019                    |row| row.get(0),
2020                )
2021            })
2022            .await
2023            .unwrap();
2024        assert_eq!(num_rows, 3);
2025    }
2026
2027    #[async_test]
2028    async fn test_linked_chunk_detach_last_items() {
2029        let store = get_event_cache_store().await.expect("creating cache store failed");
2030
2031        let room_id = *DEFAULT_TEST_ROOM_ID;
2032        let linked_chunk_id = LinkedChunkId::Room(room_id);
2033
2034        store
2035            .handle_linked_chunk_updates(
2036                linked_chunk_id,
2037                vec![
2038                    Update::NewItemsChunk {
2039                        previous: None,
2040                        new: ChunkIdentifier::new(42),
2041                        next: None,
2042                    },
2043                    Update::PushItems {
2044                        at: Position::new(ChunkIdentifier::new(42), 0),
2045                        items: vec![
2046                            make_test_event(room_id, "hello"),
2047                            make_test_event(room_id, "world"),
2048                            make_test_event(room_id, "howdy"),
2049                        ],
2050                    },
2051                    Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2052                ],
2053            )
2054            .await
2055            .unwrap();
2056
2057        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2058
2059        assert_eq!(chunks.len(), 1);
2060
2061        let c = chunks.remove(0);
2062        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2063        assert_eq!(c.previous, None);
2064        assert_eq!(c.next, None);
2065        assert_matches!(c.content, ChunkContent::Items(events) => {
2066            assert_eq!(events.len(), 1);
2067            check_test_event(&events[0], "hello");
2068        });
2069    }
2070
2071    #[async_test]
2072    async fn test_linked_chunk_start_end_reattach_items() {
2073        let store = get_event_cache_store().await.expect("creating cache store failed");
2074
2075        let room_id = *DEFAULT_TEST_ROOM_ID;
2076        let linked_chunk_id = LinkedChunkId::Room(room_id);
2077
2078        // Same updates and checks as test_linked_chunk_push_items, but with extra
2079        // `StartReattachItems` and `EndReattachItems` updates, which must have no
2080        // effects.
2081        store
2082            .handle_linked_chunk_updates(
2083                linked_chunk_id,
2084                vec![
2085                    Update::NewItemsChunk {
2086                        previous: None,
2087                        new: ChunkIdentifier::new(42),
2088                        next: None,
2089                    },
2090                    Update::PushItems {
2091                        at: Position::new(ChunkIdentifier::new(42), 0),
2092                        items: vec![
2093                            make_test_event(room_id, "hello"),
2094                            make_test_event(room_id, "world"),
2095                            make_test_event(room_id, "howdy"),
2096                        ],
2097                    },
2098                    Update::StartReattachItems,
2099                    Update::EndReattachItems,
2100                ],
2101            )
2102            .await
2103            .unwrap();
2104
2105        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2106
2107        assert_eq!(chunks.len(), 1);
2108
2109        let c = chunks.remove(0);
2110        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2111        assert_eq!(c.previous, None);
2112        assert_eq!(c.next, None);
2113        assert_matches!(c.content, ChunkContent::Items(events) => {
2114            assert_eq!(events.len(), 3);
2115            check_test_event(&events[0], "hello");
2116            check_test_event(&events[1], "world");
2117            check_test_event(&events[2], "howdy");
2118        });
2119    }
2120
2121    #[async_test]
2122    async fn test_linked_chunk_clear() {
2123        let store = get_event_cache_store().await.expect("creating cache store failed");
2124
2125        let room_id = *DEFAULT_TEST_ROOM_ID;
2126        let linked_chunk_id = LinkedChunkId::Room(room_id);
2127        let event_0 = make_test_event(room_id, "hello");
2128        let event_1 = make_test_event(room_id, "world");
2129        let event_2 = make_test_event(room_id, "howdy");
2130
2131        store
2132            .handle_linked_chunk_updates(
2133                linked_chunk_id,
2134                vec![
2135                    Update::NewItemsChunk {
2136                        previous: None,
2137                        new: ChunkIdentifier::new(42),
2138                        next: None,
2139                    },
2140                    Update::NewGapChunk {
2141                        previous: Some(ChunkIdentifier::new(42)),
2142                        new: ChunkIdentifier::new(54),
2143                        next: None,
2144                        gap: Gap { prev_token: "fondue".to_owned() },
2145                    },
2146                    Update::PushItems {
2147                        at: Position::new(ChunkIdentifier::new(42), 0),
2148                        items: vec![event_0.clone(), event_1, event_2],
2149                    },
2150                    Update::Clear,
2151                ],
2152            )
2153            .await
2154            .unwrap();
2155
2156        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2157        assert!(chunks.is_empty());
2158
2159        // Check that cascading worked. Yes, SQLite, I doubt you.
2160        store
2161            .read()
2162            .await
2163            .unwrap()
2164            .with_transaction(|txn| -> rusqlite::Result<_> {
2165                let num_gaps = txn
2166                    .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2167                    .query_row((), |row| row.get::<_, u64>(0))?;
2168                assert_eq!(num_gaps, 0);
2169
2170                let num_events = txn
2171                    .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2172                    .query_row((), |row| row.get::<_, u64>(0))?;
2173                assert_eq!(num_events, 0);
2174
2175                Ok(())
2176            })
2177            .await
2178            .unwrap();
2179
2180        // It's okay to re-insert a past event.
2181        store
2182            .handle_linked_chunk_updates(
2183                linked_chunk_id,
2184                vec![
2185                    Update::NewItemsChunk {
2186                        previous: None,
2187                        new: ChunkIdentifier::new(42),
2188                        next: None,
2189                    },
2190                    Update::PushItems {
2191                        at: Position::new(ChunkIdentifier::new(42), 0),
2192                        items: vec![event_0],
2193                    },
2194                ],
2195            )
2196            .await
2197            .unwrap();
2198    }
2199
2200    #[async_test]
2201    async fn test_linked_chunk_multiple_rooms() {
2202        let store = get_event_cache_store().await.expect("creating cache store failed");
2203
2204        let room1 = room_id!("!realcheeselovers:raclette.fr");
2205        let linked_chunk_id1 = LinkedChunkId::Room(room1);
2206        let room2 = room_id!("!realcheeselovers:fondue.ch");
2207        let linked_chunk_id2 = LinkedChunkId::Room(room2);
2208
2209        // Check that applying updates to one room doesn't affect the others.
2210        // Use the same chunk identifier in both rooms to battle-test search.
2211
2212        store
2213            .handle_linked_chunk_updates(
2214                linked_chunk_id1,
2215                vec![
2216                    Update::NewItemsChunk {
2217                        previous: None,
2218                        new: ChunkIdentifier::new(42),
2219                        next: None,
2220                    },
2221                    Update::PushItems {
2222                        at: Position::new(ChunkIdentifier::new(42), 0),
2223                        items: vec![
2224                            make_test_event(room1, "best cheese is raclette"),
2225                            make_test_event(room1, "obviously"),
2226                        ],
2227                    },
2228                ],
2229            )
2230            .await
2231            .unwrap();
2232
2233        store
2234            .handle_linked_chunk_updates(
2235                linked_chunk_id2,
2236                vec![
2237                    Update::NewItemsChunk {
2238                        previous: None,
2239                        new: ChunkIdentifier::new(42),
2240                        next: None,
2241                    },
2242                    Update::PushItems {
2243                        at: Position::new(ChunkIdentifier::new(42), 0),
2244                        items: vec![make_test_event(room1, "beaufort is the best")],
2245                    },
2246                ],
2247            )
2248            .await
2249            .unwrap();
2250
2251        // Check chunks from room 1.
2252        let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2253        assert_eq!(chunks_room1.len(), 1);
2254
2255        let c = chunks_room1.remove(0);
2256        assert_matches!(c.content, ChunkContent::Items(events) => {
2257            assert_eq!(events.len(), 2);
2258            check_test_event(&events[0], "best cheese is raclette");
2259            check_test_event(&events[1], "obviously");
2260        });
2261
2262        // Check chunks from room 2.
2263        let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2264        assert_eq!(chunks_room2.len(), 1);
2265
2266        let c = chunks_room2.remove(0);
2267        assert_matches!(c.content, ChunkContent::Items(events) => {
2268            assert_eq!(events.len(), 1);
2269            check_test_event(&events[0], "beaufort is the best");
2270        });
2271    }
2272
2273    #[async_test]
2274    async fn test_linked_chunk_update_is_a_transaction() {
2275        let store = get_event_cache_store().await.expect("creating cache store failed");
2276
2277        let room_id = *DEFAULT_TEST_ROOM_ID;
2278        let linked_chunk_id = LinkedChunkId::Room(room_id);
2279
2280        // Trigger a violation of the unique constraint on the (room id, chunk id)
2281        // couple.
2282        let err = store
2283            .handle_linked_chunk_updates(
2284                linked_chunk_id,
2285                vec![
2286                    Update::NewItemsChunk {
2287                        previous: None,
2288                        new: ChunkIdentifier::new(42),
2289                        next: None,
2290                    },
2291                    Update::NewItemsChunk {
2292                        previous: None,
2293                        new: ChunkIdentifier::new(42),
2294                        next: None,
2295                    },
2296                ],
2297            )
2298            .await
2299            .unwrap_err();
2300
2301        // The operation fails with a constraint violation error.
2302        assert_matches!(err, crate::error::Error::Sqlite(err) => {
2303            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2304        });
2305
2306        // If the updates have been handled transactionally, then no new chunks should
2307        // have been added; failure of the second update leads to the first one being
2308        // rolled back.
2309        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2310        assert!(chunks.is_empty());
2311    }
2312
2313    #[async_test]
2314    async fn test_filter_duplicate_events_no_events() {
2315        let store = get_event_cache_store().await.expect("creating cache store failed");
2316
2317        let room_id = *DEFAULT_TEST_ROOM_ID;
2318        let linked_chunk_id = LinkedChunkId::Room(room_id);
2319        let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2320        assert!(duplicates.is_empty());
2321    }
2322
2323    #[async_test]
2324    async fn test_load_last_chunk() {
2325        let room_id = room_id!("!r0:matrix.org");
2326        let linked_chunk_id = LinkedChunkId::Room(room_id);
2327        let event = |msg: &str| make_test_event(room_id, msg);
2328        let store = get_event_cache_store().await.expect("creating cache store failed");
2329
2330        // Case #1: no last chunk.
2331        {
2332            let (last_chunk, chunk_identifier_generator) =
2333                store.load_last_chunk(linked_chunk_id).await.unwrap();
2334
2335            assert!(last_chunk.is_none());
2336            assert_eq!(chunk_identifier_generator.current(), 0);
2337        }
2338
2339        // Case #2: only one chunk is present.
2340        {
2341            store
2342                .handle_linked_chunk_updates(
2343                    linked_chunk_id,
2344                    vec![
2345                        Update::NewItemsChunk {
2346                            previous: None,
2347                            new: ChunkIdentifier::new(42),
2348                            next: None,
2349                        },
2350                        Update::PushItems {
2351                            at: Position::new(ChunkIdentifier::new(42), 0),
2352                            items: vec![event("saucisse de morteau"), event("comté")],
2353                        },
2354                    ],
2355                )
2356                .await
2357                .unwrap();
2358
2359            let (last_chunk, chunk_identifier_generator) =
2360                store.load_last_chunk(linked_chunk_id).await.unwrap();
2361
2362            assert_matches!(last_chunk, Some(last_chunk) => {
2363                assert_eq!(last_chunk.identifier, 42);
2364                assert!(last_chunk.previous.is_none());
2365                assert!(last_chunk.next.is_none());
2366                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2367                    assert_eq!(items.len(), 2);
2368                    check_test_event(&items[0], "saucisse de morteau");
2369                    check_test_event(&items[1], "comté");
2370                });
2371            });
2372            assert_eq!(chunk_identifier_generator.current(), 42);
2373        }
2374
2375        // Case #3: more chunks are present.
2376        {
2377            store
2378                .handle_linked_chunk_updates(
2379                    linked_chunk_id,
2380                    vec![
2381                        Update::NewItemsChunk {
2382                            previous: Some(ChunkIdentifier::new(42)),
2383                            new: ChunkIdentifier::new(7),
2384                            next: None,
2385                        },
2386                        Update::PushItems {
2387                            at: Position::new(ChunkIdentifier::new(7), 0),
2388                            items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2389                        },
2390                    ],
2391                )
2392                .await
2393                .unwrap();
2394
2395            let (last_chunk, chunk_identifier_generator) =
2396                store.load_last_chunk(linked_chunk_id).await.unwrap();
2397
2398            assert_matches!(last_chunk, Some(last_chunk) => {
2399                assert_eq!(last_chunk.identifier, 7);
2400                assert_matches!(last_chunk.previous, Some(previous) => {
2401                    assert_eq!(previous, 42);
2402                });
2403                assert!(last_chunk.next.is_none());
2404                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2405                    assert_eq!(items.len(), 3);
2406                    check_test_event(&items[0], "fondue");
2407                    check_test_event(&items[1], "gruyère");
2408                    check_test_event(&items[2], "mont d'or");
2409                });
2410            });
2411            assert_eq!(chunk_identifier_generator.current(), 42);
2412        }
2413    }
2414
2415    #[async_test]
2416    async fn test_load_last_chunk_with_a_cycle() {
2417        let room_id = room_id!("!r0:matrix.org");
2418        let linked_chunk_id = LinkedChunkId::Room(room_id);
2419        let store = get_event_cache_store().await.expect("creating cache store failed");
2420
2421        store
2422            .handle_linked_chunk_updates(
2423                linked_chunk_id,
2424                vec![
2425                    Update::NewItemsChunk {
2426                        previous: None,
2427                        new: ChunkIdentifier::new(0),
2428                        next: None,
2429                    },
2430                    Update::NewItemsChunk {
2431                        // Because `previous` connects to chunk #0, it will create a cycle.
2432                        // Chunk #0 will have a `next` set to chunk #1! Consequently, the last chunk
2433                        // **does not exist**. We have to detect this cycle.
2434                        previous: Some(ChunkIdentifier::new(0)),
2435                        new: ChunkIdentifier::new(1),
2436                        next: Some(ChunkIdentifier::new(0)),
2437                    },
2438                ],
2439            )
2440            .await
2441            .unwrap();
2442
2443        store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2444    }
2445
2446    #[async_test]
2447    async fn test_load_previous_chunk() {
2448        let room_id = room_id!("!r0:matrix.org");
2449        let linked_chunk_id = LinkedChunkId::Room(room_id);
2450        let event = |msg: &str| make_test_event(room_id, msg);
2451        let store = get_event_cache_store().await.expect("creating cache store failed");
2452
2453        // Case #1: no chunk at all, equivalent to having an nonexistent
2454        // `before_chunk_identifier`.
2455        {
2456            let previous_chunk = store
2457                .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2458                .await
2459                .unwrap();
2460
2461            assert!(previous_chunk.is_none());
2462        }
2463
2464        // Case #2: there is one chunk only: we request the previous on this
2465        // one, it doesn't exist.
2466        {
2467            store
2468                .handle_linked_chunk_updates(
2469                    linked_chunk_id,
2470                    vec![Update::NewItemsChunk {
2471                        previous: None,
2472                        new: ChunkIdentifier::new(42),
2473                        next: None,
2474                    }],
2475                )
2476                .await
2477                .unwrap();
2478
2479            let previous_chunk =
2480                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2481
2482            assert!(previous_chunk.is_none());
2483        }
2484
2485        // Case #3: there are two chunks.
2486        {
2487            store
2488                .handle_linked_chunk_updates(
2489                    linked_chunk_id,
2490                    vec![
2491                        // new chunk before the one that exists.
2492                        Update::NewItemsChunk {
2493                            previous: None,
2494                            new: ChunkIdentifier::new(7),
2495                            next: Some(ChunkIdentifier::new(42)),
2496                        },
2497                        Update::PushItems {
2498                            at: Position::new(ChunkIdentifier::new(7), 0),
2499                            items: vec![event("brigand du jorat"), event("morbier")],
2500                        },
2501                    ],
2502                )
2503                .await
2504                .unwrap();
2505
2506            let previous_chunk =
2507                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2508
2509            assert_matches!(previous_chunk, Some(previous_chunk) => {
2510                assert_eq!(previous_chunk.identifier, 7);
2511                assert!(previous_chunk.previous.is_none());
2512                assert_matches!(previous_chunk.next, Some(next) => {
2513                    assert_eq!(next, 42);
2514                });
2515                assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2516                    assert_eq!(items.len(), 2);
2517                    check_test_event(&items[0], "brigand du jorat");
2518                    check_test_event(&items[1], "morbier");
2519                });
2520            });
2521        }
2522    }
2523}
2524
2525#[cfg(test)]
2526mod encrypted_tests {
2527    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2528
2529    use matrix_sdk_base::{
2530        event_cache::store::{EventCacheStore, EventCacheStoreError},
2531        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2532    };
2533    use matrix_sdk_test::{async_test, event_factory::EventFactory};
2534    use once_cell::sync::Lazy;
2535    use ruma::{
2536        event_id,
2537        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2538        room_id, user_id,
2539    };
2540    use tempfile::{tempdir, TempDir};
2541
2542    use super::SqliteEventCacheStore;
2543
2544    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2545    static NUM: AtomicU32 = AtomicU32::new(0);
2546
2547    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2548        let name = NUM.fetch_add(1, SeqCst).to_string();
2549        let tmpdir_path = TMP_DIR.path().join(name);
2550
2551        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2552
2553        Ok(SqliteEventCacheStore::open(
2554            tmpdir_path.to_str().unwrap(),
2555            Some("default_test_password"),
2556        )
2557        .await
2558        .unwrap())
2559    }
2560
2561    event_cache_store_integration_tests!();
2562    event_cache_store_integration_tests_time!();
2563
2564    #[async_test]
2565    async fn test_no_sqlite_injection_in_find_event_relations() {
2566        let room_id = room_id!("!test:localhost");
2567        let another_room_id = room_id!("!r1:matrix.org");
2568        let sender = user_id!("@alice:localhost");
2569
2570        let store = get_event_cache_store()
2571            .await
2572            .expect("We should be able to create a new, empty, event cache store");
2573
2574        let f = EventFactory::new().room(room_id).sender(sender);
2575
2576        // Create an event for the first room.
2577        let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2578        let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2579
2580        // Create a related event.
2581        let edit_id = event_id!("$find_me:matrix.org");
2582        let edit = f
2583            .text_msg("Find me")
2584            .event_id(edit_id)
2585            .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2586            .into_event();
2587
2588        // Create an event for the second room.
2589        let f = f.room(another_room_id);
2590
2591        let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2592        let another_event =
2593            f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2594
2595        // Save the events in the DB.
2596        store.save_event(room_id, event).await.unwrap();
2597        store.save_event(room_id, edit).await.unwrap();
2598        store.save_event(another_room_id, another_event).await.unwrap();
2599
2600        // Craft a `RelationType` that will inject some SQL to be executed. The
2601        // `OR 1=1` ensures that all the previous parameters, the room
2602        // ID and event ID are ignored.
2603        let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2604
2605        // Attempt to find events in the first room.
2606        let results = store
2607            .find_event_relations(room_id, event_id, filter.as_deref())
2608            .await
2609            .expect("We should be able to attempt to find event relations");
2610
2611        // Ensure that we only got the single related event the first room contains.
2612        similar_asserts::assert_eq!(
2613            results.len(),
2614            1,
2615            "We should only have loaded events for the first room {results:#?}"
2616        );
2617
2618        // The event needs to be the edit event, otherwise something is wrong.
2619        let (found_event, _) = &results[0];
2620        assert_eq!(
2621            found_event.event_id().as_deref(),
2622            Some(edit_id),
2623            "The single event we found should be the edit event"
2624        );
2625    }
2626}