matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13    store::{
14        migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15        DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16        RoomLoadSettings, SentRequestKey,
17    },
18    MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19    StateStoreDataKey, StateStoreDataValue,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23    canonical_json::{redact, RedactedBecause},
24    events::{
25        presence::PresenceEvent,
26        receipt::{Receipt, ReceiptThread, ReceiptType},
27        room::{
28            create::RoomCreateEventContent,
29            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30        },
31        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33    },
34    serde::Raw,
35    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36    OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{de::DeserializeOwned, Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44    error::{Error, Result},
45    utils::{
46        repeat_vars, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47        SqliteKeyValueStoreConnExt,
48    },
49    OpenStoreError, SqliteStoreConfig,
50};
51
52mod keys {
53    // Tables
54    pub const KV_BLOB: &str = "kv_blob";
55    pub const ROOM_INFO: &str = "room_info";
56    pub const STATE_EVENT: &str = "state_event";
57    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59    pub const MEMBER: &str = "member";
60    pub const PROFILE: &str = "profile";
61    pub const RECEIPT: &str = "receipt";
62    pub const DISPLAY_NAME: &str = "display_name";
63    pub const SEND_QUEUE: &str = "send_queue_events";
64    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65}
66
67/// The filename used for the SQLITE database file used by the state store.
68pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
69
70/// Identifier of the latest database version.
71///
72/// This is used to figure whether the SQLite database requires a migration.
73/// Every new SQL migration should imply a bump of this number, and changes in
74/// the [`SqliteStateStore::run_migrations`] function.
75const DATABASE_VERSION: u8 = 12;
76
77/// An SQLite-based state store.
78#[derive(Clone)]
79pub struct SqliteStateStore {
80    store_cipher: Option<Arc<StoreCipher>>,
81    pool: SqlitePool,
82}
83
84#[cfg(not(tarpaulin_include))]
85impl fmt::Debug for SqliteStateStore {
86    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
88    }
89}
90
91impl SqliteStateStore {
92    /// Open the SQLite-based state store at the given path using the given
93    /// passphrase to encrypt private data.
94    pub async fn open(
95        path: impl AsRef<Path>,
96        passphrase: Option<&str>,
97    ) -> Result<Self, OpenStoreError> {
98        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
99    }
100
101    /// Open the SQLite-based state store with the config open config.
102    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
103        let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
104
105        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
106
107        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
108        config.pool = Some(pool_config);
109
110        let pool = config.create_pool(Runtime::Tokio1)?;
111
112        let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
113        this.pool.get().await?.apply_runtime_config(runtime_config).await?;
114
115        Ok(this)
116    }
117
118    /// Create an SQLite-based state store using the given SQLite database pool.
119    /// The given passphrase will be used to encrypt private data.
120    async fn open_with_pool(
121        pool: SqlitePool,
122        passphrase: Option<&str>,
123    ) -> Result<Self, OpenStoreError> {
124        let conn = pool.get().await?;
125
126        let mut version = conn.db_version().await?;
127
128        if version == 0 {
129            init(&conn).await?;
130            version = 1;
131        }
132
133        let store_cipher = match passphrase {
134            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
135            None => None,
136        };
137        let this = Self { store_cipher, pool };
138        this.run_migrations(&conn, version, None).await?;
139
140        Ok(this)
141    }
142
143    /// Run database migrations from the given `from` version to the given `to`
144    /// version
145    ///
146    /// If `to` is `None`, the current database version will be used.
147    async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
148        let to = to.unwrap_or(DATABASE_VERSION);
149
150        if from < to {
151            debug!(version = from, new_version = to, "Upgrading database");
152        } else {
153            return Ok(());
154        }
155
156        if from < 2 && to >= 2 {
157            let this = self.clone();
158            conn.with_transaction(move |txn| {
159                // Create new table.
160                txn.execute_batch(include_str!(
161                    "../migrations/state_store/002_a_create_new_room_info.sql"
162                ))?;
163
164                // Migrate data to new table.
165                for data in txn
166                    .prepare("SELECT data FROM room_info")?
167                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
168                {
169                    let data = data?;
170                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
171
172                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
173                    let state = this
174                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
175                    txn.prepare_cached(
176                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
177                         VALUES (?, ?, ?)",
178                    )?
179                    .execute((room_id, state, data))?;
180                }
181
182                // Replace old table.
183                txn.execute_batch(include_str!(
184                    "../migrations/state_store/002_b_replace_room_info.sql"
185                ))?;
186
187                txn.set_db_version(2)?;
188                Result::<_, Error>::Ok(())
189            })
190            .await?;
191        }
192
193        // Migration to v3: RoomInfo format has changed.
194        if from < 3 && to >= 3 {
195            let this = self.clone();
196            conn.with_transaction(move |txn| {
197                // Migrate data .
198                for data in txn
199                    .prepare("SELECT data FROM room_info")?
200                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
201                {
202                    let data = data?;
203                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
204
205                    // Get the `m.room.create` event from the room state.
206                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
207                    let event_type =
208                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
209                    let create_res = txn
210                        .prepare(
211                            "SELECT stripped, data FROM state_event
212                             WHERE room_id = ? AND event_type = ?",
213                        )?
214                        .query_row([room_id, event_type], |row| {
215                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
216                        })
217                        .optional()?;
218
219                    let create = create_res.and_then(|(stripped, data)| {
220                        let create = if stripped {
221                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
222                                this.deserialize_json(&data).ok()?,
223                            )
224                        } else {
225                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
226                        };
227                        Some(create)
228                    });
229
230                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
231
232                    let data = this.serialize_json(&migrated_room_info)?;
233                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
234                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
235                        .execute((data, room_id))?;
236                }
237
238                txn.set_db_version(3)?;
239                Result::<_, Error>::Ok(())
240            })
241            .await?;
242        }
243
244        if from < 4 && to >= 4 {
245            conn.with_transaction(move |txn| {
246                // Create new table.
247                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
248                txn.set_db_version(4)
249            })
250            .await?;
251        }
252
253        if from < 5 && to >= 5 {
254            conn.with_transaction(move |txn| {
255                // Create new table.
256                txn.execute_batch(include_str!(
257                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
258                ))?;
259                txn.set_db_version(4)
260            })
261            .await?;
262        }
263
264        if from < 6 && to >= 6 {
265            conn.with_transaction(move |txn| {
266                // Create new table.
267                txn.execute_batch(include_str!(
268                    "../migrations/state_store/005_send_queue_dependent_events.sql"
269                ))?;
270                txn.set_db_version(6)
271            })
272            .await?;
273        }
274
275        if from < 7 && to >= 7 {
276            conn.with_transaction(move |txn| {
277                // Drop media table.
278                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
279                txn.set_db_version(7)
280            })
281            .await?;
282        }
283
284        if from < 8 && to >= 8 {
285            // Replace all existing wedged events with a generic error.
286            let error = QueueWedgeError::GenericApiError {
287                msg: "local echo failed to send in a previous session".into(),
288            };
289            let default_err = self.serialize_value(&error)?;
290
291            conn.with_transaction(move |txn| {
292                // Update send queue table to persist the wedge reason if any.
293                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
294
295                // Migrate the data, add a generic error for currently wedged events
296
297                for wedged_entries in txn
298                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
299                    .query_map((), |row| {
300                        Ok(
301                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
302                        )
303                    })? {
304
305                    let (room_id, transaction_id) = wedged_entries?;
306
307                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
308                        .execute((default_err.clone(), room_id, transaction_id))?;
309                }
310
311
312                // Clean up the table now that data is migrated
313                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
314
315                txn.set_db_version(8)
316            })
317                .await?;
318        }
319
320        if from < 9 && to >= 9 {
321            conn.with_transaction(move |txn| {
322                // Run the migration.
323                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
324                txn.set_db_version(9)
325            })
326            .await?;
327        }
328
329        if from < 10 && to >= 10 {
330            conn.with_transaction(move |txn| {
331                // Run the migration.
332                txn.execute_batch(include_str!(
333                    "../migrations/state_store/009_send_queue_priority.sql"
334                ))?;
335                txn.set_db_version(10)
336            })
337            .await?;
338        }
339
340        if from < 11 && to >= 11 {
341            conn.with_transaction(move |txn| {
342                // Run the migration.
343                txn.execute_batch(include_str!(
344                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
345                ))?;
346                txn.set_db_version(11)
347            })
348            .await?;
349        }
350
351        if from < 12 && to >= 12 {
352            // Defragment the DB and optimize its size on the filesystem.
353            // This should have been run in the migration for version 7, to reduce the size
354            // of the DB as we removed the media cache.
355            conn.vacuum().await?;
356            conn.set_kv("version", vec![12]).await?;
357        }
358
359        Ok(())
360    }
361
362    fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
363        if let Some(key) = &self.store_cipher {
364            let encrypted = key.encrypt_value_data(value)?;
365            Ok(rmp_serde::to_vec_named(&encrypted)?)
366        } else {
367            Ok(value)
368        }
369    }
370
371    fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
372        let serialized = rmp_serde::to_vec_named(value)?;
373        self.encode_value(serialized)
374    }
375
376    fn serialize_json(&self, value: &impl Serialize) -> Result<Vec<u8>> {
377        let serialized = serde_json::to_vec(value)?;
378        self.encode_value(serialized)
379    }
380
381    fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
382        if let Some(key) = &self.store_cipher {
383            let encrypted = rmp_serde::from_slice(value)?;
384            let decrypted = key.decrypt_value_data(encrypted)?;
385            Ok(Cow::Owned(decrypted))
386        } else {
387            Ok(Cow::Borrowed(value))
388        }
389    }
390
391    fn deserialize_json<T: DeserializeOwned>(&self, data: &[u8]) -> Result<T> {
392        let decoded = self.decode_value(data)?;
393        Ok(serde_json::from_slice(&decoded)?)
394    }
395
396    fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
397        let decoded = self.decode_value(value)?;
398        Ok(rmp_serde::from_slice(&decoded)?)
399    }
400
401    fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
402        let bytes = key.as_ref();
403        if let Some(store_cipher) = &self.store_cipher {
404            Key::Hashed(store_cipher.hash_key(table_name, bytes))
405        } else {
406            Key::Plain(bytes.to_owned())
407        }
408    }
409
410    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
411        let key_s = match key {
412            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
413            StateStoreDataKey::ServerCapabilities => {
414                Cow::Borrowed(StateStoreDataKey::SERVER_CAPABILITIES)
415            }
416            StateStoreDataKey::Filter(f) => {
417                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
418            }
419            StateStoreDataKey::UserAvatarUrl(u) => {
420                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
421            }
422            StateStoreDataKey::RecentlyVisitedRooms(b) => {
423                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
424            }
425            StateStoreDataKey::UtdHookManagerData => {
426                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
427            }
428            StateStoreDataKey::ComposerDraft(room_id) => {
429                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
430            }
431            StateStoreDataKey::SeenKnockRequests(room_id) => {
432                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
433            }
434        };
435
436        self.encode_key(keys::KV_BLOB, &*key_s)
437    }
438
439    fn encode_presence_key(&self, user_id: &UserId) -> Key {
440        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
441    }
442
443    fn encode_custom_key(&self, key: &[u8]) -> Key {
444        let mut full_key = b"custom:".to_vec();
445        full_key.extend(key);
446        self.encode_key(keys::KV_BLOB, full_key)
447    }
448
449    async fn acquire(&self) -> Result<SqliteAsyncConn> {
450        Ok(self.pool.get().await?)
451    }
452
453    fn remove_maybe_stripped_room_data(
454        &self,
455        txn: &Transaction<'_>,
456        room_id: &RoomId,
457        stripped: bool,
458    ) -> rusqlite::Result<()> {
459        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
460        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
461
462        let member_room_id = self.encode_key(keys::MEMBER, room_id);
463        txn.remove_room_members(&member_room_id, Some(stripped))
464    }
465}
466
467/// Initialize the database.
468async fn init(conn: &SqliteAsyncConn) -> Result<()> {
469    // First turn on WAL mode, this can't be done in the transaction, it fails with
470    // the error message: "cannot change into wal mode from within a transaction".
471    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
472    conn.with_transaction(|txn| {
473        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
474        txn.set_db_version(1)?;
475
476        Ok(())
477    })
478    .await
479}
480
481trait SqliteConnectionStateStoreExt {
482    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
483
484    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
485
486    fn set_room_account_data(
487        &self,
488        room_id: &[u8],
489        event_type: &[u8],
490        data: &[u8],
491    ) -> rusqlite::Result<()>;
492    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
493
494    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
495    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
496    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
497
498    fn set_state_event(
499        &self,
500        room_id: &[u8],
501        event_type: &[u8],
502        state_key: &[u8],
503        stripped: bool,
504        event_id: Option<&[u8]>,
505        data: &[u8],
506    ) -> rusqlite::Result<()>;
507    fn get_state_event_by_id(
508        &self,
509        room_id: &[u8],
510        event_id: &[u8],
511    ) -> rusqlite::Result<Option<Vec<u8>>>;
512    fn remove_room_state_events(
513        &self,
514        room_id: &[u8],
515        stripped: Option<bool>,
516    ) -> rusqlite::Result<()>;
517
518    fn set_member(
519        &self,
520        room_id: &[u8],
521        user_id: &[u8],
522        membership: &[u8],
523        stripped: bool,
524        data: &[u8],
525    ) -> rusqlite::Result<()>;
526    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
527
528    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
529    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
530    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
531
532    fn set_receipt(
533        &self,
534        room_id: &[u8],
535        user_id: &[u8],
536        receipt_type: &[u8],
537        thread_id: &[u8],
538        event_id: &[u8],
539        data: &[u8],
540    ) -> rusqlite::Result<()>;
541    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
542
543    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
544    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
545    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
546    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
547    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
548}
549
550impl SqliteConnectionStateStoreExt for rusqlite::Connection {
551    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
552        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
553        Ok(())
554    }
555
556    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
557        self.prepare_cached(
558            "INSERT OR REPLACE INTO global_account_data (event_type, data)
559             VALUES (?, ?)",
560        )?
561        .execute((event_type, data))?;
562        Ok(())
563    }
564
565    fn set_room_account_data(
566        &self,
567        room_id: &[u8],
568        event_type: &[u8],
569        data: &[u8],
570    ) -> rusqlite::Result<()> {
571        self.prepare_cached(
572            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
573             VALUES (?, ?, ?)",
574        )?
575        .execute((room_id, event_type, data))?;
576        Ok(())
577    }
578
579    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
580        self.prepare(
581            "DELETE FROM room_account_data
582             WHERE room_id = ?",
583        )?
584        .execute((room_id,))?;
585        Ok(())
586    }
587
588    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
589        self.prepare_cached(
590            "INSERT OR REPLACE INTO room_info (room_id, state, data)
591             VALUES (?, ?, ?)",
592        )?
593        .execute((room_id, state, data))?;
594        Ok(())
595    }
596
597    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
598        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
599            .optional()
600    }
601
602    /// Remove the room info for the given room.
603    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
604        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
605        Ok(())
606    }
607
608    fn set_state_event(
609        &self,
610        room_id: &[u8],
611        event_type: &[u8],
612        state_key: &[u8],
613        stripped: bool,
614        event_id: Option<&[u8]>,
615        data: &[u8],
616    ) -> rusqlite::Result<()> {
617        self.prepare_cached(
618            "INSERT OR REPLACE
619             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
620             VALUES (?, ?, ?, ?, ?, ?)",
621        )?
622        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
623        Ok(())
624    }
625
626    fn get_state_event_by_id(
627        &self,
628        room_id: &[u8],
629        event_id: &[u8],
630    ) -> rusqlite::Result<Option<Vec<u8>>> {
631        self.query_row(
632            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
633            (room_id, event_id),
634            |row| row.get(0),
635        )
636        .optional()
637    }
638
639    /// Remove state events for the given room.
640    ///
641    /// If `stripped` is `Some()`, only removes state events for the given
642    /// stripped state. Otherwise, state events are removed regardless of the
643    /// stripped state.
644    fn remove_room_state_events(
645        &self,
646        room_id: &[u8],
647        stripped: Option<bool>,
648    ) -> rusqlite::Result<()> {
649        if let Some(stripped) = stripped {
650            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
651                .execute((room_id, stripped))?;
652        } else {
653            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
654                .execute((room_id,))?;
655        }
656        Ok(())
657    }
658
659    fn set_member(
660        &self,
661        room_id: &[u8],
662        user_id: &[u8],
663        membership: &[u8],
664        stripped: bool,
665        data: &[u8],
666    ) -> rusqlite::Result<()> {
667        self.prepare_cached(
668            "INSERT OR REPLACE
669             INTO member (room_id, user_id, membership, stripped, data)
670             VALUES (?, ?, ?, ?, ?)",
671        )?
672        .execute((room_id, user_id, membership, stripped, data))?;
673        Ok(())
674    }
675
676    /// Remove members for the given room.
677    ///
678    /// If `stripped` is `Some()`, only removes members for the given stripped
679    /// state. Otherwise, members are removed regardless of the stripped state.
680    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
681        if let Some(stripped) = stripped {
682            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
683                .execute((room_id, stripped))?;
684        } else {
685            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
686        }
687        Ok(())
688    }
689
690    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
691        self.prepare_cached(
692            "INSERT OR REPLACE
693             INTO profile (room_id, user_id, data)
694             VALUES (?, ?, ?)",
695        )?
696        .execute((room_id, user_id, data))?;
697        Ok(())
698    }
699
700    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
701        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
702        Ok(())
703    }
704
705    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
706        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
707            .execute((room_id, user_id))?;
708        Ok(())
709    }
710
711    fn set_receipt(
712        &self,
713        room_id: &[u8],
714        user_id: &[u8],
715        receipt_type: &[u8],
716        thread: &[u8],
717        event_id: &[u8],
718        data: &[u8],
719    ) -> rusqlite::Result<()> {
720        self.prepare_cached(
721            "INSERT OR REPLACE
722             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
723             VALUES (?, ?, ?, ?, ?, ?)",
724        )?
725        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
726        Ok(())
727    }
728
729    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
730        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
731        Ok(())
732    }
733
734    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
735        self.prepare_cached(
736            "INSERT OR REPLACE
737             INTO display_name (room_id, name, data)
738             VALUES (?, ?, ?)",
739        )?
740        .execute((room_id, name, data))?;
741        Ok(())
742    }
743
744    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
745        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
746            .execute((room_id, name))?;
747        Ok(())
748    }
749
750    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
751        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
752        Ok(())
753    }
754
755    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
756        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
757        Ok(())
758    }
759
760    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
761        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
762            .execute((room_id,))?;
763        Ok(())
764    }
765}
766
767#[async_trait]
768trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
769    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
770        Ok(self
771            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
772            .await
773            .optional()?)
774    }
775
776    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
777        let keys_length = keys.len();
778
779        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
780            let sql_params = repeat_vars(keys.len());
781            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
782
783            let params = rusqlite::params_from_iter(keys);
784
785            Ok(txn
786                .prepare(&sql)?
787                .query(params)?
788                .mapped(|row| row.get(0))
789                .collect::<Result<_, _>>()?)
790        })
791        .await
792    }
793
794    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
795
796    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
797        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
798        Ok(())
799    }
800
801    async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
802        Ok(match room_id {
803            None => {
804                self.prepare("SELECT data FROM room_info", move |mut stmt| {
805                    stmt.query_map((), |row| row.get(0))?.collect()
806                })
807                .await?
808            }
809
810            Some(room_id) => {
811                self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
812                    stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
813                })
814                .await?
815            }
816        })
817    }
818
819    async fn get_maybe_stripped_state_events_for_keys(
820        &self,
821        room_id: Key,
822        event_type: Key,
823        state_keys: Vec<Key>,
824    ) -> Result<Vec<(bool, Vec<u8>)>> {
825        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
826            let sql_params = repeat_vars(state_keys.len());
827            let sql = format!(
828                "SELECT stripped, data FROM state_event
829                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
830            );
831
832            let params = rusqlite::params_from_iter(
833                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
834            );
835
836            Ok(txn
837                .prepare(&sql)?
838                .query(params)?
839                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
840                .collect::<Result<_, _>>()?)
841        })
842        .await
843    }
844
845    async fn get_maybe_stripped_state_events(
846        &self,
847        room_id: Key,
848        event_type: Key,
849    ) -> Result<Vec<(bool, Vec<u8>)>> {
850        Ok(self
851            .prepare(
852                "SELECT stripped, data FROM state_event
853                 WHERE room_id = ? AND event_type = ?",
854                |mut stmt| {
855                    stmt.query((room_id, event_type))?
856                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
857                        .collect()
858                },
859            )
860            .await?)
861    }
862
863    async fn get_profiles(
864        &self,
865        room_id: Key,
866        user_ids: Vec<Key>,
867    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
868        let user_ids_length = user_ids.len();
869
870        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
871            let sql_params = repeat_vars(user_ids.len());
872            let sql = format!(
873                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
874            );
875
876            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
877
878            Ok(txn
879                .prepare(&sql)?
880                .query(params)?
881                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
882                .collect::<Result<_, _>>()?)
883        })
884        .await
885    }
886
887    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
888        let res = if memberships.is_empty() {
889            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
890                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
891            })
892            .await?
893        } else {
894            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
895                let sql_params = repeat_vars(memberships.len());
896                let sql = format!(
897                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
898                );
899
900                let params =
901                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
902
903                Ok(txn
904                    .prepare(&sql)?
905                    .query(params)?
906                    .mapped(|row| row.get(0))
907                    .collect::<Result<_, _>>()?)
908            })
909            .await?
910        };
911
912        Ok(res)
913    }
914
915    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
916        Ok(self
917            .query_row(
918                "SELECT data FROM global_account_data WHERE event_type = ?",
919                (event_type,),
920                |row| row.get(0),
921            )
922            .await
923            .optional()?)
924    }
925
926    async fn get_room_account_data(
927        &self,
928        room_id: Key,
929        event_type: Key,
930    ) -> Result<Option<Vec<u8>>> {
931        Ok(self
932            .query_row(
933                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
934                (room_id, event_type),
935                |row| row.get(0),
936            )
937            .await
938            .optional()?)
939    }
940
941    async fn get_display_names(
942        &self,
943        room_id: Key,
944        names: Vec<Key>,
945    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
946        let names_length = names.len();
947
948        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
949            let sql_params = repeat_vars(names.len());
950            let sql = format!(
951                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
952            );
953
954            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
955
956            Ok(txn
957                .prepare(&sql)?
958                .query(params)?
959                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
960                .collect::<Result<_, _>>()?)
961        })
962        .await
963    }
964
965    async fn get_user_receipt(
966        &self,
967        room_id: Key,
968        receipt_type: Key,
969        thread: Key,
970        user_id: Key,
971    ) -> Result<Option<Vec<u8>>> {
972        Ok(self
973            .query_row(
974                "SELECT data FROM receipt
975                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
976                (room_id, receipt_type, thread, user_id),
977                |row| row.get(0),
978            )
979            .await
980            .optional()?)
981    }
982
983    async fn get_event_receipts(
984        &self,
985        room_id: Key,
986        receipt_type: Key,
987        thread: Key,
988        event_id: Key,
989    ) -> Result<Vec<Vec<u8>>> {
990        Ok(self
991            .prepare(
992                "SELECT data FROM receipt
993                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
994                |mut stmt| {
995                    stmt.query((room_id, receipt_type, thread, event_id))?
996                        .mapped(|row| row.get(0))
997                        .collect()
998                },
999            )
1000            .await?)
1001    }
1002}
1003
1004#[async_trait]
1005impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1006    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1007        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1008    }
1009}
1010
1011#[async_trait]
1012impl StateStore for SqliteStateStore {
1013    type Error = Error;
1014
1015    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1016        self.acquire()
1017            .await?
1018            .get_kv_blob(self.encode_state_store_data_key(key))
1019            .await?
1020            .map(|data| {
1021                Ok(match key {
1022                    StateStoreDataKey::SyncToken => {
1023                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1024                    }
1025                    StateStoreDataKey::ServerCapabilities => {
1026                        StateStoreDataValue::ServerCapabilities(self.deserialize_value(&data)?)
1027                    }
1028                    StateStoreDataKey::Filter(_) => {
1029                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1030                    }
1031                    StateStoreDataKey::UserAvatarUrl(_) => {
1032                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1033                    }
1034                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
1035                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1036                    }
1037                    StateStoreDataKey::UtdHookManagerData => {
1038                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1039                    }
1040                    StateStoreDataKey::ComposerDraft(_) => {
1041                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1042                    }
1043                    StateStoreDataKey::SeenKnockRequests(_) => {
1044                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1045                    }
1046                })
1047            })
1048            .transpose()
1049    }
1050
1051    async fn set_kv_data(
1052        &self,
1053        key: StateStoreDataKey<'_>,
1054        value: StateStoreDataValue,
1055    ) -> Result<()> {
1056        let serialized_value = match key {
1057            StateStoreDataKey::SyncToken => self.serialize_value(
1058                &value.into_sync_token().expect("Session data not a sync token"),
1059            )?,
1060            StateStoreDataKey::ServerCapabilities => self.serialize_value(
1061                &value
1062                    .into_server_capabilities()
1063                    .expect("Session data not containing server capabilities"),
1064            )?,
1065            StateStoreDataKey::Filter(_) => {
1066                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1067            }
1068            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1069                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1070            )?,
1071            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1072                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1073            )?,
1074            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1075                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1076            )?,
1077            StateStoreDataKey::ComposerDraft(_) => self.serialize_value(
1078                &value.into_composer_draft().expect("Session data not a composer draft"),
1079            )?,
1080            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1081                &value
1082                    .into_seen_knock_requests()
1083                    .expect("Session data is not a set of seen knock request ids"),
1084            )?,
1085        };
1086
1087        self.acquire()
1088            .await?
1089            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1090            .await
1091    }
1092
1093    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1094        self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1095    }
1096
1097    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1098        let changes = changes.to_owned();
1099        let this = self.clone();
1100        self.acquire()
1101            .await?
1102            .with_transaction(move |txn| {
1103                let StateChanges {
1104                    sync_token,
1105                    account_data,
1106                    presence,
1107                    profiles,
1108                    profiles_to_delete,
1109                    state,
1110                    room_account_data,
1111                    room_infos,
1112                    receipts,
1113                    redactions,
1114                    stripped_state,
1115                    ambiguity_maps,
1116                } = changes;
1117
1118                if let Some(sync_token) = sync_token {
1119                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1120                    let value = this.serialize_value(&sync_token)?;
1121                    txn.set_kv_blob(&key, &value)?;
1122                }
1123
1124                for (event_type, event) in account_data {
1125                    let event_type =
1126                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1127                    let data = this.serialize_json(&event)?;
1128                    txn.set_global_account_data(&event_type, &data)?;
1129                }
1130
1131                for (room_id, events) in room_account_data {
1132                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1133                    for (event_type, event) in events {
1134                        let event_type =
1135                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1136                        let data = this.serialize_json(&event)?;
1137                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1138                    }
1139                }
1140
1141                for (user_id, event) in presence {
1142                    let key = this.encode_presence_key(&user_id);
1143                    let value = this.serialize_json(&event)?;
1144                    txn.set_kv_blob(&key, &value)?;
1145                }
1146
1147                for (room_id, room_info) in room_infos {
1148                    let stripped = room_info.state() == RoomState::Invited;
1149                    // Remove non-stripped data for stripped rooms and vice-versa.
1150                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1151
1152                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1153                    let state = this
1154                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1155                    let data = this.serialize_json(&room_info)?;
1156                    txn.set_room_info(&room_id, &state, &data)?;
1157                }
1158
1159                for (room_id, user_ids) in profiles_to_delete {
1160                    let room_id = this.encode_key(keys::PROFILE, room_id);
1161                    for user_id in user_ids {
1162                        let user_id = this.encode_key(keys::PROFILE, user_id);
1163                        txn.remove_room_profile(&room_id, &user_id)?;
1164                    }
1165                }
1166
1167                for (room_id, state_event_types) in state {
1168                    let profiles = profiles.get(&room_id);
1169                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1170
1171                    for (event_type, state_events) in state_event_types {
1172                        let encoded_event_type =
1173                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1174
1175                        for (state_key, raw_state_event) in state_events {
1176                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1177                            let data = this.serialize_json(&raw_state_event)?;
1178
1179                            let event_id: Option<String> =
1180                                raw_state_event.get_field("event_id").ok().flatten();
1181                            let encoded_event_id =
1182                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1183
1184                            txn.set_state_event(
1185                                &encoded_room_id,
1186                                &encoded_event_type,
1187                                &encoded_state_key,
1188                                false,
1189                                encoded_event_id.as_deref(),
1190                                &data,
1191                            )?;
1192
1193                            if event_type == StateEventType::RoomMember {
1194                                let member_event = match raw_state_event
1195                                    .deserialize_as::<SyncRoomMemberEvent>()
1196                                {
1197                                    Ok(ev) => ev,
1198                                    Err(e) => {
1199                                        debug!(event_id, "Failed to deserialize member event: {e}");
1200                                        continue;
1201                                    }
1202                                };
1203
1204                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1205                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1206                                let membership = this
1207                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1208                                let data = this.serialize_value(&state_key)?;
1209
1210                                txn.set_member(
1211                                    &encoded_room_id,
1212                                    &user_id,
1213                                    &membership,
1214                                    false,
1215                                    &data,
1216                                )?;
1217
1218                                if let Some(profile) =
1219                                    profiles.and_then(|p| p.get(member_event.state_key()))
1220                                {
1221                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1222                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1223                                    let data = this.serialize_json(&profile)?;
1224                                    txn.set_profile(&room_id, &user_id, &data)?;
1225                                }
1226                            }
1227                        }
1228                    }
1229                }
1230
1231                for (room_id, stripped_state_event_types) in stripped_state {
1232                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1233
1234                    for (event_type, stripped_state_events) in stripped_state_event_types {
1235                        let encoded_event_type =
1236                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1237
1238                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1239                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1240                            let data = this.serialize_json(&raw_stripped_state_event)?;
1241                            txn.set_state_event(
1242                                &encoded_room_id,
1243                                &encoded_event_type,
1244                                &encoded_state_key,
1245                                true,
1246                                None,
1247                                &data,
1248                            )?;
1249
1250                            if event_type == StateEventType::RoomMember {
1251                                let member_event = match raw_stripped_state_event
1252                                    .deserialize_as::<StrippedRoomMemberEvent>(
1253                                ) {
1254                                    Ok(ev) => ev,
1255                                    Err(e) => {
1256                                        debug!("Failed to deserialize stripped member event: {e}");
1257                                        continue;
1258                                    }
1259                                };
1260
1261                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1262                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1263                                let membership = this.encode_key(
1264                                    keys::MEMBER,
1265                                    member_event.content.membership.as_str(),
1266                                );
1267                                let data = this.serialize_value(&state_key)?;
1268
1269                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1270                            }
1271                        }
1272                    }
1273                }
1274
1275                for (room_id, receipt_event) in receipts {
1276                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1277
1278                    for (event_id, receipt_types) in receipt_event {
1279                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1280
1281                        for (receipt_type, receipt_users) in receipt_types {
1282                            let receipt_type =
1283                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1284
1285                            for (user_id, receipt) in receipt_users {
1286                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1287                                // We cannot have a NULL primary key so we rely on serialization
1288                                // instead of the string representation.
1289                                let thread = this.encode_key(
1290                                    keys::RECEIPT,
1291                                    rmp_serde::to_vec_named(&receipt.thread)?,
1292                                );
1293                                let data = this.serialize_json(&ReceiptData {
1294                                    receipt,
1295                                    event_id: event_id.clone(),
1296                                    user_id,
1297                                })?;
1298
1299                                txn.set_receipt(
1300                                    &room_id,
1301                                    &encoded_user_id,
1302                                    &receipt_type,
1303                                    &thread,
1304                                    &encoded_event_id,
1305                                    &data,
1306                                )?;
1307                            }
1308                        }
1309                    }
1310                }
1311
1312                for (room_id, redactions) in redactions {
1313                    let make_room_version = || {
1314                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1315                        txn.get_room_info(&encoded_room_id)
1316                            .ok()
1317                            .flatten()
1318                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1319                            .and_then(|info| info.room_version().cloned())
1320                            .unwrap_or_else(|| {
1321                                warn!(
1322                                    ?room_id,
1323                                    "Unable to find the room version, assume version 9"
1324                                );
1325                                RoomVersionId::V9
1326                            })
1327                    };
1328
1329                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1330                    let mut room_version = None;
1331
1332                    for (event_id, redaction) in redactions {
1333                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1334
1335                        if let Some(Ok(raw_event)) = txn
1336                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1337                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1338                        {
1339                            let event = raw_event.deserialize()?;
1340                            let redacted = redact(
1341                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1342                                room_version.get_or_insert_with(make_room_version),
1343                                Some(RedactedBecause::from_raw_event(&redaction)?),
1344                            )
1345                            .map_err(Error::Redaction)?;
1346                            let data = this.serialize_json(&redacted)?;
1347
1348                            let event_type =
1349                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1350                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1351
1352                            txn.set_state_event(
1353                                &encoded_room_id,
1354                                &event_type,
1355                                &state_key,
1356                                false,
1357                                Some(&event_id),
1358                                &data,
1359                            )?;
1360                        }
1361                    }
1362                }
1363
1364                for (room_id, display_names) in ambiguity_maps {
1365                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1366
1367                    for (name, user_ids) in display_names {
1368                        let encoded_name = this.encode_key(
1369                            keys::DISPLAY_NAME,
1370                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1371                        );
1372                        let data = this.serialize_json(&user_ids)?;
1373
1374                        if user_ids.is_empty() {
1375                            txn.remove_display_name(&room_id, &encoded_name)?;
1376
1377                            // We can't do a migration to merge the previously distinct buckets of
1378                            // user IDs since the display names themselves are hashed before they
1379                            // are persisted in the store. So the store will always retain two
1380                            // buckets: one for raw display names and one for normalised ones.
1381                            //
1382                            // We therefore do the next best thing, which is a sort of a soft
1383                            // migration: we fetch both the raw and normalised buckets, then merge
1384                            // the user IDs contained in them into a separate, temporary merged
1385                            // bucket. The SDK then operates on the merged buckets exclusively. See
1386                            // the comment in `get_users_with_display_names` for details.
1387                            //
1388                            // If the merged bucket is empty, that must mean that both the raw and
1389                            // normalised buckets were also empty, so we can remove both from the
1390                            // store.
1391                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1392                            txn.remove_display_name(&room_id, &raw_name)?;
1393                        } else {
1394                            // We only create new buckets with the normalized display name.
1395                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1396                        }
1397                    }
1398                }
1399
1400                Ok::<_, Error>(())
1401            })
1402            .await?;
1403
1404        Ok(())
1405    }
1406
1407    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1408        self.acquire()
1409            .await?
1410            .get_kv_blob(self.encode_presence_key(user_id))
1411            .await?
1412            .map(|data| self.deserialize_json(&data))
1413            .transpose()
1414    }
1415
1416    async fn get_presence_events(
1417        &self,
1418        user_ids: &[OwnedUserId],
1419    ) -> Result<Vec<Raw<PresenceEvent>>> {
1420        if user_ids.is_empty() {
1421            return Ok(Vec::new());
1422        }
1423
1424        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1425        self.acquire()
1426            .await?
1427            .get_kv_blobs(user_ids)
1428            .await?
1429            .into_iter()
1430            .map(|data| self.deserialize_json(&data))
1431            .collect()
1432    }
1433
1434    async fn get_state_event(
1435        &self,
1436        room_id: &RoomId,
1437        event_type: StateEventType,
1438        state_key: &str,
1439    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1440        Ok(self
1441            .get_state_events_for_keys(room_id, event_type, &[state_key])
1442            .await?
1443            .into_iter()
1444            .next())
1445    }
1446
1447    async fn get_state_events(
1448        &self,
1449        room_id: &RoomId,
1450        event_type: StateEventType,
1451    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1452        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1453        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1454        self.acquire()
1455            .await?
1456            .get_maybe_stripped_state_events(room_id, event_type)
1457            .await?
1458            .into_iter()
1459            .map(|(stripped, data)| {
1460                let ev = if stripped {
1461                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1462                } else {
1463                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1464                };
1465
1466                Ok(ev)
1467            })
1468            .collect()
1469    }
1470
1471    async fn get_state_events_for_keys(
1472        &self,
1473        room_id: &RoomId,
1474        event_type: StateEventType,
1475        state_keys: &[&str],
1476    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1477        if state_keys.is_empty() {
1478            return Ok(Vec::new());
1479        }
1480
1481        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1482        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1483        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1484        self.acquire()
1485            .await?
1486            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1487            .await?
1488            .into_iter()
1489            .map(|(stripped, data)| {
1490                let ev = if stripped {
1491                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1492                } else {
1493                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1494                };
1495
1496                Ok(ev)
1497            })
1498            .collect()
1499    }
1500
1501    async fn get_profile(
1502        &self,
1503        room_id: &RoomId,
1504        user_id: &UserId,
1505    ) -> Result<Option<MinimalRoomMemberEvent>> {
1506        let room_id = self.encode_key(keys::PROFILE, room_id);
1507        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1508
1509        self.acquire()
1510            .await?
1511            .get_profiles(room_id, user_ids)
1512            .await?
1513            .into_iter()
1514            .next()
1515            .map(|(_, data)| self.deserialize_json(&data))
1516            .transpose()
1517    }
1518
1519    async fn get_profiles<'a>(
1520        &self,
1521        room_id: &RoomId,
1522        user_ids: &'a [OwnedUserId],
1523    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1524        if user_ids.is_empty() {
1525            return Ok(BTreeMap::new());
1526        }
1527
1528        let room_id = self.encode_key(keys::PROFILE, room_id);
1529        let mut user_ids_map = user_ids
1530            .iter()
1531            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1532            .collect::<BTreeMap<_, _>>();
1533        let user_ids = user_ids_map.keys().cloned().collect();
1534
1535        self.acquire()
1536            .await?
1537            .get_profiles(room_id, user_ids)
1538            .await?
1539            .into_iter()
1540            .map(|(user_id, data)| {
1541                Ok((
1542                    user_ids_map
1543                        .remove(user_id.as_slice())
1544                        .expect("returned user IDs were requested"),
1545                    self.deserialize_json(&data)?,
1546                ))
1547            })
1548            .collect()
1549    }
1550
1551    async fn get_user_ids(
1552        &self,
1553        room_id: &RoomId,
1554        membership: RoomMemberships,
1555    ) -> Result<Vec<OwnedUserId>> {
1556        let room_id = self.encode_key(keys::MEMBER, room_id);
1557        let memberships = membership
1558            .as_vec()
1559            .into_iter()
1560            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1561            .collect();
1562        self.acquire()
1563            .await?
1564            .get_user_ids(room_id, memberships)
1565            .await?
1566            .iter()
1567            .map(|data| self.deserialize_value(data))
1568            .collect()
1569    }
1570
1571    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1572        self.acquire()
1573            .await?
1574            .get_room_infos(match room_load_settings {
1575                RoomLoadSettings::All => None,
1576                RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1577            })
1578            .await?
1579            .into_iter()
1580            .map(|data| self.deserialize_json(&data))
1581            .collect()
1582    }
1583
1584    async fn get_users_with_display_name(
1585        &self,
1586        room_id: &RoomId,
1587        display_name: &DisplayName,
1588    ) -> Result<BTreeSet<OwnedUserId>> {
1589        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1590        let names = vec![self.encode_key(
1591            keys::DISPLAY_NAME,
1592            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1593        )];
1594
1595        Ok(self
1596            .acquire()
1597            .await?
1598            .get_display_names(room_id, names)
1599            .await?
1600            .into_iter()
1601            .next()
1602            .map(|(_, data)| self.deserialize_json(&data))
1603            .transpose()?
1604            .unwrap_or_default())
1605    }
1606
1607    async fn get_users_with_display_names<'a>(
1608        &self,
1609        room_id: &RoomId,
1610        display_names: &'a [DisplayName],
1611    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1612        let mut result = HashMap::new();
1613
1614        if display_names.is_empty() {
1615            return Ok(result);
1616        }
1617
1618        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1619        let mut names_map = display_names
1620            .iter()
1621            .flat_map(|display_name| {
1622                // We encode the display name as the `raw_str()` and the normalized string.
1623                //
1624                // This is for compatibility reasons since:
1625                //  1. Previously "Alice" and "alice" were considered to be distinct display
1626                //     names, while we now consider them to be the same so we need to merge the
1627                //     previously distinct buckets of user IDs.
1628                //  2. We can't do a migration to merge the previously distinct buckets of user
1629                //     IDs since the display names itself are hashed before they are persisted
1630                //     in the store.
1631                let raw =
1632                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1633                let normalized = display_name.as_normalized_str().map(|normalized| {
1634                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1635                });
1636
1637                iter::once(raw).chain(normalized.into_iter())
1638            })
1639            .collect::<BTreeMap<_, _>>();
1640        let names = names_map.keys().cloned().collect();
1641
1642        for (name, data) in
1643            self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1644        {
1645            let display_name =
1646                names_map.remove(name.as_slice()).expect("returned display names were requested");
1647            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1648
1649            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1650        }
1651
1652        Ok(result)
1653    }
1654
1655    async fn get_account_data_event(
1656        &self,
1657        event_type: GlobalAccountDataEventType,
1658    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1659        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1660        self.acquire()
1661            .await?
1662            .get_global_account_data(event_type)
1663            .await?
1664            .map(|value| self.deserialize_json(&value))
1665            .transpose()
1666    }
1667
1668    async fn get_room_account_data_event(
1669        &self,
1670        room_id: &RoomId,
1671        event_type: RoomAccountDataEventType,
1672    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1673        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1674        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1675        self.acquire()
1676            .await?
1677            .get_room_account_data(room_id, event_type)
1678            .await?
1679            .map(|value| self.deserialize_json(&value))
1680            .transpose()
1681    }
1682
1683    async fn get_user_room_receipt_event(
1684        &self,
1685        room_id: &RoomId,
1686        receipt_type: ReceiptType,
1687        thread: ReceiptThread,
1688        user_id: &UserId,
1689    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1690        let room_id = self.encode_key(keys::RECEIPT, room_id);
1691        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1692        // We cannot have a NULL primary key so we rely on serialization instead of the
1693        // string representation.
1694        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1695        let user_id = self.encode_key(keys::RECEIPT, user_id);
1696
1697        self.acquire()
1698            .await?
1699            .get_user_receipt(room_id, receipt_type, thread, user_id)
1700            .await?
1701            .map(|value| {
1702                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1703            })
1704            .transpose()
1705    }
1706
1707    async fn get_event_room_receipt_events(
1708        &self,
1709        room_id: &RoomId,
1710        receipt_type: ReceiptType,
1711        thread: ReceiptThread,
1712        event_id: &EventId,
1713    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1714        let room_id = self.encode_key(keys::RECEIPT, room_id);
1715        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1716        // We cannot have a NULL primary key so we rely on serialization instead of the
1717        // string representation.
1718        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1719        let event_id = self.encode_key(keys::RECEIPT, event_id);
1720
1721        self.acquire()
1722            .await?
1723            .get_event_receipts(room_id, receipt_type, thread, event_id)
1724            .await?
1725            .iter()
1726            .map(|value| {
1727                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1728            })
1729            .collect()
1730    }
1731
1732    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1733        self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1734    }
1735
1736    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1737        let conn = self.acquire().await?;
1738        let key = self.encode_custom_key(key);
1739        conn.set_kv_blob(key, value).await?;
1740        Ok(())
1741    }
1742
1743    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1744        let conn = self.acquire().await?;
1745        let key = self.encode_custom_key(key);
1746        let previous = conn.get_kv_blob(key.clone()).await?;
1747        conn.set_kv_blob(key, value).await?;
1748        Ok(previous)
1749    }
1750
1751    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1752        let conn = self.acquire().await?;
1753        let key = self.encode_custom_key(key);
1754        let previous = conn.get_kv_blob(key.clone()).await?;
1755        if previous.is_some() {
1756            conn.delete_kv_blob(key).await?;
1757        }
1758        Ok(previous)
1759    }
1760
1761    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1762        let this = self.clone();
1763        let room_id = room_id.to_owned();
1764
1765        let conn = self.acquire().await?;
1766
1767        conn.with_transaction(move |txn| -> Result<()> {
1768            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1769            txn.remove_room_info(&room_info_room_id)?;
1770
1771            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1772            txn.remove_room_state_events(&state_event_room_id, None)?;
1773
1774            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1775            txn.remove_room_members(&member_room_id, None)?;
1776
1777            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1778            txn.remove_room_profiles(&profile_room_id)?;
1779
1780            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1781            txn.remove_room_account_data(&room_account_data_room_id)?;
1782
1783            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1784            txn.remove_room_receipts(&receipt_room_id)?;
1785
1786            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1787            txn.remove_room_display_names(&display_name_room_id)?;
1788
1789            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1790            txn.remove_room_send_queue(&send_queue_room_id)?;
1791
1792            let dependent_send_queue_room_id =
1793                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1794            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1795
1796            Ok(())
1797        })
1798        .await?;
1799
1800        conn.vacuum().await
1801    }
1802
1803    async fn save_send_queue_request(
1804        &self,
1805        room_id: &RoomId,
1806        transaction_id: OwnedTransactionId,
1807        created_at: MilliSecondsSinceUnixEpoch,
1808        content: QueuedRequestKind,
1809        priority: usize,
1810    ) -> Result<(), Self::Error> {
1811        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1812        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1813
1814        let content = self.serialize_json(&content)?;
1815        // The transaction id is used both as a key (in remove/update) and a value (as
1816        // it's useful for the callers), so we keep it as is, and neither hash
1817        // it (with encode_key) or encrypt it (through serialize_value). After
1818        // all, it carries no personal information, so this is considered fine.
1819
1820        let created_at_ts: u64 = created_at.0.into();
1821        self.acquire()
1822            .await?
1823            .with_transaction(move |txn| {
1824                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1825                Ok(())
1826            })
1827            .await
1828    }
1829
1830    async fn update_send_queue_request(
1831        &self,
1832        room_id: &RoomId,
1833        transaction_id: &TransactionId,
1834        content: QueuedRequestKind,
1835    ) -> Result<bool, Self::Error> {
1836        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1837
1838        let content = self.serialize_json(&content)?;
1839        // See comment in [`Self::save_send_queue_event`] to understand why the
1840        // transaction id is neither encrypted or hashed.
1841        let transaction_id = transaction_id.to_string();
1842
1843        let num_updated = self.acquire()
1844            .await?
1845            .with_transaction(move |txn| {
1846                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1847            })
1848            .await?;
1849
1850        Ok(num_updated > 0)
1851    }
1852
1853    async fn remove_send_queue_request(
1854        &self,
1855        room_id: &RoomId,
1856        transaction_id: &TransactionId,
1857    ) -> Result<bool, Self::Error> {
1858        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1859
1860        // See comment in `save_send_queue_event`.
1861        let transaction_id = transaction_id.to_string();
1862
1863        let num_deleted = self
1864            .acquire()
1865            .await?
1866            .with_transaction(move |txn| {
1867                txn.prepare_cached(
1868                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1869                )?
1870                .execute((room_id, &transaction_id))
1871            })
1872            .await?;
1873
1874        Ok(num_deleted > 0)
1875    }
1876
1877    async fn load_send_queue_requests(
1878        &self,
1879        room_id: &RoomId,
1880    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1881        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1882
1883        // Note: ROWID is always present and is an auto-incremented integer counter. We
1884        // want to maintain the insertion order, so we can sort using it.
1885        // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
1886        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1887            .acquire()
1888            .await?
1889            .prepare(
1890                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1891                |mut stmt| {
1892                    stmt.query((room_id,))?
1893                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1894                        .collect()
1895                },
1896            )
1897            .await?;
1898
1899        let mut requests = Vec::with_capacity(res.len());
1900        for entry in res {
1901            let created_at = entry
1902                .4
1903                .and_then(UInt::new)
1904                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1905            requests.push(QueuedRequest {
1906                transaction_id: entry.0.into(),
1907                kind: self.deserialize_json(&entry.1)?,
1908                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1909                priority: entry.3,
1910                created_at,
1911            });
1912        }
1913
1914        Ok(requests)
1915    }
1916
1917    async fn update_send_queue_request_status(
1918        &self,
1919        room_id: &RoomId,
1920        transaction_id: &TransactionId,
1921        error: Option<QueueWedgeError>,
1922    ) -> Result<(), Self::Error> {
1923        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1924
1925        // See comment in `save_send_queue_event`.
1926        let transaction_id = transaction_id.to_string();
1927
1928        // Serialize the error to json bytes (encrypted if option is enabled) if set.
1929        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1930
1931        self.acquire()
1932            .await?
1933            .with_transaction(move |txn| {
1934                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1935                Ok(())
1936            })
1937            .await
1938    }
1939
1940    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1941        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
1942        // have to manually do the deduplication: indeed, for all X, encrypt(X)
1943        // != encrypted(X), since we use a nonce in the encryption process.
1944
1945        let res: Vec<Vec<u8>> = self
1946            .acquire()
1947            .await?
1948            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1949                stmt.query(())?.mapped(|row| row.get(0)).collect()
1950            })
1951            .await?;
1952
1953        // So we collect the results into a `BTreeSet` to perform the deduplication, and
1954        // then rejigger that into a vector.
1955        Ok(res
1956            .into_iter()
1957            .map(|entry| self.deserialize_value(&entry))
1958            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1959            .into_iter()
1960            .collect())
1961    }
1962
1963    async fn save_dependent_queued_request(
1964        &self,
1965        room_id: &RoomId,
1966        parent_txn_id: &TransactionId,
1967        own_txn_id: ChildTransactionId,
1968        created_at: MilliSecondsSinceUnixEpoch,
1969        content: DependentQueuedRequestKind,
1970    ) -> Result<()> {
1971        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1972        let content = self.serialize_json(&content)?;
1973
1974        // See comment in `save_send_queue_event`.
1975        let parent_txn_id = parent_txn_id.to_string();
1976        let own_txn_id = own_txn_id.to_string();
1977
1978        let created_at_ts: u64 = created_at.0.into();
1979        self.acquire()
1980            .await?
1981            .with_transaction(move |txn| {
1982                txn.prepare_cached(
1983                    r#"INSERT INTO dependent_send_queue_events
1984                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1985                       VALUES (?, ?, ?, ?, ?)"#,
1986                )?
1987                .execute((
1988                    room_id,
1989                    parent_txn_id,
1990                    own_txn_id,
1991                    content,
1992                    created_at_ts,
1993                ))?;
1994                Ok(())
1995            })
1996            .await
1997    }
1998
1999    async fn update_dependent_queued_request(
2000        &self,
2001        room_id: &RoomId,
2002        own_transaction_id: &ChildTransactionId,
2003        new_content: DependentQueuedRequestKind,
2004    ) -> Result<bool> {
2005        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2006        let content = self.serialize_json(&new_content)?;
2007
2008        // See comment in `save_send_queue_event`.
2009        let own_txn_id = own_transaction_id.to_string();
2010
2011        let num_updated = self
2012            .acquire()
2013            .await?
2014            .with_transaction(move |txn| {
2015                txn.prepare_cached(
2016                    r#"UPDATE dependent_send_queue_events
2017                       SET content = ?
2018                       WHERE own_transaction_id = ?
2019                       AND room_id = ?"#,
2020                )?
2021                .execute((content, own_txn_id, room_id))
2022            })
2023            .await?;
2024
2025        if num_updated > 1 {
2026            return Err(Error::InconsistentUpdate);
2027        }
2028
2029        Ok(num_updated == 1)
2030    }
2031
2032    async fn mark_dependent_queued_requests_as_ready(
2033        &self,
2034        room_id: &RoomId,
2035        parent_txn_id: &TransactionId,
2036        parent_key: SentRequestKey,
2037    ) -> Result<usize> {
2038        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2039        let parent_key = self.serialize_value(&parent_key)?;
2040
2041        // See comment in `save_send_queue_event`.
2042        let parent_txn_id = parent_txn_id.to_string();
2043
2044        self.acquire()
2045            .await?
2046            .with_transaction(move |txn| {
2047                Ok(txn.prepare_cached(
2048                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2049                )?
2050                .execute((parent_key, parent_txn_id, room_id))?)
2051            })
2052            .await
2053    }
2054
2055    async fn remove_dependent_queued_request(
2056        &self,
2057        room_id: &RoomId,
2058        txn_id: &ChildTransactionId,
2059    ) -> Result<bool> {
2060        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2061
2062        // See comment in `save_send_queue_event`.
2063        let txn_id = txn_id.to_string();
2064
2065        let num_deleted = self
2066            .acquire()
2067            .await?
2068            .with_transaction(move |txn| {
2069                txn.prepare_cached(
2070                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2071                )?
2072                .execute((txn_id, room_id))
2073            })
2074            .await?;
2075
2076        Ok(num_deleted > 0)
2077    }
2078
2079    async fn load_dependent_queued_requests(
2080        &self,
2081        room_id: &RoomId,
2082    ) -> Result<Vec<DependentQueuedRequest>> {
2083        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2084
2085        // Note: transaction_id is not encoded, see why in `save_send_queue_event`.
2086        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2087            .acquire()
2088            .await?
2089            .prepare(
2090                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2091                |mut stmt| {
2092                    stmt.query((room_id,))?
2093                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2094                        .collect()
2095                },
2096            )
2097            .await?;
2098
2099        let mut dependent_events = Vec::with_capacity(res.len());
2100        for entry in res {
2101            let created_at = entry
2102                .4
2103                .and_then(UInt::new)
2104                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2105            dependent_events.push(DependentQueuedRequest {
2106                own_transaction_id: entry.0.into(),
2107                parent_transaction_id: entry.1.into(),
2108                parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2109                kind: self.deserialize_json(&entry.3)?,
2110                created_at,
2111            });
2112        }
2113
2114        Ok(dependent_events)
2115    }
2116}
2117
2118#[derive(Debug, Clone, Serialize, Deserialize)]
2119struct ReceiptData {
2120    receipt: Receipt,
2121    event_id: OwnedEventId,
2122    user_id: OwnedUserId,
2123}
2124
2125#[cfg(test)]
2126mod tests {
2127    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2128
2129    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2130    use once_cell::sync::Lazy;
2131    use tempfile::{tempdir, TempDir};
2132
2133    use super::SqliteStateStore;
2134
2135    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2136    static NUM: AtomicU32 = AtomicU32::new(0);
2137
2138    async fn get_store() -> Result<impl StateStore, StoreError> {
2139        let name = NUM.fetch_add(1, SeqCst).to_string();
2140        let tmpdir_path = TMP_DIR.path().join(name);
2141
2142        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2143
2144        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2145    }
2146
2147    statestore_integration_tests!();
2148}
2149
2150#[cfg(test)]
2151mod encrypted_tests {
2152    use std::{
2153        path::PathBuf,
2154        sync::atomic::{AtomicU32, Ordering::SeqCst},
2155    };
2156
2157    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2158    use matrix_sdk_test::async_test;
2159    use once_cell::sync::Lazy;
2160    use tempfile::{tempdir, TempDir};
2161
2162    use super::SqliteStateStore;
2163    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2164
2165    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2166    static NUM: AtomicU32 = AtomicU32::new(0);
2167
2168    fn new_state_store_workspace() -> PathBuf {
2169        let name = NUM.fetch_add(1, SeqCst).to_string();
2170        TMP_DIR.path().join(name)
2171    }
2172
2173    async fn get_store() -> Result<impl StateStore, StoreError> {
2174        let tmpdir_path = new_state_store_workspace();
2175
2176        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2177
2178        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2179            .await
2180            .unwrap())
2181    }
2182
2183    #[async_test]
2184    async fn test_pool_size() {
2185        let tmpdir_path = new_state_store_workspace();
2186        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2187
2188        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2189
2190        assert_eq!(store.pool.status().max_size, 42);
2191    }
2192
2193    #[async_test]
2194    async fn test_cache_size() {
2195        let tmpdir_path = new_state_store_workspace();
2196        let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2197
2198        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2199
2200        let conn = store.pool.get().await.unwrap();
2201        let cache_size =
2202            conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2203
2204        // The value passed to `SqliteStoreConfig` is in bytes. Check it is
2205        // converted to kibibytes. Also, it must be a negative value because it
2206        // _is_ the size in kibibytes, not in page size.
2207        assert_eq!(cache_size, -(1500 / 1024));
2208    }
2209
2210    #[async_test]
2211    async fn test_journal_size_limit() {
2212        let tmpdir_path = new_state_store_workspace();
2213        let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2214
2215        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2216
2217        let conn = store.pool.get().await.unwrap();
2218        let journal_size_limit = conn
2219            .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2220            .await
2221            .unwrap();
2222
2223        // The value passed to `SqliteStoreConfig` is in bytes. It stays in
2224        // bytes in SQLite.
2225        assert_eq!(journal_size_limit, 1500);
2226    }
2227
2228    statestore_integration_tests!();
2229}
2230
2231#[cfg(test)]
2232mod migration_tests {
2233    use std::{
2234        path::{Path, PathBuf},
2235        sync::{
2236            atomic::{AtomicU32, Ordering::SeqCst},
2237            Arc,
2238        },
2239    };
2240
2241    use as_variant::as_variant;
2242    use deadpool_sqlite::Runtime;
2243    use matrix_sdk_base::{
2244        media::{MediaFormat, MediaRequestParameters},
2245        store::{
2246            ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2247            SerializableEventContent,
2248        },
2249        sync::UnreadNotificationsCount,
2250        RoomState, StateStore,
2251    };
2252    use matrix_sdk_test::async_test;
2253    use once_cell::sync::Lazy;
2254    use ruma::{
2255        events::{
2256            room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2257            StateEventType,
2258        },
2259        room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2260        RoomId, TransactionId, UserId,
2261    };
2262    use rusqlite::Transaction;
2263    use serde::{Deserialize, Serialize};
2264    use serde_json::json;
2265    use tempfile::{tempdir, TempDir};
2266    use tokio::fs;
2267
2268    use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2269    use crate::{
2270        error::{Error, Result},
2271        utils::{SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2272        OpenStoreError,
2273    };
2274
2275    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2276    static NUM: AtomicU32 = AtomicU32::new(0);
2277    const SECRET: &str = "secret";
2278
2279    fn new_path() -> PathBuf {
2280        let name = NUM.fetch_add(1, SeqCst).to_string();
2281        TMP_DIR.path().join(name)
2282    }
2283
2284    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2285        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2286
2287        let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2288        // use default pool config
2289
2290        let pool = config.create_pool(Runtime::Tokio1).unwrap();
2291        let conn = pool.get().await?;
2292
2293        init(&conn).await?;
2294
2295        let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2296        let this = SqliteStateStore { store_cipher, pool };
2297        this.run_migrations(&conn, 1, Some(version)).await?;
2298
2299        Ok(this)
2300    }
2301
2302    fn room_info_v1_json(
2303        room_id: &RoomId,
2304        state: RoomState,
2305        name: Option<&str>,
2306        creator: Option<&UserId>,
2307    ) -> serde_json::Value {
2308        // Test with name set or not.
2309        let name_content = match name {
2310            Some(name) => json!({ "name": name }),
2311            None => json!({ "name": null }),
2312        };
2313        // Test with creator set or not.
2314        let create_content = match creator {
2315            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2316            None => RoomCreateEventContent::new_v11(),
2317        };
2318
2319        json!({
2320            "room_id": room_id,
2321            "room_type": state,
2322            "notification_counts": UnreadNotificationsCount::default(),
2323            "summary": {
2324                "heroes": [],
2325                "joined_member_count": 0,
2326                "invited_member_count": 0,
2327            },
2328            "members_synced": false,
2329            "base_info": {
2330                "dm_targets": [],
2331                "max_power_level": 100,
2332                "name": {
2333                    "Original": {
2334                        "content": name_content,
2335                    },
2336                },
2337                "create": {
2338                    "Original": {
2339                        "content": create_content,
2340                    }
2341                }
2342            },
2343        })
2344    }
2345
2346    #[async_test]
2347    pub async fn test_migrating_v1_to_v2() {
2348        let path = new_path();
2349        // Create and populate db.
2350        {
2351            let db = create_fake_db(&path, 1).await.unwrap();
2352            let conn = db.pool.get().await.unwrap();
2353
2354            let this = db.clone();
2355            conn.with_transaction(move |txn| {
2356                for i in 0..5 {
2357                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2358                    let (state, stripped) =
2359                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2360                    let info = room_info_v1_json(&room_id, state, None, None);
2361
2362                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2363                    let data = this.serialize_json(&info)?;
2364
2365                    txn.prepare_cached(
2366                        "INSERT INTO room_info (room_id, stripped, data)
2367                         VALUES (?, ?, ?)",
2368                    )?
2369                    .execute((room_id, stripped, data))?;
2370                }
2371
2372                Result::<_, Error>::Ok(())
2373            })
2374            .await
2375            .unwrap();
2376        }
2377
2378        // This transparently migrates to the latest version.
2379        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2380
2381        // Check all room infos are there.
2382        assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2383    }
2384
2385    // Add a room in version 2 format of the state store.
2386    fn add_room_v2(
2387        this: &SqliteStateStore,
2388        txn: &Transaction<'_>,
2389        room_id: &RoomId,
2390        name: Option<&str>,
2391        create_creator: Option<&UserId>,
2392        create_sender: Option<&UserId>,
2393    ) -> Result<(), Error> {
2394        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2395
2396        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2397        let encoded_state =
2398            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2399        let data = this.serialize_json(&room_info_json)?;
2400
2401        txn.prepare_cached(
2402            "INSERT INTO room_info (room_id, state, data)
2403             VALUES (?, ?, ?)",
2404        )?
2405        .execute((encoded_room_id, encoded_state, data))?;
2406
2407        // Test with or without `m.room.create` event in the room state.
2408        let Some(create_sender) = create_sender else {
2409            return Ok(());
2410        };
2411
2412        let create_content = match create_creator {
2413            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2414            None => RoomCreateEventContent::new_v11(),
2415        };
2416
2417        let event_id = EventId::new(server_name!("dummy.local"));
2418        let create_event = json!({
2419            "content": create_content,
2420            "event_id": event_id,
2421            "sender": create_sender.to_owned(),
2422            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2423            "state_key": "",
2424            "type": "m.room.create",
2425            "unsigned": {},
2426        });
2427
2428        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2429        let encoded_event_type =
2430            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2431        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2432        let stripped = false;
2433        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2434        let data = this.serialize_json(&create_event)?;
2435
2436        txn.prepare_cached(
2437            "INSERT
2438             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2439             VALUES (?, ?, ?, ?, ?, ?)",
2440        )?
2441        .execute((
2442            encoded_room_id,
2443            encoded_event_type,
2444            encoded_state_key,
2445            stripped,
2446            encoded_event_id,
2447            data,
2448        ))?;
2449
2450        Ok(())
2451    }
2452
2453    #[async_test]
2454    pub async fn test_migrating_v2_to_v3() {
2455        let path = new_path();
2456
2457        // Room A: with name, creator and sender.
2458        let room_a_id = room_id!("!room_a:dummy.local");
2459        let room_a_name = "Room A";
2460        let room_a_creator = user_id!("@creator:dummy.local");
2461        // Use a different sender to check that sender is used over creator in
2462        // migration.
2463        let room_a_create_sender = user_id!("@sender:dummy.local");
2464
2465        // Room B: without name, creator and sender.
2466        let room_b_id = room_id!("!room_b:dummy.local");
2467
2468        // Room C: only with sender.
2469        let room_c_id = room_id!("!room_c:dummy.local");
2470        let room_c_create_sender = user_id!("@creator:dummy.local");
2471
2472        // Create and populate db.
2473        {
2474            let db = create_fake_db(&path, 2).await.unwrap();
2475            let conn = db.pool.get().await.unwrap();
2476
2477            let this = db.clone();
2478            conn.with_transaction(move |txn| {
2479                add_room_v2(
2480                    &this,
2481                    txn,
2482                    room_a_id,
2483                    Some(room_a_name),
2484                    Some(room_a_creator),
2485                    Some(room_a_create_sender),
2486                )?;
2487                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2488                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2489
2490                Result::<_, Error>::Ok(())
2491            })
2492            .await
2493            .unwrap();
2494        }
2495
2496        // This transparently migrates to the latest version.
2497        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2498
2499        // Check all room infos are there.
2500        let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2501        assert_eq!(room_infos.len(), 3);
2502
2503        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2504        assert_eq!(room_a.name(), Some(room_a_name));
2505        assert_eq!(room_a.creator(), Some(room_a_create_sender));
2506
2507        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2508        assert_eq!(room_b.name(), None);
2509        assert_eq!(room_b.creator(), None);
2510
2511        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2512        assert_eq!(room_c.name(), None);
2513        assert_eq!(room_c.creator(), Some(room_c_create_sender));
2514    }
2515
2516    #[async_test]
2517    pub async fn test_migrating_v7_to_v9() {
2518        let path = new_path();
2519
2520        let room_id = room_id!("!room_a:dummy.local");
2521        let wedged_event_transaction_id = TransactionId::new();
2522        let local_event_transaction_id = TransactionId::new();
2523
2524        // Create and populate db.
2525        {
2526            let db = create_fake_db(&path, 7).await.unwrap();
2527            let conn = db.pool.get().await.unwrap();
2528
2529            let wedge_tx = wedged_event_transaction_id.clone();
2530            let local_tx = local_event_transaction_id.clone();
2531
2532            conn.with_transaction(move |txn| {
2533                add_dependent_send_queue_event_v7(
2534                    &db,
2535                    txn,
2536                    room_id,
2537                    &local_tx,
2538                    ChildTransactionId::new(),
2539                    DependentQueuedRequestKind::RedactEvent,
2540                )?;
2541                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2542                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2543                Result::<_, Error>::Ok(())
2544            })
2545            .await
2546            .unwrap();
2547        }
2548
2549        // This transparently migrates to the latest version, which clears up all
2550        // requests and dependent requests.
2551        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2552
2553        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2554        assert!(requests.is_empty());
2555
2556        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2557        assert!(dependent_requests.is_empty());
2558    }
2559
2560    fn add_send_queue_event_v7(
2561        this: &SqliteStateStore,
2562        txn: &Transaction<'_>,
2563        transaction_id: &TransactionId,
2564        room_id: &RoomId,
2565        is_wedged: bool,
2566    ) -> Result<(), Error> {
2567        let content =
2568            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2569
2570        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2571        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2572
2573        let content = this.serialize_json(&content)?;
2574
2575        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2576            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2577
2578        Ok(())
2579    }
2580
2581    fn add_dependent_send_queue_event_v7(
2582        this: &SqliteStateStore,
2583        txn: &Transaction<'_>,
2584        room_id: &RoomId,
2585        parent_txn_id: &TransactionId,
2586        own_txn_id: ChildTransactionId,
2587        content: DependentQueuedRequestKind,
2588    ) -> Result<(), Error> {
2589        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2590
2591        let parent_txn_id = parent_txn_id.to_string();
2592        let own_txn_id = own_txn_id.to_string();
2593        let content = this.serialize_json(&content)?;
2594
2595        txn.prepare_cached(
2596            "INSERT INTO dependent_send_queue_events
2597                         (room_id, parent_transaction_id, own_transaction_id, content)
2598                       VALUES (?, ?, ?, ?)",
2599        )?
2600        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2601
2602        Ok(())
2603    }
2604
2605    #[derive(Clone, Debug, Serialize, Deserialize)]
2606    pub enum LegacyDependentQueuedRequestKind {
2607        UploadFileWithThumbnail {
2608            content_type: String,
2609            cache_key: MediaRequestParameters,
2610            related_to: OwnedTransactionId,
2611        },
2612    }
2613
2614    #[async_test]
2615    pub async fn test_dependent_queued_request_variant_renaming() {
2616        let path = new_path();
2617        let db = create_fake_db(&path, 7).await.unwrap();
2618
2619        let cache_key = MediaRequestParameters {
2620            format: MediaFormat::File,
2621            source: MediaSource::Plain("https://server.local/foobar".into()),
2622        };
2623        let related_to = TransactionId::new();
2624        let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2625            content_type: "image/png".to_owned(),
2626            cache_key,
2627            related_to: related_to.clone(),
2628        };
2629
2630        let data = db
2631            .serialize_json(&request)
2632            .expect("should be able to serialize legacy dependent request");
2633        let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2634            "should be able to deserialize dependent request from legacy dependent request",
2635        );
2636
2637        as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2638            assert_eq!(de_related_to, related_to);
2639        });
2640    }
2641}