1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 str::FromStr as _,
7 sync::Arc,
8};
9
10use async_trait::async_trait;
11use matrix_sdk_base::{
12 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13 store::{
14 compare_thread_subscription_bump_stamps, migration_helpers::RoomInfoV1, ChildTransactionId,
15 DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, QueuedRequest,
16 QueuedRequestKind, RoomLoadSettings, SentRequestKey, StoredThreadSubscription,
17 ThreadSubscriptionStatus,
18 },
19 MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
20 StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
21};
22use matrix_sdk_store_encryption::StoreCipher;
23use ruma::{
24 canonical_json::{redact, RedactedBecause},
25 events::{
26 presence::PresenceEvent,
27 receipt::{Receipt, ReceiptThread, ReceiptType},
28 room::{
29 create::RoomCreateEventContent,
30 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
31 },
32 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
33 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
34 },
35 serde::Raw,
36 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
37 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
38};
39use rusqlite::{OptionalExtension, Transaction};
40use serde::{Deserialize, Serialize};
41use tokio::{
42 fs,
43 sync::{Mutex, OwnedMutexGuard},
44};
45use tracing::{debug, instrument, trace, warn};
46
47use crate::{
48 connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49 error::{Error, Result},
50 utils::{
51 repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52 SqliteKeyValueStoreConnExt,
53 },
54 OpenStoreError, Secret, SqliteStoreConfig,
55};
56
57mod keys {
58 pub const KV_BLOB: &str = "kv_blob";
60 pub const ROOM_INFO: &str = "room_info";
61 pub const STATE_EVENT: &str = "state_event";
62 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
63 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
64 pub const MEMBER: &str = "member";
65 pub const PROFILE: &str = "profile";
66 pub const RECEIPT: &str = "receipt";
67 pub const DISPLAY_NAME: &str = "display_name";
68 pub const SEND_QUEUE: &str = "send_queue_events";
69 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
70 pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
71}
72
73pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
75
76const DATABASE_VERSION: u8 = 14;
82
83#[derive(Clone)]
85pub struct SqliteStateStore {
86 store_cipher: Option<Arc<StoreCipher>>,
87
88 pool: SqlitePool,
90
91 write_connection: Arc<Mutex<SqliteAsyncConn>>,
96}
97
98#[cfg(not(tarpaulin_include))]
99impl fmt::Debug for SqliteStateStore {
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
102 }
103}
104
105impl SqliteStateStore {
106 pub async fn open(
109 path: impl AsRef<Path>,
110 passphrase: Option<&str>,
111 ) -> Result<Self, OpenStoreError> {
112 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
113 }
114
115 pub async fn open_with_key(
118 path: impl AsRef<Path>,
119 key: Option<&[u8; 32]>,
120 ) -> Result<Self, OpenStoreError> {
121 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
122 }
123
124 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
126 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
127
128 let pool = config.build_pool_of_connections(DATABASE_NAME)?;
129
130 let this = Self::open_with_pool(pool, config.secret).await?;
131 this.pool.get().await?.apply_runtime_config(config.runtime_config).await?;
132
133 Ok(this)
134 }
135
136 pub async fn open_with_pool(
139 pool: SqlitePool,
140 secret: Option<Secret>,
141 ) -> Result<Self, OpenStoreError> {
142 let conn = pool.get().await?;
143
144 let mut version = conn.db_version().await?;
145
146 if version == 0 {
147 init(&conn).await?;
148 version = 1;
149 }
150
151 let store_cipher = match secret {
152 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
153 None => None,
154 };
155 let this = Self {
156 store_cipher,
157 pool,
158 write_connection: Arc::new(Mutex::new(conn)),
160 };
161 this.run_migrations(version, None).await?;
162
163 Ok(this)
164 }
165
166 async fn run_migrations(&self, from: u8, to: Option<u8>) -> Result<()> {
171 let to = to.unwrap_or(DATABASE_VERSION);
172
173 if from < to {
174 debug!(version = from, new_version = to, "Upgrading database");
175 } else {
176 return Ok(());
177 }
178
179 let conn = self.write().await;
180
181 if from < 2 && to >= 2 {
182 let this = self.clone();
183 conn.with_transaction(move |txn| {
184 txn.execute_batch(include_str!(
186 "../migrations/state_store/002_a_create_new_room_info.sql"
187 ))?;
188
189 for data in txn
191 .prepare("SELECT data FROM room_info")?
192 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
193 {
194 let data = data?;
195 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
196
197 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
198 let state = this
199 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
200 txn.prepare_cached(
201 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
202 VALUES (?, ?, ?)",
203 )?
204 .execute((room_id, state, data))?;
205 }
206
207 txn.execute_batch(include_str!(
209 "../migrations/state_store/002_b_replace_room_info.sql"
210 ))?;
211
212 txn.set_db_version(2)?;
213 Result::<_, Error>::Ok(())
214 })
215 .await?;
216 }
217
218 if from < 3 && to >= 3 {
220 let this = self.clone();
221 conn.with_transaction(move |txn| {
222 for data in txn
224 .prepare("SELECT data FROM room_info")?
225 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
226 {
227 let data = data?;
228 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
229
230 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
232 let event_type =
233 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
234 let create_res = txn
235 .prepare(
236 "SELECT stripped, data FROM state_event
237 WHERE room_id = ? AND event_type = ?",
238 )?
239 .query_row([room_id, event_type], |row| {
240 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
241 })
242 .optional()?;
243
244 let create = create_res.and_then(|(stripped, data)| {
245 let create = if stripped {
246 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
247 this.deserialize_json(&data).ok()?,
248 )
249 } else {
250 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
251 };
252 Some(create)
253 });
254
255 let migrated_room_info = room_info_v1.migrate(create.as_ref());
256
257 let data = this.serialize_json(&migrated_room_info)?;
258 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
259 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
260 .execute((data, room_id))?;
261 }
262
263 txn.set_db_version(3)?;
264 Result::<_, Error>::Ok(())
265 })
266 .await?;
267 }
268
269 if from < 4 && to >= 4 {
270 conn.with_transaction(move |txn| {
271 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
273 txn.set_db_version(4)
274 })
275 .await?;
276 }
277
278 if from < 5 && to >= 5 {
279 conn.with_transaction(move |txn| {
280 txn.execute_batch(include_str!(
282 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
283 ))?;
284 txn.set_db_version(4)
285 })
286 .await?;
287 }
288
289 if from < 6 && to >= 6 {
290 conn.with_transaction(move |txn| {
291 txn.execute_batch(include_str!(
293 "../migrations/state_store/005_send_queue_dependent_events.sql"
294 ))?;
295 txn.set_db_version(6)
296 })
297 .await?;
298 }
299
300 if from < 7 && to >= 7 {
301 conn.with_transaction(move |txn| {
302 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
304 txn.set_db_version(7)
305 })
306 .await?;
307 }
308
309 if from < 8 && to >= 8 {
310 let error = QueueWedgeError::GenericApiError {
312 msg: "local echo failed to send in a previous session".into(),
313 };
314 let default_err = self.serialize_value(&error)?;
315
316 conn.with_transaction(move |txn| {
317 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
319
320 for wedged_entries in txn
323 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
324 .query_map((), |row| {
325 Ok(
326 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
327 )
328 })? {
329
330 let (room_id, transaction_id) = wedged_entries?;
331
332 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
333 .execute((default_err.clone(), room_id, transaction_id))?;
334 }
335
336
337 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
339
340 txn.set_db_version(8)
341 })
342 .await?;
343 }
344
345 if from < 9 && to >= 9 {
346 conn.with_transaction(move |txn| {
347 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
349 txn.set_db_version(9)
350 })
351 .await?;
352 }
353
354 if from < 10 && to >= 10 {
355 conn.with_transaction(move |txn| {
356 txn.execute_batch(include_str!(
358 "../migrations/state_store/009_send_queue_priority.sql"
359 ))?;
360 txn.set_db_version(10)
361 })
362 .await?;
363 }
364
365 if from < 11 && to >= 11 {
366 conn.with_transaction(move |txn| {
367 txn.execute_batch(include_str!(
369 "../migrations/state_store/010_send_queue_enqueue_time.sql"
370 ))?;
371 txn.set_db_version(11)
372 })
373 .await?;
374 }
375
376 if from < 12 && to >= 12 {
377 conn.vacuum().await?;
381 conn.set_kv("version", vec![12]).await?;
382 }
383
384 if from < 13 && to >= 13 {
385 conn.with_transaction(move |txn| {
386 txn.execute_batch(include_str!(
388 "../migrations/state_store/011_thread_subscriptions.sql"
389 ))?;
390 txn.set_db_version(13)
391 })
392 .await?;
393 }
394
395 if from < 14 && to >= 14 {
396 conn.with_transaction(move |txn| {
397 txn.execute_batch(include_str!(
399 "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
400 ))?;
401 txn.set_db_version(14)
402 })
403 .await?;
404 }
405
406 if from < 15 && to >= 15 {
407 conn.with_transaction(move |txn| {
408 txn.execute_batch(include_str!(
410 "../migrations/state_store/013_send_queue_new_parent_key_format.sql"
411 ))?;
412 txn.set_db_version(15)
413 })
414 .await?;
415 }
416
417 Ok(())
418 }
419
420 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
421 let key_s = match key {
422 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
423 StateStoreDataKey::SupportedVersions => {
424 Cow::Borrowed(StateStoreDataKey::SUPPORTED_VERSIONS)
425 }
426 StateStoreDataKey::WellKnown => Cow::Borrowed(StateStoreDataKey::WELL_KNOWN),
427 StateStoreDataKey::Filter(f) => {
428 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
429 }
430 StateStoreDataKey::UserAvatarUrl(u) => {
431 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
432 }
433 StateStoreDataKey::RecentlyVisitedRooms(b) => {
434 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
435 }
436 StateStoreDataKey::UtdHookManagerData => {
437 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
438 }
439 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
440 Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
441 }
442 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
443 if let Some(thread_root) = thread_root {
444 Cow::Owned(format!(
445 "{}:{room_id}:{thread_root}",
446 StateStoreDataKey::COMPOSER_DRAFT
447 ))
448 } else {
449 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
450 }
451 }
452 StateStoreDataKey::SeenKnockRequests(room_id) => {
453 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
454 }
455 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
456 Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
457 }
458 };
459
460 self.encode_key(keys::KV_BLOB, &*key_s)
461 }
462
463 fn encode_presence_key(&self, user_id: &UserId) -> Key {
464 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
465 }
466
467 fn encode_custom_key(&self, key: &[u8]) -> Key {
468 let mut full_key = b"custom:".to_vec();
469 full_key.extend(key);
470 self.encode_key(keys::KV_BLOB, full_key)
471 }
472
473 #[instrument(skip_all)]
475 async fn read(&self) -> Result<SqliteAsyncConn> {
476 Ok(self.pool.get().await?)
477 }
478
479 #[instrument(skip_all)]
481 async fn write(&self) -> OwnedMutexGuard<SqliteAsyncConn> {
482 self.write_connection.clone().lock_owned().await
483 }
484
485 fn remove_maybe_stripped_room_data(
486 &self,
487 txn: &Transaction<'_>,
488 room_id: &RoomId,
489 stripped: bool,
490 ) -> rusqlite::Result<()> {
491 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
492 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
493
494 let member_room_id = self.encode_key(keys::MEMBER, room_id);
495 txn.remove_room_members(&member_room_id, Some(stripped))
496 }
497
498 pub async fn vacuum(&self) -> Result<()> {
499 self.write_connection.lock().await.vacuum().await
500 }
501
502 pub async fn get_db_size(&self) -> Result<Option<usize>> {
503 let read_conn = self.pool.get().await?;
504 Ok(Some(read_conn.get_db_size().await?))
505 }
506}
507
508impl EncryptableStore for SqliteStateStore {
509 fn get_cypher(&self) -> Option<&StoreCipher> {
510 self.store_cipher.as_deref()
511 }
512}
513
514async fn init(conn: &SqliteAsyncConn) -> Result<()> {
516 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
519 conn.with_transaction(|txn| {
520 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
521 txn.set_db_version(1)?;
522
523 Ok(())
524 })
525 .await
526}
527
528trait SqliteConnectionStateStoreExt {
529 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
530
531 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
532
533 fn set_room_account_data(
534 &self,
535 room_id: &[u8],
536 event_type: &[u8],
537 data: &[u8],
538 ) -> rusqlite::Result<()>;
539 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
540
541 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
542 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
543 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
544
545 fn set_state_event(
546 &self,
547 room_id: &[u8],
548 event_type: &[u8],
549 state_key: &[u8],
550 stripped: bool,
551 event_id: Option<&[u8]>,
552 data: &[u8],
553 ) -> rusqlite::Result<()>;
554 fn get_state_event_by_id(
555 &self,
556 room_id: &[u8],
557 event_id: &[u8],
558 ) -> rusqlite::Result<Option<Vec<u8>>>;
559 fn remove_room_state_events(
560 &self,
561 room_id: &[u8],
562 stripped: Option<bool>,
563 ) -> rusqlite::Result<()>;
564
565 fn set_member(
566 &self,
567 room_id: &[u8],
568 user_id: &[u8],
569 membership: &[u8],
570 stripped: bool,
571 data: &[u8],
572 ) -> rusqlite::Result<()>;
573 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
574
575 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
576 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
577 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
578
579 fn set_receipt(
580 &self,
581 room_id: &[u8],
582 user_id: &[u8],
583 receipt_type: &[u8],
584 thread_id: &[u8],
585 event_id: &[u8],
586 data: &[u8],
587 ) -> rusqlite::Result<()>;
588 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
589
590 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
591 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
592 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
593 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
594 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
595}
596
597impl SqliteConnectionStateStoreExt for rusqlite::Connection {
598 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
599 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
600 Ok(())
601 }
602
603 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
604 self.prepare_cached(
605 "INSERT OR REPLACE INTO global_account_data (event_type, data)
606 VALUES (?, ?)",
607 )?
608 .execute((event_type, data))?;
609 Ok(())
610 }
611
612 fn set_room_account_data(
613 &self,
614 room_id: &[u8],
615 event_type: &[u8],
616 data: &[u8],
617 ) -> rusqlite::Result<()> {
618 self.prepare_cached(
619 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
620 VALUES (?, ?, ?)",
621 )?
622 .execute((room_id, event_type, data))?;
623 Ok(())
624 }
625
626 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
627 self.prepare(
628 "DELETE FROM room_account_data
629 WHERE room_id = ?",
630 )?
631 .execute((room_id,))?;
632 Ok(())
633 }
634
635 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
636 self.prepare_cached(
637 "INSERT OR REPLACE INTO room_info (room_id, state, data)
638 VALUES (?, ?, ?)",
639 )?
640 .execute((room_id, state, data))?;
641 Ok(())
642 }
643
644 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
645 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
646 .optional()
647 }
648
649 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
651 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
652 Ok(())
653 }
654
655 fn set_state_event(
656 &self,
657 room_id: &[u8],
658 event_type: &[u8],
659 state_key: &[u8],
660 stripped: bool,
661 event_id: Option<&[u8]>,
662 data: &[u8],
663 ) -> rusqlite::Result<()> {
664 self.prepare_cached(
665 "INSERT OR REPLACE
666 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
667 VALUES (?, ?, ?, ?, ?, ?)",
668 )?
669 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
670 Ok(())
671 }
672
673 fn get_state_event_by_id(
674 &self,
675 room_id: &[u8],
676 event_id: &[u8],
677 ) -> rusqlite::Result<Option<Vec<u8>>> {
678 self.query_row(
679 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
680 (room_id, event_id),
681 |row| row.get(0),
682 )
683 .optional()
684 }
685
686 fn remove_room_state_events(
692 &self,
693 room_id: &[u8],
694 stripped: Option<bool>,
695 ) -> rusqlite::Result<()> {
696 if let Some(stripped) = stripped {
697 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
698 .execute((room_id, stripped))?;
699 } else {
700 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
701 .execute((room_id,))?;
702 }
703 Ok(())
704 }
705
706 fn set_member(
707 &self,
708 room_id: &[u8],
709 user_id: &[u8],
710 membership: &[u8],
711 stripped: bool,
712 data: &[u8],
713 ) -> rusqlite::Result<()> {
714 self.prepare_cached(
715 "INSERT OR REPLACE
716 INTO member (room_id, user_id, membership, stripped, data)
717 VALUES (?, ?, ?, ?, ?)",
718 )?
719 .execute((room_id, user_id, membership, stripped, data))?;
720 Ok(())
721 }
722
723 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
728 if let Some(stripped) = stripped {
729 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
730 .execute((room_id, stripped))?;
731 } else {
732 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
733 }
734 Ok(())
735 }
736
737 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
738 self.prepare_cached(
739 "INSERT OR REPLACE
740 INTO profile (room_id, user_id, data)
741 VALUES (?, ?, ?)",
742 )?
743 .execute((room_id, user_id, data))?;
744 Ok(())
745 }
746
747 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
748 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
749 Ok(())
750 }
751
752 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
753 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
754 .execute((room_id, user_id))?;
755 Ok(())
756 }
757
758 fn set_receipt(
759 &self,
760 room_id: &[u8],
761 user_id: &[u8],
762 receipt_type: &[u8],
763 thread: &[u8],
764 event_id: &[u8],
765 data: &[u8],
766 ) -> rusqlite::Result<()> {
767 self.prepare_cached(
768 "INSERT OR REPLACE
769 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
770 VALUES (?, ?, ?, ?, ?, ?)",
771 )?
772 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
773 Ok(())
774 }
775
776 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
777 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
778 Ok(())
779 }
780
781 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
782 self.prepare_cached(
783 "INSERT OR REPLACE
784 INTO display_name (room_id, name, data)
785 VALUES (?, ?, ?)",
786 )?
787 .execute((room_id, name, data))?;
788 Ok(())
789 }
790
791 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
792 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
793 .execute((room_id, name))?;
794 Ok(())
795 }
796
797 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
798 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
799 Ok(())
800 }
801
802 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
803 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
804 Ok(())
805 }
806
807 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
808 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
809 .execute((room_id,))?;
810 Ok(())
811 }
812}
813
814#[async_trait]
815trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
816 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
817 Ok(self
818 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
819 .await
820 .optional()?)
821 }
822
823 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
824 let keys_length = keys.len();
825
826 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
827 let sql_params = repeat_vars(keys.len());
828 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
829
830 let params = rusqlite::params_from_iter(keys);
831
832 Ok(txn
833 .prepare(&sql)?
834 .query(params)?
835 .mapped(|row| row.get(0))
836 .collect::<Result<_, _>>()?)
837 })
838 .await
839 }
840
841 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
842
843 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
844 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
845 Ok(())
846 }
847
848 async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
849 Ok(match room_id {
850 None => {
851 self.prepare("SELECT data FROM room_info", move |mut stmt| {
852 stmt.query_map((), |row| row.get(0))?.collect()
853 })
854 .await?
855 }
856
857 Some(room_id) => {
858 self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
859 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
860 })
861 .await?
862 }
863 })
864 }
865
866 async fn get_maybe_stripped_state_events_for_keys(
867 &self,
868 room_id: Key,
869 event_type: Key,
870 state_keys: Vec<Key>,
871 ) -> Result<Vec<(bool, Vec<u8>)>> {
872 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
873 let sql_params = repeat_vars(state_keys.len());
874 let sql = format!(
875 "SELECT stripped, data FROM state_event
876 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
877 );
878
879 let params = rusqlite::params_from_iter(
880 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
881 );
882
883 Ok(txn
884 .prepare(&sql)?
885 .query(params)?
886 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
887 .collect::<Result<_, _>>()?)
888 })
889 .await
890 }
891
892 async fn get_maybe_stripped_state_events(
893 &self,
894 room_id: Key,
895 event_type: Key,
896 ) -> Result<Vec<(bool, Vec<u8>)>> {
897 Ok(self
898 .prepare(
899 "SELECT stripped, data FROM state_event
900 WHERE room_id = ? AND event_type = ?",
901 |mut stmt| {
902 stmt.query((room_id, event_type))?
903 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
904 .collect()
905 },
906 )
907 .await?)
908 }
909
910 async fn get_profiles(
911 &self,
912 room_id: Key,
913 user_ids: Vec<Key>,
914 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
915 let user_ids_length = user_ids.len();
916
917 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
918 let sql_params = repeat_vars(user_ids.len());
919 let sql = format!(
920 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
921 );
922
923 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
924
925 Ok(txn
926 .prepare(&sql)?
927 .query(params)?
928 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
929 .collect::<Result<_, _>>()?)
930 })
931 .await
932 }
933
934 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
935 let res = if memberships.is_empty() {
936 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
937 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
938 })
939 .await?
940 } else {
941 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
942 let sql_params = repeat_vars(memberships.len());
943 let sql = format!(
944 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
945 );
946
947 let params =
948 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
949
950 Ok(txn
951 .prepare(&sql)?
952 .query(params)?
953 .mapped(|row| row.get(0))
954 .collect::<Result<_, _>>()?)
955 })
956 .await?
957 };
958
959 Ok(res)
960 }
961
962 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
963 Ok(self
964 .query_row(
965 "SELECT data FROM global_account_data WHERE event_type = ?",
966 (event_type,),
967 |row| row.get(0),
968 )
969 .await
970 .optional()?)
971 }
972
973 async fn get_room_account_data(
974 &self,
975 room_id: Key,
976 event_type: Key,
977 ) -> Result<Option<Vec<u8>>> {
978 Ok(self
979 .query_row(
980 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
981 (room_id, event_type),
982 |row| row.get(0),
983 )
984 .await
985 .optional()?)
986 }
987
988 async fn get_display_names(
989 &self,
990 room_id: Key,
991 names: Vec<Key>,
992 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
993 let names_length = names.len();
994
995 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
996 let sql_params = repeat_vars(names.len());
997 let sql = format!(
998 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
999 );
1000
1001 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
1002
1003 Ok(txn
1004 .prepare(&sql)?
1005 .query(params)?
1006 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
1007 .collect::<Result<_, _>>()?)
1008 })
1009 .await
1010 }
1011
1012 async fn get_user_receipt(
1013 &self,
1014 room_id: Key,
1015 receipt_type: Key,
1016 thread: Key,
1017 user_id: Key,
1018 ) -> Result<Option<Vec<u8>>> {
1019 Ok(self
1020 .query_row(
1021 "SELECT data FROM receipt
1022 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
1023 (room_id, receipt_type, thread, user_id),
1024 |row| row.get(0),
1025 )
1026 .await
1027 .optional()?)
1028 }
1029
1030 async fn get_event_receipts(
1031 &self,
1032 room_id: Key,
1033 receipt_type: Key,
1034 thread: Key,
1035 event_id: Key,
1036 ) -> Result<Vec<Vec<u8>>> {
1037 Ok(self
1038 .prepare(
1039 "SELECT data FROM receipt
1040 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
1041 |mut stmt| {
1042 stmt.query((room_id, receipt_type, thread, event_id))?
1043 .mapped(|row| row.get(0))
1044 .collect()
1045 },
1046 )
1047 .await?)
1048 }
1049}
1050
1051#[async_trait]
1052impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1053 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1054 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1055 }
1056}
1057
1058#[async_trait]
1059impl StateStore for SqliteStateStore {
1060 type Error = Error;
1061
1062 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1063 self.read()
1064 .await?
1065 .get_kv_blob(self.encode_state_store_data_key(key))
1066 .await?
1067 .map(|data| {
1068 Ok(match key {
1069 StateStoreDataKey::SyncToken => {
1070 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1071 }
1072 StateStoreDataKey::SupportedVersions => {
1073 StateStoreDataValue::SupportedVersions(self.deserialize_value(&data)?)
1074 }
1075 StateStoreDataKey::WellKnown => {
1076 StateStoreDataValue::WellKnown(self.deserialize_value(&data)?)
1077 }
1078 StateStoreDataKey::Filter(_) => {
1079 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1080 }
1081 StateStoreDataKey::UserAvatarUrl(_) => {
1082 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1083 }
1084 StateStoreDataKey::RecentlyVisitedRooms(_) => {
1085 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1086 }
1087 StateStoreDataKey::UtdHookManagerData => {
1088 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1089 }
1090 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1091 StateStoreDataValue::OneTimeKeyAlreadyUploaded
1092 }
1093 StateStoreDataKey::ComposerDraft(_, _) => {
1094 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1095 }
1096 StateStoreDataKey::SeenKnockRequests(_) => {
1097 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1098 }
1099 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1100 StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1101 self.deserialize_value(&data)?,
1102 )
1103 }
1104 })
1105 })
1106 .transpose()
1107 }
1108
1109 async fn set_kv_data(
1110 &self,
1111 key: StateStoreDataKey<'_>,
1112 value: StateStoreDataValue,
1113 ) -> Result<()> {
1114 let serialized_value = match key {
1115 StateStoreDataKey::SyncToken => self.serialize_value(
1116 &value.into_sync_token().expect("Session data not a sync token"),
1117 )?,
1118 StateStoreDataKey::SupportedVersions => self.serialize_value(
1119 &value
1120 .into_supported_versions()
1121 .expect("Session data not containing supported versions"),
1122 )?,
1123 StateStoreDataKey::WellKnown => self.serialize_value(
1124 &value.into_well_known().expect("Session data not containing well-known"),
1125 )?,
1126 StateStoreDataKey::Filter(_) => {
1127 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1128 }
1129 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1130 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1131 )?,
1132 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1133 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1134 )?,
1135 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1136 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1137 )?,
1138 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1139 self.serialize_value(&true).expect("We should be able to serialize a boolean")
1140 }
1141 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1142 &value.into_composer_draft().expect("Session data not a composer draft"),
1143 )?,
1144 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1145 &value
1146 .into_seen_knock_requests()
1147 .expect("Session data is not a set of seen knock request ids"),
1148 )?,
1149 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1150 &value
1151 .into_thread_subscriptions_catchup_tokens()
1152 .expect("Session data is not a list of thread subscription catchup tokens"),
1153 )?,
1154 };
1155
1156 self.write()
1157 .await
1158 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1159 .await
1160 }
1161
1162 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1163 self.write().await.delete_kv_blob(self.encode_state_store_data_key(key)).await
1164 }
1165
1166 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1167 let changes = changes.to_owned();
1168 let this = self.clone();
1169 self.write()
1170 .await
1171 .with_transaction(move |txn| {
1172 let StateChanges {
1173 sync_token,
1174 account_data,
1175 presence,
1176 profiles,
1177 profiles_to_delete,
1178 state,
1179 room_account_data,
1180 room_infos,
1181 receipts,
1182 redactions,
1183 stripped_state,
1184 ambiguity_maps,
1185 } = changes;
1186
1187 if let Some(sync_token) = sync_token {
1188 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1189 let value = this.serialize_value(&sync_token)?;
1190 txn.set_kv_blob(&key, &value)?;
1191 }
1192
1193 for (event_type, event) in account_data {
1194 let event_type =
1195 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1196 let data = this.serialize_json(&event)?;
1197 txn.set_global_account_data(&event_type, &data)?;
1198 }
1199
1200 for (room_id, events) in room_account_data {
1201 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1202 for (event_type, event) in events {
1203 let event_type =
1204 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1205 let data = this.serialize_json(&event)?;
1206 txn.set_room_account_data(&room_id, &event_type, &data)?;
1207 }
1208 }
1209
1210 for (user_id, event) in presence {
1211 let key = this.encode_presence_key(&user_id);
1212 let value = this.serialize_json(&event)?;
1213 txn.set_kv_blob(&key, &value)?;
1214 }
1215
1216 for (room_id, room_info) in room_infos {
1217 let stripped = room_info.state() == RoomState::Invited;
1218 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1220
1221 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1222 let state = this
1223 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1224 let data = this.serialize_json(&room_info)?;
1225 txn.set_room_info(&room_id, &state, &data)?;
1226 }
1227
1228 for (room_id, user_ids) in profiles_to_delete {
1229 let room_id = this.encode_key(keys::PROFILE, room_id);
1230 for user_id in user_ids {
1231 let user_id = this.encode_key(keys::PROFILE, user_id);
1232 txn.remove_room_profile(&room_id, &user_id)?;
1233 }
1234 }
1235
1236 for (room_id, state_event_types) in state {
1237 let profiles = profiles.get(&room_id);
1238 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1239
1240 for (event_type, state_events) in state_event_types {
1241 let encoded_event_type =
1242 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1243
1244 for (state_key, raw_state_event) in state_events {
1245 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1246 let data = this.serialize_json(&raw_state_event)?;
1247
1248 let event_id: Option<String> =
1249 raw_state_event.get_field("event_id").ok().flatten();
1250 let encoded_event_id =
1251 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1252
1253 txn.set_state_event(
1254 &encoded_room_id,
1255 &encoded_event_type,
1256 &encoded_state_key,
1257 false,
1258 encoded_event_id.as_deref(),
1259 &data,
1260 )?;
1261
1262 if event_type == StateEventType::RoomMember {
1263 let member_event = match raw_state_event
1264 .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1265 {
1266 Ok(ev) => ev,
1267 Err(e) => {
1268 debug!(event_id, "Failed to deserialize member event: {e}");
1269 continue;
1270 }
1271 };
1272
1273 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1274 let user_id = this.encode_key(keys::MEMBER, &state_key);
1275 let membership = this
1276 .encode_key(keys::MEMBER, member_event.membership().as_str());
1277 let data = this.serialize_value(&state_key)?;
1278
1279 txn.set_member(
1280 &encoded_room_id,
1281 &user_id,
1282 &membership,
1283 false,
1284 &data,
1285 )?;
1286
1287 if let Some(profile) =
1288 profiles.and_then(|p| p.get(member_event.state_key()))
1289 {
1290 let room_id = this.encode_key(keys::PROFILE, &room_id);
1291 let user_id = this.encode_key(keys::PROFILE, &state_key);
1292 let data = this.serialize_json(&profile)?;
1293 txn.set_profile(&room_id, &user_id, &data)?;
1294 }
1295 }
1296 }
1297 }
1298 }
1299
1300 for (room_id, stripped_state_event_types) in stripped_state {
1301 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1302
1303 for (event_type, stripped_state_events) in stripped_state_event_types {
1304 let encoded_event_type =
1305 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1306
1307 for (state_key, raw_stripped_state_event) in stripped_state_events {
1308 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1309 let data = this.serialize_json(&raw_stripped_state_event)?;
1310 txn.set_state_event(
1311 &encoded_room_id,
1312 &encoded_event_type,
1313 &encoded_state_key,
1314 true,
1315 None,
1316 &data,
1317 )?;
1318
1319 if event_type == StateEventType::RoomMember {
1320 let member_event = match raw_stripped_state_event
1321 .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1322 ) {
1323 Ok(ev) => ev,
1324 Err(e) => {
1325 debug!("Failed to deserialize stripped member event: {e}");
1326 continue;
1327 }
1328 };
1329
1330 let room_id = this.encode_key(keys::MEMBER, &room_id);
1331 let user_id = this.encode_key(keys::MEMBER, &state_key);
1332 let membership = this.encode_key(
1333 keys::MEMBER,
1334 member_event.content.membership.as_str(),
1335 );
1336 let data = this.serialize_value(&state_key)?;
1337
1338 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1339 }
1340 }
1341 }
1342 }
1343
1344 for (room_id, receipt_event) in receipts {
1345 let room_id = this.encode_key(keys::RECEIPT, room_id);
1346
1347 for (event_id, receipt_types) in receipt_event {
1348 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1349
1350 for (receipt_type, receipt_users) in receipt_types {
1351 let receipt_type =
1352 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1353
1354 for (user_id, receipt) in receipt_users {
1355 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1356 let thread = this.encode_key(
1359 keys::RECEIPT,
1360 rmp_serde::to_vec_named(&receipt.thread)?,
1361 );
1362 let data = this.serialize_json(&ReceiptData {
1363 receipt,
1364 event_id: event_id.clone(),
1365 user_id,
1366 })?;
1367
1368 txn.set_receipt(
1369 &room_id,
1370 &encoded_user_id,
1371 &receipt_type,
1372 &thread,
1373 &encoded_event_id,
1374 &data,
1375 )?;
1376 }
1377 }
1378 }
1379 }
1380
1381 for (room_id, redactions) in redactions {
1382 let make_redaction_rules = || {
1383 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1384 txn.get_room_info(&encoded_room_id)
1385 .ok()
1386 .flatten()
1387 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1388 .map(|info| info.room_version_rules_or_default())
1389 .unwrap_or_else(|| {
1390 warn!(
1391 ?room_id,
1392 "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1393 );
1394 ROOM_VERSION_RULES_FALLBACK
1395 }).redaction
1396 };
1397
1398 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1399 let mut redaction_rules = None;
1400
1401 for (event_id, redaction) in redactions {
1402 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1403
1404 if let Some(Ok(raw_event)) = txn
1405 .get_state_event_by_id(&encoded_room_id, &event_id)?
1406 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1407 {
1408 let event = raw_event.deserialize()?;
1409 let redacted = redact(
1410 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1411 redaction_rules.get_or_insert_with(make_redaction_rules),
1412 Some(RedactedBecause::from_raw_event(&redaction)?),
1413 )
1414 .map_err(Error::Redaction)?;
1415 let data = this.serialize_json(&redacted)?;
1416
1417 let event_type =
1418 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1419 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1420
1421 txn.set_state_event(
1422 &encoded_room_id,
1423 &event_type,
1424 &state_key,
1425 false,
1426 Some(&event_id),
1427 &data,
1428 )?;
1429 }
1430 }
1431 }
1432
1433 for (room_id, display_names) in ambiguity_maps {
1434 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1435
1436 for (name, user_ids) in display_names {
1437 let encoded_name = this.encode_key(
1438 keys::DISPLAY_NAME,
1439 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1440 );
1441 let data = this.serialize_json(&user_ids)?;
1442
1443 if user_ids.is_empty() {
1444 txn.remove_display_name(&room_id, &encoded_name)?;
1445
1446 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1461 txn.remove_display_name(&room_id, &raw_name)?;
1462 } else {
1463 txn.set_display_name(&room_id, &encoded_name, &data)?;
1465 }
1466 }
1467 }
1468
1469 Ok::<_, Error>(())
1470 })
1471 .await?;
1472
1473 Ok(())
1474 }
1475
1476 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1477 self.read()
1478 .await?
1479 .get_kv_blob(self.encode_presence_key(user_id))
1480 .await?
1481 .map(|data| self.deserialize_json(&data))
1482 .transpose()
1483 }
1484
1485 async fn get_presence_events(
1486 &self,
1487 user_ids: &[OwnedUserId],
1488 ) -> Result<Vec<Raw<PresenceEvent>>> {
1489 if user_ids.is_empty() {
1490 return Ok(Vec::new());
1491 }
1492
1493 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1494 self.read()
1495 .await?
1496 .get_kv_blobs(user_ids)
1497 .await?
1498 .into_iter()
1499 .map(|data| self.deserialize_json(&data))
1500 .collect()
1501 }
1502
1503 async fn get_state_event(
1504 &self,
1505 room_id: &RoomId,
1506 event_type: StateEventType,
1507 state_key: &str,
1508 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1509 Ok(self
1510 .get_state_events_for_keys(room_id, event_type, &[state_key])
1511 .await?
1512 .into_iter()
1513 .next())
1514 }
1515
1516 async fn get_state_events(
1517 &self,
1518 room_id: &RoomId,
1519 event_type: StateEventType,
1520 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1521 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1522 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1523 self.read()
1524 .await?
1525 .get_maybe_stripped_state_events(room_id, event_type)
1526 .await?
1527 .into_iter()
1528 .map(|(stripped, data)| {
1529 let ev = if stripped {
1530 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1531 } else {
1532 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1533 };
1534
1535 Ok(ev)
1536 })
1537 .collect()
1538 }
1539
1540 async fn get_state_events_for_keys(
1541 &self,
1542 room_id: &RoomId,
1543 event_type: StateEventType,
1544 state_keys: &[&str],
1545 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1546 if state_keys.is_empty() {
1547 return Ok(Vec::new());
1548 }
1549
1550 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1551 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1552 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1553 self.read()
1554 .await?
1555 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1556 .await?
1557 .into_iter()
1558 .map(|(stripped, data)| {
1559 let ev = if stripped {
1560 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1561 } else {
1562 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1563 };
1564
1565 Ok(ev)
1566 })
1567 .collect()
1568 }
1569
1570 async fn get_profile(
1571 &self,
1572 room_id: &RoomId,
1573 user_id: &UserId,
1574 ) -> Result<Option<MinimalRoomMemberEvent>> {
1575 let room_id = self.encode_key(keys::PROFILE, room_id);
1576 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1577
1578 self.read()
1579 .await?
1580 .get_profiles(room_id, user_ids)
1581 .await?
1582 .into_iter()
1583 .next()
1584 .map(|(_, data)| self.deserialize_json(&data))
1585 .transpose()
1586 }
1587
1588 async fn get_profiles<'a>(
1589 &self,
1590 room_id: &RoomId,
1591 user_ids: &'a [OwnedUserId],
1592 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1593 if user_ids.is_empty() {
1594 return Ok(BTreeMap::new());
1595 }
1596
1597 let room_id = self.encode_key(keys::PROFILE, room_id);
1598 let mut user_ids_map = user_ids
1599 .iter()
1600 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1601 .collect::<BTreeMap<_, _>>();
1602 let user_ids = user_ids_map.keys().cloned().collect();
1603
1604 self.read()
1605 .await?
1606 .get_profiles(room_id, user_ids)
1607 .await?
1608 .into_iter()
1609 .map(|(user_id, data)| {
1610 Ok((
1611 user_ids_map
1612 .remove(user_id.as_slice())
1613 .expect("returned user IDs were requested"),
1614 self.deserialize_json(&data)?,
1615 ))
1616 })
1617 .collect()
1618 }
1619
1620 async fn get_user_ids(
1621 &self,
1622 room_id: &RoomId,
1623 membership: RoomMemberships,
1624 ) -> Result<Vec<OwnedUserId>> {
1625 let room_id = self.encode_key(keys::MEMBER, room_id);
1626 let memberships = membership
1627 .as_vec()
1628 .into_iter()
1629 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1630 .collect();
1631 self.read()
1632 .await?
1633 .get_user_ids(room_id, memberships)
1634 .await?
1635 .iter()
1636 .map(|data| self.deserialize_value(data))
1637 .collect()
1638 }
1639
1640 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1641 self.read()
1642 .await?
1643 .get_room_infos(match room_load_settings {
1644 RoomLoadSettings::All => None,
1645 RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1646 })
1647 .await?
1648 .into_iter()
1649 .map(|data| self.deserialize_json(&data))
1650 .collect()
1651 }
1652
1653 async fn get_users_with_display_name(
1654 &self,
1655 room_id: &RoomId,
1656 display_name: &DisplayName,
1657 ) -> Result<BTreeSet<OwnedUserId>> {
1658 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1659 let names = vec![self.encode_key(
1660 keys::DISPLAY_NAME,
1661 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1662 )];
1663
1664 Ok(self
1665 .read()
1666 .await?
1667 .get_display_names(room_id, names)
1668 .await?
1669 .into_iter()
1670 .next()
1671 .map(|(_, data)| self.deserialize_json(&data))
1672 .transpose()?
1673 .unwrap_or_default())
1674 }
1675
1676 async fn get_users_with_display_names<'a>(
1677 &self,
1678 room_id: &RoomId,
1679 display_names: &'a [DisplayName],
1680 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1681 let mut result = HashMap::new();
1682
1683 if display_names.is_empty() {
1684 return Ok(result);
1685 }
1686
1687 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1688 let mut names_map = display_names
1689 .iter()
1690 .flat_map(|display_name| {
1691 let raw =
1701 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1702 let normalized = display_name.as_normalized_str().map(|normalized| {
1703 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1704 });
1705
1706 iter::once(raw).chain(normalized)
1707 })
1708 .collect::<BTreeMap<_, _>>();
1709 let names = names_map.keys().cloned().collect();
1710
1711 for (name, data) in self.read().await?.get_display_names(room_id, names).await?.into_iter()
1712 {
1713 let display_name =
1714 names_map.remove(name.as_slice()).expect("returned display names were requested");
1715 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1716
1717 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1718 }
1719
1720 Ok(result)
1721 }
1722
1723 async fn get_account_data_event(
1724 &self,
1725 event_type: GlobalAccountDataEventType,
1726 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1727 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1728 self.read()
1729 .await?
1730 .get_global_account_data(event_type)
1731 .await?
1732 .map(|value| self.deserialize_json(&value))
1733 .transpose()
1734 }
1735
1736 async fn get_room_account_data_event(
1737 &self,
1738 room_id: &RoomId,
1739 event_type: RoomAccountDataEventType,
1740 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1741 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1742 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1743 self.read()
1744 .await?
1745 .get_room_account_data(room_id, event_type)
1746 .await?
1747 .map(|value| self.deserialize_json(&value))
1748 .transpose()
1749 }
1750
1751 async fn get_user_room_receipt_event(
1752 &self,
1753 room_id: &RoomId,
1754 receipt_type: ReceiptType,
1755 thread: ReceiptThread,
1756 user_id: &UserId,
1757 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1758 let room_id = self.encode_key(keys::RECEIPT, room_id);
1759 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1760 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1763 let user_id = self.encode_key(keys::RECEIPT, user_id);
1764
1765 self.read()
1766 .await?
1767 .get_user_receipt(room_id, receipt_type, thread, user_id)
1768 .await?
1769 .map(|value| {
1770 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1771 })
1772 .transpose()
1773 }
1774
1775 async fn get_event_room_receipt_events(
1776 &self,
1777 room_id: &RoomId,
1778 receipt_type: ReceiptType,
1779 thread: ReceiptThread,
1780 event_id: &EventId,
1781 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1782 let room_id = self.encode_key(keys::RECEIPT, room_id);
1783 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1784 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1787 let event_id = self.encode_key(keys::RECEIPT, event_id);
1788
1789 self.read()
1790 .await?
1791 .get_event_receipts(room_id, receipt_type, thread, event_id)
1792 .await?
1793 .iter()
1794 .map(|value| {
1795 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1796 })
1797 .collect()
1798 }
1799
1800 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1801 self.read().await?.get_kv_blob(self.encode_custom_key(key)).await
1802 }
1803
1804 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1805 let conn = self.write().await;
1806 let key = self.encode_custom_key(key);
1807 conn.set_kv_blob(key, value).await?;
1808 Ok(())
1809 }
1810
1811 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1812 let conn = self.write().await;
1813 let key = self.encode_custom_key(key);
1814 let previous = conn.get_kv_blob(key.clone()).await?;
1815 conn.set_kv_blob(key, value).await?;
1816 Ok(previous)
1817 }
1818
1819 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1820 let conn = self.write().await;
1821 let key = self.encode_custom_key(key);
1822 let previous = conn.get_kv_blob(key.clone()).await?;
1823 if previous.is_some() {
1824 conn.delete_kv_blob(key).await?;
1825 }
1826 Ok(previous)
1827 }
1828
1829 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1830 let this = self.clone();
1831 let room_id = room_id.to_owned();
1832
1833 let conn = self.write().await;
1834
1835 conn.with_transaction(move |txn| -> Result<()> {
1836 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1837 txn.remove_room_info(&room_info_room_id)?;
1838
1839 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1840 txn.remove_room_state_events(&state_event_room_id, None)?;
1841
1842 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1843 txn.remove_room_members(&member_room_id, None)?;
1844
1845 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1846 txn.remove_room_profiles(&profile_room_id)?;
1847
1848 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1849 txn.remove_room_account_data(&room_account_data_room_id)?;
1850
1851 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1852 txn.remove_room_receipts(&receipt_room_id)?;
1853
1854 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1855 txn.remove_room_display_names(&display_name_room_id)?;
1856
1857 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1858 txn.remove_room_send_queue(&send_queue_room_id)?;
1859
1860 let dependent_send_queue_room_id =
1861 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1862 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1863
1864 let thread_subscriptions_room_id =
1865 this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1866 txn.execute(
1867 "DELETE FROM thread_subscriptions WHERE room_id = ?",
1868 (thread_subscriptions_room_id,),
1869 )?;
1870
1871 Ok(())
1872 })
1873 .await?;
1874
1875 conn.vacuum().await
1876 }
1877
1878 async fn save_send_queue_request(
1879 &self,
1880 room_id: &RoomId,
1881 transaction_id: OwnedTransactionId,
1882 created_at: MilliSecondsSinceUnixEpoch,
1883 content: QueuedRequestKind,
1884 priority: usize,
1885 ) -> Result<(), Self::Error> {
1886 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1887 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1888
1889 let content = self.serialize_json(&content)?;
1890 let created_at_ts: u64 = created_at.0.into();
1896 self.write()
1897 .await
1898 .with_transaction(move |txn| {
1899 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1900 Ok(())
1901 })
1902 .await
1903 }
1904
1905 async fn update_send_queue_request(
1906 &self,
1907 room_id: &RoomId,
1908 transaction_id: &TransactionId,
1909 content: QueuedRequestKind,
1910 ) -> Result<bool, Self::Error> {
1911 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1912
1913 let content = self.serialize_json(&content)?;
1914 let transaction_id = transaction_id.to_string();
1917
1918 let num_updated = self.write()
1919 .await
1920 .with_transaction(move |txn| {
1921 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1922 })
1923 .await?;
1924
1925 Ok(num_updated > 0)
1926 }
1927
1928 async fn remove_send_queue_request(
1929 &self,
1930 room_id: &RoomId,
1931 transaction_id: &TransactionId,
1932 ) -> Result<bool, Self::Error> {
1933 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1934
1935 let transaction_id = transaction_id.to_string();
1937
1938 let num_deleted = self
1939 .write()
1940 .await
1941 .with_transaction(move |txn| {
1942 txn.prepare_cached(
1943 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1944 )?
1945 .execute((room_id, &transaction_id))
1946 })
1947 .await?;
1948
1949 Ok(num_deleted > 0)
1950 }
1951
1952 async fn load_send_queue_requests(
1953 &self,
1954 room_id: &RoomId,
1955 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1956 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1957
1958 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1962 .read()
1963 .await?
1964 .prepare(
1965 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1966 |mut stmt| {
1967 stmt.query((room_id,))?
1968 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1969 .collect()
1970 },
1971 )
1972 .await?;
1973
1974 let mut requests = Vec::with_capacity(res.len());
1975
1976 for entry in res {
1977 let created_at = entry
1978 .4
1979 .and_then(UInt::new)
1980 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1981
1982 requests.push(QueuedRequest {
1983 transaction_id: entry.0.into(),
1984 kind: self.deserialize_json(&entry.1)?,
1985 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1986 priority: entry.3,
1987 created_at,
1988 });
1989 }
1990
1991 Ok(requests)
1992 }
1993
1994 async fn update_send_queue_request_status(
1995 &self,
1996 room_id: &RoomId,
1997 transaction_id: &TransactionId,
1998 error: Option<QueueWedgeError>,
1999 ) -> Result<(), Self::Error> {
2000 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2001
2002 let transaction_id = transaction_id.to_string();
2004
2005 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
2007
2008 self.write()
2009 .await
2010 .with_transaction(move |txn| {
2011 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
2012 Ok(())
2013 })
2014 .await
2015 }
2016
2017 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
2018 let res: Vec<Vec<u8>> = self
2023 .read()
2024 .await?
2025 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
2026 stmt.query(())?.mapped(|row| row.get(0)).collect()
2027 })
2028 .await?;
2029
2030 Ok(res
2033 .into_iter()
2034 .map(|entry| self.deserialize_value(&entry))
2035 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
2036 .into_iter()
2037 .collect())
2038 }
2039
2040 async fn save_dependent_queued_request(
2041 &self,
2042 room_id: &RoomId,
2043 parent_txn_id: &TransactionId,
2044 own_txn_id: ChildTransactionId,
2045 created_at: MilliSecondsSinceUnixEpoch,
2046 content: DependentQueuedRequestKind,
2047 ) -> Result<()> {
2048 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2049 let content = self.serialize_json(&content)?;
2050
2051 let parent_txn_id = parent_txn_id.to_string();
2053 let own_txn_id = own_txn_id.to_string();
2054
2055 let created_at_ts: u64 = created_at.0.into();
2056 self.write()
2057 .await
2058 .with_transaction(move |txn| {
2059 txn.prepare_cached(
2060 r#"INSERT INTO dependent_send_queue_events
2061 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2062 VALUES (?, ?, ?, ?, ?)"#,
2063 )?
2064 .execute((
2065 room_id,
2066 parent_txn_id,
2067 own_txn_id,
2068 content,
2069 created_at_ts,
2070 ))?;
2071 Ok(())
2072 })
2073 .await
2074 }
2075
2076 async fn update_dependent_queued_request(
2077 &self,
2078 room_id: &RoomId,
2079 own_transaction_id: &ChildTransactionId,
2080 new_content: DependentQueuedRequestKind,
2081 ) -> Result<bool> {
2082 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2083 let content = self.serialize_json(&new_content)?;
2084
2085 let own_txn_id = own_transaction_id.to_string();
2087
2088 let num_updated = self
2089 .write()
2090 .await
2091 .with_transaction(move |txn| {
2092 txn.prepare_cached(
2093 r#"UPDATE dependent_send_queue_events
2094 SET content = ?
2095 WHERE own_transaction_id = ?
2096 AND room_id = ?"#,
2097 )?
2098 .execute((content, own_txn_id, room_id))
2099 })
2100 .await?;
2101
2102 if num_updated > 1 {
2103 return Err(Error::InconsistentUpdate);
2104 }
2105
2106 Ok(num_updated == 1)
2107 }
2108
2109 async fn mark_dependent_queued_requests_as_ready(
2110 &self,
2111 room_id: &RoomId,
2112 parent_txn_id: &TransactionId,
2113 parent_key: SentRequestKey,
2114 ) -> Result<usize> {
2115 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2116 let parent_key = self.serialize_json(&parent_key)?;
2117
2118 let parent_txn_id = parent_txn_id.to_string();
2120
2121 self.write()
2122 .await
2123 .with_transaction(move |txn| {
2124 Ok(txn.prepare_cached(
2125 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2126 )?
2127 .execute((parent_key, parent_txn_id, room_id))?)
2128 })
2129 .await
2130 }
2131
2132 async fn remove_dependent_queued_request(
2133 &self,
2134 room_id: &RoomId,
2135 txn_id: &ChildTransactionId,
2136 ) -> Result<bool> {
2137 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2138
2139 let txn_id = txn_id.to_string();
2141
2142 let num_deleted = self
2143 .write()
2144 .await
2145 .with_transaction(move |txn| {
2146 txn.prepare_cached(
2147 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2148 )?
2149 .execute((txn_id, room_id))
2150 })
2151 .await?;
2152
2153 Ok(num_deleted > 0)
2154 }
2155
2156 async fn load_dependent_queued_requests(
2157 &self,
2158 room_id: &RoomId,
2159 ) -> Result<Vec<DependentQueuedRequest>> {
2160 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2161
2162 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2164 .read()
2165 .await?
2166 .prepare(
2167 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2168 |mut stmt| {
2169 stmt.query((room_id,))?
2170 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2171 .collect()
2172 },
2173 )
2174 .await?;
2175
2176 let mut dependent_events = Vec::with_capacity(res.len());
2177
2178 for entry in res {
2179 let created_at = entry
2180 .4
2181 .and_then(UInt::new)
2182 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2183
2184 dependent_events.push(DependentQueuedRequest {
2185 own_transaction_id: entry.0.into(),
2186 parent_transaction_id: entry.1.into(),
2187 parent_key: entry.2.map(|json| self.deserialize_json(&json)).transpose()?,
2188 kind: self.deserialize_json(&entry.3)?,
2189 created_at,
2190 });
2191 }
2192
2193 Ok(dependent_events)
2194 }
2195
2196 async fn upsert_thread_subscription(
2197 &self,
2198 room_id: &RoomId,
2199 thread_id: &EventId,
2200 mut new: StoredThreadSubscription,
2201 ) -> Result<(), Self::Error> {
2202 if let Some(previous) = self.load_thread_subscription(room_id, thread_id).await? {
2203 if previous == new {
2204 trace!("not saving thread subscription because the subscription is the same");
2206 return Ok(());
2207 }
2208
2209 if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
2210 trace!("not saving thread subscription because we have a newer bump stamp");
2211 return Ok(());
2212 }
2213 }
2214
2215 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2216 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2217 let status = new.status.as_str();
2218
2219 self.write()
2220 .await
2221 .with_transaction(move |txn| {
2222 txn.prepare_cached(
2224 "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
2225 VALUES (?, ?, ?, ?)",
2226 )?
2227 .execute((room_id, thread_id, status, new.bump_stamp))
2228 })
2229 .await?;
2230 Ok(())
2231 }
2232
2233 async fn load_thread_subscription(
2234 &self,
2235 room_id: &RoomId,
2236 thread_id: &EventId,
2237 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2238 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2239 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2240
2241 Ok(self
2242 .read()
2243 .await?
2244 .query_row(
2245 "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2246 (room_id, thread_id),
2247 |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2248 )
2249 .await
2250 .optional()?
2251 .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2252 let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2253 Error::InvalidData { details: format!("Invalid thread status: {status}") }
2254 })?;
2255 Ok(StoredThreadSubscription { status, bump_stamp })
2256 })
2257 .transpose()?)
2258 }
2259
2260 async fn remove_thread_subscription(
2261 &self,
2262 room_id: &RoomId,
2263 thread_id: &EventId,
2264 ) -> Result<(), Self::Error> {
2265 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2266 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2267
2268 self.write()
2269 .await
2270 .execute(
2271 "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2272 (room_id, thread_id),
2273 )
2274 .await?;
2275
2276 Ok(())
2277 }
2278
2279 async fn optimize(&self) -> Result<(), Self::Error> {
2280 Ok(self.vacuum().await?)
2281 }
2282
2283 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
2284 self.get_db_size().await
2285 }
2286}
2287
2288#[derive(Debug, Clone, Serialize, Deserialize)]
2289struct ReceiptData {
2290 receipt: Receipt,
2291 event_id: OwnedEventId,
2292 user_id: OwnedUserId,
2293}
2294
2295#[cfg(test)]
2296mod tests {
2297 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2298
2299 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2300 use once_cell::sync::Lazy;
2301 use tempfile::{tempdir, TempDir};
2302
2303 use super::SqliteStateStore;
2304
2305 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2306 static NUM: AtomicU32 = AtomicU32::new(0);
2307
2308 async fn get_store() -> Result<impl StateStore, StoreError> {
2309 let name = NUM.fetch_add(1, SeqCst).to_string();
2310 let tmpdir_path = TMP_DIR.path().join(name);
2311
2312 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2313
2314 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2315 }
2316
2317 statestore_integration_tests!();
2318}
2319
2320#[cfg(test)]
2321mod encrypted_tests {
2322 use std::{
2323 path::PathBuf,
2324 sync::atomic::{AtomicU32, Ordering::SeqCst},
2325 };
2326
2327 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2328 use matrix_sdk_test::async_test;
2329 use once_cell::sync::Lazy;
2330 use tempfile::{tempdir, TempDir};
2331
2332 use super::SqliteStateStore;
2333 use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2334
2335 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2336 static NUM: AtomicU32 = AtomicU32::new(0);
2337
2338 fn new_state_store_workspace() -> PathBuf {
2339 let name = NUM.fetch_add(1, SeqCst).to_string();
2340 TMP_DIR.path().join(name)
2341 }
2342
2343 async fn get_store() -> Result<impl StateStore, StoreError> {
2344 let tmpdir_path = new_state_store_workspace();
2345
2346 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2347
2348 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2349 .await
2350 .unwrap())
2351 }
2352
2353 #[async_test]
2354 async fn test_pool_size() {
2355 let tmpdir_path = new_state_store_workspace();
2356 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2357
2358 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2359
2360 assert_eq!(store.pool.status().max_size, 42);
2361 }
2362
2363 #[async_test]
2364 async fn test_cache_size() {
2365 let tmpdir_path = new_state_store_workspace();
2366 let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2367
2368 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2369
2370 let conn = store.pool.get().await.unwrap();
2371 let cache_size =
2372 conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2373
2374 assert_eq!(cache_size, -(1500 / 1024));
2378 }
2379
2380 #[async_test]
2381 async fn test_journal_size_limit() {
2382 let tmpdir_path = new_state_store_workspace();
2383 let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2384
2385 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2386
2387 let conn = store.pool.get().await.unwrap();
2388 let journal_size_limit = conn
2389 .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2390 .await
2391 .unwrap();
2392
2393 assert_eq!(journal_size_limit, 1500);
2396 }
2397
2398 statestore_integration_tests!();
2399}
2400
2401#[cfg(test)]
2402mod migration_tests {
2403 use std::{
2404 path::{Path, PathBuf},
2405 sync::{
2406 atomic::{AtomicU32, Ordering::SeqCst},
2407 Arc,
2408 },
2409 };
2410
2411 use as_variant::as_variant;
2412 use matrix_sdk_base::{
2413 media::{MediaFormat, MediaRequestParameters},
2414 store::{
2415 ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2416 SerializableEventContent,
2417 },
2418 sync::UnreadNotificationsCount,
2419 RoomState, StateStore,
2420 };
2421 use matrix_sdk_test::async_test;
2422 use once_cell::sync::Lazy;
2423 use ruma::{
2424 events::{
2425 room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2426 StateEventType,
2427 },
2428 room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2429 RoomId, TransactionId, UserId,
2430 };
2431 use rusqlite::Transaction;
2432 use serde::{Deserialize, Serialize};
2433 use serde_json::json;
2434 use tempfile::{tempdir, TempDir};
2435 use tokio::{fs, sync::Mutex};
2436 use zeroize::Zeroizing;
2437
2438 use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2439 use crate::{
2440 error::{Error, Result},
2441 utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2442 OpenStoreError, Secret, SqliteStoreConfig,
2443 };
2444
2445 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2446 static NUM: AtomicU32 = AtomicU32::new(0);
2447 const SECRET: &str = "secret";
2448
2449 fn new_path() -> PathBuf {
2450 let name = NUM.fetch_add(1, SeqCst).to_string();
2451 TMP_DIR.path().join(name)
2452 }
2453
2454 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2455 let config = SqliteStoreConfig::new(path);
2456
2457 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir).unwrap();
2458
2459 let pool = config.build_pool_of_connections(DATABASE_NAME).unwrap();
2460 let conn = pool.get().await?;
2461
2462 init(&conn).await?;
2463
2464 let store_cipher = Some(Arc::new(
2465 conn.get_or_create_store_cipher(Secret::PassPhrase(Zeroizing::new(SECRET.to_owned())))
2466 .await
2467 .unwrap(),
2468 ));
2469 let this = SqliteStateStore {
2470 store_cipher,
2471 pool,
2472 write_connection: Arc::new(Mutex::new(conn)),
2474 };
2475 this.run_migrations(1, Some(version)).await?;
2476
2477 Ok(this)
2478 }
2479
2480 fn room_info_v1_json(
2481 room_id: &RoomId,
2482 state: RoomState,
2483 name: Option<&str>,
2484 creator: Option<&UserId>,
2485 ) -> serde_json::Value {
2486 let name_content = match name {
2488 Some(name) => json!({ "name": name }),
2489 None => json!({ "name": null }),
2490 };
2491 let create_content = match creator {
2493 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2494 None => RoomCreateEventContent::new_v11(),
2495 };
2496
2497 json!({
2498 "room_id": room_id,
2499 "room_type": state,
2500 "notification_counts": UnreadNotificationsCount::default(),
2501 "summary": {
2502 "heroes": [],
2503 "joined_member_count": 0,
2504 "invited_member_count": 0,
2505 },
2506 "members_synced": false,
2507 "base_info": {
2508 "dm_targets": [],
2509 "max_power_level": 100,
2510 "name": {
2511 "Original": {
2512 "content": name_content,
2513 },
2514 },
2515 "create": {
2516 "Original": {
2517 "content": create_content,
2518 }
2519 }
2520 },
2521 })
2522 }
2523
2524 #[async_test]
2525 pub async fn test_migrating_v1_to_v2() {
2526 let path = new_path();
2527 {
2529 let db = create_fake_db(&path, 1).await.unwrap();
2530 let conn = db.pool.get().await.unwrap();
2531
2532 let this = db.clone();
2533 conn.with_transaction(move |txn| {
2534 for i in 0..5 {
2535 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2536 let (state, stripped) =
2537 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2538 let info = room_info_v1_json(&room_id, state, None, None);
2539
2540 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2541 let data = this.serialize_json(&info)?;
2542
2543 txn.prepare_cached(
2544 "INSERT INTO room_info (room_id, stripped, data)
2545 VALUES (?, ?, ?)",
2546 )?
2547 .execute((room_id, stripped, data))?;
2548 }
2549
2550 Result::<_, Error>::Ok(())
2551 })
2552 .await
2553 .unwrap();
2554 }
2555
2556 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2558
2559 assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2561 }
2562
2563 fn add_room_v2(
2565 this: &SqliteStateStore,
2566 txn: &Transaction<'_>,
2567 room_id: &RoomId,
2568 name: Option<&str>,
2569 create_creator: Option<&UserId>,
2570 create_sender: Option<&UserId>,
2571 ) -> Result<(), Error> {
2572 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2573
2574 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2575 let encoded_state =
2576 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2577 let data = this.serialize_json(&room_info_json)?;
2578
2579 txn.prepare_cached(
2580 "INSERT INTO room_info (room_id, state, data)
2581 VALUES (?, ?, ?)",
2582 )?
2583 .execute((encoded_room_id, encoded_state, data))?;
2584
2585 let Some(create_sender) = create_sender else {
2587 return Ok(());
2588 };
2589
2590 let create_content = match create_creator {
2591 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2592 None => RoomCreateEventContent::new_v11(),
2593 };
2594
2595 let event_id = EventId::new(server_name!("dummy.local"));
2596 let create_event = json!({
2597 "content": create_content,
2598 "event_id": event_id,
2599 "sender": create_sender.to_owned(),
2600 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2601 "state_key": "",
2602 "type": "m.room.create",
2603 "unsigned": {},
2604 });
2605
2606 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2607 let encoded_event_type =
2608 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2609 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2610 let stripped = false;
2611 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2612 let data = this.serialize_json(&create_event)?;
2613
2614 txn.prepare_cached(
2615 "INSERT
2616 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2617 VALUES (?, ?, ?, ?, ?, ?)",
2618 )?
2619 .execute((
2620 encoded_room_id,
2621 encoded_event_type,
2622 encoded_state_key,
2623 stripped,
2624 encoded_event_id,
2625 data,
2626 ))?;
2627
2628 Ok(())
2629 }
2630
2631 #[async_test]
2632 pub async fn test_migrating_v2_to_v3() {
2633 let path = new_path();
2634
2635 let room_a_id = room_id!("!room_a:dummy.local");
2637 let room_a_name = "Room A";
2638 let room_a_creator = user_id!("@creator:dummy.local");
2639 let room_a_create_sender = user_id!("@sender:dummy.local");
2642
2643 let room_b_id = room_id!("!room_b:dummy.local");
2645
2646 let room_c_id = room_id!("!room_c:dummy.local");
2648 let room_c_create_sender = user_id!("@creator:dummy.local");
2649
2650 {
2652 let db = create_fake_db(&path, 2).await.unwrap();
2653 let conn = db.pool.get().await.unwrap();
2654
2655 let this = db.clone();
2656 conn.with_transaction(move |txn| {
2657 add_room_v2(
2658 &this,
2659 txn,
2660 room_a_id,
2661 Some(room_a_name),
2662 Some(room_a_creator),
2663 Some(room_a_create_sender),
2664 )?;
2665 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2666 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2667
2668 Result::<_, Error>::Ok(())
2669 })
2670 .await
2671 .unwrap();
2672 }
2673
2674 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2676
2677 let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2679 assert_eq!(room_infos.len(), 3);
2680
2681 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2682 assert_eq!(room_a.name(), Some(room_a_name));
2683 assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2684
2685 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2686 assert_eq!(room_b.name(), None);
2687 assert_eq!(room_b.creators(), None);
2688
2689 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2690 assert_eq!(room_c.name(), None);
2691 assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2692 }
2693
2694 #[async_test]
2695 pub async fn test_migrating_v7_to_v9() {
2696 let path = new_path();
2697
2698 let room_id = room_id!("!room_a:dummy.local");
2699 let wedged_event_transaction_id = TransactionId::new();
2700 let local_event_transaction_id = TransactionId::new();
2701
2702 {
2704 let db = create_fake_db(&path, 7).await.unwrap();
2705 let conn = db.pool.get().await.unwrap();
2706
2707 let wedge_tx = wedged_event_transaction_id.clone();
2708 let local_tx = local_event_transaction_id.clone();
2709
2710 conn.with_transaction(move |txn| {
2711 add_dependent_send_queue_event_v7(
2712 &db,
2713 txn,
2714 room_id,
2715 &local_tx,
2716 ChildTransactionId::new(),
2717 DependentQueuedRequestKind::RedactEvent,
2718 )?;
2719 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2720 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2721 Result::<_, Error>::Ok(())
2722 })
2723 .await
2724 .unwrap();
2725 }
2726
2727 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2730
2731 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2732 assert!(requests.is_empty());
2733
2734 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2735 assert!(dependent_requests.is_empty());
2736 }
2737
2738 fn add_send_queue_event_v7(
2739 this: &SqliteStateStore,
2740 txn: &Transaction<'_>,
2741 transaction_id: &TransactionId,
2742 room_id: &RoomId,
2743 is_wedged: bool,
2744 ) -> Result<(), Error> {
2745 let content =
2746 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2747
2748 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2749 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2750
2751 let content = this.serialize_json(&content)?;
2752
2753 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2754 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2755
2756 Ok(())
2757 }
2758
2759 fn add_dependent_send_queue_event_v7(
2760 this: &SqliteStateStore,
2761 txn: &Transaction<'_>,
2762 room_id: &RoomId,
2763 parent_txn_id: &TransactionId,
2764 own_txn_id: ChildTransactionId,
2765 content: DependentQueuedRequestKind,
2766 ) -> Result<(), Error> {
2767 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2768
2769 let parent_txn_id = parent_txn_id.to_string();
2770 let own_txn_id = own_txn_id.to_string();
2771 let content = this.serialize_json(&content)?;
2772
2773 txn.prepare_cached(
2774 "INSERT INTO dependent_send_queue_events
2775 (room_id, parent_transaction_id, own_transaction_id, content)
2776 VALUES (?, ?, ?, ?)",
2777 )?
2778 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2779
2780 Ok(())
2781 }
2782
2783 #[derive(Clone, Debug, Serialize, Deserialize)]
2784 pub enum LegacyDependentQueuedRequestKind {
2785 UploadFileWithThumbnail {
2786 content_type: String,
2787 cache_key: MediaRequestParameters,
2788 related_to: OwnedTransactionId,
2789 },
2790 }
2791
2792 #[async_test]
2793 pub async fn test_dependent_queued_request_variant_renaming() {
2794 let path = new_path();
2795 let db = create_fake_db(&path, 7).await.unwrap();
2796
2797 let cache_key = MediaRequestParameters {
2798 format: MediaFormat::File,
2799 source: MediaSource::Plain("https://server.local/foobar".into()),
2800 };
2801 let related_to = TransactionId::new();
2802 let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2803 content_type: "image/png".to_owned(),
2804 cache_key,
2805 related_to: related_to.clone(),
2806 };
2807
2808 let data = db
2809 .serialize_json(&request)
2810 .expect("should be able to serialize legacy dependent request");
2811 let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2812 "should be able to deserialize dependent request from legacy dependent request",
2813 );
2814
2815 as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2816 assert_eq!(de_related_to, related_to);
2817 });
2818 }
2819}