matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    str::FromStr as _,
7    sync::Arc,
8};
9
10use async_trait::async_trait;
11use matrix_sdk_base::{
12    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13    store::{
14        compare_thread_subscription_bump_stamps, migration_helpers::RoomInfoV1, ChildTransactionId,
15        DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, QueuedRequest,
16        QueuedRequestKind, RoomLoadSettings, SentRequestKey, StoredThreadSubscription,
17        ThreadSubscriptionStatus,
18    },
19    MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
20    StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
21};
22use matrix_sdk_store_encryption::StoreCipher;
23use ruma::{
24    canonical_json::{redact, RedactedBecause},
25    events::{
26        presence::PresenceEvent,
27        receipt::{Receipt, ReceiptThread, ReceiptType},
28        room::{
29            create::RoomCreateEventContent,
30            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
31        },
32        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
33        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
34    },
35    serde::Raw,
36    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
37    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
38};
39use rusqlite::{OptionalExtension, Transaction};
40use serde::{Deserialize, Serialize};
41use tokio::{
42    fs,
43    sync::{Mutex, OwnedMutexGuard},
44};
45use tracing::{debug, instrument, trace, warn};
46
47use crate::{
48    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49    error::{Error, Result},
50    utils::{
51        repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52        SqliteKeyValueStoreConnExt,
53    },
54    OpenStoreError, Secret, SqliteStoreConfig,
55};
56
57mod keys {
58    // Tables
59    pub const KV_BLOB: &str = "kv_blob";
60    pub const ROOM_INFO: &str = "room_info";
61    pub const STATE_EVENT: &str = "state_event";
62    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
63    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
64    pub const MEMBER: &str = "member";
65    pub const PROFILE: &str = "profile";
66    pub const RECEIPT: &str = "receipt";
67    pub const DISPLAY_NAME: &str = "display_name";
68    pub const SEND_QUEUE: &str = "send_queue_events";
69    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
70    pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
71}
72
73/// The filename used for the SQLITE database file used by the state store.
74pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
75
76/// Identifier of the latest database version.
77///
78/// This is used to figure whether the SQLite database requires a migration.
79/// Every new SQL migration should imply a bump of this number, and changes in
80/// the [`SqliteStateStore::run_migrations`] function.
81const DATABASE_VERSION: u8 = 14;
82
83/// An SQLite-based state store.
84#[derive(Clone)]
85pub struct SqliteStateStore {
86    store_cipher: Option<Arc<StoreCipher>>,
87
88    /// The pool of connections.
89    pool: SqlitePool,
90
91    /// We make the difference between connections for read operations, and for
92    /// write operations. We keep a single connection apart from write
93    /// operations. All other connections are used for read operations. The
94    /// lock is used to ensure there is one owner at a time.
95    write_connection: Arc<Mutex<SqliteAsyncConn>>,
96}
97
98#[cfg(not(tarpaulin_include))]
99impl fmt::Debug for SqliteStateStore {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
102    }
103}
104
105impl SqliteStateStore {
106    /// Open the SQLite-based state store at the given path using the given
107    /// given passphrase to encrypt private data.
108    pub async fn open(
109        path: impl AsRef<Path>,
110        passphrase: Option<&str>,
111    ) -> Result<Self, OpenStoreError> {
112        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
113    }
114
115    /// Open the SQLite-based state store at the given path using the given
116    /// key to encrypt private data.
117    pub async fn open_with_key(
118        path: impl AsRef<Path>,
119        key: Option<&[u8; 32]>,
120    ) -> Result<Self, OpenStoreError> {
121        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
122    }
123
124    /// Open the SQLite-based state store with the config open config.
125    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
126        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
127
128        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
129
130        let this = Self::open_with_pool(pool, config.secret).await?;
131        this.pool.get().await?.apply_runtime_config(config.runtime_config).await?;
132
133        Ok(this)
134    }
135
136    /// Create an SQLite-based state store using the given SQLite database pool.
137    /// The given secret will be used to encrypt private data.
138    pub async fn open_with_pool(
139        pool: SqlitePool,
140        secret: Option<Secret>,
141    ) -> Result<Self, OpenStoreError> {
142        let conn = pool.get().await?;
143
144        let mut version = conn.db_version().await?;
145
146        if version == 0 {
147            init(&conn).await?;
148            version = 1;
149        }
150
151        let store_cipher = match secret {
152            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
153            None => None,
154        };
155        let this = Self {
156            store_cipher,
157            pool,
158            // Use `conn` as our selected write connections.
159            write_connection: Arc::new(Mutex::new(conn)),
160        };
161        this.run_migrations(version, None).await?;
162
163        Ok(this)
164    }
165
166    /// Run database migrations from the given `from` version to the given `to`
167    /// version
168    ///
169    /// If `to` is `None`, the current database version will be used.
170    async fn run_migrations(&self, from: u8, to: Option<u8>) -> Result<()> {
171        let to = to.unwrap_or(DATABASE_VERSION);
172
173        if from < to {
174            debug!(version = from, new_version = to, "Upgrading database");
175        } else {
176            return Ok(());
177        }
178
179        let conn = self.write().await;
180
181        if from < 2 && to >= 2 {
182            let this = self.clone();
183            conn.with_transaction(move |txn| {
184                // Create new table.
185                txn.execute_batch(include_str!(
186                    "../migrations/state_store/002_a_create_new_room_info.sql"
187                ))?;
188
189                // Migrate data to new table.
190                for data in txn
191                    .prepare("SELECT data FROM room_info")?
192                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
193                {
194                    let data = data?;
195                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
196
197                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
198                    let state = this
199                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
200                    txn.prepare_cached(
201                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
202                         VALUES (?, ?, ?)",
203                    )?
204                    .execute((room_id, state, data))?;
205                }
206
207                // Replace old table.
208                txn.execute_batch(include_str!(
209                    "../migrations/state_store/002_b_replace_room_info.sql"
210                ))?;
211
212                txn.set_db_version(2)?;
213                Result::<_, Error>::Ok(())
214            })
215            .await?;
216        }
217
218        // Migration to v3: RoomInfo format has changed.
219        if from < 3 && to >= 3 {
220            let this = self.clone();
221            conn.with_transaction(move |txn| {
222                // Migrate data .
223                for data in txn
224                    .prepare("SELECT data FROM room_info")?
225                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
226                {
227                    let data = data?;
228                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
229
230                    // Get the `m.room.create` event from the room state.
231                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
232                    let event_type =
233                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
234                    let create_res = txn
235                        .prepare(
236                            "SELECT stripped, data FROM state_event
237                             WHERE room_id = ? AND event_type = ?",
238                        )?
239                        .query_row([room_id, event_type], |row| {
240                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
241                        })
242                        .optional()?;
243
244                    let create = create_res.and_then(|(stripped, data)| {
245                        let create = if stripped {
246                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
247                                this.deserialize_json(&data).ok()?,
248                            )
249                        } else {
250                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
251                        };
252                        Some(create)
253                    });
254
255                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
256
257                    let data = this.serialize_json(&migrated_room_info)?;
258                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
259                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
260                        .execute((data, room_id))?;
261                }
262
263                txn.set_db_version(3)?;
264                Result::<_, Error>::Ok(())
265            })
266            .await?;
267        }
268
269        if from < 4 && to >= 4 {
270            conn.with_transaction(move |txn| {
271                // Create new table.
272                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
273                txn.set_db_version(4)
274            })
275            .await?;
276        }
277
278        if from < 5 && to >= 5 {
279            conn.with_transaction(move |txn| {
280                // Create new table.
281                txn.execute_batch(include_str!(
282                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
283                ))?;
284                txn.set_db_version(4)
285            })
286            .await?;
287        }
288
289        if from < 6 && to >= 6 {
290            conn.with_transaction(move |txn| {
291                // Create new table.
292                txn.execute_batch(include_str!(
293                    "../migrations/state_store/005_send_queue_dependent_events.sql"
294                ))?;
295                txn.set_db_version(6)
296            })
297            .await?;
298        }
299
300        if from < 7 && to >= 7 {
301            conn.with_transaction(move |txn| {
302                // Drop media table.
303                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
304                txn.set_db_version(7)
305            })
306            .await?;
307        }
308
309        if from < 8 && to >= 8 {
310            // Replace all existing wedged events with a generic error.
311            let error = QueueWedgeError::GenericApiError {
312                msg: "local echo failed to send in a previous session".into(),
313            };
314            let default_err = self.serialize_value(&error)?;
315
316            conn.with_transaction(move |txn| {
317                // Update send queue table to persist the wedge reason if any.
318                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
319
320                // Migrate the data, add a generic error for currently wedged events
321
322                for wedged_entries in txn
323                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
324                    .query_map((), |row| {
325                        Ok(
326                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
327                        )
328                    })? {
329
330                    let (room_id, transaction_id) = wedged_entries?;
331
332                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
333                        .execute((default_err.clone(), room_id, transaction_id))?;
334                }
335
336
337                // Clean up the table now that data is migrated
338                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
339
340                txn.set_db_version(8)
341            })
342                .await?;
343        }
344
345        if from < 9 && to >= 9 {
346            conn.with_transaction(move |txn| {
347                // Run the migration.
348                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
349                txn.set_db_version(9)
350            })
351            .await?;
352        }
353
354        if from < 10 && to >= 10 {
355            conn.with_transaction(move |txn| {
356                // Run the migration.
357                txn.execute_batch(include_str!(
358                    "../migrations/state_store/009_send_queue_priority.sql"
359                ))?;
360                txn.set_db_version(10)
361            })
362            .await?;
363        }
364
365        if from < 11 && to >= 11 {
366            conn.with_transaction(move |txn| {
367                // Run the migration.
368                txn.execute_batch(include_str!(
369                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
370                ))?;
371                txn.set_db_version(11)
372            })
373            .await?;
374        }
375
376        if from < 12 && to >= 12 {
377            // Defragment the DB and optimize its size on the filesystem.
378            // This should have been run in the migration for version 7, to reduce the size
379            // of the DB as we removed the media cache.
380            conn.vacuum().await?;
381            conn.set_kv("version", vec![12]).await?;
382        }
383
384        if from < 13 && to >= 13 {
385            conn.with_transaction(move |txn| {
386                // Run the migration.
387                txn.execute_batch(include_str!(
388                    "../migrations/state_store/011_thread_subscriptions.sql"
389                ))?;
390                txn.set_db_version(13)
391            })
392            .await?;
393        }
394
395        if from < 14 && to >= 14 {
396            conn.with_transaction(move |txn| {
397                // Run the migration.
398                txn.execute_batch(include_str!(
399                    "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
400                ))?;
401                txn.set_db_version(14)
402            })
403            .await?;
404        }
405
406        if from < 15 && to >= 15 {
407            conn.with_transaction(move |txn| {
408                // Run the migration.
409                txn.execute_batch(include_str!(
410                    "../migrations/state_store/013_send_queue_new_parent_key_format.sql"
411                ))?;
412                txn.set_db_version(15)
413            })
414            .await?;
415        }
416
417        Ok(())
418    }
419
420    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
421        let key_s = match key {
422            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
423            StateStoreDataKey::SupportedVersions => {
424                Cow::Borrowed(StateStoreDataKey::SUPPORTED_VERSIONS)
425            }
426            StateStoreDataKey::WellKnown => Cow::Borrowed(StateStoreDataKey::WELL_KNOWN),
427            StateStoreDataKey::Filter(f) => {
428                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
429            }
430            StateStoreDataKey::UserAvatarUrl(u) => {
431                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
432            }
433            StateStoreDataKey::RecentlyVisitedRooms(b) => {
434                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
435            }
436            StateStoreDataKey::UtdHookManagerData => {
437                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
438            }
439            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
440                Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
441            }
442            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
443                if let Some(thread_root) = thread_root {
444                    Cow::Owned(format!(
445                        "{}:{room_id}:{thread_root}",
446                        StateStoreDataKey::COMPOSER_DRAFT
447                    ))
448                } else {
449                    Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
450                }
451            }
452            StateStoreDataKey::SeenKnockRequests(room_id) => {
453                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
454            }
455            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
456                Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
457            }
458        };
459
460        self.encode_key(keys::KV_BLOB, &*key_s)
461    }
462
463    fn encode_presence_key(&self, user_id: &UserId) -> Key {
464        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
465    }
466
467    fn encode_custom_key(&self, key: &[u8]) -> Key {
468        let mut full_key = b"custom:".to_vec();
469        full_key.extend(key);
470        self.encode_key(keys::KV_BLOB, full_key)
471    }
472
473    /// Acquire a connection for executing read operations.
474    #[instrument(skip_all)]
475    async fn read(&self) -> Result<SqliteAsyncConn> {
476        Ok(self.pool.get().await?)
477    }
478
479    /// Acquire a connection for executing write operations.
480    #[instrument(skip_all)]
481    async fn write(&self) -> OwnedMutexGuard<SqliteAsyncConn> {
482        self.write_connection.clone().lock_owned().await
483    }
484
485    fn remove_maybe_stripped_room_data(
486        &self,
487        txn: &Transaction<'_>,
488        room_id: &RoomId,
489        stripped: bool,
490    ) -> rusqlite::Result<()> {
491        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
492        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
493
494        let member_room_id = self.encode_key(keys::MEMBER, room_id);
495        txn.remove_room_members(&member_room_id, Some(stripped))
496    }
497
498    pub async fn vacuum(&self) -> Result<()> {
499        self.write_connection.lock().await.vacuum().await
500    }
501
502    pub async fn get_db_size(&self) -> Result<Option<usize>> {
503        let read_conn = self.pool.get().await?;
504        Ok(Some(read_conn.get_db_size().await?))
505    }
506}
507
508impl EncryptableStore for SqliteStateStore {
509    fn get_cypher(&self) -> Option<&StoreCipher> {
510        self.store_cipher.as_deref()
511    }
512}
513
514/// Initialize the database.
515async fn init(conn: &SqliteAsyncConn) -> Result<()> {
516    // First turn on WAL mode, this can't be done in the transaction, it fails with
517    // the error message: "cannot change into wal mode from within a transaction".
518    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
519    conn.with_transaction(|txn| {
520        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
521        txn.set_db_version(1)?;
522
523        Ok(())
524    })
525    .await
526}
527
528trait SqliteConnectionStateStoreExt {
529    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
530
531    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
532
533    fn set_room_account_data(
534        &self,
535        room_id: &[u8],
536        event_type: &[u8],
537        data: &[u8],
538    ) -> rusqlite::Result<()>;
539    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
540
541    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
542    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
543    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
544
545    fn set_state_event(
546        &self,
547        room_id: &[u8],
548        event_type: &[u8],
549        state_key: &[u8],
550        stripped: bool,
551        event_id: Option<&[u8]>,
552        data: &[u8],
553    ) -> rusqlite::Result<()>;
554    fn get_state_event_by_id(
555        &self,
556        room_id: &[u8],
557        event_id: &[u8],
558    ) -> rusqlite::Result<Option<Vec<u8>>>;
559    fn remove_room_state_events(
560        &self,
561        room_id: &[u8],
562        stripped: Option<bool>,
563    ) -> rusqlite::Result<()>;
564
565    fn set_member(
566        &self,
567        room_id: &[u8],
568        user_id: &[u8],
569        membership: &[u8],
570        stripped: bool,
571        data: &[u8],
572    ) -> rusqlite::Result<()>;
573    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
574
575    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
576    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
577    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
578
579    fn set_receipt(
580        &self,
581        room_id: &[u8],
582        user_id: &[u8],
583        receipt_type: &[u8],
584        thread_id: &[u8],
585        event_id: &[u8],
586        data: &[u8],
587    ) -> rusqlite::Result<()>;
588    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
589
590    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
591    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
592    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
593    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
594    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
595}
596
597impl SqliteConnectionStateStoreExt for rusqlite::Connection {
598    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
599        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
600        Ok(())
601    }
602
603    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
604        self.prepare_cached(
605            "INSERT OR REPLACE INTO global_account_data (event_type, data)
606             VALUES (?, ?)",
607        )?
608        .execute((event_type, data))?;
609        Ok(())
610    }
611
612    fn set_room_account_data(
613        &self,
614        room_id: &[u8],
615        event_type: &[u8],
616        data: &[u8],
617    ) -> rusqlite::Result<()> {
618        self.prepare_cached(
619            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
620             VALUES (?, ?, ?)",
621        )?
622        .execute((room_id, event_type, data))?;
623        Ok(())
624    }
625
626    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
627        self.prepare(
628            "DELETE FROM room_account_data
629             WHERE room_id = ?",
630        )?
631        .execute((room_id,))?;
632        Ok(())
633    }
634
635    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
636        self.prepare_cached(
637            "INSERT OR REPLACE INTO room_info (room_id, state, data)
638             VALUES (?, ?, ?)",
639        )?
640        .execute((room_id, state, data))?;
641        Ok(())
642    }
643
644    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
645        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
646            .optional()
647    }
648
649    /// Remove the room info for the given room.
650    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
651        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
652        Ok(())
653    }
654
655    fn set_state_event(
656        &self,
657        room_id: &[u8],
658        event_type: &[u8],
659        state_key: &[u8],
660        stripped: bool,
661        event_id: Option<&[u8]>,
662        data: &[u8],
663    ) -> rusqlite::Result<()> {
664        self.prepare_cached(
665            "INSERT OR REPLACE
666             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
667             VALUES (?, ?, ?, ?, ?, ?)",
668        )?
669        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
670        Ok(())
671    }
672
673    fn get_state_event_by_id(
674        &self,
675        room_id: &[u8],
676        event_id: &[u8],
677    ) -> rusqlite::Result<Option<Vec<u8>>> {
678        self.query_row(
679            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
680            (room_id, event_id),
681            |row| row.get(0),
682        )
683        .optional()
684    }
685
686    /// Remove state events for the given room.
687    ///
688    /// If `stripped` is `Some()`, only removes state events for the given
689    /// stripped state. Otherwise, state events are removed regardless of the
690    /// stripped state.
691    fn remove_room_state_events(
692        &self,
693        room_id: &[u8],
694        stripped: Option<bool>,
695    ) -> rusqlite::Result<()> {
696        if let Some(stripped) = stripped {
697            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
698                .execute((room_id, stripped))?;
699        } else {
700            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
701                .execute((room_id,))?;
702        }
703        Ok(())
704    }
705
706    fn set_member(
707        &self,
708        room_id: &[u8],
709        user_id: &[u8],
710        membership: &[u8],
711        stripped: bool,
712        data: &[u8],
713    ) -> rusqlite::Result<()> {
714        self.prepare_cached(
715            "INSERT OR REPLACE
716             INTO member (room_id, user_id, membership, stripped, data)
717             VALUES (?, ?, ?, ?, ?)",
718        )?
719        .execute((room_id, user_id, membership, stripped, data))?;
720        Ok(())
721    }
722
723    /// Remove members for the given room.
724    ///
725    /// If `stripped` is `Some()`, only removes members for the given stripped
726    /// state. Otherwise, members are removed regardless of the stripped state.
727    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
728        if let Some(stripped) = stripped {
729            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
730                .execute((room_id, stripped))?;
731        } else {
732            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
733        }
734        Ok(())
735    }
736
737    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
738        self.prepare_cached(
739            "INSERT OR REPLACE
740             INTO profile (room_id, user_id, data)
741             VALUES (?, ?, ?)",
742        )?
743        .execute((room_id, user_id, data))?;
744        Ok(())
745    }
746
747    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
748        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
749        Ok(())
750    }
751
752    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
753        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
754            .execute((room_id, user_id))?;
755        Ok(())
756    }
757
758    fn set_receipt(
759        &self,
760        room_id: &[u8],
761        user_id: &[u8],
762        receipt_type: &[u8],
763        thread: &[u8],
764        event_id: &[u8],
765        data: &[u8],
766    ) -> rusqlite::Result<()> {
767        self.prepare_cached(
768            "INSERT OR REPLACE
769             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
770             VALUES (?, ?, ?, ?, ?, ?)",
771        )?
772        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
773        Ok(())
774    }
775
776    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
777        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
778        Ok(())
779    }
780
781    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
782        self.prepare_cached(
783            "INSERT OR REPLACE
784             INTO display_name (room_id, name, data)
785             VALUES (?, ?, ?)",
786        )?
787        .execute((room_id, name, data))?;
788        Ok(())
789    }
790
791    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
792        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
793            .execute((room_id, name))?;
794        Ok(())
795    }
796
797    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
798        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
799        Ok(())
800    }
801
802    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
803        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
804        Ok(())
805    }
806
807    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
808        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
809            .execute((room_id,))?;
810        Ok(())
811    }
812}
813
814#[async_trait]
815trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
816    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
817        Ok(self
818            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
819            .await
820            .optional()?)
821    }
822
823    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
824        let keys_length = keys.len();
825
826        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
827            let sql_params = repeat_vars(keys.len());
828            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
829
830            let params = rusqlite::params_from_iter(keys);
831
832            Ok(txn
833                .prepare(&sql)?
834                .query(params)?
835                .mapped(|row| row.get(0))
836                .collect::<Result<_, _>>()?)
837        })
838        .await
839    }
840
841    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
842
843    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
844        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
845        Ok(())
846    }
847
848    async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
849        Ok(match room_id {
850            None => {
851                self.prepare("SELECT data FROM room_info", move |mut stmt| {
852                    stmt.query_map((), |row| row.get(0))?.collect()
853                })
854                .await?
855            }
856
857            Some(room_id) => {
858                self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
859                    stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
860                })
861                .await?
862            }
863        })
864    }
865
866    async fn get_maybe_stripped_state_events_for_keys(
867        &self,
868        room_id: Key,
869        event_type: Key,
870        state_keys: Vec<Key>,
871    ) -> Result<Vec<(bool, Vec<u8>)>> {
872        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
873            let sql_params = repeat_vars(state_keys.len());
874            let sql = format!(
875                "SELECT stripped, data FROM state_event
876                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
877            );
878
879            let params = rusqlite::params_from_iter(
880                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
881            );
882
883            Ok(txn
884                .prepare(&sql)?
885                .query(params)?
886                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
887                .collect::<Result<_, _>>()?)
888        })
889        .await
890    }
891
892    async fn get_maybe_stripped_state_events(
893        &self,
894        room_id: Key,
895        event_type: Key,
896    ) -> Result<Vec<(bool, Vec<u8>)>> {
897        Ok(self
898            .prepare(
899                "SELECT stripped, data FROM state_event
900                 WHERE room_id = ? AND event_type = ?",
901                |mut stmt| {
902                    stmt.query((room_id, event_type))?
903                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
904                        .collect()
905                },
906            )
907            .await?)
908    }
909
910    async fn get_profiles(
911        &self,
912        room_id: Key,
913        user_ids: Vec<Key>,
914    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
915        let user_ids_length = user_ids.len();
916
917        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
918            let sql_params = repeat_vars(user_ids.len());
919            let sql = format!(
920                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
921            );
922
923            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
924
925            Ok(txn
926                .prepare(&sql)?
927                .query(params)?
928                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
929                .collect::<Result<_, _>>()?)
930        })
931        .await
932    }
933
934    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
935        let res = if memberships.is_empty() {
936            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
937                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
938            })
939            .await?
940        } else {
941            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
942                let sql_params = repeat_vars(memberships.len());
943                let sql = format!(
944                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
945                );
946
947                let params =
948                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
949
950                Ok(txn
951                    .prepare(&sql)?
952                    .query(params)?
953                    .mapped(|row| row.get(0))
954                    .collect::<Result<_, _>>()?)
955            })
956            .await?
957        };
958
959        Ok(res)
960    }
961
962    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
963        Ok(self
964            .query_row(
965                "SELECT data FROM global_account_data WHERE event_type = ?",
966                (event_type,),
967                |row| row.get(0),
968            )
969            .await
970            .optional()?)
971    }
972
973    async fn get_room_account_data(
974        &self,
975        room_id: Key,
976        event_type: Key,
977    ) -> Result<Option<Vec<u8>>> {
978        Ok(self
979            .query_row(
980                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
981                (room_id, event_type),
982                |row| row.get(0),
983            )
984            .await
985            .optional()?)
986    }
987
988    async fn get_display_names(
989        &self,
990        room_id: Key,
991        names: Vec<Key>,
992    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
993        let names_length = names.len();
994
995        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
996            let sql_params = repeat_vars(names.len());
997            let sql = format!(
998                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
999            );
1000
1001            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
1002
1003            Ok(txn
1004                .prepare(&sql)?
1005                .query(params)?
1006                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
1007                .collect::<Result<_, _>>()?)
1008        })
1009        .await
1010    }
1011
1012    async fn get_user_receipt(
1013        &self,
1014        room_id: Key,
1015        receipt_type: Key,
1016        thread: Key,
1017        user_id: Key,
1018    ) -> Result<Option<Vec<u8>>> {
1019        Ok(self
1020            .query_row(
1021                "SELECT data FROM receipt
1022                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
1023                (room_id, receipt_type, thread, user_id),
1024                |row| row.get(0),
1025            )
1026            .await
1027            .optional()?)
1028    }
1029
1030    async fn get_event_receipts(
1031        &self,
1032        room_id: Key,
1033        receipt_type: Key,
1034        thread: Key,
1035        event_id: Key,
1036    ) -> Result<Vec<Vec<u8>>> {
1037        Ok(self
1038            .prepare(
1039                "SELECT data FROM receipt
1040                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
1041                |mut stmt| {
1042                    stmt.query((room_id, receipt_type, thread, event_id))?
1043                        .mapped(|row| row.get(0))
1044                        .collect()
1045                },
1046            )
1047            .await?)
1048    }
1049}
1050
1051#[async_trait]
1052impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1053    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1054        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1055    }
1056}
1057
1058#[async_trait]
1059impl StateStore for SqliteStateStore {
1060    type Error = Error;
1061
1062    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1063        self.read()
1064            .await?
1065            .get_kv_blob(self.encode_state_store_data_key(key))
1066            .await?
1067            .map(|data| {
1068                Ok(match key {
1069                    StateStoreDataKey::SyncToken => {
1070                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1071                    }
1072                    StateStoreDataKey::SupportedVersions => {
1073                        StateStoreDataValue::SupportedVersions(self.deserialize_value(&data)?)
1074                    }
1075                    StateStoreDataKey::WellKnown => {
1076                        StateStoreDataValue::WellKnown(self.deserialize_value(&data)?)
1077                    }
1078                    StateStoreDataKey::Filter(_) => {
1079                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1080                    }
1081                    StateStoreDataKey::UserAvatarUrl(_) => {
1082                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1083                    }
1084                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
1085                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1086                    }
1087                    StateStoreDataKey::UtdHookManagerData => {
1088                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1089                    }
1090                    StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1091                        StateStoreDataValue::OneTimeKeyAlreadyUploaded
1092                    }
1093                    StateStoreDataKey::ComposerDraft(_, _) => {
1094                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1095                    }
1096                    StateStoreDataKey::SeenKnockRequests(_) => {
1097                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1098                    }
1099                    StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1100                        StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1101                            self.deserialize_value(&data)?,
1102                        )
1103                    }
1104                })
1105            })
1106            .transpose()
1107    }
1108
1109    async fn set_kv_data(
1110        &self,
1111        key: StateStoreDataKey<'_>,
1112        value: StateStoreDataValue,
1113    ) -> Result<()> {
1114        let serialized_value = match key {
1115            StateStoreDataKey::SyncToken => self.serialize_value(
1116                &value.into_sync_token().expect("Session data not a sync token"),
1117            )?,
1118            StateStoreDataKey::SupportedVersions => self.serialize_value(
1119                &value
1120                    .into_supported_versions()
1121                    .expect("Session data not containing supported versions"),
1122            )?,
1123            StateStoreDataKey::WellKnown => self.serialize_value(
1124                &value.into_well_known().expect("Session data not containing well-known"),
1125            )?,
1126            StateStoreDataKey::Filter(_) => {
1127                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1128            }
1129            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1130                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1131            )?,
1132            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1133                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1134            )?,
1135            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1136                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1137            )?,
1138            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1139                self.serialize_value(&true).expect("We should be able to serialize a boolean")
1140            }
1141            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1142                &value.into_composer_draft().expect("Session data not a composer draft"),
1143            )?,
1144            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1145                &value
1146                    .into_seen_knock_requests()
1147                    .expect("Session data is not a set of seen knock request ids"),
1148            )?,
1149            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1150                &value
1151                    .into_thread_subscriptions_catchup_tokens()
1152                    .expect("Session data is not a list of thread subscription catchup tokens"),
1153            )?,
1154        };
1155
1156        self.write()
1157            .await
1158            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1159            .await
1160    }
1161
1162    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1163        self.write().await.delete_kv_blob(self.encode_state_store_data_key(key)).await
1164    }
1165
1166    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1167        let changes = changes.to_owned();
1168        let this = self.clone();
1169        self.write()
1170            .await
1171            .with_transaction(move |txn| {
1172                let StateChanges {
1173                    sync_token,
1174                    account_data,
1175                    presence,
1176                    profiles,
1177                    profiles_to_delete,
1178                    state,
1179                    room_account_data,
1180                    room_infos,
1181                    receipts,
1182                    redactions,
1183                    stripped_state,
1184                    ambiguity_maps,
1185                } = changes;
1186
1187                if let Some(sync_token) = sync_token {
1188                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1189                    let value = this.serialize_value(&sync_token)?;
1190                    txn.set_kv_blob(&key, &value)?;
1191                }
1192
1193                for (event_type, event) in account_data {
1194                    let event_type =
1195                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1196                    let data = this.serialize_json(&event)?;
1197                    txn.set_global_account_data(&event_type, &data)?;
1198                }
1199
1200                for (room_id, events) in room_account_data {
1201                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1202                    for (event_type, event) in events {
1203                        let event_type =
1204                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1205                        let data = this.serialize_json(&event)?;
1206                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1207                    }
1208                }
1209
1210                for (user_id, event) in presence {
1211                    let key = this.encode_presence_key(&user_id);
1212                    let value = this.serialize_json(&event)?;
1213                    txn.set_kv_blob(&key, &value)?;
1214                }
1215
1216                for (room_id, room_info) in room_infos {
1217                    let stripped = room_info.state() == RoomState::Invited;
1218                    // Remove non-stripped data for stripped rooms and vice-versa.
1219                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1220
1221                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1222                    let state = this
1223                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1224                    let data = this.serialize_json(&room_info)?;
1225                    txn.set_room_info(&room_id, &state, &data)?;
1226                }
1227
1228                for (room_id, user_ids) in profiles_to_delete {
1229                    let room_id = this.encode_key(keys::PROFILE, room_id);
1230                    for user_id in user_ids {
1231                        let user_id = this.encode_key(keys::PROFILE, user_id);
1232                        txn.remove_room_profile(&room_id, &user_id)?;
1233                    }
1234                }
1235
1236                for (room_id, state_event_types) in state {
1237                    let profiles = profiles.get(&room_id);
1238                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1239
1240                    for (event_type, state_events) in state_event_types {
1241                        let encoded_event_type =
1242                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1243
1244                        for (state_key, raw_state_event) in state_events {
1245                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1246                            let data = this.serialize_json(&raw_state_event)?;
1247
1248                            let event_id: Option<String> =
1249                                raw_state_event.get_field("event_id").ok().flatten();
1250                            let encoded_event_id =
1251                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1252
1253                            txn.set_state_event(
1254                                &encoded_room_id,
1255                                &encoded_event_type,
1256                                &encoded_state_key,
1257                                false,
1258                                encoded_event_id.as_deref(),
1259                                &data,
1260                            )?;
1261
1262                            if event_type == StateEventType::RoomMember {
1263                                let member_event = match raw_state_event
1264                                    .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1265                                {
1266                                    Ok(ev) => ev,
1267                                    Err(e) => {
1268                                        debug!(event_id, "Failed to deserialize member event: {e}");
1269                                        continue;
1270                                    }
1271                                };
1272
1273                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1274                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1275                                let membership = this
1276                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1277                                let data = this.serialize_value(&state_key)?;
1278
1279                                txn.set_member(
1280                                    &encoded_room_id,
1281                                    &user_id,
1282                                    &membership,
1283                                    false,
1284                                    &data,
1285                                )?;
1286
1287                                if let Some(profile) =
1288                                    profiles.and_then(|p| p.get(member_event.state_key()))
1289                                {
1290                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1291                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1292                                    let data = this.serialize_json(&profile)?;
1293                                    txn.set_profile(&room_id, &user_id, &data)?;
1294                                }
1295                            }
1296                        }
1297                    }
1298                }
1299
1300                for (room_id, stripped_state_event_types) in stripped_state {
1301                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1302
1303                    for (event_type, stripped_state_events) in stripped_state_event_types {
1304                        let encoded_event_type =
1305                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1306
1307                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1308                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1309                            let data = this.serialize_json(&raw_stripped_state_event)?;
1310                            txn.set_state_event(
1311                                &encoded_room_id,
1312                                &encoded_event_type,
1313                                &encoded_state_key,
1314                                true,
1315                                None,
1316                                &data,
1317                            )?;
1318
1319                            if event_type == StateEventType::RoomMember {
1320                                let member_event = match raw_stripped_state_event
1321                                    .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1322                                ) {
1323                                    Ok(ev) => ev,
1324                                    Err(e) => {
1325                                        debug!("Failed to deserialize stripped member event: {e}");
1326                                        continue;
1327                                    }
1328                                };
1329
1330                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1331                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1332                                let membership = this.encode_key(
1333                                    keys::MEMBER,
1334                                    member_event.content.membership.as_str(),
1335                                );
1336                                let data = this.serialize_value(&state_key)?;
1337
1338                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1339                            }
1340                        }
1341                    }
1342                }
1343
1344                for (room_id, receipt_event) in receipts {
1345                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1346
1347                    for (event_id, receipt_types) in receipt_event {
1348                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1349
1350                        for (receipt_type, receipt_users) in receipt_types {
1351                            let receipt_type =
1352                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1353
1354                            for (user_id, receipt) in receipt_users {
1355                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1356                                // We cannot have a NULL primary key so we rely on serialization
1357                                // instead of the string representation.
1358                                let thread = this.encode_key(
1359                                    keys::RECEIPT,
1360                                    rmp_serde::to_vec_named(&receipt.thread)?,
1361                                );
1362                                let data = this.serialize_json(&ReceiptData {
1363                                    receipt,
1364                                    event_id: event_id.clone(),
1365                                    user_id,
1366                                })?;
1367
1368                                txn.set_receipt(
1369                                    &room_id,
1370                                    &encoded_user_id,
1371                                    &receipt_type,
1372                                    &thread,
1373                                    &encoded_event_id,
1374                                    &data,
1375                                )?;
1376                            }
1377                        }
1378                    }
1379                }
1380
1381                for (room_id, redactions) in redactions {
1382                    let make_redaction_rules = || {
1383                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1384                        txn.get_room_info(&encoded_room_id)
1385                            .ok()
1386                            .flatten()
1387                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1388                            .map(|info| info.room_version_rules_or_default())
1389                            .unwrap_or_else(|| {
1390                                warn!(
1391                                    ?room_id,
1392                                    "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1393                                );
1394                                ROOM_VERSION_RULES_FALLBACK
1395                            }).redaction
1396                    };
1397
1398                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1399                    let mut redaction_rules = None;
1400
1401                    for (event_id, redaction) in redactions {
1402                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1403
1404                        if let Some(Ok(raw_event)) = txn
1405                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1406                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1407                        {
1408                            let event = raw_event.deserialize()?;
1409                            let redacted = redact(
1410                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1411                                redaction_rules.get_or_insert_with(make_redaction_rules),
1412                                Some(RedactedBecause::from_raw_event(&redaction)?),
1413                            )
1414                            .map_err(Error::Redaction)?;
1415                            let data = this.serialize_json(&redacted)?;
1416
1417                            let event_type =
1418                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1419                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1420
1421                            txn.set_state_event(
1422                                &encoded_room_id,
1423                                &event_type,
1424                                &state_key,
1425                                false,
1426                                Some(&event_id),
1427                                &data,
1428                            )?;
1429                        }
1430                    }
1431                }
1432
1433                for (room_id, display_names) in ambiguity_maps {
1434                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1435
1436                    for (name, user_ids) in display_names {
1437                        let encoded_name = this.encode_key(
1438                            keys::DISPLAY_NAME,
1439                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1440                        );
1441                        let data = this.serialize_json(&user_ids)?;
1442
1443                        if user_ids.is_empty() {
1444                            txn.remove_display_name(&room_id, &encoded_name)?;
1445
1446                            // We can't do a migration to merge the previously distinct buckets of
1447                            // user IDs since the display names themselves are hashed before they
1448                            // are persisted in the store. So the store will always retain two
1449                            // buckets: one for raw display names and one for normalised ones.
1450                            //
1451                            // We therefore do the next best thing, which is a sort of a soft
1452                            // migration: we fetch both the raw and normalised buckets, then merge
1453                            // the user IDs contained in them into a separate, temporary merged
1454                            // bucket. The SDK then operates on the merged buckets exclusively. See
1455                            // the comment in `get_users_with_display_names` for details.
1456                            //
1457                            // If the merged bucket is empty, that must mean that both the raw and
1458                            // normalised buckets were also empty, so we can remove both from the
1459                            // store.
1460                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1461                            txn.remove_display_name(&room_id, &raw_name)?;
1462                        } else {
1463                            // We only create new buckets with the normalized display name.
1464                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1465                        }
1466                    }
1467                }
1468
1469                Ok::<_, Error>(())
1470            })
1471            .await?;
1472
1473        Ok(())
1474    }
1475
1476    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1477        self.read()
1478            .await?
1479            .get_kv_blob(self.encode_presence_key(user_id))
1480            .await?
1481            .map(|data| self.deserialize_json(&data))
1482            .transpose()
1483    }
1484
1485    async fn get_presence_events(
1486        &self,
1487        user_ids: &[OwnedUserId],
1488    ) -> Result<Vec<Raw<PresenceEvent>>> {
1489        if user_ids.is_empty() {
1490            return Ok(Vec::new());
1491        }
1492
1493        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1494        self.read()
1495            .await?
1496            .get_kv_blobs(user_ids)
1497            .await?
1498            .into_iter()
1499            .map(|data| self.deserialize_json(&data))
1500            .collect()
1501    }
1502
1503    async fn get_state_event(
1504        &self,
1505        room_id: &RoomId,
1506        event_type: StateEventType,
1507        state_key: &str,
1508    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1509        Ok(self
1510            .get_state_events_for_keys(room_id, event_type, &[state_key])
1511            .await?
1512            .into_iter()
1513            .next())
1514    }
1515
1516    async fn get_state_events(
1517        &self,
1518        room_id: &RoomId,
1519        event_type: StateEventType,
1520    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1521        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1522        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1523        self.read()
1524            .await?
1525            .get_maybe_stripped_state_events(room_id, event_type)
1526            .await?
1527            .into_iter()
1528            .map(|(stripped, data)| {
1529                let ev = if stripped {
1530                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1531                } else {
1532                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1533                };
1534
1535                Ok(ev)
1536            })
1537            .collect()
1538    }
1539
1540    async fn get_state_events_for_keys(
1541        &self,
1542        room_id: &RoomId,
1543        event_type: StateEventType,
1544        state_keys: &[&str],
1545    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1546        if state_keys.is_empty() {
1547            return Ok(Vec::new());
1548        }
1549
1550        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1551        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1552        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1553        self.read()
1554            .await?
1555            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1556            .await?
1557            .into_iter()
1558            .map(|(stripped, data)| {
1559                let ev = if stripped {
1560                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1561                } else {
1562                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1563                };
1564
1565                Ok(ev)
1566            })
1567            .collect()
1568    }
1569
1570    async fn get_profile(
1571        &self,
1572        room_id: &RoomId,
1573        user_id: &UserId,
1574    ) -> Result<Option<MinimalRoomMemberEvent>> {
1575        let room_id = self.encode_key(keys::PROFILE, room_id);
1576        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1577
1578        self.read()
1579            .await?
1580            .get_profiles(room_id, user_ids)
1581            .await?
1582            .into_iter()
1583            .next()
1584            .map(|(_, data)| self.deserialize_json(&data))
1585            .transpose()
1586    }
1587
1588    async fn get_profiles<'a>(
1589        &self,
1590        room_id: &RoomId,
1591        user_ids: &'a [OwnedUserId],
1592    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1593        if user_ids.is_empty() {
1594            return Ok(BTreeMap::new());
1595        }
1596
1597        let room_id = self.encode_key(keys::PROFILE, room_id);
1598        let mut user_ids_map = user_ids
1599            .iter()
1600            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1601            .collect::<BTreeMap<_, _>>();
1602        let user_ids = user_ids_map.keys().cloned().collect();
1603
1604        self.read()
1605            .await?
1606            .get_profiles(room_id, user_ids)
1607            .await?
1608            .into_iter()
1609            .map(|(user_id, data)| {
1610                Ok((
1611                    user_ids_map
1612                        .remove(user_id.as_slice())
1613                        .expect("returned user IDs were requested"),
1614                    self.deserialize_json(&data)?,
1615                ))
1616            })
1617            .collect()
1618    }
1619
1620    async fn get_user_ids(
1621        &self,
1622        room_id: &RoomId,
1623        membership: RoomMemberships,
1624    ) -> Result<Vec<OwnedUserId>> {
1625        let room_id = self.encode_key(keys::MEMBER, room_id);
1626        let memberships = membership
1627            .as_vec()
1628            .into_iter()
1629            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1630            .collect();
1631        self.read()
1632            .await?
1633            .get_user_ids(room_id, memberships)
1634            .await?
1635            .iter()
1636            .map(|data| self.deserialize_value(data))
1637            .collect()
1638    }
1639
1640    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1641        self.read()
1642            .await?
1643            .get_room_infos(match room_load_settings {
1644                RoomLoadSettings::All => None,
1645                RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1646            })
1647            .await?
1648            .into_iter()
1649            .map(|data| self.deserialize_json(&data))
1650            .collect()
1651    }
1652
1653    async fn get_users_with_display_name(
1654        &self,
1655        room_id: &RoomId,
1656        display_name: &DisplayName,
1657    ) -> Result<BTreeSet<OwnedUserId>> {
1658        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1659        let names = vec![self.encode_key(
1660            keys::DISPLAY_NAME,
1661            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1662        )];
1663
1664        Ok(self
1665            .read()
1666            .await?
1667            .get_display_names(room_id, names)
1668            .await?
1669            .into_iter()
1670            .next()
1671            .map(|(_, data)| self.deserialize_json(&data))
1672            .transpose()?
1673            .unwrap_or_default())
1674    }
1675
1676    async fn get_users_with_display_names<'a>(
1677        &self,
1678        room_id: &RoomId,
1679        display_names: &'a [DisplayName],
1680    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1681        let mut result = HashMap::new();
1682
1683        if display_names.is_empty() {
1684            return Ok(result);
1685        }
1686
1687        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1688        let mut names_map = display_names
1689            .iter()
1690            .flat_map(|display_name| {
1691                // We encode the display name as the `raw_str()` and the normalized string.
1692                //
1693                // This is for compatibility reasons since:
1694                //  1. Previously "Alice" and "alice" were considered to be distinct display
1695                //     names, while we now consider them to be the same so we need to merge the
1696                //     previously distinct buckets of user IDs.
1697                //  2. We can't do a migration to merge the previously distinct buckets of user
1698                //     IDs since the display names itself are hashed before they are persisted
1699                //     in the store.
1700                let raw =
1701                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1702                let normalized = display_name.as_normalized_str().map(|normalized| {
1703                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1704                });
1705
1706                iter::once(raw).chain(normalized)
1707            })
1708            .collect::<BTreeMap<_, _>>();
1709        let names = names_map.keys().cloned().collect();
1710
1711        for (name, data) in self.read().await?.get_display_names(room_id, names).await?.into_iter()
1712        {
1713            let display_name =
1714                names_map.remove(name.as_slice()).expect("returned display names were requested");
1715            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1716
1717            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1718        }
1719
1720        Ok(result)
1721    }
1722
1723    async fn get_account_data_event(
1724        &self,
1725        event_type: GlobalAccountDataEventType,
1726    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1727        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1728        self.read()
1729            .await?
1730            .get_global_account_data(event_type)
1731            .await?
1732            .map(|value| self.deserialize_json(&value))
1733            .transpose()
1734    }
1735
1736    async fn get_room_account_data_event(
1737        &self,
1738        room_id: &RoomId,
1739        event_type: RoomAccountDataEventType,
1740    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1741        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1742        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1743        self.read()
1744            .await?
1745            .get_room_account_data(room_id, event_type)
1746            .await?
1747            .map(|value| self.deserialize_json(&value))
1748            .transpose()
1749    }
1750
1751    async fn get_user_room_receipt_event(
1752        &self,
1753        room_id: &RoomId,
1754        receipt_type: ReceiptType,
1755        thread: ReceiptThread,
1756        user_id: &UserId,
1757    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1758        let room_id = self.encode_key(keys::RECEIPT, room_id);
1759        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1760        // We cannot have a NULL primary key so we rely on serialization instead of the
1761        // string representation.
1762        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1763        let user_id = self.encode_key(keys::RECEIPT, user_id);
1764
1765        self.read()
1766            .await?
1767            .get_user_receipt(room_id, receipt_type, thread, user_id)
1768            .await?
1769            .map(|value| {
1770                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1771            })
1772            .transpose()
1773    }
1774
1775    async fn get_event_room_receipt_events(
1776        &self,
1777        room_id: &RoomId,
1778        receipt_type: ReceiptType,
1779        thread: ReceiptThread,
1780        event_id: &EventId,
1781    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1782        let room_id = self.encode_key(keys::RECEIPT, room_id);
1783        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1784        // We cannot have a NULL primary key so we rely on serialization instead of the
1785        // string representation.
1786        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1787        let event_id = self.encode_key(keys::RECEIPT, event_id);
1788
1789        self.read()
1790            .await?
1791            .get_event_receipts(room_id, receipt_type, thread, event_id)
1792            .await?
1793            .iter()
1794            .map(|value| {
1795                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1796            })
1797            .collect()
1798    }
1799
1800    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1801        self.read().await?.get_kv_blob(self.encode_custom_key(key)).await
1802    }
1803
1804    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1805        let conn = self.write().await;
1806        let key = self.encode_custom_key(key);
1807        conn.set_kv_blob(key, value).await?;
1808        Ok(())
1809    }
1810
1811    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1812        let conn = self.write().await;
1813        let key = self.encode_custom_key(key);
1814        let previous = conn.get_kv_blob(key.clone()).await?;
1815        conn.set_kv_blob(key, value).await?;
1816        Ok(previous)
1817    }
1818
1819    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1820        let conn = self.write().await;
1821        let key = self.encode_custom_key(key);
1822        let previous = conn.get_kv_blob(key.clone()).await?;
1823        if previous.is_some() {
1824            conn.delete_kv_blob(key).await?;
1825        }
1826        Ok(previous)
1827    }
1828
1829    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1830        let this = self.clone();
1831        let room_id = room_id.to_owned();
1832
1833        let conn = self.write().await;
1834
1835        conn.with_transaction(move |txn| -> Result<()> {
1836            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1837            txn.remove_room_info(&room_info_room_id)?;
1838
1839            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1840            txn.remove_room_state_events(&state_event_room_id, None)?;
1841
1842            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1843            txn.remove_room_members(&member_room_id, None)?;
1844
1845            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1846            txn.remove_room_profiles(&profile_room_id)?;
1847
1848            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1849            txn.remove_room_account_data(&room_account_data_room_id)?;
1850
1851            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1852            txn.remove_room_receipts(&receipt_room_id)?;
1853
1854            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1855            txn.remove_room_display_names(&display_name_room_id)?;
1856
1857            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1858            txn.remove_room_send_queue(&send_queue_room_id)?;
1859
1860            let dependent_send_queue_room_id =
1861                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1862            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1863
1864            let thread_subscriptions_room_id =
1865                this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1866            txn.execute(
1867                "DELETE FROM thread_subscriptions WHERE room_id = ?",
1868                (thread_subscriptions_room_id,),
1869            )?;
1870
1871            Ok(())
1872        })
1873        .await?;
1874
1875        conn.vacuum().await
1876    }
1877
1878    async fn save_send_queue_request(
1879        &self,
1880        room_id: &RoomId,
1881        transaction_id: OwnedTransactionId,
1882        created_at: MilliSecondsSinceUnixEpoch,
1883        content: QueuedRequestKind,
1884        priority: usize,
1885    ) -> Result<(), Self::Error> {
1886        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1887        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1888
1889        let content = self.serialize_json(&content)?;
1890        // The transaction id is used both as a key (in remove/update) and a value (as
1891        // it's useful for the callers), so we keep it as is, and neither hash
1892        // it (with encode_key) or encrypt it (through serialize_value). After
1893        // all, it carries no personal information, so this is considered fine.
1894
1895        let created_at_ts: u64 = created_at.0.into();
1896        self.write()
1897            .await
1898            .with_transaction(move |txn| {
1899                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1900                Ok(())
1901            })
1902            .await
1903    }
1904
1905    async fn update_send_queue_request(
1906        &self,
1907        room_id: &RoomId,
1908        transaction_id: &TransactionId,
1909        content: QueuedRequestKind,
1910    ) -> Result<bool, Self::Error> {
1911        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1912
1913        let content = self.serialize_json(&content)?;
1914        // See comment in [`Self::save_send_queue_request`] to understand why the
1915        // transaction id is neither encrypted or hashed.
1916        let transaction_id = transaction_id.to_string();
1917
1918        let num_updated = self.write()
1919            .await
1920            .with_transaction(move |txn| {
1921                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1922            })
1923            .await?;
1924
1925        Ok(num_updated > 0)
1926    }
1927
1928    async fn remove_send_queue_request(
1929        &self,
1930        room_id: &RoomId,
1931        transaction_id: &TransactionId,
1932    ) -> Result<bool, Self::Error> {
1933        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1934
1935        // See comment in `save_send_queue_request`.
1936        let transaction_id = transaction_id.to_string();
1937
1938        let num_deleted = self
1939            .write()
1940            .await
1941            .with_transaction(move |txn| {
1942                txn.prepare_cached(
1943                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1944                )?
1945                .execute((room_id, &transaction_id))
1946            })
1947            .await?;
1948
1949        Ok(num_deleted > 0)
1950    }
1951
1952    async fn load_send_queue_requests(
1953        &self,
1954        room_id: &RoomId,
1955    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1956        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1957
1958        // Note: ROWID is always present and is an auto-incremented integer counter. We
1959        // want to maintain the insertion order, so we can sort using it.
1960        // Note 2: transaction_id is not encoded, see why in `save_send_queue_request`.
1961        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1962            .read()
1963            .await?
1964            .prepare(
1965                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1966                |mut stmt| {
1967                    stmt.query((room_id,))?
1968                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1969                        .collect()
1970                },
1971            )
1972            .await?;
1973
1974        let mut requests = Vec::with_capacity(res.len());
1975
1976        for entry in res {
1977            let created_at = entry
1978                .4
1979                .and_then(UInt::new)
1980                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1981
1982            requests.push(QueuedRequest {
1983                transaction_id: entry.0.into(),
1984                kind: self.deserialize_json(&entry.1)?,
1985                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1986                priority: entry.3,
1987                created_at,
1988            });
1989        }
1990
1991        Ok(requests)
1992    }
1993
1994    async fn update_send_queue_request_status(
1995        &self,
1996        room_id: &RoomId,
1997        transaction_id: &TransactionId,
1998        error: Option<QueueWedgeError>,
1999    ) -> Result<(), Self::Error> {
2000        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2001
2002        // See comment in `save_send_queue_request`.
2003        let transaction_id = transaction_id.to_string();
2004
2005        // Serialize the error to json bytes (encrypted if option is enabled) if set.
2006        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
2007
2008        self.write()
2009            .await
2010            .with_transaction(move |txn| {
2011                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
2012                Ok(())
2013            })
2014            .await
2015    }
2016
2017    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
2018        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
2019        // have to manually do the deduplication: indeed, for all X, encrypt(X)
2020        // != encrypted(X), since we use a nonce in the encryption process.
2021
2022        let res: Vec<Vec<u8>> = self
2023            .read()
2024            .await?
2025            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
2026                stmt.query(())?.mapped(|row| row.get(0)).collect()
2027            })
2028            .await?;
2029
2030        // So we collect the results into a `BTreeSet` to perform the deduplication, and
2031        // then rejigger that into a vector.
2032        Ok(res
2033            .into_iter()
2034            .map(|entry| self.deserialize_value(&entry))
2035            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
2036            .into_iter()
2037            .collect())
2038    }
2039
2040    async fn save_dependent_queued_request(
2041        &self,
2042        room_id: &RoomId,
2043        parent_txn_id: &TransactionId,
2044        own_txn_id: ChildTransactionId,
2045        created_at: MilliSecondsSinceUnixEpoch,
2046        content: DependentQueuedRequestKind,
2047    ) -> Result<()> {
2048        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2049        let content = self.serialize_json(&content)?;
2050
2051        // See comment in `save_send_queue_request`.
2052        let parent_txn_id = parent_txn_id.to_string();
2053        let own_txn_id = own_txn_id.to_string();
2054
2055        let created_at_ts: u64 = created_at.0.into();
2056        self.write()
2057            .await
2058            .with_transaction(move |txn| {
2059                txn.prepare_cached(
2060                    r#"INSERT INTO dependent_send_queue_events
2061                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2062                       VALUES (?, ?, ?, ?, ?)"#,
2063                )?
2064                .execute((
2065                    room_id,
2066                    parent_txn_id,
2067                    own_txn_id,
2068                    content,
2069                    created_at_ts,
2070                ))?;
2071                Ok(())
2072            })
2073            .await
2074    }
2075
2076    async fn update_dependent_queued_request(
2077        &self,
2078        room_id: &RoomId,
2079        own_transaction_id: &ChildTransactionId,
2080        new_content: DependentQueuedRequestKind,
2081    ) -> Result<bool> {
2082        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2083        let content = self.serialize_json(&new_content)?;
2084
2085        // See comment in `save_send_queue_request`.
2086        let own_txn_id = own_transaction_id.to_string();
2087
2088        let num_updated = self
2089            .write()
2090            .await
2091            .with_transaction(move |txn| {
2092                txn.prepare_cached(
2093                    r#"UPDATE dependent_send_queue_events
2094                       SET content = ?
2095                       WHERE own_transaction_id = ?
2096                       AND room_id = ?"#,
2097                )?
2098                .execute((content, own_txn_id, room_id))
2099            })
2100            .await?;
2101
2102        if num_updated > 1 {
2103            return Err(Error::InconsistentUpdate);
2104        }
2105
2106        Ok(num_updated == 1)
2107    }
2108
2109    async fn mark_dependent_queued_requests_as_ready(
2110        &self,
2111        room_id: &RoomId,
2112        parent_txn_id: &TransactionId,
2113        parent_key: SentRequestKey,
2114    ) -> Result<usize> {
2115        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2116        let parent_key = self.serialize_json(&parent_key)?;
2117
2118        // See comment in `save_send_queue_request`.
2119        let parent_txn_id = parent_txn_id.to_string();
2120
2121        self.write()
2122            .await
2123            .with_transaction(move |txn| {
2124                Ok(txn.prepare_cached(
2125                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2126                )?
2127                .execute((parent_key, parent_txn_id, room_id))?)
2128            })
2129            .await
2130    }
2131
2132    async fn remove_dependent_queued_request(
2133        &self,
2134        room_id: &RoomId,
2135        txn_id: &ChildTransactionId,
2136    ) -> Result<bool> {
2137        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2138
2139        // See comment in `save_send_queue_request`.
2140        let txn_id = txn_id.to_string();
2141
2142        let num_deleted = self
2143            .write()
2144            .await
2145            .with_transaction(move |txn| {
2146                txn.prepare_cached(
2147                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2148                )?
2149                .execute((txn_id, room_id))
2150            })
2151            .await?;
2152
2153        Ok(num_deleted > 0)
2154    }
2155
2156    async fn load_dependent_queued_requests(
2157        &self,
2158        room_id: &RoomId,
2159    ) -> Result<Vec<DependentQueuedRequest>> {
2160        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2161
2162        // Note: transaction_id is not encoded, see why in `save_send_queue_request`.
2163        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2164            .read()
2165            .await?
2166            .prepare(
2167                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2168                |mut stmt| {
2169                    stmt.query((room_id,))?
2170                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2171                        .collect()
2172                },
2173            )
2174            .await?;
2175
2176        let mut dependent_events = Vec::with_capacity(res.len());
2177
2178        for entry in res {
2179            let created_at = entry
2180                .4
2181                .and_then(UInt::new)
2182                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2183
2184            dependent_events.push(DependentQueuedRequest {
2185                own_transaction_id: entry.0.into(),
2186                parent_transaction_id: entry.1.into(),
2187                parent_key: entry.2.map(|json| self.deserialize_json(&json)).transpose()?,
2188                kind: self.deserialize_json(&entry.3)?,
2189                created_at,
2190            });
2191        }
2192
2193        Ok(dependent_events)
2194    }
2195
2196    async fn upsert_thread_subscription(
2197        &self,
2198        room_id: &RoomId,
2199        thread_id: &EventId,
2200        mut new: StoredThreadSubscription,
2201    ) -> Result<(), Self::Error> {
2202        if let Some(previous) = self.load_thread_subscription(room_id, thread_id).await? {
2203            if previous == new {
2204                // No need to update anything.
2205                trace!("not saving thread subscription because the subscription is the same");
2206                return Ok(());
2207            }
2208
2209            if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
2210                trace!("not saving thread subscription because we have a newer bump stamp");
2211                return Ok(());
2212            }
2213        }
2214
2215        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2216        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2217        let status = new.status.as_str();
2218
2219        self.write()
2220            .await
2221            .with_transaction(move |txn| {
2222                // Try to find a previous value.
2223                txn.prepare_cached(
2224                    "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
2225                         VALUES (?, ?, ?, ?)",
2226                )?
2227                .execute((room_id, thread_id, status, new.bump_stamp))
2228            })
2229            .await?;
2230        Ok(())
2231    }
2232
2233    async fn load_thread_subscription(
2234        &self,
2235        room_id: &RoomId,
2236        thread_id: &EventId,
2237    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2238        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2239        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2240
2241        Ok(self
2242            .read()
2243            .await?
2244            .query_row(
2245                "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2246                (room_id, thread_id),
2247                |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2248            )
2249            .await
2250            .optional()?
2251            .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2252                let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2253                    Error::InvalidData { details: format!("Invalid thread status: {status}") }
2254                })?;
2255                Ok(StoredThreadSubscription { status, bump_stamp })
2256            })
2257            .transpose()?)
2258    }
2259
2260    async fn remove_thread_subscription(
2261        &self,
2262        room_id: &RoomId,
2263        thread_id: &EventId,
2264    ) -> Result<(), Self::Error> {
2265        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2266        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2267
2268        self.write()
2269            .await
2270            .execute(
2271                "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2272                (room_id, thread_id),
2273            )
2274            .await?;
2275
2276        Ok(())
2277    }
2278
2279    async fn optimize(&self) -> Result<(), Self::Error> {
2280        Ok(self.vacuum().await?)
2281    }
2282
2283    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
2284        self.get_db_size().await
2285    }
2286}
2287
2288#[derive(Debug, Clone, Serialize, Deserialize)]
2289struct ReceiptData {
2290    receipt: Receipt,
2291    event_id: OwnedEventId,
2292    user_id: OwnedUserId,
2293}
2294
2295#[cfg(test)]
2296mod tests {
2297    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2298
2299    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2300    use once_cell::sync::Lazy;
2301    use tempfile::{tempdir, TempDir};
2302
2303    use super::SqliteStateStore;
2304
2305    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2306    static NUM: AtomicU32 = AtomicU32::new(0);
2307
2308    async fn get_store() -> Result<impl StateStore, StoreError> {
2309        let name = NUM.fetch_add(1, SeqCst).to_string();
2310        let tmpdir_path = TMP_DIR.path().join(name);
2311
2312        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2313
2314        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2315    }
2316
2317    statestore_integration_tests!();
2318}
2319
2320#[cfg(test)]
2321mod encrypted_tests {
2322    use std::{
2323        path::PathBuf,
2324        sync::atomic::{AtomicU32, Ordering::SeqCst},
2325    };
2326
2327    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2328    use matrix_sdk_test::async_test;
2329    use once_cell::sync::Lazy;
2330    use tempfile::{tempdir, TempDir};
2331
2332    use super::SqliteStateStore;
2333    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2334
2335    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2336    static NUM: AtomicU32 = AtomicU32::new(0);
2337
2338    fn new_state_store_workspace() -> PathBuf {
2339        let name = NUM.fetch_add(1, SeqCst).to_string();
2340        TMP_DIR.path().join(name)
2341    }
2342
2343    async fn get_store() -> Result<impl StateStore, StoreError> {
2344        let tmpdir_path = new_state_store_workspace();
2345
2346        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2347
2348        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2349            .await
2350            .unwrap())
2351    }
2352
2353    #[async_test]
2354    async fn test_pool_size() {
2355        let tmpdir_path = new_state_store_workspace();
2356        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2357
2358        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2359
2360        assert_eq!(store.pool.status().max_size, 42);
2361    }
2362
2363    #[async_test]
2364    async fn test_cache_size() {
2365        let tmpdir_path = new_state_store_workspace();
2366        let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2367
2368        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2369
2370        let conn = store.pool.get().await.unwrap();
2371        let cache_size =
2372            conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2373
2374        // The value passed to `SqliteStoreConfig` is in bytes. Check it is
2375        // converted to kibibytes. Also, it must be a negative value because it
2376        // _is_ the size in kibibytes, not in page size.
2377        assert_eq!(cache_size, -(1500 / 1024));
2378    }
2379
2380    #[async_test]
2381    async fn test_journal_size_limit() {
2382        let tmpdir_path = new_state_store_workspace();
2383        let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2384
2385        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2386
2387        let conn = store.pool.get().await.unwrap();
2388        let journal_size_limit = conn
2389            .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2390            .await
2391            .unwrap();
2392
2393        // The value passed to `SqliteStoreConfig` is in bytes. It stays in
2394        // bytes in SQLite.
2395        assert_eq!(journal_size_limit, 1500);
2396    }
2397
2398    statestore_integration_tests!();
2399}
2400
2401#[cfg(test)]
2402mod migration_tests {
2403    use std::{
2404        path::{Path, PathBuf},
2405        sync::{
2406            atomic::{AtomicU32, Ordering::SeqCst},
2407            Arc,
2408        },
2409    };
2410
2411    use as_variant::as_variant;
2412    use matrix_sdk_base::{
2413        media::{MediaFormat, MediaRequestParameters},
2414        store::{
2415            ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2416            SerializableEventContent,
2417        },
2418        sync::UnreadNotificationsCount,
2419        RoomState, StateStore,
2420    };
2421    use matrix_sdk_test::async_test;
2422    use once_cell::sync::Lazy;
2423    use ruma::{
2424        events::{
2425            room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2426            StateEventType,
2427        },
2428        room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2429        RoomId, TransactionId, UserId,
2430    };
2431    use rusqlite::Transaction;
2432    use serde::{Deserialize, Serialize};
2433    use serde_json::json;
2434    use tempfile::{tempdir, TempDir};
2435    use tokio::{fs, sync::Mutex};
2436    use zeroize::Zeroizing;
2437
2438    use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2439    use crate::{
2440        error::{Error, Result},
2441        utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2442        OpenStoreError, Secret, SqliteStoreConfig,
2443    };
2444
2445    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2446    static NUM: AtomicU32 = AtomicU32::new(0);
2447    const SECRET: &str = "secret";
2448
2449    fn new_path() -> PathBuf {
2450        let name = NUM.fetch_add(1, SeqCst).to_string();
2451        TMP_DIR.path().join(name)
2452    }
2453
2454    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2455        let config = SqliteStoreConfig::new(path);
2456
2457        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir).unwrap();
2458
2459        let pool = config.build_pool_of_connections(DATABASE_NAME).unwrap();
2460        let conn = pool.get().await?;
2461
2462        init(&conn).await?;
2463
2464        let store_cipher = Some(Arc::new(
2465            conn.get_or_create_store_cipher(Secret::PassPhrase(Zeroizing::new(SECRET.to_owned())))
2466                .await
2467                .unwrap(),
2468        ));
2469        let this = SqliteStateStore {
2470            store_cipher,
2471            pool,
2472            // Use `conn` as our selected write connections.
2473            write_connection: Arc::new(Mutex::new(conn)),
2474        };
2475        this.run_migrations(1, Some(version)).await?;
2476
2477        Ok(this)
2478    }
2479
2480    fn room_info_v1_json(
2481        room_id: &RoomId,
2482        state: RoomState,
2483        name: Option<&str>,
2484        creator: Option<&UserId>,
2485    ) -> serde_json::Value {
2486        // Test with name set or not.
2487        let name_content = match name {
2488            Some(name) => json!({ "name": name }),
2489            None => json!({ "name": null }),
2490        };
2491        // Test with creator set or not.
2492        let create_content = match creator {
2493            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2494            None => RoomCreateEventContent::new_v11(),
2495        };
2496
2497        json!({
2498            "room_id": room_id,
2499            "room_type": state,
2500            "notification_counts": UnreadNotificationsCount::default(),
2501            "summary": {
2502                "heroes": [],
2503                "joined_member_count": 0,
2504                "invited_member_count": 0,
2505            },
2506            "members_synced": false,
2507            "base_info": {
2508                "dm_targets": [],
2509                "max_power_level": 100,
2510                "name": {
2511                    "Original": {
2512                        "content": name_content,
2513                    },
2514                },
2515                "create": {
2516                    "Original": {
2517                        "content": create_content,
2518                    }
2519                }
2520            },
2521        })
2522    }
2523
2524    #[async_test]
2525    pub async fn test_migrating_v1_to_v2() {
2526        let path = new_path();
2527        // Create and populate db.
2528        {
2529            let db = create_fake_db(&path, 1).await.unwrap();
2530            let conn = db.pool.get().await.unwrap();
2531
2532            let this = db.clone();
2533            conn.with_transaction(move |txn| {
2534                for i in 0..5 {
2535                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2536                    let (state, stripped) =
2537                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2538                    let info = room_info_v1_json(&room_id, state, None, None);
2539
2540                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2541                    let data = this.serialize_json(&info)?;
2542
2543                    txn.prepare_cached(
2544                        "INSERT INTO room_info (room_id, stripped, data)
2545                         VALUES (?, ?, ?)",
2546                    )?
2547                    .execute((room_id, stripped, data))?;
2548                }
2549
2550                Result::<_, Error>::Ok(())
2551            })
2552            .await
2553            .unwrap();
2554        }
2555
2556        // This transparently migrates to the latest version.
2557        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2558
2559        // Check all room infos are there.
2560        assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2561    }
2562
2563    // Add a room in version 2 format of the state store.
2564    fn add_room_v2(
2565        this: &SqliteStateStore,
2566        txn: &Transaction<'_>,
2567        room_id: &RoomId,
2568        name: Option<&str>,
2569        create_creator: Option<&UserId>,
2570        create_sender: Option<&UserId>,
2571    ) -> Result<(), Error> {
2572        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2573
2574        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2575        let encoded_state =
2576            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2577        let data = this.serialize_json(&room_info_json)?;
2578
2579        txn.prepare_cached(
2580            "INSERT INTO room_info (room_id, state, data)
2581             VALUES (?, ?, ?)",
2582        )?
2583        .execute((encoded_room_id, encoded_state, data))?;
2584
2585        // Test with or without `m.room.create` event in the room state.
2586        let Some(create_sender) = create_sender else {
2587            return Ok(());
2588        };
2589
2590        let create_content = match create_creator {
2591            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2592            None => RoomCreateEventContent::new_v11(),
2593        };
2594
2595        let event_id = EventId::new(server_name!("dummy.local"));
2596        let create_event = json!({
2597            "content": create_content,
2598            "event_id": event_id,
2599            "sender": create_sender.to_owned(),
2600            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2601            "state_key": "",
2602            "type": "m.room.create",
2603            "unsigned": {},
2604        });
2605
2606        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2607        let encoded_event_type =
2608            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2609        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2610        let stripped = false;
2611        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2612        let data = this.serialize_json(&create_event)?;
2613
2614        txn.prepare_cached(
2615            "INSERT
2616             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2617             VALUES (?, ?, ?, ?, ?, ?)",
2618        )?
2619        .execute((
2620            encoded_room_id,
2621            encoded_event_type,
2622            encoded_state_key,
2623            stripped,
2624            encoded_event_id,
2625            data,
2626        ))?;
2627
2628        Ok(())
2629    }
2630
2631    #[async_test]
2632    pub async fn test_migrating_v2_to_v3() {
2633        let path = new_path();
2634
2635        // Room A: with name, creator and sender.
2636        let room_a_id = room_id!("!room_a:dummy.local");
2637        let room_a_name = "Room A";
2638        let room_a_creator = user_id!("@creator:dummy.local");
2639        // Use a different sender to check that sender is used over creator in
2640        // migration.
2641        let room_a_create_sender = user_id!("@sender:dummy.local");
2642
2643        // Room B: without name, creator and sender.
2644        let room_b_id = room_id!("!room_b:dummy.local");
2645
2646        // Room C: only with sender.
2647        let room_c_id = room_id!("!room_c:dummy.local");
2648        let room_c_create_sender = user_id!("@creator:dummy.local");
2649
2650        // Create and populate db.
2651        {
2652            let db = create_fake_db(&path, 2).await.unwrap();
2653            let conn = db.pool.get().await.unwrap();
2654
2655            let this = db.clone();
2656            conn.with_transaction(move |txn| {
2657                add_room_v2(
2658                    &this,
2659                    txn,
2660                    room_a_id,
2661                    Some(room_a_name),
2662                    Some(room_a_creator),
2663                    Some(room_a_create_sender),
2664                )?;
2665                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2666                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2667
2668                Result::<_, Error>::Ok(())
2669            })
2670            .await
2671            .unwrap();
2672        }
2673
2674        // This transparently migrates to the latest version.
2675        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2676
2677        // Check all room infos are there.
2678        let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2679        assert_eq!(room_infos.len(), 3);
2680
2681        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2682        assert_eq!(room_a.name(), Some(room_a_name));
2683        assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2684
2685        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2686        assert_eq!(room_b.name(), None);
2687        assert_eq!(room_b.creators(), None);
2688
2689        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2690        assert_eq!(room_c.name(), None);
2691        assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2692    }
2693
2694    #[async_test]
2695    pub async fn test_migrating_v7_to_v9() {
2696        let path = new_path();
2697
2698        let room_id = room_id!("!room_a:dummy.local");
2699        let wedged_event_transaction_id = TransactionId::new();
2700        let local_event_transaction_id = TransactionId::new();
2701
2702        // Create and populate db.
2703        {
2704            let db = create_fake_db(&path, 7).await.unwrap();
2705            let conn = db.pool.get().await.unwrap();
2706
2707            let wedge_tx = wedged_event_transaction_id.clone();
2708            let local_tx = local_event_transaction_id.clone();
2709
2710            conn.with_transaction(move |txn| {
2711                add_dependent_send_queue_event_v7(
2712                    &db,
2713                    txn,
2714                    room_id,
2715                    &local_tx,
2716                    ChildTransactionId::new(),
2717                    DependentQueuedRequestKind::RedactEvent,
2718                )?;
2719                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2720                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2721                Result::<_, Error>::Ok(())
2722            })
2723            .await
2724            .unwrap();
2725        }
2726
2727        // This transparently migrates to the latest version, which clears up all
2728        // requests and dependent requests.
2729        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2730
2731        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2732        assert!(requests.is_empty());
2733
2734        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2735        assert!(dependent_requests.is_empty());
2736    }
2737
2738    fn add_send_queue_event_v7(
2739        this: &SqliteStateStore,
2740        txn: &Transaction<'_>,
2741        transaction_id: &TransactionId,
2742        room_id: &RoomId,
2743        is_wedged: bool,
2744    ) -> Result<(), Error> {
2745        let content =
2746            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2747
2748        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2749        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2750
2751        let content = this.serialize_json(&content)?;
2752
2753        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2754            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2755
2756        Ok(())
2757    }
2758
2759    fn add_dependent_send_queue_event_v7(
2760        this: &SqliteStateStore,
2761        txn: &Transaction<'_>,
2762        room_id: &RoomId,
2763        parent_txn_id: &TransactionId,
2764        own_txn_id: ChildTransactionId,
2765        content: DependentQueuedRequestKind,
2766    ) -> Result<(), Error> {
2767        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2768
2769        let parent_txn_id = parent_txn_id.to_string();
2770        let own_txn_id = own_txn_id.to_string();
2771        let content = this.serialize_json(&content)?;
2772
2773        txn.prepare_cached(
2774            "INSERT INTO dependent_send_queue_events
2775                         (room_id, parent_transaction_id, own_transaction_id, content)
2776                       VALUES (?, ?, ?, ?)",
2777        )?
2778        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2779
2780        Ok(())
2781    }
2782
2783    #[derive(Clone, Debug, Serialize, Deserialize)]
2784    pub enum LegacyDependentQueuedRequestKind {
2785        UploadFileWithThumbnail {
2786            content_type: String,
2787            cache_key: MediaRequestParameters,
2788            related_to: OwnedTransactionId,
2789        },
2790    }
2791
2792    #[async_test]
2793    pub async fn test_dependent_queued_request_variant_renaming() {
2794        let path = new_path();
2795        let db = create_fake_db(&path, 7).await.unwrap();
2796
2797        let cache_key = MediaRequestParameters {
2798            format: MediaFormat::File,
2799            source: MediaSource::Plain("https://server.local/foobar".into()),
2800        };
2801        let related_to = TransactionId::new();
2802        let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2803            content_type: "image/png".to_owned(),
2804            cache_key,
2805            related_to: related_to.clone(),
2806        };
2807
2808        let data = db
2809            .serialize_json(&request)
2810            .expect("should be able to serialize legacy dependent request");
2811        let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2812            "should be able to deserialize dependent request from legacy dependent request",
2813        );
2814
2815        as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2816            assert_eq!(de_related_to, related_to);
2817        });
2818    }
2819}