1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13 store::{
14 migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15 DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16 RoomLoadSettings, SentRequestKey,
17 },
18 MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19 StateStoreDataKey, StateStoreDataValue,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23 canonical_json::{redact, RedactedBecause},
24 events::{
25 presence::PresenceEvent,
26 receipt::{Receipt, ReceiptThread, ReceiptType},
27 room::{
28 create::RoomCreateEventContent,
29 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30 },
31 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33 },
34 serde::Raw,
35 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36 OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{de::DeserializeOwned, Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44 error::{Error, Result},
45 utils::{
46 repeat_vars, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47 SqliteKeyValueStoreConnExt,
48 },
49 OpenStoreError, SqliteStoreConfig,
50};
51
52mod keys {
53 pub const KV_BLOB: &str = "kv_blob";
55 pub const ROOM_INFO: &str = "room_info";
56 pub const STATE_EVENT: &str = "state_event";
57 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59 pub const MEMBER: &str = "member";
60 pub const PROFILE: &str = "profile";
61 pub const RECEIPT: &str = "receipt";
62 pub const DISPLAY_NAME: &str = "display_name";
63 pub const SEND_QUEUE: &str = "send_queue_events";
64 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65}
66
67pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
69
70const DATABASE_VERSION: u8 = 12;
76
77#[derive(Clone)]
79pub struct SqliteStateStore {
80 store_cipher: Option<Arc<StoreCipher>>,
81 pool: SqlitePool,
82}
83
84#[cfg(not(tarpaulin_include))]
85impl fmt::Debug for SqliteStateStore {
86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
88 }
89}
90
91impl SqliteStateStore {
92 pub async fn open(
95 path: impl AsRef<Path>,
96 passphrase: Option<&str>,
97 ) -> Result<Self, OpenStoreError> {
98 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
99 }
100
101 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
103 let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
104
105 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
106
107 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
108 config.pool = Some(pool_config);
109
110 let pool = config.create_pool(Runtime::Tokio1)?;
111
112 let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
113 this.pool.get().await?.apply_runtime_config(runtime_config).await?;
114
115 Ok(this)
116 }
117
118 async fn open_with_pool(
121 pool: SqlitePool,
122 passphrase: Option<&str>,
123 ) -> Result<Self, OpenStoreError> {
124 let conn = pool.get().await?;
125
126 let mut version = conn.db_version().await?;
127
128 if version == 0 {
129 init(&conn).await?;
130 version = 1;
131 }
132
133 let store_cipher = match passphrase {
134 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
135 None => None,
136 };
137 let this = Self { store_cipher, pool };
138 this.run_migrations(&conn, version, None).await?;
139
140 Ok(this)
141 }
142
143 async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
148 let to = to.unwrap_or(DATABASE_VERSION);
149
150 if from < to {
151 debug!(version = from, new_version = to, "Upgrading database");
152 } else {
153 return Ok(());
154 }
155
156 if from < 2 && to >= 2 {
157 let this = self.clone();
158 conn.with_transaction(move |txn| {
159 txn.execute_batch(include_str!(
161 "../migrations/state_store/002_a_create_new_room_info.sql"
162 ))?;
163
164 for data in txn
166 .prepare("SELECT data FROM room_info")?
167 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
168 {
169 let data = data?;
170 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
171
172 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
173 let state = this
174 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
175 txn.prepare_cached(
176 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
177 VALUES (?, ?, ?)",
178 )?
179 .execute((room_id, state, data))?;
180 }
181
182 txn.execute_batch(include_str!(
184 "../migrations/state_store/002_b_replace_room_info.sql"
185 ))?;
186
187 txn.set_db_version(2)?;
188 Result::<_, Error>::Ok(())
189 })
190 .await?;
191 }
192
193 if from < 3 && to >= 3 {
195 let this = self.clone();
196 conn.with_transaction(move |txn| {
197 for data in txn
199 .prepare("SELECT data FROM room_info")?
200 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
201 {
202 let data = data?;
203 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
204
205 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
207 let event_type =
208 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
209 let create_res = txn
210 .prepare(
211 "SELECT stripped, data FROM state_event
212 WHERE room_id = ? AND event_type = ?",
213 )?
214 .query_row([room_id, event_type], |row| {
215 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
216 })
217 .optional()?;
218
219 let create = create_res.and_then(|(stripped, data)| {
220 let create = if stripped {
221 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
222 this.deserialize_json(&data).ok()?,
223 )
224 } else {
225 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
226 };
227 Some(create)
228 });
229
230 let migrated_room_info = room_info_v1.migrate(create.as_ref());
231
232 let data = this.serialize_json(&migrated_room_info)?;
233 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
234 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
235 .execute((data, room_id))?;
236 }
237
238 txn.set_db_version(3)?;
239 Result::<_, Error>::Ok(())
240 })
241 .await?;
242 }
243
244 if from < 4 && to >= 4 {
245 conn.with_transaction(move |txn| {
246 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
248 txn.set_db_version(4)
249 })
250 .await?;
251 }
252
253 if from < 5 && to >= 5 {
254 conn.with_transaction(move |txn| {
255 txn.execute_batch(include_str!(
257 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
258 ))?;
259 txn.set_db_version(4)
260 })
261 .await?;
262 }
263
264 if from < 6 && to >= 6 {
265 conn.with_transaction(move |txn| {
266 txn.execute_batch(include_str!(
268 "../migrations/state_store/005_send_queue_dependent_events.sql"
269 ))?;
270 txn.set_db_version(6)
271 })
272 .await?;
273 }
274
275 if from < 7 && to >= 7 {
276 conn.with_transaction(move |txn| {
277 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
279 txn.set_db_version(7)
280 })
281 .await?;
282 }
283
284 if from < 8 && to >= 8 {
285 let error = QueueWedgeError::GenericApiError {
287 msg: "local echo failed to send in a previous session".into(),
288 };
289 let default_err = self.serialize_value(&error)?;
290
291 conn.with_transaction(move |txn| {
292 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
294
295 for wedged_entries in txn
298 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
299 .query_map((), |row| {
300 Ok(
301 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
302 )
303 })? {
304
305 let (room_id, transaction_id) = wedged_entries?;
306
307 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
308 .execute((default_err.clone(), room_id, transaction_id))?;
309 }
310
311
312 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
314
315 txn.set_db_version(8)
316 })
317 .await?;
318 }
319
320 if from < 9 && to >= 9 {
321 conn.with_transaction(move |txn| {
322 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
324 txn.set_db_version(9)
325 })
326 .await?;
327 }
328
329 if from < 10 && to >= 10 {
330 conn.with_transaction(move |txn| {
331 txn.execute_batch(include_str!(
333 "../migrations/state_store/009_send_queue_priority.sql"
334 ))?;
335 txn.set_db_version(10)
336 })
337 .await?;
338 }
339
340 if from < 11 && to >= 11 {
341 conn.with_transaction(move |txn| {
342 txn.execute_batch(include_str!(
344 "../migrations/state_store/010_send_queue_enqueue_time.sql"
345 ))?;
346 txn.set_db_version(11)
347 })
348 .await?;
349 }
350
351 if from < 12 && to >= 12 {
352 conn.vacuum().await?;
356 conn.set_kv("version", vec![12]).await?;
357 }
358
359 Ok(())
360 }
361
362 fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
363 if let Some(key) = &self.store_cipher {
364 let encrypted = key.encrypt_value_data(value)?;
365 Ok(rmp_serde::to_vec_named(&encrypted)?)
366 } else {
367 Ok(value)
368 }
369 }
370
371 fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
372 let serialized = rmp_serde::to_vec_named(value)?;
373 self.encode_value(serialized)
374 }
375
376 fn serialize_json(&self, value: &impl Serialize) -> Result<Vec<u8>> {
377 let serialized = serde_json::to_vec(value)?;
378 self.encode_value(serialized)
379 }
380
381 fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
382 if let Some(key) = &self.store_cipher {
383 let encrypted = rmp_serde::from_slice(value)?;
384 let decrypted = key.decrypt_value_data(encrypted)?;
385 Ok(Cow::Owned(decrypted))
386 } else {
387 Ok(Cow::Borrowed(value))
388 }
389 }
390
391 fn deserialize_json<T: DeserializeOwned>(&self, data: &[u8]) -> Result<T> {
392 let decoded = self.decode_value(data)?;
393 Ok(serde_json::from_slice(&decoded)?)
394 }
395
396 fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
397 let decoded = self.decode_value(value)?;
398 Ok(rmp_serde::from_slice(&decoded)?)
399 }
400
401 fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
402 let bytes = key.as_ref();
403 if let Some(store_cipher) = &self.store_cipher {
404 Key::Hashed(store_cipher.hash_key(table_name, bytes))
405 } else {
406 Key::Plain(bytes.to_owned())
407 }
408 }
409
410 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
411 let key_s = match key {
412 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
413 StateStoreDataKey::ServerCapabilities => {
414 Cow::Borrowed(StateStoreDataKey::SERVER_CAPABILITIES)
415 }
416 StateStoreDataKey::Filter(f) => {
417 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
418 }
419 StateStoreDataKey::UserAvatarUrl(u) => {
420 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
421 }
422 StateStoreDataKey::RecentlyVisitedRooms(b) => {
423 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
424 }
425 StateStoreDataKey::UtdHookManagerData => {
426 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
427 }
428 StateStoreDataKey::ComposerDraft(room_id) => {
429 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
430 }
431 StateStoreDataKey::SeenKnockRequests(room_id) => {
432 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
433 }
434 };
435
436 self.encode_key(keys::KV_BLOB, &*key_s)
437 }
438
439 fn encode_presence_key(&self, user_id: &UserId) -> Key {
440 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
441 }
442
443 fn encode_custom_key(&self, key: &[u8]) -> Key {
444 let mut full_key = b"custom:".to_vec();
445 full_key.extend(key);
446 self.encode_key(keys::KV_BLOB, full_key)
447 }
448
449 async fn acquire(&self) -> Result<SqliteAsyncConn> {
450 Ok(self.pool.get().await?)
451 }
452
453 fn remove_maybe_stripped_room_data(
454 &self,
455 txn: &Transaction<'_>,
456 room_id: &RoomId,
457 stripped: bool,
458 ) -> rusqlite::Result<()> {
459 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
460 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
461
462 let member_room_id = self.encode_key(keys::MEMBER, room_id);
463 txn.remove_room_members(&member_room_id, Some(stripped))
464 }
465}
466
467async fn init(conn: &SqliteAsyncConn) -> Result<()> {
469 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
472 conn.with_transaction(|txn| {
473 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
474 txn.set_db_version(1)?;
475
476 Ok(())
477 })
478 .await
479}
480
481trait SqliteConnectionStateStoreExt {
482 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
483
484 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
485
486 fn set_room_account_data(
487 &self,
488 room_id: &[u8],
489 event_type: &[u8],
490 data: &[u8],
491 ) -> rusqlite::Result<()>;
492 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
493
494 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
495 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
496 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
497
498 fn set_state_event(
499 &self,
500 room_id: &[u8],
501 event_type: &[u8],
502 state_key: &[u8],
503 stripped: bool,
504 event_id: Option<&[u8]>,
505 data: &[u8],
506 ) -> rusqlite::Result<()>;
507 fn get_state_event_by_id(
508 &self,
509 room_id: &[u8],
510 event_id: &[u8],
511 ) -> rusqlite::Result<Option<Vec<u8>>>;
512 fn remove_room_state_events(
513 &self,
514 room_id: &[u8],
515 stripped: Option<bool>,
516 ) -> rusqlite::Result<()>;
517
518 fn set_member(
519 &self,
520 room_id: &[u8],
521 user_id: &[u8],
522 membership: &[u8],
523 stripped: bool,
524 data: &[u8],
525 ) -> rusqlite::Result<()>;
526 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
527
528 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
529 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
530 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
531
532 fn set_receipt(
533 &self,
534 room_id: &[u8],
535 user_id: &[u8],
536 receipt_type: &[u8],
537 thread_id: &[u8],
538 event_id: &[u8],
539 data: &[u8],
540 ) -> rusqlite::Result<()>;
541 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
542
543 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
544 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
545 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
546 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
547 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
548}
549
550impl SqliteConnectionStateStoreExt for rusqlite::Connection {
551 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
552 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
553 Ok(())
554 }
555
556 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
557 self.prepare_cached(
558 "INSERT OR REPLACE INTO global_account_data (event_type, data)
559 VALUES (?, ?)",
560 )?
561 .execute((event_type, data))?;
562 Ok(())
563 }
564
565 fn set_room_account_data(
566 &self,
567 room_id: &[u8],
568 event_type: &[u8],
569 data: &[u8],
570 ) -> rusqlite::Result<()> {
571 self.prepare_cached(
572 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
573 VALUES (?, ?, ?)",
574 )?
575 .execute((room_id, event_type, data))?;
576 Ok(())
577 }
578
579 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
580 self.prepare(
581 "DELETE FROM room_account_data
582 WHERE room_id = ?",
583 )?
584 .execute((room_id,))?;
585 Ok(())
586 }
587
588 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
589 self.prepare_cached(
590 "INSERT OR REPLACE INTO room_info (room_id, state, data)
591 VALUES (?, ?, ?)",
592 )?
593 .execute((room_id, state, data))?;
594 Ok(())
595 }
596
597 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
598 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
599 .optional()
600 }
601
602 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
604 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
605 Ok(())
606 }
607
608 fn set_state_event(
609 &self,
610 room_id: &[u8],
611 event_type: &[u8],
612 state_key: &[u8],
613 stripped: bool,
614 event_id: Option<&[u8]>,
615 data: &[u8],
616 ) -> rusqlite::Result<()> {
617 self.prepare_cached(
618 "INSERT OR REPLACE
619 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
620 VALUES (?, ?, ?, ?, ?, ?)",
621 )?
622 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
623 Ok(())
624 }
625
626 fn get_state_event_by_id(
627 &self,
628 room_id: &[u8],
629 event_id: &[u8],
630 ) -> rusqlite::Result<Option<Vec<u8>>> {
631 self.query_row(
632 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
633 (room_id, event_id),
634 |row| row.get(0),
635 )
636 .optional()
637 }
638
639 fn remove_room_state_events(
645 &self,
646 room_id: &[u8],
647 stripped: Option<bool>,
648 ) -> rusqlite::Result<()> {
649 if let Some(stripped) = stripped {
650 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
651 .execute((room_id, stripped))?;
652 } else {
653 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
654 .execute((room_id,))?;
655 }
656 Ok(())
657 }
658
659 fn set_member(
660 &self,
661 room_id: &[u8],
662 user_id: &[u8],
663 membership: &[u8],
664 stripped: bool,
665 data: &[u8],
666 ) -> rusqlite::Result<()> {
667 self.prepare_cached(
668 "INSERT OR REPLACE
669 INTO member (room_id, user_id, membership, stripped, data)
670 VALUES (?, ?, ?, ?, ?)",
671 )?
672 .execute((room_id, user_id, membership, stripped, data))?;
673 Ok(())
674 }
675
676 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
681 if let Some(stripped) = stripped {
682 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
683 .execute((room_id, stripped))?;
684 } else {
685 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
686 }
687 Ok(())
688 }
689
690 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
691 self.prepare_cached(
692 "INSERT OR REPLACE
693 INTO profile (room_id, user_id, data)
694 VALUES (?, ?, ?)",
695 )?
696 .execute((room_id, user_id, data))?;
697 Ok(())
698 }
699
700 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
701 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
702 Ok(())
703 }
704
705 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
706 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
707 .execute((room_id, user_id))?;
708 Ok(())
709 }
710
711 fn set_receipt(
712 &self,
713 room_id: &[u8],
714 user_id: &[u8],
715 receipt_type: &[u8],
716 thread: &[u8],
717 event_id: &[u8],
718 data: &[u8],
719 ) -> rusqlite::Result<()> {
720 self.prepare_cached(
721 "INSERT OR REPLACE
722 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
723 VALUES (?, ?, ?, ?, ?, ?)",
724 )?
725 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
726 Ok(())
727 }
728
729 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
730 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
731 Ok(())
732 }
733
734 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
735 self.prepare_cached(
736 "INSERT OR REPLACE
737 INTO display_name (room_id, name, data)
738 VALUES (?, ?, ?)",
739 )?
740 .execute((room_id, name, data))?;
741 Ok(())
742 }
743
744 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
745 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
746 .execute((room_id, name))?;
747 Ok(())
748 }
749
750 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
751 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
752 Ok(())
753 }
754
755 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
756 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
757 Ok(())
758 }
759
760 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
761 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
762 .execute((room_id,))?;
763 Ok(())
764 }
765}
766
767#[async_trait]
768trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
769 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
770 Ok(self
771 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
772 .await
773 .optional()?)
774 }
775
776 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
777 let keys_length = keys.len();
778
779 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
780 let sql_params = repeat_vars(keys.len());
781 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
782
783 let params = rusqlite::params_from_iter(keys);
784
785 Ok(txn
786 .prepare(&sql)?
787 .query(params)?
788 .mapped(|row| row.get(0))
789 .collect::<Result<_, _>>()?)
790 })
791 .await
792 }
793
794 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
795
796 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
797 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
798 Ok(())
799 }
800
801 async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
802 Ok(match room_id {
803 None => {
804 self.prepare("SELECT data FROM room_info", move |mut stmt| {
805 stmt.query_map((), |row| row.get(0))?.collect()
806 })
807 .await?
808 }
809
810 Some(room_id) => {
811 self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
812 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
813 })
814 .await?
815 }
816 })
817 }
818
819 async fn get_maybe_stripped_state_events_for_keys(
820 &self,
821 room_id: Key,
822 event_type: Key,
823 state_keys: Vec<Key>,
824 ) -> Result<Vec<(bool, Vec<u8>)>> {
825 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
826 let sql_params = repeat_vars(state_keys.len());
827 let sql = format!(
828 "SELECT stripped, data FROM state_event
829 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
830 );
831
832 let params = rusqlite::params_from_iter(
833 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
834 );
835
836 Ok(txn
837 .prepare(&sql)?
838 .query(params)?
839 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
840 .collect::<Result<_, _>>()?)
841 })
842 .await
843 }
844
845 async fn get_maybe_stripped_state_events(
846 &self,
847 room_id: Key,
848 event_type: Key,
849 ) -> Result<Vec<(bool, Vec<u8>)>> {
850 Ok(self
851 .prepare(
852 "SELECT stripped, data FROM state_event
853 WHERE room_id = ? AND event_type = ?",
854 |mut stmt| {
855 stmt.query((room_id, event_type))?
856 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
857 .collect()
858 },
859 )
860 .await?)
861 }
862
863 async fn get_profiles(
864 &self,
865 room_id: Key,
866 user_ids: Vec<Key>,
867 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
868 let user_ids_length = user_ids.len();
869
870 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
871 let sql_params = repeat_vars(user_ids.len());
872 let sql = format!(
873 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
874 );
875
876 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
877
878 Ok(txn
879 .prepare(&sql)?
880 .query(params)?
881 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
882 .collect::<Result<_, _>>()?)
883 })
884 .await
885 }
886
887 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
888 let res = if memberships.is_empty() {
889 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
890 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
891 })
892 .await?
893 } else {
894 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
895 let sql_params = repeat_vars(memberships.len());
896 let sql = format!(
897 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
898 );
899
900 let params =
901 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
902
903 Ok(txn
904 .prepare(&sql)?
905 .query(params)?
906 .mapped(|row| row.get(0))
907 .collect::<Result<_, _>>()?)
908 })
909 .await?
910 };
911
912 Ok(res)
913 }
914
915 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
916 Ok(self
917 .query_row(
918 "SELECT data FROM global_account_data WHERE event_type = ?",
919 (event_type,),
920 |row| row.get(0),
921 )
922 .await
923 .optional()?)
924 }
925
926 async fn get_room_account_data(
927 &self,
928 room_id: Key,
929 event_type: Key,
930 ) -> Result<Option<Vec<u8>>> {
931 Ok(self
932 .query_row(
933 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
934 (room_id, event_type),
935 |row| row.get(0),
936 )
937 .await
938 .optional()?)
939 }
940
941 async fn get_display_names(
942 &self,
943 room_id: Key,
944 names: Vec<Key>,
945 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
946 let names_length = names.len();
947
948 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
949 let sql_params = repeat_vars(names.len());
950 let sql = format!(
951 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
952 );
953
954 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
955
956 Ok(txn
957 .prepare(&sql)?
958 .query(params)?
959 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
960 .collect::<Result<_, _>>()?)
961 })
962 .await
963 }
964
965 async fn get_user_receipt(
966 &self,
967 room_id: Key,
968 receipt_type: Key,
969 thread: Key,
970 user_id: Key,
971 ) -> Result<Option<Vec<u8>>> {
972 Ok(self
973 .query_row(
974 "SELECT data FROM receipt
975 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
976 (room_id, receipt_type, thread, user_id),
977 |row| row.get(0),
978 )
979 .await
980 .optional()?)
981 }
982
983 async fn get_event_receipts(
984 &self,
985 room_id: Key,
986 receipt_type: Key,
987 thread: Key,
988 event_id: Key,
989 ) -> Result<Vec<Vec<u8>>> {
990 Ok(self
991 .prepare(
992 "SELECT data FROM receipt
993 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
994 |mut stmt| {
995 stmt.query((room_id, receipt_type, thread, event_id))?
996 .mapped(|row| row.get(0))
997 .collect()
998 },
999 )
1000 .await?)
1001 }
1002}
1003
1004#[async_trait]
1005impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1006 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1007 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1008 }
1009}
1010
1011#[async_trait]
1012impl StateStore for SqliteStateStore {
1013 type Error = Error;
1014
1015 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1016 self.acquire()
1017 .await?
1018 .get_kv_blob(self.encode_state_store_data_key(key))
1019 .await?
1020 .map(|data| {
1021 Ok(match key {
1022 StateStoreDataKey::SyncToken => {
1023 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1024 }
1025 StateStoreDataKey::ServerCapabilities => {
1026 StateStoreDataValue::ServerCapabilities(self.deserialize_value(&data)?)
1027 }
1028 StateStoreDataKey::Filter(_) => {
1029 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1030 }
1031 StateStoreDataKey::UserAvatarUrl(_) => {
1032 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1033 }
1034 StateStoreDataKey::RecentlyVisitedRooms(_) => {
1035 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1036 }
1037 StateStoreDataKey::UtdHookManagerData => {
1038 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1039 }
1040 StateStoreDataKey::ComposerDraft(_) => {
1041 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1042 }
1043 StateStoreDataKey::SeenKnockRequests(_) => {
1044 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1045 }
1046 })
1047 })
1048 .transpose()
1049 }
1050
1051 async fn set_kv_data(
1052 &self,
1053 key: StateStoreDataKey<'_>,
1054 value: StateStoreDataValue,
1055 ) -> Result<()> {
1056 let serialized_value = match key {
1057 StateStoreDataKey::SyncToken => self.serialize_value(
1058 &value.into_sync_token().expect("Session data not a sync token"),
1059 )?,
1060 StateStoreDataKey::ServerCapabilities => self.serialize_value(
1061 &value
1062 .into_server_capabilities()
1063 .expect("Session data not containing server capabilities"),
1064 )?,
1065 StateStoreDataKey::Filter(_) => {
1066 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1067 }
1068 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1069 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1070 )?,
1071 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1072 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1073 )?,
1074 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1075 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1076 )?,
1077 StateStoreDataKey::ComposerDraft(_) => self.serialize_value(
1078 &value.into_composer_draft().expect("Session data not a composer draft"),
1079 )?,
1080 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1081 &value
1082 .into_seen_knock_requests()
1083 .expect("Session data is not a set of seen knock request ids"),
1084 )?,
1085 };
1086
1087 self.acquire()
1088 .await?
1089 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1090 .await
1091 }
1092
1093 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1094 self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1095 }
1096
1097 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1098 let changes = changes.to_owned();
1099 let this = self.clone();
1100 self.acquire()
1101 .await?
1102 .with_transaction(move |txn| {
1103 let StateChanges {
1104 sync_token,
1105 account_data,
1106 presence,
1107 profiles,
1108 profiles_to_delete,
1109 state,
1110 room_account_data,
1111 room_infos,
1112 receipts,
1113 redactions,
1114 stripped_state,
1115 ambiguity_maps,
1116 } = changes;
1117
1118 if let Some(sync_token) = sync_token {
1119 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1120 let value = this.serialize_value(&sync_token)?;
1121 txn.set_kv_blob(&key, &value)?;
1122 }
1123
1124 for (event_type, event) in account_data {
1125 let event_type =
1126 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1127 let data = this.serialize_json(&event)?;
1128 txn.set_global_account_data(&event_type, &data)?;
1129 }
1130
1131 for (room_id, events) in room_account_data {
1132 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1133 for (event_type, event) in events {
1134 let event_type =
1135 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1136 let data = this.serialize_json(&event)?;
1137 txn.set_room_account_data(&room_id, &event_type, &data)?;
1138 }
1139 }
1140
1141 for (user_id, event) in presence {
1142 let key = this.encode_presence_key(&user_id);
1143 let value = this.serialize_json(&event)?;
1144 txn.set_kv_blob(&key, &value)?;
1145 }
1146
1147 for (room_id, room_info) in room_infos {
1148 let stripped = room_info.state() == RoomState::Invited;
1149 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1151
1152 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1153 let state = this
1154 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1155 let data = this.serialize_json(&room_info)?;
1156 txn.set_room_info(&room_id, &state, &data)?;
1157 }
1158
1159 for (room_id, user_ids) in profiles_to_delete {
1160 let room_id = this.encode_key(keys::PROFILE, room_id);
1161 for user_id in user_ids {
1162 let user_id = this.encode_key(keys::PROFILE, user_id);
1163 txn.remove_room_profile(&room_id, &user_id)?;
1164 }
1165 }
1166
1167 for (room_id, state_event_types) in state {
1168 let profiles = profiles.get(&room_id);
1169 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1170
1171 for (event_type, state_events) in state_event_types {
1172 let encoded_event_type =
1173 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1174
1175 for (state_key, raw_state_event) in state_events {
1176 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1177 let data = this.serialize_json(&raw_state_event)?;
1178
1179 let event_id: Option<String> =
1180 raw_state_event.get_field("event_id").ok().flatten();
1181 let encoded_event_id =
1182 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1183
1184 txn.set_state_event(
1185 &encoded_room_id,
1186 &encoded_event_type,
1187 &encoded_state_key,
1188 false,
1189 encoded_event_id.as_deref(),
1190 &data,
1191 )?;
1192
1193 if event_type == StateEventType::RoomMember {
1194 let member_event = match raw_state_event
1195 .deserialize_as::<SyncRoomMemberEvent>()
1196 {
1197 Ok(ev) => ev,
1198 Err(e) => {
1199 debug!(event_id, "Failed to deserialize member event: {e}");
1200 continue;
1201 }
1202 };
1203
1204 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1205 let user_id = this.encode_key(keys::MEMBER, &state_key);
1206 let membership = this
1207 .encode_key(keys::MEMBER, member_event.membership().as_str());
1208 let data = this.serialize_value(&state_key)?;
1209
1210 txn.set_member(
1211 &encoded_room_id,
1212 &user_id,
1213 &membership,
1214 false,
1215 &data,
1216 )?;
1217
1218 if let Some(profile) =
1219 profiles.and_then(|p| p.get(member_event.state_key()))
1220 {
1221 let room_id = this.encode_key(keys::PROFILE, &room_id);
1222 let user_id = this.encode_key(keys::PROFILE, &state_key);
1223 let data = this.serialize_json(&profile)?;
1224 txn.set_profile(&room_id, &user_id, &data)?;
1225 }
1226 }
1227 }
1228 }
1229 }
1230
1231 for (room_id, stripped_state_event_types) in stripped_state {
1232 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1233
1234 for (event_type, stripped_state_events) in stripped_state_event_types {
1235 let encoded_event_type =
1236 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1237
1238 for (state_key, raw_stripped_state_event) in stripped_state_events {
1239 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1240 let data = this.serialize_json(&raw_stripped_state_event)?;
1241 txn.set_state_event(
1242 &encoded_room_id,
1243 &encoded_event_type,
1244 &encoded_state_key,
1245 true,
1246 None,
1247 &data,
1248 )?;
1249
1250 if event_type == StateEventType::RoomMember {
1251 let member_event = match raw_stripped_state_event
1252 .deserialize_as::<StrippedRoomMemberEvent>(
1253 ) {
1254 Ok(ev) => ev,
1255 Err(e) => {
1256 debug!("Failed to deserialize stripped member event: {e}");
1257 continue;
1258 }
1259 };
1260
1261 let room_id = this.encode_key(keys::MEMBER, &room_id);
1262 let user_id = this.encode_key(keys::MEMBER, &state_key);
1263 let membership = this.encode_key(
1264 keys::MEMBER,
1265 member_event.content.membership.as_str(),
1266 );
1267 let data = this.serialize_value(&state_key)?;
1268
1269 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1270 }
1271 }
1272 }
1273 }
1274
1275 for (room_id, receipt_event) in receipts {
1276 let room_id = this.encode_key(keys::RECEIPT, room_id);
1277
1278 for (event_id, receipt_types) in receipt_event {
1279 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1280
1281 for (receipt_type, receipt_users) in receipt_types {
1282 let receipt_type =
1283 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1284
1285 for (user_id, receipt) in receipt_users {
1286 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1287 let thread = this.encode_key(
1290 keys::RECEIPT,
1291 rmp_serde::to_vec_named(&receipt.thread)?,
1292 );
1293 let data = this.serialize_json(&ReceiptData {
1294 receipt,
1295 event_id: event_id.clone(),
1296 user_id,
1297 })?;
1298
1299 txn.set_receipt(
1300 &room_id,
1301 &encoded_user_id,
1302 &receipt_type,
1303 &thread,
1304 &encoded_event_id,
1305 &data,
1306 )?;
1307 }
1308 }
1309 }
1310 }
1311
1312 for (room_id, redactions) in redactions {
1313 let make_room_version = || {
1314 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1315 txn.get_room_info(&encoded_room_id)
1316 .ok()
1317 .flatten()
1318 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1319 .and_then(|info| info.room_version().cloned())
1320 .unwrap_or_else(|| {
1321 warn!(
1322 ?room_id,
1323 "Unable to find the room version, assume version 9"
1324 );
1325 RoomVersionId::V9
1326 })
1327 };
1328
1329 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1330 let mut room_version = None;
1331
1332 for (event_id, redaction) in redactions {
1333 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1334
1335 if let Some(Ok(raw_event)) = txn
1336 .get_state_event_by_id(&encoded_room_id, &event_id)?
1337 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1338 {
1339 let event = raw_event.deserialize()?;
1340 let redacted = redact(
1341 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1342 room_version.get_or_insert_with(make_room_version),
1343 Some(RedactedBecause::from_raw_event(&redaction)?),
1344 )
1345 .map_err(Error::Redaction)?;
1346 let data = this.serialize_json(&redacted)?;
1347
1348 let event_type =
1349 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1350 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1351
1352 txn.set_state_event(
1353 &encoded_room_id,
1354 &event_type,
1355 &state_key,
1356 false,
1357 Some(&event_id),
1358 &data,
1359 )?;
1360 }
1361 }
1362 }
1363
1364 for (room_id, display_names) in ambiguity_maps {
1365 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1366
1367 for (name, user_ids) in display_names {
1368 let encoded_name = this.encode_key(
1369 keys::DISPLAY_NAME,
1370 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1371 );
1372 let data = this.serialize_json(&user_ids)?;
1373
1374 if user_ids.is_empty() {
1375 txn.remove_display_name(&room_id, &encoded_name)?;
1376
1377 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1392 txn.remove_display_name(&room_id, &raw_name)?;
1393 } else {
1394 txn.set_display_name(&room_id, &encoded_name, &data)?;
1396 }
1397 }
1398 }
1399
1400 Ok::<_, Error>(())
1401 })
1402 .await?;
1403
1404 Ok(())
1405 }
1406
1407 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1408 self.acquire()
1409 .await?
1410 .get_kv_blob(self.encode_presence_key(user_id))
1411 .await?
1412 .map(|data| self.deserialize_json(&data))
1413 .transpose()
1414 }
1415
1416 async fn get_presence_events(
1417 &self,
1418 user_ids: &[OwnedUserId],
1419 ) -> Result<Vec<Raw<PresenceEvent>>> {
1420 if user_ids.is_empty() {
1421 return Ok(Vec::new());
1422 }
1423
1424 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1425 self.acquire()
1426 .await?
1427 .get_kv_blobs(user_ids)
1428 .await?
1429 .into_iter()
1430 .map(|data| self.deserialize_json(&data))
1431 .collect()
1432 }
1433
1434 async fn get_state_event(
1435 &self,
1436 room_id: &RoomId,
1437 event_type: StateEventType,
1438 state_key: &str,
1439 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1440 Ok(self
1441 .get_state_events_for_keys(room_id, event_type, &[state_key])
1442 .await?
1443 .into_iter()
1444 .next())
1445 }
1446
1447 async fn get_state_events(
1448 &self,
1449 room_id: &RoomId,
1450 event_type: StateEventType,
1451 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1452 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1453 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1454 self.acquire()
1455 .await?
1456 .get_maybe_stripped_state_events(room_id, event_type)
1457 .await?
1458 .into_iter()
1459 .map(|(stripped, data)| {
1460 let ev = if stripped {
1461 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1462 } else {
1463 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1464 };
1465
1466 Ok(ev)
1467 })
1468 .collect()
1469 }
1470
1471 async fn get_state_events_for_keys(
1472 &self,
1473 room_id: &RoomId,
1474 event_type: StateEventType,
1475 state_keys: &[&str],
1476 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1477 if state_keys.is_empty() {
1478 return Ok(Vec::new());
1479 }
1480
1481 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1482 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1483 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1484 self.acquire()
1485 .await?
1486 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1487 .await?
1488 .into_iter()
1489 .map(|(stripped, data)| {
1490 let ev = if stripped {
1491 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1492 } else {
1493 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1494 };
1495
1496 Ok(ev)
1497 })
1498 .collect()
1499 }
1500
1501 async fn get_profile(
1502 &self,
1503 room_id: &RoomId,
1504 user_id: &UserId,
1505 ) -> Result<Option<MinimalRoomMemberEvent>> {
1506 let room_id = self.encode_key(keys::PROFILE, room_id);
1507 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1508
1509 self.acquire()
1510 .await?
1511 .get_profiles(room_id, user_ids)
1512 .await?
1513 .into_iter()
1514 .next()
1515 .map(|(_, data)| self.deserialize_json(&data))
1516 .transpose()
1517 }
1518
1519 async fn get_profiles<'a>(
1520 &self,
1521 room_id: &RoomId,
1522 user_ids: &'a [OwnedUserId],
1523 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1524 if user_ids.is_empty() {
1525 return Ok(BTreeMap::new());
1526 }
1527
1528 let room_id = self.encode_key(keys::PROFILE, room_id);
1529 let mut user_ids_map = user_ids
1530 .iter()
1531 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1532 .collect::<BTreeMap<_, _>>();
1533 let user_ids = user_ids_map.keys().cloned().collect();
1534
1535 self.acquire()
1536 .await?
1537 .get_profiles(room_id, user_ids)
1538 .await?
1539 .into_iter()
1540 .map(|(user_id, data)| {
1541 Ok((
1542 user_ids_map
1543 .remove(user_id.as_slice())
1544 .expect("returned user IDs were requested"),
1545 self.deserialize_json(&data)?,
1546 ))
1547 })
1548 .collect()
1549 }
1550
1551 async fn get_user_ids(
1552 &self,
1553 room_id: &RoomId,
1554 membership: RoomMemberships,
1555 ) -> Result<Vec<OwnedUserId>> {
1556 let room_id = self.encode_key(keys::MEMBER, room_id);
1557 let memberships = membership
1558 .as_vec()
1559 .into_iter()
1560 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1561 .collect();
1562 self.acquire()
1563 .await?
1564 .get_user_ids(room_id, memberships)
1565 .await?
1566 .iter()
1567 .map(|data| self.deserialize_value(data))
1568 .collect()
1569 }
1570
1571 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1572 self.acquire()
1573 .await?
1574 .get_room_infos(match room_load_settings {
1575 RoomLoadSettings::All => None,
1576 RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1577 })
1578 .await?
1579 .into_iter()
1580 .map(|data| self.deserialize_json(&data))
1581 .collect()
1582 }
1583
1584 async fn get_users_with_display_name(
1585 &self,
1586 room_id: &RoomId,
1587 display_name: &DisplayName,
1588 ) -> Result<BTreeSet<OwnedUserId>> {
1589 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1590 let names = vec![self.encode_key(
1591 keys::DISPLAY_NAME,
1592 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1593 )];
1594
1595 Ok(self
1596 .acquire()
1597 .await?
1598 .get_display_names(room_id, names)
1599 .await?
1600 .into_iter()
1601 .next()
1602 .map(|(_, data)| self.deserialize_json(&data))
1603 .transpose()?
1604 .unwrap_or_default())
1605 }
1606
1607 async fn get_users_with_display_names<'a>(
1608 &self,
1609 room_id: &RoomId,
1610 display_names: &'a [DisplayName],
1611 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1612 let mut result = HashMap::new();
1613
1614 if display_names.is_empty() {
1615 return Ok(result);
1616 }
1617
1618 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1619 let mut names_map = display_names
1620 .iter()
1621 .flat_map(|display_name| {
1622 let raw =
1632 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1633 let normalized = display_name.as_normalized_str().map(|normalized| {
1634 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1635 });
1636
1637 iter::once(raw).chain(normalized.into_iter())
1638 })
1639 .collect::<BTreeMap<_, _>>();
1640 let names = names_map.keys().cloned().collect();
1641
1642 for (name, data) in
1643 self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1644 {
1645 let display_name =
1646 names_map.remove(name.as_slice()).expect("returned display names were requested");
1647 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1648
1649 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1650 }
1651
1652 Ok(result)
1653 }
1654
1655 async fn get_account_data_event(
1656 &self,
1657 event_type: GlobalAccountDataEventType,
1658 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1659 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1660 self.acquire()
1661 .await?
1662 .get_global_account_data(event_type)
1663 .await?
1664 .map(|value| self.deserialize_json(&value))
1665 .transpose()
1666 }
1667
1668 async fn get_room_account_data_event(
1669 &self,
1670 room_id: &RoomId,
1671 event_type: RoomAccountDataEventType,
1672 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1673 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1674 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1675 self.acquire()
1676 .await?
1677 .get_room_account_data(room_id, event_type)
1678 .await?
1679 .map(|value| self.deserialize_json(&value))
1680 .transpose()
1681 }
1682
1683 async fn get_user_room_receipt_event(
1684 &self,
1685 room_id: &RoomId,
1686 receipt_type: ReceiptType,
1687 thread: ReceiptThread,
1688 user_id: &UserId,
1689 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1690 let room_id = self.encode_key(keys::RECEIPT, room_id);
1691 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1692 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1695 let user_id = self.encode_key(keys::RECEIPT, user_id);
1696
1697 self.acquire()
1698 .await?
1699 .get_user_receipt(room_id, receipt_type, thread, user_id)
1700 .await?
1701 .map(|value| {
1702 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1703 })
1704 .transpose()
1705 }
1706
1707 async fn get_event_room_receipt_events(
1708 &self,
1709 room_id: &RoomId,
1710 receipt_type: ReceiptType,
1711 thread: ReceiptThread,
1712 event_id: &EventId,
1713 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1714 let room_id = self.encode_key(keys::RECEIPT, room_id);
1715 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1716 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1719 let event_id = self.encode_key(keys::RECEIPT, event_id);
1720
1721 self.acquire()
1722 .await?
1723 .get_event_receipts(room_id, receipt_type, thread, event_id)
1724 .await?
1725 .iter()
1726 .map(|value| {
1727 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1728 })
1729 .collect()
1730 }
1731
1732 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1733 self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1734 }
1735
1736 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1737 let conn = self.acquire().await?;
1738 let key = self.encode_custom_key(key);
1739 conn.set_kv_blob(key, value).await?;
1740 Ok(())
1741 }
1742
1743 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1744 let conn = self.acquire().await?;
1745 let key = self.encode_custom_key(key);
1746 let previous = conn.get_kv_blob(key.clone()).await?;
1747 conn.set_kv_blob(key, value).await?;
1748 Ok(previous)
1749 }
1750
1751 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1752 let conn = self.acquire().await?;
1753 let key = self.encode_custom_key(key);
1754 let previous = conn.get_kv_blob(key.clone()).await?;
1755 if previous.is_some() {
1756 conn.delete_kv_blob(key).await?;
1757 }
1758 Ok(previous)
1759 }
1760
1761 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1762 let this = self.clone();
1763 let room_id = room_id.to_owned();
1764
1765 let conn = self.acquire().await?;
1766
1767 conn.with_transaction(move |txn| -> Result<()> {
1768 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1769 txn.remove_room_info(&room_info_room_id)?;
1770
1771 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1772 txn.remove_room_state_events(&state_event_room_id, None)?;
1773
1774 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1775 txn.remove_room_members(&member_room_id, None)?;
1776
1777 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1778 txn.remove_room_profiles(&profile_room_id)?;
1779
1780 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1781 txn.remove_room_account_data(&room_account_data_room_id)?;
1782
1783 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1784 txn.remove_room_receipts(&receipt_room_id)?;
1785
1786 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1787 txn.remove_room_display_names(&display_name_room_id)?;
1788
1789 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1790 txn.remove_room_send_queue(&send_queue_room_id)?;
1791
1792 let dependent_send_queue_room_id =
1793 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1794 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1795
1796 Ok(())
1797 })
1798 .await?;
1799
1800 conn.vacuum().await
1801 }
1802
1803 async fn save_send_queue_request(
1804 &self,
1805 room_id: &RoomId,
1806 transaction_id: OwnedTransactionId,
1807 created_at: MilliSecondsSinceUnixEpoch,
1808 content: QueuedRequestKind,
1809 priority: usize,
1810 ) -> Result<(), Self::Error> {
1811 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1812 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1813
1814 let content = self.serialize_json(&content)?;
1815 let created_at_ts: u64 = created_at.0.into();
1821 self.acquire()
1822 .await?
1823 .with_transaction(move |txn| {
1824 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1825 Ok(())
1826 })
1827 .await
1828 }
1829
1830 async fn update_send_queue_request(
1831 &self,
1832 room_id: &RoomId,
1833 transaction_id: &TransactionId,
1834 content: QueuedRequestKind,
1835 ) -> Result<bool, Self::Error> {
1836 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1837
1838 let content = self.serialize_json(&content)?;
1839 let transaction_id = transaction_id.to_string();
1842
1843 let num_updated = self.acquire()
1844 .await?
1845 .with_transaction(move |txn| {
1846 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1847 })
1848 .await?;
1849
1850 Ok(num_updated > 0)
1851 }
1852
1853 async fn remove_send_queue_request(
1854 &self,
1855 room_id: &RoomId,
1856 transaction_id: &TransactionId,
1857 ) -> Result<bool, Self::Error> {
1858 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1859
1860 let transaction_id = transaction_id.to_string();
1862
1863 let num_deleted = self
1864 .acquire()
1865 .await?
1866 .with_transaction(move |txn| {
1867 txn.prepare_cached(
1868 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1869 )?
1870 .execute((room_id, &transaction_id))
1871 })
1872 .await?;
1873
1874 Ok(num_deleted > 0)
1875 }
1876
1877 async fn load_send_queue_requests(
1878 &self,
1879 room_id: &RoomId,
1880 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1881 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1882
1883 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1887 .acquire()
1888 .await?
1889 .prepare(
1890 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1891 |mut stmt| {
1892 stmt.query((room_id,))?
1893 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1894 .collect()
1895 },
1896 )
1897 .await?;
1898
1899 let mut requests = Vec::with_capacity(res.len());
1900 for entry in res {
1901 let created_at = entry
1902 .4
1903 .and_then(UInt::new)
1904 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1905 requests.push(QueuedRequest {
1906 transaction_id: entry.0.into(),
1907 kind: self.deserialize_json(&entry.1)?,
1908 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1909 priority: entry.3,
1910 created_at,
1911 });
1912 }
1913
1914 Ok(requests)
1915 }
1916
1917 async fn update_send_queue_request_status(
1918 &self,
1919 room_id: &RoomId,
1920 transaction_id: &TransactionId,
1921 error: Option<QueueWedgeError>,
1922 ) -> Result<(), Self::Error> {
1923 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1924
1925 let transaction_id = transaction_id.to_string();
1927
1928 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1930
1931 self.acquire()
1932 .await?
1933 .with_transaction(move |txn| {
1934 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1935 Ok(())
1936 })
1937 .await
1938 }
1939
1940 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1941 let res: Vec<Vec<u8>> = self
1946 .acquire()
1947 .await?
1948 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1949 stmt.query(())?.mapped(|row| row.get(0)).collect()
1950 })
1951 .await?;
1952
1953 Ok(res
1956 .into_iter()
1957 .map(|entry| self.deserialize_value(&entry))
1958 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1959 .into_iter()
1960 .collect())
1961 }
1962
1963 async fn save_dependent_queued_request(
1964 &self,
1965 room_id: &RoomId,
1966 parent_txn_id: &TransactionId,
1967 own_txn_id: ChildTransactionId,
1968 created_at: MilliSecondsSinceUnixEpoch,
1969 content: DependentQueuedRequestKind,
1970 ) -> Result<()> {
1971 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1972 let content = self.serialize_json(&content)?;
1973
1974 let parent_txn_id = parent_txn_id.to_string();
1976 let own_txn_id = own_txn_id.to_string();
1977
1978 let created_at_ts: u64 = created_at.0.into();
1979 self.acquire()
1980 .await?
1981 .with_transaction(move |txn| {
1982 txn.prepare_cached(
1983 r#"INSERT INTO dependent_send_queue_events
1984 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1985 VALUES (?, ?, ?, ?, ?)"#,
1986 )?
1987 .execute((
1988 room_id,
1989 parent_txn_id,
1990 own_txn_id,
1991 content,
1992 created_at_ts,
1993 ))?;
1994 Ok(())
1995 })
1996 .await
1997 }
1998
1999 async fn update_dependent_queued_request(
2000 &self,
2001 room_id: &RoomId,
2002 own_transaction_id: &ChildTransactionId,
2003 new_content: DependentQueuedRequestKind,
2004 ) -> Result<bool> {
2005 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2006 let content = self.serialize_json(&new_content)?;
2007
2008 let own_txn_id = own_transaction_id.to_string();
2010
2011 let num_updated = self
2012 .acquire()
2013 .await?
2014 .with_transaction(move |txn| {
2015 txn.prepare_cached(
2016 r#"UPDATE dependent_send_queue_events
2017 SET content = ?
2018 WHERE own_transaction_id = ?
2019 AND room_id = ?"#,
2020 )?
2021 .execute((content, own_txn_id, room_id))
2022 })
2023 .await?;
2024
2025 if num_updated > 1 {
2026 return Err(Error::InconsistentUpdate);
2027 }
2028
2029 Ok(num_updated == 1)
2030 }
2031
2032 async fn mark_dependent_queued_requests_as_ready(
2033 &self,
2034 room_id: &RoomId,
2035 parent_txn_id: &TransactionId,
2036 parent_key: SentRequestKey,
2037 ) -> Result<usize> {
2038 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2039 let parent_key = self.serialize_value(&parent_key)?;
2040
2041 let parent_txn_id = parent_txn_id.to_string();
2043
2044 self.acquire()
2045 .await?
2046 .with_transaction(move |txn| {
2047 Ok(txn.prepare_cached(
2048 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2049 )?
2050 .execute((parent_key, parent_txn_id, room_id))?)
2051 })
2052 .await
2053 }
2054
2055 async fn remove_dependent_queued_request(
2056 &self,
2057 room_id: &RoomId,
2058 txn_id: &ChildTransactionId,
2059 ) -> Result<bool> {
2060 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2061
2062 let txn_id = txn_id.to_string();
2064
2065 let num_deleted = self
2066 .acquire()
2067 .await?
2068 .with_transaction(move |txn| {
2069 txn.prepare_cached(
2070 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2071 )?
2072 .execute((txn_id, room_id))
2073 })
2074 .await?;
2075
2076 Ok(num_deleted > 0)
2077 }
2078
2079 async fn load_dependent_queued_requests(
2080 &self,
2081 room_id: &RoomId,
2082 ) -> Result<Vec<DependentQueuedRequest>> {
2083 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2084
2085 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2087 .acquire()
2088 .await?
2089 .prepare(
2090 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2091 |mut stmt| {
2092 stmt.query((room_id,))?
2093 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2094 .collect()
2095 },
2096 )
2097 .await?;
2098
2099 let mut dependent_events = Vec::with_capacity(res.len());
2100 for entry in res {
2101 let created_at = entry
2102 .4
2103 .and_then(UInt::new)
2104 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2105 dependent_events.push(DependentQueuedRequest {
2106 own_transaction_id: entry.0.into(),
2107 parent_transaction_id: entry.1.into(),
2108 parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2109 kind: self.deserialize_json(&entry.3)?,
2110 created_at,
2111 });
2112 }
2113
2114 Ok(dependent_events)
2115 }
2116}
2117
2118#[derive(Debug, Clone, Serialize, Deserialize)]
2119struct ReceiptData {
2120 receipt: Receipt,
2121 event_id: OwnedEventId,
2122 user_id: OwnedUserId,
2123}
2124
2125#[cfg(test)]
2126mod tests {
2127 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2128
2129 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2130 use once_cell::sync::Lazy;
2131 use tempfile::{tempdir, TempDir};
2132
2133 use super::SqliteStateStore;
2134
2135 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2136 static NUM: AtomicU32 = AtomicU32::new(0);
2137
2138 async fn get_store() -> Result<impl StateStore, StoreError> {
2139 let name = NUM.fetch_add(1, SeqCst).to_string();
2140 let tmpdir_path = TMP_DIR.path().join(name);
2141
2142 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2143
2144 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2145 }
2146
2147 statestore_integration_tests!();
2148}
2149
2150#[cfg(test)]
2151mod encrypted_tests {
2152 use std::{
2153 path::PathBuf,
2154 sync::atomic::{AtomicU32, Ordering::SeqCst},
2155 };
2156
2157 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2158 use matrix_sdk_test::async_test;
2159 use once_cell::sync::Lazy;
2160 use tempfile::{tempdir, TempDir};
2161
2162 use super::SqliteStateStore;
2163 use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2164
2165 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2166 static NUM: AtomicU32 = AtomicU32::new(0);
2167
2168 fn new_state_store_workspace() -> PathBuf {
2169 let name = NUM.fetch_add(1, SeqCst).to_string();
2170 TMP_DIR.path().join(name)
2171 }
2172
2173 async fn get_store() -> Result<impl StateStore, StoreError> {
2174 let tmpdir_path = new_state_store_workspace();
2175
2176 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2177
2178 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2179 .await
2180 .unwrap())
2181 }
2182
2183 #[async_test]
2184 async fn test_pool_size() {
2185 let tmpdir_path = new_state_store_workspace();
2186 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2187
2188 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2189
2190 assert_eq!(store.pool.status().max_size, 42);
2191 }
2192
2193 #[async_test]
2194 async fn test_cache_size() {
2195 let tmpdir_path = new_state_store_workspace();
2196 let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2197
2198 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2199
2200 let conn = store.pool.get().await.unwrap();
2201 let cache_size =
2202 conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2203
2204 assert_eq!(cache_size, -(1500 / 1024));
2208 }
2209
2210 #[async_test]
2211 async fn test_journal_size_limit() {
2212 let tmpdir_path = new_state_store_workspace();
2213 let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2214
2215 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2216
2217 let conn = store.pool.get().await.unwrap();
2218 let journal_size_limit = conn
2219 .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2220 .await
2221 .unwrap();
2222
2223 assert_eq!(journal_size_limit, 1500);
2226 }
2227
2228 statestore_integration_tests!();
2229}
2230
2231#[cfg(test)]
2232mod migration_tests {
2233 use std::{
2234 path::{Path, PathBuf},
2235 sync::{
2236 atomic::{AtomicU32, Ordering::SeqCst},
2237 Arc,
2238 },
2239 };
2240
2241 use as_variant::as_variant;
2242 use deadpool_sqlite::Runtime;
2243 use matrix_sdk_base::{
2244 media::{MediaFormat, MediaRequestParameters},
2245 store::{
2246 ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2247 SerializableEventContent,
2248 },
2249 sync::UnreadNotificationsCount,
2250 RoomState, StateStore,
2251 };
2252 use matrix_sdk_test::async_test;
2253 use once_cell::sync::Lazy;
2254 use ruma::{
2255 events::{
2256 room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2257 StateEventType,
2258 },
2259 room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2260 RoomId, TransactionId, UserId,
2261 };
2262 use rusqlite::Transaction;
2263 use serde::{Deserialize, Serialize};
2264 use serde_json::json;
2265 use tempfile::{tempdir, TempDir};
2266 use tokio::fs;
2267
2268 use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2269 use crate::{
2270 error::{Error, Result},
2271 utils::{SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2272 OpenStoreError,
2273 };
2274
2275 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2276 static NUM: AtomicU32 = AtomicU32::new(0);
2277 const SECRET: &str = "secret";
2278
2279 fn new_path() -> PathBuf {
2280 let name = NUM.fetch_add(1, SeqCst).to_string();
2281 TMP_DIR.path().join(name)
2282 }
2283
2284 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2285 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2286
2287 let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2288 let pool = config.create_pool(Runtime::Tokio1).unwrap();
2291 let conn = pool.get().await?;
2292
2293 init(&conn).await?;
2294
2295 let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2296 let this = SqliteStateStore { store_cipher, pool };
2297 this.run_migrations(&conn, 1, Some(version)).await?;
2298
2299 Ok(this)
2300 }
2301
2302 fn room_info_v1_json(
2303 room_id: &RoomId,
2304 state: RoomState,
2305 name: Option<&str>,
2306 creator: Option<&UserId>,
2307 ) -> serde_json::Value {
2308 let name_content = match name {
2310 Some(name) => json!({ "name": name }),
2311 None => json!({ "name": null }),
2312 };
2313 let create_content = match creator {
2315 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2316 None => RoomCreateEventContent::new_v11(),
2317 };
2318
2319 json!({
2320 "room_id": room_id,
2321 "room_type": state,
2322 "notification_counts": UnreadNotificationsCount::default(),
2323 "summary": {
2324 "heroes": [],
2325 "joined_member_count": 0,
2326 "invited_member_count": 0,
2327 },
2328 "members_synced": false,
2329 "base_info": {
2330 "dm_targets": [],
2331 "max_power_level": 100,
2332 "name": {
2333 "Original": {
2334 "content": name_content,
2335 },
2336 },
2337 "create": {
2338 "Original": {
2339 "content": create_content,
2340 }
2341 }
2342 },
2343 })
2344 }
2345
2346 #[async_test]
2347 pub async fn test_migrating_v1_to_v2() {
2348 let path = new_path();
2349 {
2351 let db = create_fake_db(&path, 1).await.unwrap();
2352 let conn = db.pool.get().await.unwrap();
2353
2354 let this = db.clone();
2355 conn.with_transaction(move |txn| {
2356 for i in 0..5 {
2357 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2358 let (state, stripped) =
2359 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2360 let info = room_info_v1_json(&room_id, state, None, None);
2361
2362 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2363 let data = this.serialize_json(&info)?;
2364
2365 txn.prepare_cached(
2366 "INSERT INTO room_info (room_id, stripped, data)
2367 VALUES (?, ?, ?)",
2368 )?
2369 .execute((room_id, stripped, data))?;
2370 }
2371
2372 Result::<_, Error>::Ok(())
2373 })
2374 .await
2375 .unwrap();
2376 }
2377
2378 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2380
2381 assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2383 }
2384
2385 fn add_room_v2(
2387 this: &SqliteStateStore,
2388 txn: &Transaction<'_>,
2389 room_id: &RoomId,
2390 name: Option<&str>,
2391 create_creator: Option<&UserId>,
2392 create_sender: Option<&UserId>,
2393 ) -> Result<(), Error> {
2394 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2395
2396 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2397 let encoded_state =
2398 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2399 let data = this.serialize_json(&room_info_json)?;
2400
2401 txn.prepare_cached(
2402 "INSERT INTO room_info (room_id, state, data)
2403 VALUES (?, ?, ?)",
2404 )?
2405 .execute((encoded_room_id, encoded_state, data))?;
2406
2407 let Some(create_sender) = create_sender else {
2409 return Ok(());
2410 };
2411
2412 let create_content = match create_creator {
2413 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2414 None => RoomCreateEventContent::new_v11(),
2415 };
2416
2417 let event_id = EventId::new(server_name!("dummy.local"));
2418 let create_event = json!({
2419 "content": create_content,
2420 "event_id": event_id,
2421 "sender": create_sender.to_owned(),
2422 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2423 "state_key": "",
2424 "type": "m.room.create",
2425 "unsigned": {},
2426 });
2427
2428 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2429 let encoded_event_type =
2430 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2431 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2432 let stripped = false;
2433 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2434 let data = this.serialize_json(&create_event)?;
2435
2436 txn.prepare_cached(
2437 "INSERT
2438 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2439 VALUES (?, ?, ?, ?, ?, ?)",
2440 )?
2441 .execute((
2442 encoded_room_id,
2443 encoded_event_type,
2444 encoded_state_key,
2445 stripped,
2446 encoded_event_id,
2447 data,
2448 ))?;
2449
2450 Ok(())
2451 }
2452
2453 #[async_test]
2454 pub async fn test_migrating_v2_to_v3() {
2455 let path = new_path();
2456
2457 let room_a_id = room_id!("!room_a:dummy.local");
2459 let room_a_name = "Room A";
2460 let room_a_creator = user_id!("@creator:dummy.local");
2461 let room_a_create_sender = user_id!("@sender:dummy.local");
2464
2465 let room_b_id = room_id!("!room_b:dummy.local");
2467
2468 let room_c_id = room_id!("!room_c:dummy.local");
2470 let room_c_create_sender = user_id!("@creator:dummy.local");
2471
2472 {
2474 let db = create_fake_db(&path, 2).await.unwrap();
2475 let conn = db.pool.get().await.unwrap();
2476
2477 let this = db.clone();
2478 conn.with_transaction(move |txn| {
2479 add_room_v2(
2480 &this,
2481 txn,
2482 room_a_id,
2483 Some(room_a_name),
2484 Some(room_a_creator),
2485 Some(room_a_create_sender),
2486 )?;
2487 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2488 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2489
2490 Result::<_, Error>::Ok(())
2491 })
2492 .await
2493 .unwrap();
2494 }
2495
2496 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2498
2499 let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2501 assert_eq!(room_infos.len(), 3);
2502
2503 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2504 assert_eq!(room_a.name(), Some(room_a_name));
2505 assert_eq!(room_a.creator(), Some(room_a_create_sender));
2506
2507 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2508 assert_eq!(room_b.name(), None);
2509 assert_eq!(room_b.creator(), None);
2510
2511 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2512 assert_eq!(room_c.name(), None);
2513 assert_eq!(room_c.creator(), Some(room_c_create_sender));
2514 }
2515
2516 #[async_test]
2517 pub async fn test_migrating_v7_to_v9() {
2518 let path = new_path();
2519
2520 let room_id = room_id!("!room_a:dummy.local");
2521 let wedged_event_transaction_id = TransactionId::new();
2522 let local_event_transaction_id = TransactionId::new();
2523
2524 {
2526 let db = create_fake_db(&path, 7).await.unwrap();
2527 let conn = db.pool.get().await.unwrap();
2528
2529 let wedge_tx = wedged_event_transaction_id.clone();
2530 let local_tx = local_event_transaction_id.clone();
2531
2532 conn.with_transaction(move |txn| {
2533 add_dependent_send_queue_event_v7(
2534 &db,
2535 txn,
2536 room_id,
2537 &local_tx,
2538 ChildTransactionId::new(),
2539 DependentQueuedRequestKind::RedactEvent,
2540 )?;
2541 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2542 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2543 Result::<_, Error>::Ok(())
2544 })
2545 .await
2546 .unwrap();
2547 }
2548
2549 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2552
2553 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2554 assert!(requests.is_empty());
2555
2556 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2557 assert!(dependent_requests.is_empty());
2558 }
2559
2560 fn add_send_queue_event_v7(
2561 this: &SqliteStateStore,
2562 txn: &Transaction<'_>,
2563 transaction_id: &TransactionId,
2564 room_id: &RoomId,
2565 is_wedged: bool,
2566 ) -> Result<(), Error> {
2567 let content =
2568 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2569
2570 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2571 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2572
2573 let content = this.serialize_json(&content)?;
2574
2575 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2576 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2577
2578 Ok(())
2579 }
2580
2581 fn add_dependent_send_queue_event_v7(
2582 this: &SqliteStateStore,
2583 txn: &Transaction<'_>,
2584 room_id: &RoomId,
2585 parent_txn_id: &TransactionId,
2586 own_txn_id: ChildTransactionId,
2587 content: DependentQueuedRequestKind,
2588 ) -> Result<(), Error> {
2589 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2590
2591 let parent_txn_id = parent_txn_id.to_string();
2592 let own_txn_id = own_txn_id.to_string();
2593 let content = this.serialize_json(&content)?;
2594
2595 txn.prepare_cached(
2596 "INSERT INTO dependent_send_queue_events
2597 (room_id, parent_transaction_id, own_transaction_id, content)
2598 VALUES (?, ?, ?, ?)",
2599 )?
2600 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2601
2602 Ok(())
2603 }
2604
2605 #[derive(Clone, Debug, Serialize, Deserialize)]
2606 pub enum LegacyDependentQueuedRequestKind {
2607 UploadFileWithThumbnail {
2608 content_type: String,
2609 cache_key: MediaRequestParameters,
2610 related_to: OwnedTransactionId,
2611 },
2612 }
2613
2614 #[async_test]
2615 pub async fn test_dependent_queued_request_variant_renaming() {
2616 let path = new_path();
2617 let db = create_fake_db(&path, 7).await.unwrap();
2618
2619 let cache_key = MediaRequestParameters {
2620 format: MediaFormat::File,
2621 source: MediaSource::Plain("https://server.local/foobar".into()),
2622 };
2623 let related_to = TransactionId::new();
2624 let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2625 content_type: "image/png".to_owned(),
2626 cache_key,
2627 related_to: related_to.clone(),
2628 };
2629
2630 let data = db
2631 .serialize_json(&request)
2632 .expect("should be able to serialize legacy dependent request");
2633 let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2634 "should be able to deserialize dependent request from legacy dependent request",
2635 );
2636
2637 as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2638 assert_eq!(de_related_to, related_to);
2639 });
2640 }
2641}