matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    str::FromStr as _,
7    sync::Arc,
8};
9
10use async_trait::async_trait;
11use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
12use matrix_sdk_base::{
13    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
14    store::{
15        compare_thread_subscription_bump_stamps, migration_helpers::RoomInfoV1, ChildTransactionId,
16        DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, QueuedRequest,
17        QueuedRequestKind, RoomLoadSettings, SentRequestKey, StoredThreadSubscription,
18        ThreadSubscriptionStatus,
19    },
20    MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
21    StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
22};
23use matrix_sdk_store_encryption::StoreCipher;
24use ruma::{
25    canonical_json::{redact, RedactedBecause},
26    events::{
27        presence::PresenceEvent,
28        receipt::{Receipt, ReceiptThread, ReceiptType},
29        room::{
30            create::RoomCreateEventContent,
31            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
32        },
33        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
34        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
35    },
36    serde::Raw,
37    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
38    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
39};
40use rusqlite::{OptionalExtension, Transaction};
41use serde::{Deserialize, Serialize};
42use tokio::fs;
43use tracing::{debug, trace, warn};
44
45use crate::{
46    error::{Error, Result},
47    utils::{
48        repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
49        SqliteKeyValueStoreConnExt,
50    },
51    OpenStoreError, SqliteStoreConfig,
52};
53
54mod keys {
55    // Tables
56    pub const KV_BLOB: &str = "kv_blob";
57    pub const ROOM_INFO: &str = "room_info";
58    pub const STATE_EVENT: &str = "state_event";
59    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
60    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
61    pub const MEMBER: &str = "member";
62    pub const PROFILE: &str = "profile";
63    pub const RECEIPT: &str = "receipt";
64    pub const DISPLAY_NAME: &str = "display_name";
65    pub const SEND_QUEUE: &str = "send_queue_events";
66    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
67    pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
68}
69
70/// The filename used for the SQLITE database file used by the state store.
71pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
72
73/// Identifier of the latest database version.
74///
75/// This is used to figure whether the SQLite database requires a migration.
76/// Every new SQL migration should imply a bump of this number, and changes in
77/// the [`SqliteStateStore::run_migrations`] function.
78const DATABASE_VERSION: u8 = 14;
79
80/// An SQLite-based state store.
81#[derive(Clone)]
82pub struct SqliteStateStore {
83    store_cipher: Option<Arc<StoreCipher>>,
84    pool: SqlitePool,
85}
86
87#[cfg(not(tarpaulin_include))]
88impl fmt::Debug for SqliteStateStore {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
91    }
92}
93
94impl SqliteStateStore {
95    /// Open the SQLite-based state store at the given path using the given
96    /// passphrase to encrypt private data.
97    pub async fn open(
98        path: impl AsRef<Path>,
99        passphrase: Option<&str>,
100    ) -> Result<Self, OpenStoreError> {
101        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
102    }
103
104    /// Open the SQLite-based state store with the config open config.
105    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
106        let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
107
108        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
109
110        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
111        config.pool = Some(pool_config);
112
113        let pool = config.create_pool(Runtime::Tokio1)?;
114
115        let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
116        this.pool.get().await?.apply_runtime_config(runtime_config).await?;
117
118        Ok(this)
119    }
120
121    /// Create an SQLite-based state store using the given SQLite database pool.
122    /// The given passphrase will be used to encrypt private data.
123    pub async fn open_with_pool(
124        pool: SqlitePool,
125        passphrase: Option<&str>,
126    ) -> Result<Self, OpenStoreError> {
127        let conn = pool.get().await?;
128
129        let mut version = conn.db_version().await?;
130
131        if version == 0 {
132            init(&conn).await?;
133            version = 1;
134        }
135
136        let store_cipher = match passphrase {
137            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
138            None => None,
139        };
140        let this = Self { store_cipher, pool };
141        this.run_migrations(&conn, version, None).await?;
142
143        Ok(this)
144    }
145
146    /// Run database migrations from the given `from` version to the given `to`
147    /// version
148    ///
149    /// If `to` is `None`, the current database version will be used.
150    async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
151        let to = to.unwrap_or(DATABASE_VERSION);
152
153        if from < to {
154            debug!(version = from, new_version = to, "Upgrading database");
155        } else {
156            return Ok(());
157        }
158
159        if from < 2 && to >= 2 {
160            let this = self.clone();
161            conn.with_transaction(move |txn| {
162                // Create new table.
163                txn.execute_batch(include_str!(
164                    "../migrations/state_store/002_a_create_new_room_info.sql"
165                ))?;
166
167                // Migrate data to new table.
168                for data in txn
169                    .prepare("SELECT data FROM room_info")?
170                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
171                {
172                    let data = data?;
173                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
174
175                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
176                    let state = this
177                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
178                    txn.prepare_cached(
179                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
180                         VALUES (?, ?, ?)",
181                    )?
182                    .execute((room_id, state, data))?;
183                }
184
185                // Replace old table.
186                txn.execute_batch(include_str!(
187                    "../migrations/state_store/002_b_replace_room_info.sql"
188                ))?;
189
190                txn.set_db_version(2)?;
191                Result::<_, Error>::Ok(())
192            })
193            .await?;
194        }
195
196        // Migration to v3: RoomInfo format has changed.
197        if from < 3 && to >= 3 {
198            let this = self.clone();
199            conn.with_transaction(move |txn| {
200                // Migrate data .
201                for data in txn
202                    .prepare("SELECT data FROM room_info")?
203                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
204                {
205                    let data = data?;
206                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
207
208                    // Get the `m.room.create` event from the room state.
209                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
210                    let event_type =
211                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
212                    let create_res = txn
213                        .prepare(
214                            "SELECT stripped, data FROM state_event
215                             WHERE room_id = ? AND event_type = ?",
216                        )?
217                        .query_row([room_id, event_type], |row| {
218                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
219                        })
220                        .optional()?;
221
222                    let create = create_res.and_then(|(stripped, data)| {
223                        let create = if stripped {
224                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
225                                this.deserialize_json(&data).ok()?,
226                            )
227                        } else {
228                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
229                        };
230                        Some(create)
231                    });
232
233                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
234
235                    let data = this.serialize_json(&migrated_room_info)?;
236                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
237                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
238                        .execute((data, room_id))?;
239                }
240
241                txn.set_db_version(3)?;
242                Result::<_, Error>::Ok(())
243            })
244            .await?;
245        }
246
247        if from < 4 && to >= 4 {
248            conn.with_transaction(move |txn| {
249                // Create new table.
250                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
251                txn.set_db_version(4)
252            })
253            .await?;
254        }
255
256        if from < 5 && to >= 5 {
257            conn.with_transaction(move |txn| {
258                // Create new table.
259                txn.execute_batch(include_str!(
260                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
261                ))?;
262                txn.set_db_version(4)
263            })
264            .await?;
265        }
266
267        if from < 6 && to >= 6 {
268            conn.with_transaction(move |txn| {
269                // Create new table.
270                txn.execute_batch(include_str!(
271                    "../migrations/state_store/005_send_queue_dependent_events.sql"
272                ))?;
273                txn.set_db_version(6)
274            })
275            .await?;
276        }
277
278        if from < 7 && to >= 7 {
279            conn.with_transaction(move |txn| {
280                // Drop media table.
281                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
282                txn.set_db_version(7)
283            })
284            .await?;
285        }
286
287        if from < 8 && to >= 8 {
288            // Replace all existing wedged events with a generic error.
289            let error = QueueWedgeError::GenericApiError {
290                msg: "local echo failed to send in a previous session".into(),
291            };
292            let default_err = self.serialize_value(&error)?;
293
294            conn.with_transaction(move |txn| {
295                // Update send queue table to persist the wedge reason if any.
296                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
297
298                // Migrate the data, add a generic error for currently wedged events
299
300                for wedged_entries in txn
301                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
302                    .query_map((), |row| {
303                        Ok(
304                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
305                        )
306                    })? {
307
308                    let (room_id, transaction_id) = wedged_entries?;
309
310                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
311                        .execute((default_err.clone(), room_id, transaction_id))?;
312                }
313
314
315                // Clean up the table now that data is migrated
316                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
317
318                txn.set_db_version(8)
319            })
320                .await?;
321        }
322
323        if from < 9 && to >= 9 {
324            conn.with_transaction(move |txn| {
325                // Run the migration.
326                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
327                txn.set_db_version(9)
328            })
329            .await?;
330        }
331
332        if from < 10 && to >= 10 {
333            conn.with_transaction(move |txn| {
334                // Run the migration.
335                txn.execute_batch(include_str!(
336                    "../migrations/state_store/009_send_queue_priority.sql"
337                ))?;
338                txn.set_db_version(10)
339            })
340            .await?;
341        }
342
343        if from < 11 && to >= 11 {
344            conn.with_transaction(move |txn| {
345                // Run the migration.
346                txn.execute_batch(include_str!(
347                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
348                ))?;
349                txn.set_db_version(11)
350            })
351            .await?;
352        }
353
354        if from < 12 && to >= 12 {
355            // Defragment the DB and optimize its size on the filesystem.
356            // This should have been run in the migration for version 7, to reduce the size
357            // of the DB as we removed the media cache.
358            conn.vacuum().await?;
359            conn.set_kv("version", vec![12]).await?;
360        }
361
362        if from < 13 && to >= 13 {
363            conn.with_transaction(move |txn| {
364                // Run the migration.
365                txn.execute_batch(include_str!(
366                    "../migrations/state_store/011_thread_subscriptions.sql"
367                ))?;
368                txn.set_db_version(13)
369            })
370            .await?;
371        }
372
373        if from < 14 && to >= 14 {
374            conn.with_transaction(move |txn| {
375                // Run the migration.
376                txn.execute_batch(include_str!(
377                    "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
378                ))?;
379                txn.set_db_version(14)
380            })
381            .await?;
382        }
383
384        Ok(())
385    }
386
387    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
388        let key_s = match key {
389            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
390            StateStoreDataKey::ServerInfo => Cow::Borrowed(StateStoreDataKey::SERVER_INFO),
391            StateStoreDataKey::Filter(f) => {
392                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
393            }
394            StateStoreDataKey::UserAvatarUrl(u) => {
395                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
396            }
397            StateStoreDataKey::RecentlyVisitedRooms(b) => {
398                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
399            }
400            StateStoreDataKey::UtdHookManagerData => {
401                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
402            }
403            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
404                Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
405            }
406            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
407                if let Some(thread_root) = thread_root {
408                    Cow::Owned(format!(
409                        "{}:{room_id}:{thread_root}",
410                        StateStoreDataKey::COMPOSER_DRAFT
411                    ))
412                } else {
413                    Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
414                }
415            }
416            StateStoreDataKey::SeenKnockRequests(room_id) => {
417                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
418            }
419            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
420                Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
421            }
422        };
423
424        self.encode_key(keys::KV_BLOB, &*key_s)
425    }
426
427    fn encode_presence_key(&self, user_id: &UserId) -> Key {
428        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
429    }
430
431    fn encode_custom_key(&self, key: &[u8]) -> Key {
432        let mut full_key = b"custom:".to_vec();
433        full_key.extend(key);
434        self.encode_key(keys::KV_BLOB, full_key)
435    }
436
437    async fn acquire(&self) -> Result<SqliteAsyncConn> {
438        Ok(self.pool.get().await?)
439    }
440
441    fn remove_maybe_stripped_room_data(
442        &self,
443        txn: &Transaction<'_>,
444        room_id: &RoomId,
445        stripped: bool,
446    ) -> rusqlite::Result<()> {
447        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
448        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
449
450        let member_room_id = self.encode_key(keys::MEMBER, room_id);
451        txn.remove_room_members(&member_room_id, Some(stripped))
452    }
453}
454
455impl EncryptableStore for SqliteStateStore {
456    fn get_cypher(&self) -> Option<&StoreCipher> {
457        self.store_cipher.as_deref()
458    }
459}
460
461/// Initialize the database.
462async fn init(conn: &SqliteAsyncConn) -> Result<()> {
463    // First turn on WAL mode, this can't be done in the transaction, it fails with
464    // the error message: "cannot change into wal mode from within a transaction".
465    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
466    conn.with_transaction(|txn| {
467        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
468        txn.set_db_version(1)?;
469
470        Ok(())
471    })
472    .await
473}
474
475trait SqliteConnectionStateStoreExt {
476    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
477
478    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
479
480    fn set_room_account_data(
481        &self,
482        room_id: &[u8],
483        event_type: &[u8],
484        data: &[u8],
485    ) -> rusqlite::Result<()>;
486    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
487
488    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
489    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
490    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
491
492    fn set_state_event(
493        &self,
494        room_id: &[u8],
495        event_type: &[u8],
496        state_key: &[u8],
497        stripped: bool,
498        event_id: Option<&[u8]>,
499        data: &[u8],
500    ) -> rusqlite::Result<()>;
501    fn get_state_event_by_id(
502        &self,
503        room_id: &[u8],
504        event_id: &[u8],
505    ) -> rusqlite::Result<Option<Vec<u8>>>;
506    fn remove_room_state_events(
507        &self,
508        room_id: &[u8],
509        stripped: Option<bool>,
510    ) -> rusqlite::Result<()>;
511
512    fn set_member(
513        &self,
514        room_id: &[u8],
515        user_id: &[u8],
516        membership: &[u8],
517        stripped: bool,
518        data: &[u8],
519    ) -> rusqlite::Result<()>;
520    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
521
522    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
523    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
524    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
525
526    fn set_receipt(
527        &self,
528        room_id: &[u8],
529        user_id: &[u8],
530        receipt_type: &[u8],
531        thread_id: &[u8],
532        event_id: &[u8],
533        data: &[u8],
534    ) -> rusqlite::Result<()>;
535    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
536
537    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
538    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
539    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
540    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
541    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
542}
543
544impl SqliteConnectionStateStoreExt for rusqlite::Connection {
545    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
546        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
547        Ok(())
548    }
549
550    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
551        self.prepare_cached(
552            "INSERT OR REPLACE INTO global_account_data (event_type, data)
553             VALUES (?, ?)",
554        )?
555        .execute((event_type, data))?;
556        Ok(())
557    }
558
559    fn set_room_account_data(
560        &self,
561        room_id: &[u8],
562        event_type: &[u8],
563        data: &[u8],
564    ) -> rusqlite::Result<()> {
565        self.prepare_cached(
566            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
567             VALUES (?, ?, ?)",
568        )?
569        .execute((room_id, event_type, data))?;
570        Ok(())
571    }
572
573    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
574        self.prepare(
575            "DELETE FROM room_account_data
576             WHERE room_id = ?",
577        )?
578        .execute((room_id,))?;
579        Ok(())
580    }
581
582    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
583        self.prepare_cached(
584            "INSERT OR REPLACE INTO room_info (room_id, state, data)
585             VALUES (?, ?, ?)",
586        )?
587        .execute((room_id, state, data))?;
588        Ok(())
589    }
590
591    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
592        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
593            .optional()
594    }
595
596    /// Remove the room info for the given room.
597    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
598        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
599        Ok(())
600    }
601
602    fn set_state_event(
603        &self,
604        room_id: &[u8],
605        event_type: &[u8],
606        state_key: &[u8],
607        stripped: bool,
608        event_id: Option<&[u8]>,
609        data: &[u8],
610    ) -> rusqlite::Result<()> {
611        self.prepare_cached(
612            "INSERT OR REPLACE
613             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
614             VALUES (?, ?, ?, ?, ?, ?)",
615        )?
616        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
617        Ok(())
618    }
619
620    fn get_state_event_by_id(
621        &self,
622        room_id: &[u8],
623        event_id: &[u8],
624    ) -> rusqlite::Result<Option<Vec<u8>>> {
625        self.query_row(
626            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
627            (room_id, event_id),
628            |row| row.get(0),
629        )
630        .optional()
631    }
632
633    /// Remove state events for the given room.
634    ///
635    /// If `stripped` is `Some()`, only removes state events for the given
636    /// stripped state. Otherwise, state events are removed regardless of the
637    /// stripped state.
638    fn remove_room_state_events(
639        &self,
640        room_id: &[u8],
641        stripped: Option<bool>,
642    ) -> rusqlite::Result<()> {
643        if let Some(stripped) = stripped {
644            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
645                .execute((room_id, stripped))?;
646        } else {
647            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
648                .execute((room_id,))?;
649        }
650        Ok(())
651    }
652
653    fn set_member(
654        &self,
655        room_id: &[u8],
656        user_id: &[u8],
657        membership: &[u8],
658        stripped: bool,
659        data: &[u8],
660    ) -> rusqlite::Result<()> {
661        self.prepare_cached(
662            "INSERT OR REPLACE
663             INTO member (room_id, user_id, membership, stripped, data)
664             VALUES (?, ?, ?, ?, ?)",
665        )?
666        .execute((room_id, user_id, membership, stripped, data))?;
667        Ok(())
668    }
669
670    /// Remove members for the given room.
671    ///
672    /// If `stripped` is `Some()`, only removes members for the given stripped
673    /// state. Otherwise, members are removed regardless of the stripped state.
674    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
675        if let Some(stripped) = stripped {
676            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
677                .execute((room_id, stripped))?;
678        } else {
679            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
680        }
681        Ok(())
682    }
683
684    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
685        self.prepare_cached(
686            "INSERT OR REPLACE
687             INTO profile (room_id, user_id, data)
688             VALUES (?, ?, ?)",
689        )?
690        .execute((room_id, user_id, data))?;
691        Ok(())
692    }
693
694    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
695        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
696        Ok(())
697    }
698
699    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
700        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
701            .execute((room_id, user_id))?;
702        Ok(())
703    }
704
705    fn set_receipt(
706        &self,
707        room_id: &[u8],
708        user_id: &[u8],
709        receipt_type: &[u8],
710        thread: &[u8],
711        event_id: &[u8],
712        data: &[u8],
713    ) -> rusqlite::Result<()> {
714        self.prepare_cached(
715            "INSERT OR REPLACE
716             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
717             VALUES (?, ?, ?, ?, ?, ?)",
718        )?
719        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
720        Ok(())
721    }
722
723    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
724        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
725        Ok(())
726    }
727
728    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
729        self.prepare_cached(
730            "INSERT OR REPLACE
731             INTO display_name (room_id, name, data)
732             VALUES (?, ?, ?)",
733        )?
734        .execute((room_id, name, data))?;
735        Ok(())
736    }
737
738    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
739        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
740            .execute((room_id, name))?;
741        Ok(())
742    }
743
744    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
745        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
746        Ok(())
747    }
748
749    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
750        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
751        Ok(())
752    }
753
754    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
755        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
756            .execute((room_id,))?;
757        Ok(())
758    }
759}
760
761#[async_trait]
762trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
763    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
764        Ok(self
765            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
766            .await
767            .optional()?)
768    }
769
770    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
771        let keys_length = keys.len();
772
773        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
774            let sql_params = repeat_vars(keys.len());
775            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
776
777            let params = rusqlite::params_from_iter(keys);
778
779            Ok(txn
780                .prepare(&sql)?
781                .query(params)?
782                .mapped(|row| row.get(0))
783                .collect::<Result<_, _>>()?)
784        })
785        .await
786    }
787
788    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
789
790    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
791        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
792        Ok(())
793    }
794
795    async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
796        Ok(match room_id {
797            None => {
798                self.prepare("SELECT data FROM room_info", move |mut stmt| {
799                    stmt.query_map((), |row| row.get(0))?.collect()
800                })
801                .await?
802            }
803
804            Some(room_id) => {
805                self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
806                    stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
807                })
808                .await?
809            }
810        })
811    }
812
813    async fn get_maybe_stripped_state_events_for_keys(
814        &self,
815        room_id: Key,
816        event_type: Key,
817        state_keys: Vec<Key>,
818    ) -> Result<Vec<(bool, Vec<u8>)>> {
819        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
820            let sql_params = repeat_vars(state_keys.len());
821            let sql = format!(
822                "SELECT stripped, data FROM state_event
823                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
824            );
825
826            let params = rusqlite::params_from_iter(
827                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
828            );
829
830            Ok(txn
831                .prepare(&sql)?
832                .query(params)?
833                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
834                .collect::<Result<_, _>>()?)
835        })
836        .await
837    }
838
839    async fn get_maybe_stripped_state_events(
840        &self,
841        room_id: Key,
842        event_type: Key,
843    ) -> Result<Vec<(bool, Vec<u8>)>> {
844        Ok(self
845            .prepare(
846                "SELECT stripped, data FROM state_event
847                 WHERE room_id = ? AND event_type = ?",
848                |mut stmt| {
849                    stmt.query((room_id, event_type))?
850                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
851                        .collect()
852                },
853            )
854            .await?)
855    }
856
857    async fn get_profiles(
858        &self,
859        room_id: Key,
860        user_ids: Vec<Key>,
861    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
862        let user_ids_length = user_ids.len();
863
864        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
865            let sql_params = repeat_vars(user_ids.len());
866            let sql = format!(
867                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
868            );
869
870            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
871
872            Ok(txn
873                .prepare(&sql)?
874                .query(params)?
875                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
876                .collect::<Result<_, _>>()?)
877        })
878        .await
879    }
880
881    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
882        let res = if memberships.is_empty() {
883            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
884                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
885            })
886            .await?
887        } else {
888            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
889                let sql_params = repeat_vars(memberships.len());
890                let sql = format!(
891                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
892                );
893
894                let params =
895                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
896
897                Ok(txn
898                    .prepare(&sql)?
899                    .query(params)?
900                    .mapped(|row| row.get(0))
901                    .collect::<Result<_, _>>()?)
902            })
903            .await?
904        };
905
906        Ok(res)
907    }
908
909    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
910        Ok(self
911            .query_row(
912                "SELECT data FROM global_account_data WHERE event_type = ?",
913                (event_type,),
914                |row| row.get(0),
915            )
916            .await
917            .optional()?)
918    }
919
920    async fn get_room_account_data(
921        &self,
922        room_id: Key,
923        event_type: Key,
924    ) -> Result<Option<Vec<u8>>> {
925        Ok(self
926            .query_row(
927                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
928                (room_id, event_type),
929                |row| row.get(0),
930            )
931            .await
932            .optional()?)
933    }
934
935    async fn get_display_names(
936        &self,
937        room_id: Key,
938        names: Vec<Key>,
939    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
940        let names_length = names.len();
941
942        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
943            let sql_params = repeat_vars(names.len());
944            let sql = format!(
945                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
946            );
947
948            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
949
950            Ok(txn
951                .prepare(&sql)?
952                .query(params)?
953                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
954                .collect::<Result<_, _>>()?)
955        })
956        .await
957    }
958
959    async fn get_user_receipt(
960        &self,
961        room_id: Key,
962        receipt_type: Key,
963        thread: Key,
964        user_id: Key,
965    ) -> Result<Option<Vec<u8>>> {
966        Ok(self
967            .query_row(
968                "SELECT data FROM receipt
969                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
970                (room_id, receipt_type, thread, user_id),
971                |row| row.get(0),
972            )
973            .await
974            .optional()?)
975    }
976
977    async fn get_event_receipts(
978        &self,
979        room_id: Key,
980        receipt_type: Key,
981        thread: Key,
982        event_id: Key,
983    ) -> Result<Vec<Vec<u8>>> {
984        Ok(self
985            .prepare(
986                "SELECT data FROM receipt
987                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
988                |mut stmt| {
989                    stmt.query((room_id, receipt_type, thread, event_id))?
990                        .mapped(|row| row.get(0))
991                        .collect()
992                },
993            )
994            .await?)
995    }
996}
997
998#[async_trait]
999impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1000    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1001        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1002    }
1003}
1004
1005#[async_trait]
1006impl StateStore for SqliteStateStore {
1007    type Error = Error;
1008
1009    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1010        self.acquire()
1011            .await?
1012            .get_kv_blob(self.encode_state_store_data_key(key))
1013            .await?
1014            .map(|data| {
1015                Ok(match key {
1016                    StateStoreDataKey::SyncToken => {
1017                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1018                    }
1019                    StateStoreDataKey::ServerInfo => {
1020                        StateStoreDataValue::ServerInfo(self.deserialize_value(&data)?)
1021                    }
1022                    StateStoreDataKey::Filter(_) => {
1023                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1024                    }
1025                    StateStoreDataKey::UserAvatarUrl(_) => {
1026                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1027                    }
1028                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
1029                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1030                    }
1031                    StateStoreDataKey::UtdHookManagerData => {
1032                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1033                    }
1034                    StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1035                        StateStoreDataValue::OneTimeKeyAlreadyUploaded
1036                    }
1037                    StateStoreDataKey::ComposerDraft(_, _) => {
1038                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1039                    }
1040                    StateStoreDataKey::SeenKnockRequests(_) => {
1041                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1042                    }
1043                    StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1044                        StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1045                            self.deserialize_value(&data)?,
1046                        )
1047                    }
1048                })
1049            })
1050            .transpose()
1051    }
1052
1053    async fn set_kv_data(
1054        &self,
1055        key: StateStoreDataKey<'_>,
1056        value: StateStoreDataValue,
1057    ) -> Result<()> {
1058        let serialized_value = match key {
1059            StateStoreDataKey::SyncToken => self.serialize_value(
1060                &value.into_sync_token().expect("Session data not a sync token"),
1061            )?,
1062            StateStoreDataKey::ServerInfo => self.serialize_value(
1063                &value.into_server_info().expect("Session data not containing server info"),
1064            )?,
1065            StateStoreDataKey::Filter(_) => {
1066                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1067            }
1068            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1069                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1070            )?,
1071            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1072                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1073            )?,
1074            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1075                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1076            )?,
1077            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1078                self.serialize_value(&true).expect("We should be able to serialize a boolean")
1079            }
1080            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1081                &value.into_composer_draft().expect("Session data not a composer draft"),
1082            )?,
1083            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1084                &value
1085                    .into_seen_knock_requests()
1086                    .expect("Session data is not a set of seen knock request ids"),
1087            )?,
1088            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1089                &value
1090                    .into_thread_subscriptions_catchup_tokens()
1091                    .expect("Session data is not a list of thread subscription catchup tokens"),
1092            )?,
1093        };
1094
1095        self.acquire()
1096            .await?
1097            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1098            .await
1099    }
1100
1101    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1102        self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1103    }
1104
1105    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1106        let changes = changes.to_owned();
1107        let this = self.clone();
1108        self.acquire()
1109            .await?
1110            .with_transaction(move |txn| {
1111                let StateChanges {
1112                    sync_token,
1113                    account_data,
1114                    presence,
1115                    profiles,
1116                    profiles_to_delete,
1117                    state,
1118                    room_account_data,
1119                    room_infos,
1120                    receipts,
1121                    redactions,
1122                    stripped_state,
1123                    ambiguity_maps,
1124                } = changes;
1125
1126                if let Some(sync_token) = sync_token {
1127                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1128                    let value = this.serialize_value(&sync_token)?;
1129                    txn.set_kv_blob(&key, &value)?;
1130                }
1131
1132                for (event_type, event) in account_data {
1133                    let event_type =
1134                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1135                    let data = this.serialize_json(&event)?;
1136                    txn.set_global_account_data(&event_type, &data)?;
1137                }
1138
1139                for (room_id, events) in room_account_data {
1140                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1141                    for (event_type, event) in events {
1142                        let event_type =
1143                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1144                        let data = this.serialize_json(&event)?;
1145                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1146                    }
1147                }
1148
1149                for (user_id, event) in presence {
1150                    let key = this.encode_presence_key(&user_id);
1151                    let value = this.serialize_json(&event)?;
1152                    txn.set_kv_blob(&key, &value)?;
1153                }
1154
1155                for (room_id, room_info) in room_infos {
1156                    let stripped = room_info.state() == RoomState::Invited;
1157                    // Remove non-stripped data for stripped rooms and vice-versa.
1158                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1159
1160                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1161                    let state = this
1162                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1163                    let data = this.serialize_json(&room_info)?;
1164                    txn.set_room_info(&room_id, &state, &data)?;
1165                }
1166
1167                for (room_id, user_ids) in profiles_to_delete {
1168                    let room_id = this.encode_key(keys::PROFILE, room_id);
1169                    for user_id in user_ids {
1170                        let user_id = this.encode_key(keys::PROFILE, user_id);
1171                        txn.remove_room_profile(&room_id, &user_id)?;
1172                    }
1173                }
1174
1175                for (room_id, state_event_types) in state {
1176                    let profiles = profiles.get(&room_id);
1177                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1178
1179                    for (event_type, state_events) in state_event_types {
1180                        let encoded_event_type =
1181                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1182
1183                        for (state_key, raw_state_event) in state_events {
1184                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1185                            let data = this.serialize_json(&raw_state_event)?;
1186
1187                            let event_id: Option<String> =
1188                                raw_state_event.get_field("event_id").ok().flatten();
1189                            let encoded_event_id =
1190                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1191
1192                            txn.set_state_event(
1193                                &encoded_room_id,
1194                                &encoded_event_type,
1195                                &encoded_state_key,
1196                                false,
1197                                encoded_event_id.as_deref(),
1198                                &data,
1199                            )?;
1200
1201                            if event_type == StateEventType::RoomMember {
1202                                let member_event = match raw_state_event
1203                                    .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1204                                {
1205                                    Ok(ev) => ev,
1206                                    Err(e) => {
1207                                        debug!(event_id, "Failed to deserialize member event: {e}");
1208                                        continue;
1209                                    }
1210                                };
1211
1212                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1213                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1214                                let membership = this
1215                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1216                                let data = this.serialize_value(&state_key)?;
1217
1218                                txn.set_member(
1219                                    &encoded_room_id,
1220                                    &user_id,
1221                                    &membership,
1222                                    false,
1223                                    &data,
1224                                )?;
1225
1226                                if let Some(profile) =
1227                                    profiles.and_then(|p| p.get(member_event.state_key()))
1228                                {
1229                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1230                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1231                                    let data = this.serialize_json(&profile)?;
1232                                    txn.set_profile(&room_id, &user_id, &data)?;
1233                                }
1234                            }
1235                        }
1236                    }
1237                }
1238
1239                for (room_id, stripped_state_event_types) in stripped_state {
1240                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1241
1242                    for (event_type, stripped_state_events) in stripped_state_event_types {
1243                        let encoded_event_type =
1244                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1245
1246                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1247                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1248                            let data = this.serialize_json(&raw_stripped_state_event)?;
1249                            txn.set_state_event(
1250                                &encoded_room_id,
1251                                &encoded_event_type,
1252                                &encoded_state_key,
1253                                true,
1254                                None,
1255                                &data,
1256                            )?;
1257
1258                            if event_type == StateEventType::RoomMember {
1259                                let member_event = match raw_stripped_state_event
1260                                    .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1261                                ) {
1262                                    Ok(ev) => ev,
1263                                    Err(e) => {
1264                                        debug!("Failed to deserialize stripped member event: {e}");
1265                                        continue;
1266                                    }
1267                                };
1268
1269                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1270                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1271                                let membership = this.encode_key(
1272                                    keys::MEMBER,
1273                                    member_event.content.membership.as_str(),
1274                                );
1275                                let data = this.serialize_value(&state_key)?;
1276
1277                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1278                            }
1279                        }
1280                    }
1281                }
1282
1283                for (room_id, receipt_event) in receipts {
1284                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1285
1286                    for (event_id, receipt_types) in receipt_event {
1287                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1288
1289                        for (receipt_type, receipt_users) in receipt_types {
1290                            let receipt_type =
1291                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1292
1293                            for (user_id, receipt) in receipt_users {
1294                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1295                                // We cannot have a NULL primary key so we rely on serialization
1296                                // instead of the string representation.
1297                                let thread = this.encode_key(
1298                                    keys::RECEIPT,
1299                                    rmp_serde::to_vec_named(&receipt.thread)?,
1300                                );
1301                                let data = this.serialize_json(&ReceiptData {
1302                                    receipt,
1303                                    event_id: event_id.clone(),
1304                                    user_id,
1305                                })?;
1306
1307                                txn.set_receipt(
1308                                    &room_id,
1309                                    &encoded_user_id,
1310                                    &receipt_type,
1311                                    &thread,
1312                                    &encoded_event_id,
1313                                    &data,
1314                                )?;
1315                            }
1316                        }
1317                    }
1318                }
1319
1320                for (room_id, redactions) in redactions {
1321                    let make_redaction_rules = || {
1322                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1323                        txn.get_room_info(&encoded_room_id)
1324                            .ok()
1325                            .flatten()
1326                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1327                            .map(|info| info.room_version_rules_or_default())
1328                            .unwrap_or_else(|| {
1329                                warn!(
1330                                    ?room_id,
1331                                    "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1332                                );
1333                                ROOM_VERSION_RULES_FALLBACK
1334                            }).redaction
1335                    };
1336
1337                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1338                    let mut redaction_rules = None;
1339
1340                    for (event_id, redaction) in redactions {
1341                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1342
1343                        if let Some(Ok(raw_event)) = txn
1344                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1345                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1346                        {
1347                            let event = raw_event.deserialize()?;
1348                            let redacted = redact(
1349                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1350                                redaction_rules.get_or_insert_with(make_redaction_rules),
1351                                Some(RedactedBecause::from_raw_event(&redaction)?),
1352                            )
1353                            .map_err(Error::Redaction)?;
1354                            let data = this.serialize_json(&redacted)?;
1355
1356                            let event_type =
1357                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1358                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1359
1360                            txn.set_state_event(
1361                                &encoded_room_id,
1362                                &event_type,
1363                                &state_key,
1364                                false,
1365                                Some(&event_id),
1366                                &data,
1367                            )?;
1368                        }
1369                    }
1370                }
1371
1372                for (room_id, display_names) in ambiguity_maps {
1373                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1374
1375                    for (name, user_ids) in display_names {
1376                        let encoded_name = this.encode_key(
1377                            keys::DISPLAY_NAME,
1378                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1379                        );
1380                        let data = this.serialize_json(&user_ids)?;
1381
1382                        if user_ids.is_empty() {
1383                            txn.remove_display_name(&room_id, &encoded_name)?;
1384
1385                            // We can't do a migration to merge the previously distinct buckets of
1386                            // user IDs since the display names themselves are hashed before they
1387                            // are persisted in the store. So the store will always retain two
1388                            // buckets: one for raw display names and one for normalised ones.
1389                            //
1390                            // We therefore do the next best thing, which is a sort of a soft
1391                            // migration: we fetch both the raw and normalised buckets, then merge
1392                            // the user IDs contained in them into a separate, temporary merged
1393                            // bucket. The SDK then operates on the merged buckets exclusively. See
1394                            // the comment in `get_users_with_display_names` for details.
1395                            //
1396                            // If the merged bucket is empty, that must mean that both the raw and
1397                            // normalised buckets were also empty, so we can remove both from the
1398                            // store.
1399                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1400                            txn.remove_display_name(&room_id, &raw_name)?;
1401                        } else {
1402                            // We only create new buckets with the normalized display name.
1403                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1404                        }
1405                    }
1406                }
1407
1408                Ok::<_, Error>(())
1409            })
1410            .await?;
1411
1412        Ok(())
1413    }
1414
1415    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1416        self.acquire()
1417            .await?
1418            .get_kv_blob(self.encode_presence_key(user_id))
1419            .await?
1420            .map(|data| self.deserialize_json(&data))
1421            .transpose()
1422    }
1423
1424    async fn get_presence_events(
1425        &self,
1426        user_ids: &[OwnedUserId],
1427    ) -> Result<Vec<Raw<PresenceEvent>>> {
1428        if user_ids.is_empty() {
1429            return Ok(Vec::new());
1430        }
1431
1432        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1433        self.acquire()
1434            .await?
1435            .get_kv_blobs(user_ids)
1436            .await?
1437            .into_iter()
1438            .map(|data| self.deserialize_json(&data))
1439            .collect()
1440    }
1441
1442    async fn get_state_event(
1443        &self,
1444        room_id: &RoomId,
1445        event_type: StateEventType,
1446        state_key: &str,
1447    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1448        Ok(self
1449            .get_state_events_for_keys(room_id, event_type, &[state_key])
1450            .await?
1451            .into_iter()
1452            .next())
1453    }
1454
1455    async fn get_state_events(
1456        &self,
1457        room_id: &RoomId,
1458        event_type: StateEventType,
1459    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1460        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1461        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1462        self.acquire()
1463            .await?
1464            .get_maybe_stripped_state_events(room_id, event_type)
1465            .await?
1466            .into_iter()
1467            .map(|(stripped, data)| {
1468                let ev = if stripped {
1469                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1470                } else {
1471                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1472                };
1473
1474                Ok(ev)
1475            })
1476            .collect()
1477    }
1478
1479    async fn get_state_events_for_keys(
1480        &self,
1481        room_id: &RoomId,
1482        event_type: StateEventType,
1483        state_keys: &[&str],
1484    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1485        if state_keys.is_empty() {
1486            return Ok(Vec::new());
1487        }
1488
1489        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1490        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1491        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1492        self.acquire()
1493            .await?
1494            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1495            .await?
1496            .into_iter()
1497            .map(|(stripped, data)| {
1498                let ev = if stripped {
1499                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1500                } else {
1501                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1502                };
1503
1504                Ok(ev)
1505            })
1506            .collect()
1507    }
1508
1509    async fn get_profile(
1510        &self,
1511        room_id: &RoomId,
1512        user_id: &UserId,
1513    ) -> Result<Option<MinimalRoomMemberEvent>> {
1514        let room_id = self.encode_key(keys::PROFILE, room_id);
1515        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1516
1517        self.acquire()
1518            .await?
1519            .get_profiles(room_id, user_ids)
1520            .await?
1521            .into_iter()
1522            .next()
1523            .map(|(_, data)| self.deserialize_json(&data))
1524            .transpose()
1525    }
1526
1527    async fn get_profiles<'a>(
1528        &self,
1529        room_id: &RoomId,
1530        user_ids: &'a [OwnedUserId],
1531    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1532        if user_ids.is_empty() {
1533            return Ok(BTreeMap::new());
1534        }
1535
1536        let room_id = self.encode_key(keys::PROFILE, room_id);
1537        let mut user_ids_map = user_ids
1538            .iter()
1539            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1540            .collect::<BTreeMap<_, _>>();
1541        let user_ids = user_ids_map.keys().cloned().collect();
1542
1543        self.acquire()
1544            .await?
1545            .get_profiles(room_id, user_ids)
1546            .await?
1547            .into_iter()
1548            .map(|(user_id, data)| {
1549                Ok((
1550                    user_ids_map
1551                        .remove(user_id.as_slice())
1552                        .expect("returned user IDs were requested"),
1553                    self.deserialize_json(&data)?,
1554                ))
1555            })
1556            .collect()
1557    }
1558
1559    async fn get_user_ids(
1560        &self,
1561        room_id: &RoomId,
1562        membership: RoomMemberships,
1563    ) -> Result<Vec<OwnedUserId>> {
1564        let room_id = self.encode_key(keys::MEMBER, room_id);
1565        let memberships = membership
1566            .as_vec()
1567            .into_iter()
1568            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1569            .collect();
1570        self.acquire()
1571            .await?
1572            .get_user_ids(room_id, memberships)
1573            .await?
1574            .iter()
1575            .map(|data| self.deserialize_value(data))
1576            .collect()
1577    }
1578
1579    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1580        self.acquire()
1581            .await?
1582            .get_room_infos(match room_load_settings {
1583                RoomLoadSettings::All => None,
1584                RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1585            })
1586            .await?
1587            .into_iter()
1588            .map(|data| self.deserialize_json(&data))
1589            .collect()
1590    }
1591
1592    async fn get_users_with_display_name(
1593        &self,
1594        room_id: &RoomId,
1595        display_name: &DisplayName,
1596    ) -> Result<BTreeSet<OwnedUserId>> {
1597        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1598        let names = vec![self.encode_key(
1599            keys::DISPLAY_NAME,
1600            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1601        )];
1602
1603        Ok(self
1604            .acquire()
1605            .await?
1606            .get_display_names(room_id, names)
1607            .await?
1608            .into_iter()
1609            .next()
1610            .map(|(_, data)| self.deserialize_json(&data))
1611            .transpose()?
1612            .unwrap_or_default())
1613    }
1614
1615    async fn get_users_with_display_names<'a>(
1616        &self,
1617        room_id: &RoomId,
1618        display_names: &'a [DisplayName],
1619    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1620        let mut result = HashMap::new();
1621
1622        if display_names.is_empty() {
1623            return Ok(result);
1624        }
1625
1626        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1627        let mut names_map = display_names
1628            .iter()
1629            .flat_map(|display_name| {
1630                // We encode the display name as the `raw_str()` and the normalized string.
1631                //
1632                // This is for compatibility reasons since:
1633                //  1. Previously "Alice" and "alice" were considered to be distinct display
1634                //     names, while we now consider them to be the same so we need to merge the
1635                //     previously distinct buckets of user IDs.
1636                //  2. We can't do a migration to merge the previously distinct buckets of user
1637                //     IDs since the display names itself are hashed before they are persisted
1638                //     in the store.
1639                let raw =
1640                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1641                let normalized = display_name.as_normalized_str().map(|normalized| {
1642                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1643                });
1644
1645                iter::once(raw).chain(normalized)
1646            })
1647            .collect::<BTreeMap<_, _>>();
1648        let names = names_map.keys().cloned().collect();
1649
1650        for (name, data) in
1651            self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1652        {
1653            let display_name =
1654                names_map.remove(name.as_slice()).expect("returned display names were requested");
1655            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1656
1657            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1658        }
1659
1660        Ok(result)
1661    }
1662
1663    async fn get_account_data_event(
1664        &self,
1665        event_type: GlobalAccountDataEventType,
1666    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1667        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1668        self.acquire()
1669            .await?
1670            .get_global_account_data(event_type)
1671            .await?
1672            .map(|value| self.deserialize_json(&value))
1673            .transpose()
1674    }
1675
1676    async fn get_room_account_data_event(
1677        &self,
1678        room_id: &RoomId,
1679        event_type: RoomAccountDataEventType,
1680    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1681        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1682        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1683        self.acquire()
1684            .await?
1685            .get_room_account_data(room_id, event_type)
1686            .await?
1687            .map(|value| self.deserialize_json(&value))
1688            .transpose()
1689    }
1690
1691    async fn get_user_room_receipt_event(
1692        &self,
1693        room_id: &RoomId,
1694        receipt_type: ReceiptType,
1695        thread: ReceiptThread,
1696        user_id: &UserId,
1697    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1698        let room_id = self.encode_key(keys::RECEIPT, room_id);
1699        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1700        // We cannot have a NULL primary key so we rely on serialization instead of the
1701        // string representation.
1702        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1703        let user_id = self.encode_key(keys::RECEIPT, user_id);
1704
1705        self.acquire()
1706            .await?
1707            .get_user_receipt(room_id, receipt_type, thread, user_id)
1708            .await?
1709            .map(|value| {
1710                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1711            })
1712            .transpose()
1713    }
1714
1715    async fn get_event_room_receipt_events(
1716        &self,
1717        room_id: &RoomId,
1718        receipt_type: ReceiptType,
1719        thread: ReceiptThread,
1720        event_id: &EventId,
1721    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1722        let room_id = self.encode_key(keys::RECEIPT, room_id);
1723        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1724        // We cannot have a NULL primary key so we rely on serialization instead of the
1725        // string representation.
1726        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1727        let event_id = self.encode_key(keys::RECEIPT, event_id);
1728
1729        self.acquire()
1730            .await?
1731            .get_event_receipts(room_id, receipt_type, thread, event_id)
1732            .await?
1733            .iter()
1734            .map(|value| {
1735                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1736            })
1737            .collect()
1738    }
1739
1740    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1741        self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1742    }
1743
1744    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1745        let conn = self.acquire().await?;
1746        let key = self.encode_custom_key(key);
1747        conn.set_kv_blob(key, value).await?;
1748        Ok(())
1749    }
1750
1751    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1752        let conn = self.acquire().await?;
1753        let key = self.encode_custom_key(key);
1754        let previous = conn.get_kv_blob(key.clone()).await?;
1755        conn.set_kv_blob(key, value).await?;
1756        Ok(previous)
1757    }
1758
1759    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1760        let conn = self.acquire().await?;
1761        let key = self.encode_custom_key(key);
1762        let previous = conn.get_kv_blob(key.clone()).await?;
1763        if previous.is_some() {
1764            conn.delete_kv_blob(key).await?;
1765        }
1766        Ok(previous)
1767    }
1768
1769    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1770        let this = self.clone();
1771        let room_id = room_id.to_owned();
1772
1773        let conn = self.acquire().await?;
1774
1775        conn.with_transaction(move |txn| -> Result<()> {
1776            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1777            txn.remove_room_info(&room_info_room_id)?;
1778
1779            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1780            txn.remove_room_state_events(&state_event_room_id, None)?;
1781
1782            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1783            txn.remove_room_members(&member_room_id, None)?;
1784
1785            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1786            txn.remove_room_profiles(&profile_room_id)?;
1787
1788            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1789            txn.remove_room_account_data(&room_account_data_room_id)?;
1790
1791            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1792            txn.remove_room_receipts(&receipt_room_id)?;
1793
1794            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1795            txn.remove_room_display_names(&display_name_room_id)?;
1796
1797            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1798            txn.remove_room_send_queue(&send_queue_room_id)?;
1799
1800            let dependent_send_queue_room_id =
1801                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1802            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1803
1804            let thread_subscriptions_room_id =
1805                this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1806            txn.execute(
1807                "DELETE FROM thread_subscriptions WHERE room_id = ?",
1808                (thread_subscriptions_room_id,),
1809            )?;
1810
1811            Ok(())
1812        })
1813        .await?;
1814
1815        conn.vacuum().await
1816    }
1817
1818    async fn save_send_queue_request(
1819        &self,
1820        room_id: &RoomId,
1821        transaction_id: OwnedTransactionId,
1822        created_at: MilliSecondsSinceUnixEpoch,
1823        content: QueuedRequestKind,
1824        priority: usize,
1825    ) -> Result<(), Self::Error> {
1826        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1827        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1828
1829        let content = self.serialize_json(&content)?;
1830        // The transaction id is used both as a key (in remove/update) and a value (as
1831        // it's useful for the callers), so we keep it as is, and neither hash
1832        // it (with encode_key) or encrypt it (through serialize_value). After
1833        // all, it carries no personal information, so this is considered fine.
1834
1835        let created_at_ts: u64 = created_at.0.into();
1836        self.acquire()
1837            .await?
1838            .with_transaction(move |txn| {
1839                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1840                Ok(())
1841            })
1842            .await
1843    }
1844
1845    async fn update_send_queue_request(
1846        &self,
1847        room_id: &RoomId,
1848        transaction_id: &TransactionId,
1849        content: QueuedRequestKind,
1850    ) -> Result<bool, Self::Error> {
1851        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1852
1853        let content = self.serialize_json(&content)?;
1854        // See comment in [`Self::save_send_queue_event`] to understand why the
1855        // transaction id is neither encrypted or hashed.
1856        let transaction_id = transaction_id.to_string();
1857
1858        let num_updated = self.acquire()
1859            .await?
1860            .with_transaction(move |txn| {
1861                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1862            })
1863            .await?;
1864
1865        Ok(num_updated > 0)
1866    }
1867
1868    async fn remove_send_queue_request(
1869        &self,
1870        room_id: &RoomId,
1871        transaction_id: &TransactionId,
1872    ) -> Result<bool, Self::Error> {
1873        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1874
1875        // See comment in `save_send_queue_event`.
1876        let transaction_id = transaction_id.to_string();
1877
1878        let num_deleted = self
1879            .acquire()
1880            .await?
1881            .with_transaction(move |txn| {
1882                txn.prepare_cached(
1883                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1884                )?
1885                .execute((room_id, &transaction_id))
1886            })
1887            .await?;
1888
1889        Ok(num_deleted > 0)
1890    }
1891
1892    async fn load_send_queue_requests(
1893        &self,
1894        room_id: &RoomId,
1895    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1896        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1897
1898        // Note: ROWID is always present and is an auto-incremented integer counter. We
1899        // want to maintain the insertion order, so we can sort using it.
1900        // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
1901        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1902            .acquire()
1903            .await?
1904            .prepare(
1905                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1906                |mut stmt| {
1907                    stmt.query((room_id,))?
1908                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1909                        .collect()
1910                },
1911            )
1912            .await?;
1913
1914        let mut requests = Vec::with_capacity(res.len());
1915        for entry in res {
1916            let created_at = entry
1917                .4
1918                .and_then(UInt::new)
1919                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1920            requests.push(QueuedRequest {
1921                transaction_id: entry.0.into(),
1922                kind: self.deserialize_json(&entry.1)?,
1923                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1924                priority: entry.3,
1925                created_at,
1926            });
1927        }
1928
1929        Ok(requests)
1930    }
1931
1932    async fn update_send_queue_request_status(
1933        &self,
1934        room_id: &RoomId,
1935        transaction_id: &TransactionId,
1936        error: Option<QueueWedgeError>,
1937    ) -> Result<(), Self::Error> {
1938        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1939
1940        // See comment in `save_send_queue_event`.
1941        let transaction_id = transaction_id.to_string();
1942
1943        // Serialize the error to json bytes (encrypted if option is enabled) if set.
1944        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1945
1946        self.acquire()
1947            .await?
1948            .with_transaction(move |txn| {
1949                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1950                Ok(())
1951            })
1952            .await
1953    }
1954
1955    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1956        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
1957        // have to manually do the deduplication: indeed, for all X, encrypt(X)
1958        // != encrypted(X), since we use a nonce in the encryption process.
1959
1960        let res: Vec<Vec<u8>> = self
1961            .acquire()
1962            .await?
1963            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1964                stmt.query(())?.mapped(|row| row.get(0)).collect()
1965            })
1966            .await?;
1967
1968        // So we collect the results into a `BTreeSet` to perform the deduplication, and
1969        // then rejigger that into a vector.
1970        Ok(res
1971            .into_iter()
1972            .map(|entry| self.deserialize_value(&entry))
1973            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1974            .into_iter()
1975            .collect())
1976    }
1977
1978    async fn save_dependent_queued_request(
1979        &self,
1980        room_id: &RoomId,
1981        parent_txn_id: &TransactionId,
1982        own_txn_id: ChildTransactionId,
1983        created_at: MilliSecondsSinceUnixEpoch,
1984        content: DependentQueuedRequestKind,
1985    ) -> Result<()> {
1986        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1987        let content = self.serialize_json(&content)?;
1988
1989        // See comment in `save_send_queue_event`.
1990        let parent_txn_id = parent_txn_id.to_string();
1991        let own_txn_id = own_txn_id.to_string();
1992
1993        let created_at_ts: u64 = created_at.0.into();
1994        self.acquire()
1995            .await?
1996            .with_transaction(move |txn| {
1997                txn.prepare_cached(
1998                    r#"INSERT INTO dependent_send_queue_events
1999                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2000                       VALUES (?, ?, ?, ?, ?)"#,
2001                )?
2002                .execute((
2003                    room_id,
2004                    parent_txn_id,
2005                    own_txn_id,
2006                    content,
2007                    created_at_ts,
2008                ))?;
2009                Ok(())
2010            })
2011            .await
2012    }
2013
2014    async fn update_dependent_queued_request(
2015        &self,
2016        room_id: &RoomId,
2017        own_transaction_id: &ChildTransactionId,
2018        new_content: DependentQueuedRequestKind,
2019    ) -> Result<bool> {
2020        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2021        let content = self.serialize_json(&new_content)?;
2022
2023        // See comment in `save_send_queue_event`.
2024        let own_txn_id = own_transaction_id.to_string();
2025
2026        let num_updated = self
2027            .acquire()
2028            .await?
2029            .with_transaction(move |txn| {
2030                txn.prepare_cached(
2031                    r#"UPDATE dependent_send_queue_events
2032                       SET content = ?
2033                       WHERE own_transaction_id = ?
2034                       AND room_id = ?"#,
2035                )?
2036                .execute((content, own_txn_id, room_id))
2037            })
2038            .await?;
2039
2040        if num_updated > 1 {
2041            return Err(Error::InconsistentUpdate);
2042        }
2043
2044        Ok(num_updated == 1)
2045    }
2046
2047    async fn mark_dependent_queued_requests_as_ready(
2048        &self,
2049        room_id: &RoomId,
2050        parent_txn_id: &TransactionId,
2051        parent_key: SentRequestKey,
2052    ) -> Result<usize> {
2053        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2054        let parent_key = self.serialize_value(&parent_key)?;
2055
2056        // See comment in `save_send_queue_event`.
2057        let parent_txn_id = parent_txn_id.to_string();
2058
2059        self.acquire()
2060            .await?
2061            .with_transaction(move |txn| {
2062                Ok(txn.prepare_cached(
2063                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2064                )?
2065                .execute((parent_key, parent_txn_id, room_id))?)
2066            })
2067            .await
2068    }
2069
2070    async fn remove_dependent_queued_request(
2071        &self,
2072        room_id: &RoomId,
2073        txn_id: &ChildTransactionId,
2074    ) -> Result<bool> {
2075        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2076
2077        // See comment in `save_send_queue_event`.
2078        let txn_id = txn_id.to_string();
2079
2080        let num_deleted = self
2081            .acquire()
2082            .await?
2083            .with_transaction(move |txn| {
2084                txn.prepare_cached(
2085                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2086                )?
2087                .execute((txn_id, room_id))
2088            })
2089            .await?;
2090
2091        Ok(num_deleted > 0)
2092    }
2093
2094    async fn load_dependent_queued_requests(
2095        &self,
2096        room_id: &RoomId,
2097    ) -> Result<Vec<DependentQueuedRequest>> {
2098        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2099
2100        // Note: transaction_id is not encoded, see why in `save_send_queue_event`.
2101        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2102            .acquire()
2103            .await?
2104            .prepare(
2105                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2106                |mut stmt| {
2107                    stmt.query((room_id,))?
2108                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2109                        .collect()
2110                },
2111            )
2112            .await?;
2113
2114        let mut dependent_events = Vec::with_capacity(res.len());
2115        for entry in res {
2116            let created_at = entry
2117                .4
2118                .and_then(UInt::new)
2119                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2120            dependent_events.push(DependentQueuedRequest {
2121                own_transaction_id: entry.0.into(),
2122                parent_transaction_id: entry.1.into(),
2123                parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2124                kind: self.deserialize_json(&entry.3)?,
2125                created_at,
2126            });
2127        }
2128
2129        Ok(dependent_events)
2130    }
2131
2132    async fn upsert_thread_subscription(
2133        &self,
2134        room_id: &RoomId,
2135        thread_id: &EventId,
2136        mut new: StoredThreadSubscription,
2137    ) -> Result<(), Self::Error> {
2138        if let Some(previous) = self.load_thread_subscription(room_id, thread_id).await? {
2139            if previous == new {
2140                // No need to update anything.
2141                trace!("not saving thread subscription because the subscription is the same");
2142                return Ok(());
2143            }
2144            if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
2145                trace!("not saving thread subscription because we have a newer bump stamp");
2146                return Ok(());
2147            }
2148        }
2149
2150        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2151        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2152        let status = new.status.as_str();
2153
2154        self.acquire()
2155            .await?
2156            .with_transaction(move |txn| {
2157                // Try to find a previous value.
2158                txn.prepare_cached(
2159                    "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
2160                         VALUES (?, ?, ?, ?)",
2161                )?
2162                .execute((room_id, thread_id, status, new.bump_stamp))
2163            })
2164            .await?;
2165        Ok(())
2166    }
2167
2168    async fn load_thread_subscription(
2169        &self,
2170        room_id: &RoomId,
2171        thread_id: &EventId,
2172    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2173        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2174        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2175
2176        Ok(self
2177            .acquire()
2178            .await?
2179            .query_row(
2180                "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2181                (room_id, thread_id),
2182                |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2183            )
2184            .await
2185            .optional()?
2186            .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2187                let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2188                    Error::InvalidData { details: format!("Invalid thread status: {status}") }
2189                })?;
2190                Ok(StoredThreadSubscription { status, bump_stamp })
2191            })
2192            .transpose()?)
2193    }
2194
2195    async fn remove_thread_subscription(
2196        &self,
2197        room_id: &RoomId,
2198        thread_id: &EventId,
2199    ) -> Result<(), Self::Error> {
2200        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2201        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2202
2203        self.acquire()
2204            .await?
2205            .execute(
2206                "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2207                (room_id, thread_id),
2208            )
2209            .await?;
2210
2211        Ok(())
2212    }
2213}
2214
2215#[derive(Debug, Clone, Serialize, Deserialize)]
2216struct ReceiptData {
2217    receipt: Receipt,
2218    event_id: OwnedEventId,
2219    user_id: OwnedUserId,
2220}
2221
2222#[cfg(test)]
2223mod tests {
2224    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2225
2226    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2227    use once_cell::sync::Lazy;
2228    use tempfile::{tempdir, TempDir};
2229
2230    use super::SqliteStateStore;
2231
2232    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2233    static NUM: AtomicU32 = AtomicU32::new(0);
2234
2235    async fn get_store() -> Result<impl StateStore, StoreError> {
2236        let name = NUM.fetch_add(1, SeqCst).to_string();
2237        let tmpdir_path = TMP_DIR.path().join(name);
2238
2239        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2240
2241        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2242    }
2243
2244    statestore_integration_tests!();
2245}
2246
2247#[cfg(test)]
2248mod encrypted_tests {
2249    use std::{
2250        path::PathBuf,
2251        sync::atomic::{AtomicU32, Ordering::SeqCst},
2252    };
2253
2254    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2255    use matrix_sdk_test::async_test;
2256    use once_cell::sync::Lazy;
2257    use tempfile::{tempdir, TempDir};
2258
2259    use super::SqliteStateStore;
2260    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2261
2262    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2263    static NUM: AtomicU32 = AtomicU32::new(0);
2264
2265    fn new_state_store_workspace() -> PathBuf {
2266        let name = NUM.fetch_add(1, SeqCst).to_string();
2267        TMP_DIR.path().join(name)
2268    }
2269
2270    async fn get_store() -> Result<impl StateStore, StoreError> {
2271        let tmpdir_path = new_state_store_workspace();
2272
2273        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2274
2275        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2276            .await
2277            .unwrap())
2278    }
2279
2280    #[async_test]
2281    async fn test_pool_size() {
2282        let tmpdir_path = new_state_store_workspace();
2283        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2284
2285        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2286
2287        assert_eq!(store.pool.status().max_size, 42);
2288    }
2289
2290    #[async_test]
2291    async fn test_cache_size() {
2292        let tmpdir_path = new_state_store_workspace();
2293        let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2294
2295        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2296
2297        let conn = store.pool.get().await.unwrap();
2298        let cache_size =
2299            conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2300
2301        // The value passed to `SqliteStoreConfig` is in bytes. Check it is
2302        // converted to kibibytes. Also, it must be a negative value because it
2303        // _is_ the size in kibibytes, not in page size.
2304        assert_eq!(cache_size, -(1500 / 1024));
2305    }
2306
2307    #[async_test]
2308    async fn test_journal_size_limit() {
2309        let tmpdir_path = new_state_store_workspace();
2310        let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2311
2312        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2313
2314        let conn = store.pool.get().await.unwrap();
2315        let journal_size_limit = conn
2316            .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2317            .await
2318            .unwrap();
2319
2320        // The value passed to `SqliteStoreConfig` is in bytes. It stays in
2321        // bytes in SQLite.
2322        assert_eq!(journal_size_limit, 1500);
2323    }
2324
2325    statestore_integration_tests!();
2326}
2327
2328#[cfg(test)]
2329mod migration_tests {
2330    use std::{
2331        path::{Path, PathBuf},
2332        sync::{
2333            atomic::{AtomicU32, Ordering::SeqCst},
2334            Arc,
2335        },
2336    };
2337
2338    use as_variant::as_variant;
2339    use deadpool_sqlite::Runtime;
2340    use matrix_sdk_base::{
2341        media::{MediaFormat, MediaRequestParameters},
2342        store::{
2343            ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2344            SerializableEventContent,
2345        },
2346        sync::UnreadNotificationsCount,
2347        RoomState, StateStore,
2348    };
2349    use matrix_sdk_test::async_test;
2350    use once_cell::sync::Lazy;
2351    use ruma::{
2352        events::{
2353            room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2354            StateEventType,
2355        },
2356        room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2357        RoomId, TransactionId, UserId,
2358    };
2359    use rusqlite::Transaction;
2360    use serde::{Deserialize, Serialize};
2361    use serde_json::json;
2362    use tempfile::{tempdir, TempDir};
2363    use tokio::fs;
2364
2365    use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2366    use crate::{
2367        error::{Error, Result},
2368        utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2369        OpenStoreError,
2370    };
2371
2372    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2373    static NUM: AtomicU32 = AtomicU32::new(0);
2374    const SECRET: &str = "secret";
2375
2376    fn new_path() -> PathBuf {
2377        let name = NUM.fetch_add(1, SeqCst).to_string();
2378        TMP_DIR.path().join(name)
2379    }
2380
2381    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2382        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2383
2384        let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2385        // use default pool config
2386
2387        let pool = config.create_pool(Runtime::Tokio1).unwrap();
2388        let conn = pool.get().await?;
2389
2390        init(&conn).await?;
2391
2392        let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2393        let this = SqliteStateStore { store_cipher, pool };
2394        this.run_migrations(&conn, 1, Some(version)).await?;
2395
2396        Ok(this)
2397    }
2398
2399    fn room_info_v1_json(
2400        room_id: &RoomId,
2401        state: RoomState,
2402        name: Option<&str>,
2403        creator: Option<&UserId>,
2404    ) -> serde_json::Value {
2405        // Test with name set or not.
2406        let name_content = match name {
2407            Some(name) => json!({ "name": name }),
2408            None => json!({ "name": null }),
2409        };
2410        // Test with creator set or not.
2411        let create_content = match creator {
2412            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2413            None => RoomCreateEventContent::new_v11(),
2414        };
2415
2416        json!({
2417            "room_id": room_id,
2418            "room_type": state,
2419            "notification_counts": UnreadNotificationsCount::default(),
2420            "summary": {
2421                "heroes": [],
2422                "joined_member_count": 0,
2423                "invited_member_count": 0,
2424            },
2425            "members_synced": false,
2426            "base_info": {
2427                "dm_targets": [],
2428                "max_power_level": 100,
2429                "name": {
2430                    "Original": {
2431                        "content": name_content,
2432                    },
2433                },
2434                "create": {
2435                    "Original": {
2436                        "content": create_content,
2437                    }
2438                }
2439            },
2440        })
2441    }
2442
2443    #[async_test]
2444    pub async fn test_migrating_v1_to_v2() {
2445        let path = new_path();
2446        // Create and populate db.
2447        {
2448            let db = create_fake_db(&path, 1).await.unwrap();
2449            let conn = db.pool.get().await.unwrap();
2450
2451            let this = db.clone();
2452            conn.with_transaction(move |txn| {
2453                for i in 0..5 {
2454                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2455                    let (state, stripped) =
2456                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2457                    let info = room_info_v1_json(&room_id, state, None, None);
2458
2459                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2460                    let data = this.serialize_json(&info)?;
2461
2462                    txn.prepare_cached(
2463                        "INSERT INTO room_info (room_id, stripped, data)
2464                         VALUES (?, ?, ?)",
2465                    )?
2466                    .execute((room_id, stripped, data))?;
2467                }
2468
2469                Result::<_, Error>::Ok(())
2470            })
2471            .await
2472            .unwrap();
2473        }
2474
2475        // This transparently migrates to the latest version.
2476        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2477
2478        // Check all room infos are there.
2479        assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2480    }
2481
2482    // Add a room in version 2 format of the state store.
2483    fn add_room_v2(
2484        this: &SqliteStateStore,
2485        txn: &Transaction<'_>,
2486        room_id: &RoomId,
2487        name: Option<&str>,
2488        create_creator: Option<&UserId>,
2489        create_sender: Option<&UserId>,
2490    ) -> Result<(), Error> {
2491        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2492
2493        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2494        let encoded_state =
2495            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2496        let data = this.serialize_json(&room_info_json)?;
2497
2498        txn.prepare_cached(
2499            "INSERT INTO room_info (room_id, state, data)
2500             VALUES (?, ?, ?)",
2501        )?
2502        .execute((encoded_room_id, encoded_state, data))?;
2503
2504        // Test with or without `m.room.create` event in the room state.
2505        let Some(create_sender) = create_sender else {
2506            return Ok(());
2507        };
2508
2509        let create_content = match create_creator {
2510            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2511            None => RoomCreateEventContent::new_v11(),
2512        };
2513
2514        let event_id = EventId::new(server_name!("dummy.local"));
2515        let create_event = json!({
2516            "content": create_content,
2517            "event_id": event_id,
2518            "sender": create_sender.to_owned(),
2519            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2520            "state_key": "",
2521            "type": "m.room.create",
2522            "unsigned": {},
2523        });
2524
2525        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2526        let encoded_event_type =
2527            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2528        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2529        let stripped = false;
2530        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2531        let data = this.serialize_json(&create_event)?;
2532
2533        txn.prepare_cached(
2534            "INSERT
2535             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2536             VALUES (?, ?, ?, ?, ?, ?)",
2537        )?
2538        .execute((
2539            encoded_room_id,
2540            encoded_event_type,
2541            encoded_state_key,
2542            stripped,
2543            encoded_event_id,
2544            data,
2545        ))?;
2546
2547        Ok(())
2548    }
2549
2550    #[async_test]
2551    pub async fn test_migrating_v2_to_v3() {
2552        let path = new_path();
2553
2554        // Room A: with name, creator and sender.
2555        let room_a_id = room_id!("!room_a:dummy.local");
2556        let room_a_name = "Room A";
2557        let room_a_creator = user_id!("@creator:dummy.local");
2558        // Use a different sender to check that sender is used over creator in
2559        // migration.
2560        let room_a_create_sender = user_id!("@sender:dummy.local");
2561
2562        // Room B: without name, creator and sender.
2563        let room_b_id = room_id!("!room_b:dummy.local");
2564
2565        // Room C: only with sender.
2566        let room_c_id = room_id!("!room_c:dummy.local");
2567        let room_c_create_sender = user_id!("@creator:dummy.local");
2568
2569        // Create and populate db.
2570        {
2571            let db = create_fake_db(&path, 2).await.unwrap();
2572            let conn = db.pool.get().await.unwrap();
2573
2574            let this = db.clone();
2575            conn.with_transaction(move |txn| {
2576                add_room_v2(
2577                    &this,
2578                    txn,
2579                    room_a_id,
2580                    Some(room_a_name),
2581                    Some(room_a_creator),
2582                    Some(room_a_create_sender),
2583                )?;
2584                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2585                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2586
2587                Result::<_, Error>::Ok(())
2588            })
2589            .await
2590            .unwrap();
2591        }
2592
2593        // This transparently migrates to the latest version.
2594        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2595
2596        // Check all room infos are there.
2597        let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2598        assert_eq!(room_infos.len(), 3);
2599
2600        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2601        assert_eq!(room_a.name(), Some(room_a_name));
2602        assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2603
2604        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2605        assert_eq!(room_b.name(), None);
2606        assert_eq!(room_b.creators(), None);
2607
2608        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2609        assert_eq!(room_c.name(), None);
2610        assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2611    }
2612
2613    #[async_test]
2614    pub async fn test_migrating_v7_to_v9() {
2615        let path = new_path();
2616
2617        let room_id = room_id!("!room_a:dummy.local");
2618        let wedged_event_transaction_id = TransactionId::new();
2619        let local_event_transaction_id = TransactionId::new();
2620
2621        // Create and populate db.
2622        {
2623            let db = create_fake_db(&path, 7).await.unwrap();
2624            let conn = db.pool.get().await.unwrap();
2625
2626            let wedge_tx = wedged_event_transaction_id.clone();
2627            let local_tx = local_event_transaction_id.clone();
2628
2629            conn.with_transaction(move |txn| {
2630                add_dependent_send_queue_event_v7(
2631                    &db,
2632                    txn,
2633                    room_id,
2634                    &local_tx,
2635                    ChildTransactionId::new(),
2636                    DependentQueuedRequestKind::RedactEvent,
2637                )?;
2638                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2639                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2640                Result::<_, Error>::Ok(())
2641            })
2642            .await
2643            .unwrap();
2644        }
2645
2646        // This transparently migrates to the latest version, which clears up all
2647        // requests and dependent requests.
2648        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2649
2650        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2651        assert!(requests.is_empty());
2652
2653        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2654        assert!(dependent_requests.is_empty());
2655    }
2656
2657    fn add_send_queue_event_v7(
2658        this: &SqliteStateStore,
2659        txn: &Transaction<'_>,
2660        transaction_id: &TransactionId,
2661        room_id: &RoomId,
2662        is_wedged: bool,
2663    ) -> Result<(), Error> {
2664        let content =
2665            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2666
2667        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2668        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2669
2670        let content = this.serialize_json(&content)?;
2671
2672        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2673            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2674
2675        Ok(())
2676    }
2677
2678    fn add_dependent_send_queue_event_v7(
2679        this: &SqliteStateStore,
2680        txn: &Transaction<'_>,
2681        room_id: &RoomId,
2682        parent_txn_id: &TransactionId,
2683        own_txn_id: ChildTransactionId,
2684        content: DependentQueuedRequestKind,
2685    ) -> Result<(), Error> {
2686        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2687
2688        let parent_txn_id = parent_txn_id.to_string();
2689        let own_txn_id = own_txn_id.to_string();
2690        let content = this.serialize_json(&content)?;
2691
2692        txn.prepare_cached(
2693            "INSERT INTO dependent_send_queue_events
2694                         (room_id, parent_transaction_id, own_transaction_id, content)
2695                       VALUES (?, ?, ?, ?)",
2696        )?
2697        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2698
2699        Ok(())
2700    }
2701
2702    #[derive(Clone, Debug, Serialize, Deserialize)]
2703    pub enum LegacyDependentQueuedRequestKind {
2704        UploadFileWithThumbnail {
2705            content_type: String,
2706            cache_key: MediaRequestParameters,
2707            related_to: OwnedTransactionId,
2708        },
2709    }
2710
2711    #[async_test]
2712    pub async fn test_dependent_queued_request_variant_renaming() {
2713        let path = new_path();
2714        let db = create_fake_db(&path, 7).await.unwrap();
2715
2716        let cache_key = MediaRequestParameters {
2717            format: MediaFormat::File,
2718            source: MediaSource::Plain("https://server.local/foobar".into()),
2719        };
2720        let related_to = TransactionId::new();
2721        let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2722            content_type: "image/png".to_owned(),
2723            cache_key,
2724            related_to: related_to.clone(),
2725        };
2726
2727        let data = db
2728            .serialize_json(&request)
2729            .expect("should be able to serialize legacy dependent request");
2730        let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2731            "should be able to deserialize dependent request from legacy dependent request",
2732        );
2733
2734        as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2735            assert_eq!(de_related_to, related_to);
2736        });
2737    }
2738}