1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 str::FromStr as _,
7 sync::Arc,
8};
9
10use async_trait::async_trait;
11use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
12use matrix_sdk_base::{
13 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
14 store::{
15 compare_thread_subscription_bump_stamps, migration_helpers::RoomInfoV1, ChildTransactionId,
16 DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, QueuedRequest,
17 QueuedRequestKind, RoomLoadSettings, SentRequestKey, StoredThreadSubscription,
18 ThreadSubscriptionStatus,
19 },
20 MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
21 StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
22};
23use matrix_sdk_store_encryption::StoreCipher;
24use ruma::{
25 canonical_json::{redact, RedactedBecause},
26 events::{
27 presence::PresenceEvent,
28 receipt::{Receipt, ReceiptThread, ReceiptType},
29 room::{
30 create::RoomCreateEventContent,
31 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
32 },
33 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
34 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
35 },
36 serde::Raw,
37 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
38 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
39};
40use rusqlite::{OptionalExtension, Transaction};
41use serde::{Deserialize, Serialize};
42use tokio::fs;
43use tracing::{debug, trace, warn};
44
45use crate::{
46 error::{Error, Result},
47 utils::{
48 repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
49 SqliteKeyValueStoreConnExt,
50 },
51 OpenStoreError, SqliteStoreConfig,
52};
53
54mod keys {
55 pub const KV_BLOB: &str = "kv_blob";
57 pub const ROOM_INFO: &str = "room_info";
58 pub const STATE_EVENT: &str = "state_event";
59 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
60 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
61 pub const MEMBER: &str = "member";
62 pub const PROFILE: &str = "profile";
63 pub const RECEIPT: &str = "receipt";
64 pub const DISPLAY_NAME: &str = "display_name";
65 pub const SEND_QUEUE: &str = "send_queue_events";
66 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
67 pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
68}
69
70pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
72
73const DATABASE_VERSION: u8 = 14;
79
80#[derive(Clone)]
82pub struct SqliteStateStore {
83 store_cipher: Option<Arc<StoreCipher>>,
84 pool: SqlitePool,
85}
86
87#[cfg(not(tarpaulin_include))]
88impl fmt::Debug for SqliteStateStore {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
91 }
92}
93
94impl SqliteStateStore {
95 pub async fn open(
98 path: impl AsRef<Path>,
99 passphrase: Option<&str>,
100 ) -> Result<Self, OpenStoreError> {
101 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
102 }
103
104 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
106 let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
107
108 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
109
110 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
111 config.pool = Some(pool_config);
112
113 let pool = config.create_pool(Runtime::Tokio1)?;
114
115 let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
116 this.pool.get().await?.apply_runtime_config(runtime_config).await?;
117
118 Ok(this)
119 }
120
121 pub async fn open_with_pool(
124 pool: SqlitePool,
125 passphrase: Option<&str>,
126 ) -> Result<Self, OpenStoreError> {
127 let conn = pool.get().await?;
128
129 let mut version = conn.db_version().await?;
130
131 if version == 0 {
132 init(&conn).await?;
133 version = 1;
134 }
135
136 let store_cipher = match passphrase {
137 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
138 None => None,
139 };
140 let this = Self { store_cipher, pool };
141 this.run_migrations(&conn, version, None).await?;
142
143 Ok(this)
144 }
145
146 async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
151 let to = to.unwrap_or(DATABASE_VERSION);
152
153 if from < to {
154 debug!(version = from, new_version = to, "Upgrading database");
155 } else {
156 return Ok(());
157 }
158
159 if from < 2 && to >= 2 {
160 let this = self.clone();
161 conn.with_transaction(move |txn| {
162 txn.execute_batch(include_str!(
164 "../migrations/state_store/002_a_create_new_room_info.sql"
165 ))?;
166
167 for data in txn
169 .prepare("SELECT data FROM room_info")?
170 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
171 {
172 let data = data?;
173 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
174
175 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
176 let state = this
177 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
178 txn.prepare_cached(
179 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
180 VALUES (?, ?, ?)",
181 )?
182 .execute((room_id, state, data))?;
183 }
184
185 txn.execute_batch(include_str!(
187 "../migrations/state_store/002_b_replace_room_info.sql"
188 ))?;
189
190 txn.set_db_version(2)?;
191 Result::<_, Error>::Ok(())
192 })
193 .await?;
194 }
195
196 if from < 3 && to >= 3 {
198 let this = self.clone();
199 conn.with_transaction(move |txn| {
200 for data in txn
202 .prepare("SELECT data FROM room_info")?
203 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
204 {
205 let data = data?;
206 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
207
208 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
210 let event_type =
211 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
212 let create_res = txn
213 .prepare(
214 "SELECT stripped, data FROM state_event
215 WHERE room_id = ? AND event_type = ?",
216 )?
217 .query_row([room_id, event_type], |row| {
218 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
219 })
220 .optional()?;
221
222 let create = create_res.and_then(|(stripped, data)| {
223 let create = if stripped {
224 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
225 this.deserialize_json(&data).ok()?,
226 )
227 } else {
228 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
229 };
230 Some(create)
231 });
232
233 let migrated_room_info = room_info_v1.migrate(create.as_ref());
234
235 let data = this.serialize_json(&migrated_room_info)?;
236 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
237 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
238 .execute((data, room_id))?;
239 }
240
241 txn.set_db_version(3)?;
242 Result::<_, Error>::Ok(())
243 })
244 .await?;
245 }
246
247 if from < 4 && to >= 4 {
248 conn.with_transaction(move |txn| {
249 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
251 txn.set_db_version(4)
252 })
253 .await?;
254 }
255
256 if from < 5 && to >= 5 {
257 conn.with_transaction(move |txn| {
258 txn.execute_batch(include_str!(
260 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
261 ))?;
262 txn.set_db_version(4)
263 })
264 .await?;
265 }
266
267 if from < 6 && to >= 6 {
268 conn.with_transaction(move |txn| {
269 txn.execute_batch(include_str!(
271 "../migrations/state_store/005_send_queue_dependent_events.sql"
272 ))?;
273 txn.set_db_version(6)
274 })
275 .await?;
276 }
277
278 if from < 7 && to >= 7 {
279 conn.with_transaction(move |txn| {
280 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
282 txn.set_db_version(7)
283 })
284 .await?;
285 }
286
287 if from < 8 && to >= 8 {
288 let error = QueueWedgeError::GenericApiError {
290 msg: "local echo failed to send in a previous session".into(),
291 };
292 let default_err = self.serialize_value(&error)?;
293
294 conn.with_transaction(move |txn| {
295 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
297
298 for wedged_entries in txn
301 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
302 .query_map((), |row| {
303 Ok(
304 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
305 )
306 })? {
307
308 let (room_id, transaction_id) = wedged_entries?;
309
310 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
311 .execute((default_err.clone(), room_id, transaction_id))?;
312 }
313
314
315 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
317
318 txn.set_db_version(8)
319 })
320 .await?;
321 }
322
323 if from < 9 && to >= 9 {
324 conn.with_transaction(move |txn| {
325 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
327 txn.set_db_version(9)
328 })
329 .await?;
330 }
331
332 if from < 10 && to >= 10 {
333 conn.with_transaction(move |txn| {
334 txn.execute_batch(include_str!(
336 "../migrations/state_store/009_send_queue_priority.sql"
337 ))?;
338 txn.set_db_version(10)
339 })
340 .await?;
341 }
342
343 if from < 11 && to >= 11 {
344 conn.with_transaction(move |txn| {
345 txn.execute_batch(include_str!(
347 "../migrations/state_store/010_send_queue_enqueue_time.sql"
348 ))?;
349 txn.set_db_version(11)
350 })
351 .await?;
352 }
353
354 if from < 12 && to >= 12 {
355 conn.vacuum().await?;
359 conn.set_kv("version", vec![12]).await?;
360 }
361
362 if from < 13 && to >= 13 {
363 conn.with_transaction(move |txn| {
364 txn.execute_batch(include_str!(
366 "../migrations/state_store/011_thread_subscriptions.sql"
367 ))?;
368 txn.set_db_version(13)
369 })
370 .await?;
371 }
372
373 if from < 14 && to >= 14 {
374 conn.with_transaction(move |txn| {
375 txn.execute_batch(include_str!(
377 "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
378 ))?;
379 txn.set_db_version(14)
380 })
381 .await?;
382 }
383
384 Ok(())
385 }
386
387 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
388 let key_s = match key {
389 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
390 StateStoreDataKey::ServerInfo => Cow::Borrowed(StateStoreDataKey::SERVER_INFO),
391 StateStoreDataKey::Filter(f) => {
392 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
393 }
394 StateStoreDataKey::UserAvatarUrl(u) => {
395 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
396 }
397 StateStoreDataKey::RecentlyVisitedRooms(b) => {
398 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
399 }
400 StateStoreDataKey::UtdHookManagerData => {
401 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
402 }
403 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
404 Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
405 }
406 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
407 if let Some(thread_root) = thread_root {
408 Cow::Owned(format!(
409 "{}:{room_id}:{thread_root}",
410 StateStoreDataKey::COMPOSER_DRAFT
411 ))
412 } else {
413 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
414 }
415 }
416 StateStoreDataKey::SeenKnockRequests(room_id) => {
417 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
418 }
419 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
420 Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
421 }
422 };
423
424 self.encode_key(keys::KV_BLOB, &*key_s)
425 }
426
427 fn encode_presence_key(&self, user_id: &UserId) -> Key {
428 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
429 }
430
431 fn encode_custom_key(&self, key: &[u8]) -> Key {
432 let mut full_key = b"custom:".to_vec();
433 full_key.extend(key);
434 self.encode_key(keys::KV_BLOB, full_key)
435 }
436
437 async fn acquire(&self) -> Result<SqliteAsyncConn> {
438 Ok(self.pool.get().await?)
439 }
440
441 fn remove_maybe_stripped_room_data(
442 &self,
443 txn: &Transaction<'_>,
444 room_id: &RoomId,
445 stripped: bool,
446 ) -> rusqlite::Result<()> {
447 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
448 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
449
450 let member_room_id = self.encode_key(keys::MEMBER, room_id);
451 txn.remove_room_members(&member_room_id, Some(stripped))
452 }
453}
454
455impl EncryptableStore for SqliteStateStore {
456 fn get_cypher(&self) -> Option<&StoreCipher> {
457 self.store_cipher.as_deref()
458 }
459}
460
461async fn init(conn: &SqliteAsyncConn) -> Result<()> {
463 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
466 conn.with_transaction(|txn| {
467 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
468 txn.set_db_version(1)?;
469
470 Ok(())
471 })
472 .await
473}
474
475trait SqliteConnectionStateStoreExt {
476 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
477
478 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
479
480 fn set_room_account_data(
481 &self,
482 room_id: &[u8],
483 event_type: &[u8],
484 data: &[u8],
485 ) -> rusqlite::Result<()>;
486 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
487
488 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
489 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
490 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
491
492 fn set_state_event(
493 &self,
494 room_id: &[u8],
495 event_type: &[u8],
496 state_key: &[u8],
497 stripped: bool,
498 event_id: Option<&[u8]>,
499 data: &[u8],
500 ) -> rusqlite::Result<()>;
501 fn get_state_event_by_id(
502 &self,
503 room_id: &[u8],
504 event_id: &[u8],
505 ) -> rusqlite::Result<Option<Vec<u8>>>;
506 fn remove_room_state_events(
507 &self,
508 room_id: &[u8],
509 stripped: Option<bool>,
510 ) -> rusqlite::Result<()>;
511
512 fn set_member(
513 &self,
514 room_id: &[u8],
515 user_id: &[u8],
516 membership: &[u8],
517 stripped: bool,
518 data: &[u8],
519 ) -> rusqlite::Result<()>;
520 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
521
522 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
523 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
524 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
525
526 fn set_receipt(
527 &self,
528 room_id: &[u8],
529 user_id: &[u8],
530 receipt_type: &[u8],
531 thread_id: &[u8],
532 event_id: &[u8],
533 data: &[u8],
534 ) -> rusqlite::Result<()>;
535 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
536
537 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
538 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
539 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
540 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
541 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
542}
543
544impl SqliteConnectionStateStoreExt for rusqlite::Connection {
545 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
546 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
547 Ok(())
548 }
549
550 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
551 self.prepare_cached(
552 "INSERT OR REPLACE INTO global_account_data (event_type, data)
553 VALUES (?, ?)",
554 )?
555 .execute((event_type, data))?;
556 Ok(())
557 }
558
559 fn set_room_account_data(
560 &self,
561 room_id: &[u8],
562 event_type: &[u8],
563 data: &[u8],
564 ) -> rusqlite::Result<()> {
565 self.prepare_cached(
566 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
567 VALUES (?, ?, ?)",
568 )?
569 .execute((room_id, event_type, data))?;
570 Ok(())
571 }
572
573 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
574 self.prepare(
575 "DELETE FROM room_account_data
576 WHERE room_id = ?",
577 )?
578 .execute((room_id,))?;
579 Ok(())
580 }
581
582 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
583 self.prepare_cached(
584 "INSERT OR REPLACE INTO room_info (room_id, state, data)
585 VALUES (?, ?, ?)",
586 )?
587 .execute((room_id, state, data))?;
588 Ok(())
589 }
590
591 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
592 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
593 .optional()
594 }
595
596 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
598 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
599 Ok(())
600 }
601
602 fn set_state_event(
603 &self,
604 room_id: &[u8],
605 event_type: &[u8],
606 state_key: &[u8],
607 stripped: bool,
608 event_id: Option<&[u8]>,
609 data: &[u8],
610 ) -> rusqlite::Result<()> {
611 self.prepare_cached(
612 "INSERT OR REPLACE
613 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
614 VALUES (?, ?, ?, ?, ?, ?)",
615 )?
616 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
617 Ok(())
618 }
619
620 fn get_state_event_by_id(
621 &self,
622 room_id: &[u8],
623 event_id: &[u8],
624 ) -> rusqlite::Result<Option<Vec<u8>>> {
625 self.query_row(
626 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
627 (room_id, event_id),
628 |row| row.get(0),
629 )
630 .optional()
631 }
632
633 fn remove_room_state_events(
639 &self,
640 room_id: &[u8],
641 stripped: Option<bool>,
642 ) -> rusqlite::Result<()> {
643 if let Some(stripped) = stripped {
644 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
645 .execute((room_id, stripped))?;
646 } else {
647 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
648 .execute((room_id,))?;
649 }
650 Ok(())
651 }
652
653 fn set_member(
654 &self,
655 room_id: &[u8],
656 user_id: &[u8],
657 membership: &[u8],
658 stripped: bool,
659 data: &[u8],
660 ) -> rusqlite::Result<()> {
661 self.prepare_cached(
662 "INSERT OR REPLACE
663 INTO member (room_id, user_id, membership, stripped, data)
664 VALUES (?, ?, ?, ?, ?)",
665 )?
666 .execute((room_id, user_id, membership, stripped, data))?;
667 Ok(())
668 }
669
670 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
675 if let Some(stripped) = stripped {
676 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
677 .execute((room_id, stripped))?;
678 } else {
679 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
680 }
681 Ok(())
682 }
683
684 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
685 self.prepare_cached(
686 "INSERT OR REPLACE
687 INTO profile (room_id, user_id, data)
688 VALUES (?, ?, ?)",
689 )?
690 .execute((room_id, user_id, data))?;
691 Ok(())
692 }
693
694 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
695 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
696 Ok(())
697 }
698
699 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
700 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
701 .execute((room_id, user_id))?;
702 Ok(())
703 }
704
705 fn set_receipt(
706 &self,
707 room_id: &[u8],
708 user_id: &[u8],
709 receipt_type: &[u8],
710 thread: &[u8],
711 event_id: &[u8],
712 data: &[u8],
713 ) -> rusqlite::Result<()> {
714 self.prepare_cached(
715 "INSERT OR REPLACE
716 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
717 VALUES (?, ?, ?, ?, ?, ?)",
718 )?
719 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
720 Ok(())
721 }
722
723 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
724 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
725 Ok(())
726 }
727
728 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
729 self.prepare_cached(
730 "INSERT OR REPLACE
731 INTO display_name (room_id, name, data)
732 VALUES (?, ?, ?)",
733 )?
734 .execute((room_id, name, data))?;
735 Ok(())
736 }
737
738 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
739 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
740 .execute((room_id, name))?;
741 Ok(())
742 }
743
744 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
745 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
746 Ok(())
747 }
748
749 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
750 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
751 Ok(())
752 }
753
754 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
755 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
756 .execute((room_id,))?;
757 Ok(())
758 }
759}
760
761#[async_trait]
762trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
763 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
764 Ok(self
765 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
766 .await
767 .optional()?)
768 }
769
770 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
771 let keys_length = keys.len();
772
773 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
774 let sql_params = repeat_vars(keys.len());
775 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
776
777 let params = rusqlite::params_from_iter(keys);
778
779 Ok(txn
780 .prepare(&sql)?
781 .query(params)?
782 .mapped(|row| row.get(0))
783 .collect::<Result<_, _>>()?)
784 })
785 .await
786 }
787
788 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
789
790 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
791 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
792 Ok(())
793 }
794
795 async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
796 Ok(match room_id {
797 None => {
798 self.prepare("SELECT data FROM room_info", move |mut stmt| {
799 stmt.query_map((), |row| row.get(0))?.collect()
800 })
801 .await?
802 }
803
804 Some(room_id) => {
805 self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
806 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
807 })
808 .await?
809 }
810 })
811 }
812
813 async fn get_maybe_stripped_state_events_for_keys(
814 &self,
815 room_id: Key,
816 event_type: Key,
817 state_keys: Vec<Key>,
818 ) -> Result<Vec<(bool, Vec<u8>)>> {
819 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
820 let sql_params = repeat_vars(state_keys.len());
821 let sql = format!(
822 "SELECT stripped, data FROM state_event
823 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
824 );
825
826 let params = rusqlite::params_from_iter(
827 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
828 );
829
830 Ok(txn
831 .prepare(&sql)?
832 .query(params)?
833 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
834 .collect::<Result<_, _>>()?)
835 })
836 .await
837 }
838
839 async fn get_maybe_stripped_state_events(
840 &self,
841 room_id: Key,
842 event_type: Key,
843 ) -> Result<Vec<(bool, Vec<u8>)>> {
844 Ok(self
845 .prepare(
846 "SELECT stripped, data FROM state_event
847 WHERE room_id = ? AND event_type = ?",
848 |mut stmt| {
849 stmt.query((room_id, event_type))?
850 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
851 .collect()
852 },
853 )
854 .await?)
855 }
856
857 async fn get_profiles(
858 &self,
859 room_id: Key,
860 user_ids: Vec<Key>,
861 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
862 let user_ids_length = user_ids.len();
863
864 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
865 let sql_params = repeat_vars(user_ids.len());
866 let sql = format!(
867 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
868 );
869
870 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
871
872 Ok(txn
873 .prepare(&sql)?
874 .query(params)?
875 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
876 .collect::<Result<_, _>>()?)
877 })
878 .await
879 }
880
881 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
882 let res = if memberships.is_empty() {
883 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
884 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
885 })
886 .await?
887 } else {
888 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
889 let sql_params = repeat_vars(memberships.len());
890 let sql = format!(
891 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
892 );
893
894 let params =
895 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
896
897 Ok(txn
898 .prepare(&sql)?
899 .query(params)?
900 .mapped(|row| row.get(0))
901 .collect::<Result<_, _>>()?)
902 })
903 .await?
904 };
905
906 Ok(res)
907 }
908
909 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
910 Ok(self
911 .query_row(
912 "SELECT data FROM global_account_data WHERE event_type = ?",
913 (event_type,),
914 |row| row.get(0),
915 )
916 .await
917 .optional()?)
918 }
919
920 async fn get_room_account_data(
921 &self,
922 room_id: Key,
923 event_type: Key,
924 ) -> Result<Option<Vec<u8>>> {
925 Ok(self
926 .query_row(
927 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
928 (room_id, event_type),
929 |row| row.get(0),
930 )
931 .await
932 .optional()?)
933 }
934
935 async fn get_display_names(
936 &self,
937 room_id: Key,
938 names: Vec<Key>,
939 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
940 let names_length = names.len();
941
942 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
943 let sql_params = repeat_vars(names.len());
944 let sql = format!(
945 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
946 );
947
948 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
949
950 Ok(txn
951 .prepare(&sql)?
952 .query(params)?
953 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
954 .collect::<Result<_, _>>()?)
955 })
956 .await
957 }
958
959 async fn get_user_receipt(
960 &self,
961 room_id: Key,
962 receipt_type: Key,
963 thread: Key,
964 user_id: Key,
965 ) -> Result<Option<Vec<u8>>> {
966 Ok(self
967 .query_row(
968 "SELECT data FROM receipt
969 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
970 (room_id, receipt_type, thread, user_id),
971 |row| row.get(0),
972 )
973 .await
974 .optional()?)
975 }
976
977 async fn get_event_receipts(
978 &self,
979 room_id: Key,
980 receipt_type: Key,
981 thread: Key,
982 event_id: Key,
983 ) -> Result<Vec<Vec<u8>>> {
984 Ok(self
985 .prepare(
986 "SELECT data FROM receipt
987 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
988 |mut stmt| {
989 stmt.query((room_id, receipt_type, thread, event_id))?
990 .mapped(|row| row.get(0))
991 .collect()
992 },
993 )
994 .await?)
995 }
996}
997
998#[async_trait]
999impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1000 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1001 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1002 }
1003}
1004
1005#[async_trait]
1006impl StateStore for SqliteStateStore {
1007 type Error = Error;
1008
1009 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1010 self.acquire()
1011 .await?
1012 .get_kv_blob(self.encode_state_store_data_key(key))
1013 .await?
1014 .map(|data| {
1015 Ok(match key {
1016 StateStoreDataKey::SyncToken => {
1017 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1018 }
1019 StateStoreDataKey::ServerInfo => {
1020 StateStoreDataValue::ServerInfo(self.deserialize_value(&data)?)
1021 }
1022 StateStoreDataKey::Filter(_) => {
1023 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1024 }
1025 StateStoreDataKey::UserAvatarUrl(_) => {
1026 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1027 }
1028 StateStoreDataKey::RecentlyVisitedRooms(_) => {
1029 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1030 }
1031 StateStoreDataKey::UtdHookManagerData => {
1032 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1033 }
1034 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1035 StateStoreDataValue::OneTimeKeyAlreadyUploaded
1036 }
1037 StateStoreDataKey::ComposerDraft(_, _) => {
1038 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1039 }
1040 StateStoreDataKey::SeenKnockRequests(_) => {
1041 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1042 }
1043 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1044 StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1045 self.deserialize_value(&data)?,
1046 )
1047 }
1048 })
1049 })
1050 .transpose()
1051 }
1052
1053 async fn set_kv_data(
1054 &self,
1055 key: StateStoreDataKey<'_>,
1056 value: StateStoreDataValue,
1057 ) -> Result<()> {
1058 let serialized_value = match key {
1059 StateStoreDataKey::SyncToken => self.serialize_value(
1060 &value.into_sync_token().expect("Session data not a sync token"),
1061 )?,
1062 StateStoreDataKey::ServerInfo => self.serialize_value(
1063 &value.into_server_info().expect("Session data not containing server info"),
1064 )?,
1065 StateStoreDataKey::Filter(_) => {
1066 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1067 }
1068 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1069 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1070 )?,
1071 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1072 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1073 )?,
1074 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1075 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1076 )?,
1077 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1078 self.serialize_value(&true).expect("We should be able to serialize a boolean")
1079 }
1080 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1081 &value.into_composer_draft().expect("Session data not a composer draft"),
1082 )?,
1083 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1084 &value
1085 .into_seen_knock_requests()
1086 .expect("Session data is not a set of seen knock request ids"),
1087 )?,
1088 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1089 &value
1090 .into_thread_subscriptions_catchup_tokens()
1091 .expect("Session data is not a list of thread subscription catchup tokens"),
1092 )?,
1093 };
1094
1095 self.acquire()
1096 .await?
1097 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1098 .await
1099 }
1100
1101 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1102 self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1103 }
1104
1105 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1106 let changes = changes.to_owned();
1107 let this = self.clone();
1108 self.acquire()
1109 .await?
1110 .with_transaction(move |txn| {
1111 let StateChanges {
1112 sync_token,
1113 account_data,
1114 presence,
1115 profiles,
1116 profiles_to_delete,
1117 state,
1118 room_account_data,
1119 room_infos,
1120 receipts,
1121 redactions,
1122 stripped_state,
1123 ambiguity_maps,
1124 } = changes;
1125
1126 if let Some(sync_token) = sync_token {
1127 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1128 let value = this.serialize_value(&sync_token)?;
1129 txn.set_kv_blob(&key, &value)?;
1130 }
1131
1132 for (event_type, event) in account_data {
1133 let event_type =
1134 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1135 let data = this.serialize_json(&event)?;
1136 txn.set_global_account_data(&event_type, &data)?;
1137 }
1138
1139 for (room_id, events) in room_account_data {
1140 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1141 for (event_type, event) in events {
1142 let event_type =
1143 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1144 let data = this.serialize_json(&event)?;
1145 txn.set_room_account_data(&room_id, &event_type, &data)?;
1146 }
1147 }
1148
1149 for (user_id, event) in presence {
1150 let key = this.encode_presence_key(&user_id);
1151 let value = this.serialize_json(&event)?;
1152 txn.set_kv_blob(&key, &value)?;
1153 }
1154
1155 for (room_id, room_info) in room_infos {
1156 let stripped = room_info.state() == RoomState::Invited;
1157 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1159
1160 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1161 let state = this
1162 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1163 let data = this.serialize_json(&room_info)?;
1164 txn.set_room_info(&room_id, &state, &data)?;
1165 }
1166
1167 for (room_id, user_ids) in profiles_to_delete {
1168 let room_id = this.encode_key(keys::PROFILE, room_id);
1169 for user_id in user_ids {
1170 let user_id = this.encode_key(keys::PROFILE, user_id);
1171 txn.remove_room_profile(&room_id, &user_id)?;
1172 }
1173 }
1174
1175 for (room_id, state_event_types) in state {
1176 let profiles = profiles.get(&room_id);
1177 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1178
1179 for (event_type, state_events) in state_event_types {
1180 let encoded_event_type =
1181 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1182
1183 for (state_key, raw_state_event) in state_events {
1184 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1185 let data = this.serialize_json(&raw_state_event)?;
1186
1187 let event_id: Option<String> =
1188 raw_state_event.get_field("event_id").ok().flatten();
1189 let encoded_event_id =
1190 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1191
1192 txn.set_state_event(
1193 &encoded_room_id,
1194 &encoded_event_type,
1195 &encoded_state_key,
1196 false,
1197 encoded_event_id.as_deref(),
1198 &data,
1199 )?;
1200
1201 if event_type == StateEventType::RoomMember {
1202 let member_event = match raw_state_event
1203 .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1204 {
1205 Ok(ev) => ev,
1206 Err(e) => {
1207 debug!(event_id, "Failed to deserialize member event: {e}");
1208 continue;
1209 }
1210 };
1211
1212 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1213 let user_id = this.encode_key(keys::MEMBER, &state_key);
1214 let membership = this
1215 .encode_key(keys::MEMBER, member_event.membership().as_str());
1216 let data = this.serialize_value(&state_key)?;
1217
1218 txn.set_member(
1219 &encoded_room_id,
1220 &user_id,
1221 &membership,
1222 false,
1223 &data,
1224 )?;
1225
1226 if let Some(profile) =
1227 profiles.and_then(|p| p.get(member_event.state_key()))
1228 {
1229 let room_id = this.encode_key(keys::PROFILE, &room_id);
1230 let user_id = this.encode_key(keys::PROFILE, &state_key);
1231 let data = this.serialize_json(&profile)?;
1232 txn.set_profile(&room_id, &user_id, &data)?;
1233 }
1234 }
1235 }
1236 }
1237 }
1238
1239 for (room_id, stripped_state_event_types) in stripped_state {
1240 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1241
1242 for (event_type, stripped_state_events) in stripped_state_event_types {
1243 let encoded_event_type =
1244 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1245
1246 for (state_key, raw_stripped_state_event) in stripped_state_events {
1247 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1248 let data = this.serialize_json(&raw_stripped_state_event)?;
1249 txn.set_state_event(
1250 &encoded_room_id,
1251 &encoded_event_type,
1252 &encoded_state_key,
1253 true,
1254 None,
1255 &data,
1256 )?;
1257
1258 if event_type == StateEventType::RoomMember {
1259 let member_event = match raw_stripped_state_event
1260 .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1261 ) {
1262 Ok(ev) => ev,
1263 Err(e) => {
1264 debug!("Failed to deserialize stripped member event: {e}");
1265 continue;
1266 }
1267 };
1268
1269 let room_id = this.encode_key(keys::MEMBER, &room_id);
1270 let user_id = this.encode_key(keys::MEMBER, &state_key);
1271 let membership = this.encode_key(
1272 keys::MEMBER,
1273 member_event.content.membership.as_str(),
1274 );
1275 let data = this.serialize_value(&state_key)?;
1276
1277 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1278 }
1279 }
1280 }
1281 }
1282
1283 for (room_id, receipt_event) in receipts {
1284 let room_id = this.encode_key(keys::RECEIPT, room_id);
1285
1286 for (event_id, receipt_types) in receipt_event {
1287 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1288
1289 for (receipt_type, receipt_users) in receipt_types {
1290 let receipt_type =
1291 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1292
1293 for (user_id, receipt) in receipt_users {
1294 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1295 let thread = this.encode_key(
1298 keys::RECEIPT,
1299 rmp_serde::to_vec_named(&receipt.thread)?,
1300 );
1301 let data = this.serialize_json(&ReceiptData {
1302 receipt,
1303 event_id: event_id.clone(),
1304 user_id,
1305 })?;
1306
1307 txn.set_receipt(
1308 &room_id,
1309 &encoded_user_id,
1310 &receipt_type,
1311 &thread,
1312 &encoded_event_id,
1313 &data,
1314 )?;
1315 }
1316 }
1317 }
1318 }
1319
1320 for (room_id, redactions) in redactions {
1321 let make_redaction_rules = || {
1322 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1323 txn.get_room_info(&encoded_room_id)
1324 .ok()
1325 .flatten()
1326 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1327 .map(|info| info.room_version_rules_or_default())
1328 .unwrap_or_else(|| {
1329 warn!(
1330 ?room_id,
1331 "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1332 );
1333 ROOM_VERSION_RULES_FALLBACK
1334 }).redaction
1335 };
1336
1337 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1338 let mut redaction_rules = None;
1339
1340 for (event_id, redaction) in redactions {
1341 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1342
1343 if let Some(Ok(raw_event)) = txn
1344 .get_state_event_by_id(&encoded_room_id, &event_id)?
1345 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1346 {
1347 let event = raw_event.deserialize()?;
1348 let redacted = redact(
1349 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1350 redaction_rules.get_or_insert_with(make_redaction_rules),
1351 Some(RedactedBecause::from_raw_event(&redaction)?),
1352 )
1353 .map_err(Error::Redaction)?;
1354 let data = this.serialize_json(&redacted)?;
1355
1356 let event_type =
1357 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1358 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1359
1360 txn.set_state_event(
1361 &encoded_room_id,
1362 &event_type,
1363 &state_key,
1364 false,
1365 Some(&event_id),
1366 &data,
1367 )?;
1368 }
1369 }
1370 }
1371
1372 for (room_id, display_names) in ambiguity_maps {
1373 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1374
1375 for (name, user_ids) in display_names {
1376 let encoded_name = this.encode_key(
1377 keys::DISPLAY_NAME,
1378 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1379 );
1380 let data = this.serialize_json(&user_ids)?;
1381
1382 if user_ids.is_empty() {
1383 txn.remove_display_name(&room_id, &encoded_name)?;
1384
1385 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1400 txn.remove_display_name(&room_id, &raw_name)?;
1401 } else {
1402 txn.set_display_name(&room_id, &encoded_name, &data)?;
1404 }
1405 }
1406 }
1407
1408 Ok::<_, Error>(())
1409 })
1410 .await?;
1411
1412 Ok(())
1413 }
1414
1415 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1416 self.acquire()
1417 .await?
1418 .get_kv_blob(self.encode_presence_key(user_id))
1419 .await?
1420 .map(|data| self.deserialize_json(&data))
1421 .transpose()
1422 }
1423
1424 async fn get_presence_events(
1425 &self,
1426 user_ids: &[OwnedUserId],
1427 ) -> Result<Vec<Raw<PresenceEvent>>> {
1428 if user_ids.is_empty() {
1429 return Ok(Vec::new());
1430 }
1431
1432 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1433 self.acquire()
1434 .await?
1435 .get_kv_blobs(user_ids)
1436 .await?
1437 .into_iter()
1438 .map(|data| self.deserialize_json(&data))
1439 .collect()
1440 }
1441
1442 async fn get_state_event(
1443 &self,
1444 room_id: &RoomId,
1445 event_type: StateEventType,
1446 state_key: &str,
1447 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1448 Ok(self
1449 .get_state_events_for_keys(room_id, event_type, &[state_key])
1450 .await?
1451 .into_iter()
1452 .next())
1453 }
1454
1455 async fn get_state_events(
1456 &self,
1457 room_id: &RoomId,
1458 event_type: StateEventType,
1459 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1460 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1461 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1462 self.acquire()
1463 .await?
1464 .get_maybe_stripped_state_events(room_id, event_type)
1465 .await?
1466 .into_iter()
1467 .map(|(stripped, data)| {
1468 let ev = if stripped {
1469 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1470 } else {
1471 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1472 };
1473
1474 Ok(ev)
1475 })
1476 .collect()
1477 }
1478
1479 async fn get_state_events_for_keys(
1480 &self,
1481 room_id: &RoomId,
1482 event_type: StateEventType,
1483 state_keys: &[&str],
1484 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1485 if state_keys.is_empty() {
1486 return Ok(Vec::new());
1487 }
1488
1489 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1490 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1491 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1492 self.acquire()
1493 .await?
1494 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1495 .await?
1496 .into_iter()
1497 .map(|(stripped, data)| {
1498 let ev = if stripped {
1499 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1500 } else {
1501 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1502 };
1503
1504 Ok(ev)
1505 })
1506 .collect()
1507 }
1508
1509 async fn get_profile(
1510 &self,
1511 room_id: &RoomId,
1512 user_id: &UserId,
1513 ) -> Result<Option<MinimalRoomMemberEvent>> {
1514 let room_id = self.encode_key(keys::PROFILE, room_id);
1515 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1516
1517 self.acquire()
1518 .await?
1519 .get_profiles(room_id, user_ids)
1520 .await?
1521 .into_iter()
1522 .next()
1523 .map(|(_, data)| self.deserialize_json(&data))
1524 .transpose()
1525 }
1526
1527 async fn get_profiles<'a>(
1528 &self,
1529 room_id: &RoomId,
1530 user_ids: &'a [OwnedUserId],
1531 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1532 if user_ids.is_empty() {
1533 return Ok(BTreeMap::new());
1534 }
1535
1536 let room_id = self.encode_key(keys::PROFILE, room_id);
1537 let mut user_ids_map = user_ids
1538 .iter()
1539 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1540 .collect::<BTreeMap<_, _>>();
1541 let user_ids = user_ids_map.keys().cloned().collect();
1542
1543 self.acquire()
1544 .await?
1545 .get_profiles(room_id, user_ids)
1546 .await?
1547 .into_iter()
1548 .map(|(user_id, data)| {
1549 Ok((
1550 user_ids_map
1551 .remove(user_id.as_slice())
1552 .expect("returned user IDs were requested"),
1553 self.deserialize_json(&data)?,
1554 ))
1555 })
1556 .collect()
1557 }
1558
1559 async fn get_user_ids(
1560 &self,
1561 room_id: &RoomId,
1562 membership: RoomMemberships,
1563 ) -> Result<Vec<OwnedUserId>> {
1564 let room_id = self.encode_key(keys::MEMBER, room_id);
1565 let memberships = membership
1566 .as_vec()
1567 .into_iter()
1568 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1569 .collect();
1570 self.acquire()
1571 .await?
1572 .get_user_ids(room_id, memberships)
1573 .await?
1574 .iter()
1575 .map(|data| self.deserialize_value(data))
1576 .collect()
1577 }
1578
1579 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1580 self.acquire()
1581 .await?
1582 .get_room_infos(match room_load_settings {
1583 RoomLoadSettings::All => None,
1584 RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1585 })
1586 .await?
1587 .into_iter()
1588 .map(|data| self.deserialize_json(&data))
1589 .collect()
1590 }
1591
1592 async fn get_users_with_display_name(
1593 &self,
1594 room_id: &RoomId,
1595 display_name: &DisplayName,
1596 ) -> Result<BTreeSet<OwnedUserId>> {
1597 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1598 let names = vec![self.encode_key(
1599 keys::DISPLAY_NAME,
1600 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1601 )];
1602
1603 Ok(self
1604 .acquire()
1605 .await?
1606 .get_display_names(room_id, names)
1607 .await?
1608 .into_iter()
1609 .next()
1610 .map(|(_, data)| self.deserialize_json(&data))
1611 .transpose()?
1612 .unwrap_or_default())
1613 }
1614
1615 async fn get_users_with_display_names<'a>(
1616 &self,
1617 room_id: &RoomId,
1618 display_names: &'a [DisplayName],
1619 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1620 let mut result = HashMap::new();
1621
1622 if display_names.is_empty() {
1623 return Ok(result);
1624 }
1625
1626 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1627 let mut names_map = display_names
1628 .iter()
1629 .flat_map(|display_name| {
1630 let raw =
1640 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1641 let normalized = display_name.as_normalized_str().map(|normalized| {
1642 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1643 });
1644
1645 iter::once(raw).chain(normalized)
1646 })
1647 .collect::<BTreeMap<_, _>>();
1648 let names = names_map.keys().cloned().collect();
1649
1650 for (name, data) in
1651 self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1652 {
1653 let display_name =
1654 names_map.remove(name.as_slice()).expect("returned display names were requested");
1655 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1656
1657 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1658 }
1659
1660 Ok(result)
1661 }
1662
1663 async fn get_account_data_event(
1664 &self,
1665 event_type: GlobalAccountDataEventType,
1666 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1667 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1668 self.acquire()
1669 .await?
1670 .get_global_account_data(event_type)
1671 .await?
1672 .map(|value| self.deserialize_json(&value))
1673 .transpose()
1674 }
1675
1676 async fn get_room_account_data_event(
1677 &self,
1678 room_id: &RoomId,
1679 event_type: RoomAccountDataEventType,
1680 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1681 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1682 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1683 self.acquire()
1684 .await?
1685 .get_room_account_data(room_id, event_type)
1686 .await?
1687 .map(|value| self.deserialize_json(&value))
1688 .transpose()
1689 }
1690
1691 async fn get_user_room_receipt_event(
1692 &self,
1693 room_id: &RoomId,
1694 receipt_type: ReceiptType,
1695 thread: ReceiptThread,
1696 user_id: &UserId,
1697 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1698 let room_id = self.encode_key(keys::RECEIPT, room_id);
1699 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1700 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1703 let user_id = self.encode_key(keys::RECEIPT, user_id);
1704
1705 self.acquire()
1706 .await?
1707 .get_user_receipt(room_id, receipt_type, thread, user_id)
1708 .await?
1709 .map(|value| {
1710 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1711 })
1712 .transpose()
1713 }
1714
1715 async fn get_event_room_receipt_events(
1716 &self,
1717 room_id: &RoomId,
1718 receipt_type: ReceiptType,
1719 thread: ReceiptThread,
1720 event_id: &EventId,
1721 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1722 let room_id = self.encode_key(keys::RECEIPT, room_id);
1723 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1724 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1727 let event_id = self.encode_key(keys::RECEIPT, event_id);
1728
1729 self.acquire()
1730 .await?
1731 .get_event_receipts(room_id, receipt_type, thread, event_id)
1732 .await?
1733 .iter()
1734 .map(|value| {
1735 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1736 })
1737 .collect()
1738 }
1739
1740 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1741 self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1742 }
1743
1744 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1745 let conn = self.acquire().await?;
1746 let key = self.encode_custom_key(key);
1747 conn.set_kv_blob(key, value).await?;
1748 Ok(())
1749 }
1750
1751 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1752 let conn = self.acquire().await?;
1753 let key = self.encode_custom_key(key);
1754 let previous = conn.get_kv_blob(key.clone()).await?;
1755 conn.set_kv_blob(key, value).await?;
1756 Ok(previous)
1757 }
1758
1759 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1760 let conn = self.acquire().await?;
1761 let key = self.encode_custom_key(key);
1762 let previous = conn.get_kv_blob(key.clone()).await?;
1763 if previous.is_some() {
1764 conn.delete_kv_blob(key).await?;
1765 }
1766 Ok(previous)
1767 }
1768
1769 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1770 let this = self.clone();
1771 let room_id = room_id.to_owned();
1772
1773 let conn = self.acquire().await?;
1774
1775 conn.with_transaction(move |txn| -> Result<()> {
1776 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1777 txn.remove_room_info(&room_info_room_id)?;
1778
1779 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1780 txn.remove_room_state_events(&state_event_room_id, None)?;
1781
1782 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1783 txn.remove_room_members(&member_room_id, None)?;
1784
1785 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1786 txn.remove_room_profiles(&profile_room_id)?;
1787
1788 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1789 txn.remove_room_account_data(&room_account_data_room_id)?;
1790
1791 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1792 txn.remove_room_receipts(&receipt_room_id)?;
1793
1794 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1795 txn.remove_room_display_names(&display_name_room_id)?;
1796
1797 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1798 txn.remove_room_send_queue(&send_queue_room_id)?;
1799
1800 let dependent_send_queue_room_id =
1801 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1802 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1803
1804 let thread_subscriptions_room_id =
1805 this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1806 txn.execute(
1807 "DELETE FROM thread_subscriptions WHERE room_id = ?",
1808 (thread_subscriptions_room_id,),
1809 )?;
1810
1811 Ok(())
1812 })
1813 .await?;
1814
1815 conn.vacuum().await
1816 }
1817
1818 async fn save_send_queue_request(
1819 &self,
1820 room_id: &RoomId,
1821 transaction_id: OwnedTransactionId,
1822 created_at: MilliSecondsSinceUnixEpoch,
1823 content: QueuedRequestKind,
1824 priority: usize,
1825 ) -> Result<(), Self::Error> {
1826 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1827 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1828
1829 let content = self.serialize_json(&content)?;
1830 let created_at_ts: u64 = created_at.0.into();
1836 self.acquire()
1837 .await?
1838 .with_transaction(move |txn| {
1839 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1840 Ok(())
1841 })
1842 .await
1843 }
1844
1845 async fn update_send_queue_request(
1846 &self,
1847 room_id: &RoomId,
1848 transaction_id: &TransactionId,
1849 content: QueuedRequestKind,
1850 ) -> Result<bool, Self::Error> {
1851 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1852
1853 let content = self.serialize_json(&content)?;
1854 let transaction_id = transaction_id.to_string();
1857
1858 let num_updated = self.acquire()
1859 .await?
1860 .with_transaction(move |txn| {
1861 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1862 })
1863 .await?;
1864
1865 Ok(num_updated > 0)
1866 }
1867
1868 async fn remove_send_queue_request(
1869 &self,
1870 room_id: &RoomId,
1871 transaction_id: &TransactionId,
1872 ) -> Result<bool, Self::Error> {
1873 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1874
1875 let transaction_id = transaction_id.to_string();
1877
1878 let num_deleted = self
1879 .acquire()
1880 .await?
1881 .with_transaction(move |txn| {
1882 txn.prepare_cached(
1883 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1884 )?
1885 .execute((room_id, &transaction_id))
1886 })
1887 .await?;
1888
1889 Ok(num_deleted > 0)
1890 }
1891
1892 async fn load_send_queue_requests(
1893 &self,
1894 room_id: &RoomId,
1895 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1896 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1897
1898 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1902 .acquire()
1903 .await?
1904 .prepare(
1905 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1906 |mut stmt| {
1907 stmt.query((room_id,))?
1908 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1909 .collect()
1910 },
1911 )
1912 .await?;
1913
1914 let mut requests = Vec::with_capacity(res.len());
1915 for entry in res {
1916 let created_at = entry
1917 .4
1918 .and_then(UInt::new)
1919 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1920 requests.push(QueuedRequest {
1921 transaction_id: entry.0.into(),
1922 kind: self.deserialize_json(&entry.1)?,
1923 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1924 priority: entry.3,
1925 created_at,
1926 });
1927 }
1928
1929 Ok(requests)
1930 }
1931
1932 async fn update_send_queue_request_status(
1933 &self,
1934 room_id: &RoomId,
1935 transaction_id: &TransactionId,
1936 error: Option<QueueWedgeError>,
1937 ) -> Result<(), Self::Error> {
1938 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1939
1940 let transaction_id = transaction_id.to_string();
1942
1943 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1945
1946 self.acquire()
1947 .await?
1948 .with_transaction(move |txn| {
1949 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1950 Ok(())
1951 })
1952 .await
1953 }
1954
1955 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1956 let res: Vec<Vec<u8>> = self
1961 .acquire()
1962 .await?
1963 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1964 stmt.query(())?.mapped(|row| row.get(0)).collect()
1965 })
1966 .await?;
1967
1968 Ok(res
1971 .into_iter()
1972 .map(|entry| self.deserialize_value(&entry))
1973 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1974 .into_iter()
1975 .collect())
1976 }
1977
1978 async fn save_dependent_queued_request(
1979 &self,
1980 room_id: &RoomId,
1981 parent_txn_id: &TransactionId,
1982 own_txn_id: ChildTransactionId,
1983 created_at: MilliSecondsSinceUnixEpoch,
1984 content: DependentQueuedRequestKind,
1985 ) -> Result<()> {
1986 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1987 let content = self.serialize_json(&content)?;
1988
1989 let parent_txn_id = parent_txn_id.to_string();
1991 let own_txn_id = own_txn_id.to_string();
1992
1993 let created_at_ts: u64 = created_at.0.into();
1994 self.acquire()
1995 .await?
1996 .with_transaction(move |txn| {
1997 txn.prepare_cached(
1998 r#"INSERT INTO dependent_send_queue_events
1999 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2000 VALUES (?, ?, ?, ?, ?)"#,
2001 )?
2002 .execute((
2003 room_id,
2004 parent_txn_id,
2005 own_txn_id,
2006 content,
2007 created_at_ts,
2008 ))?;
2009 Ok(())
2010 })
2011 .await
2012 }
2013
2014 async fn update_dependent_queued_request(
2015 &self,
2016 room_id: &RoomId,
2017 own_transaction_id: &ChildTransactionId,
2018 new_content: DependentQueuedRequestKind,
2019 ) -> Result<bool> {
2020 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2021 let content = self.serialize_json(&new_content)?;
2022
2023 let own_txn_id = own_transaction_id.to_string();
2025
2026 let num_updated = self
2027 .acquire()
2028 .await?
2029 .with_transaction(move |txn| {
2030 txn.prepare_cached(
2031 r#"UPDATE dependent_send_queue_events
2032 SET content = ?
2033 WHERE own_transaction_id = ?
2034 AND room_id = ?"#,
2035 )?
2036 .execute((content, own_txn_id, room_id))
2037 })
2038 .await?;
2039
2040 if num_updated > 1 {
2041 return Err(Error::InconsistentUpdate);
2042 }
2043
2044 Ok(num_updated == 1)
2045 }
2046
2047 async fn mark_dependent_queued_requests_as_ready(
2048 &self,
2049 room_id: &RoomId,
2050 parent_txn_id: &TransactionId,
2051 parent_key: SentRequestKey,
2052 ) -> Result<usize> {
2053 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2054 let parent_key = self.serialize_value(&parent_key)?;
2055
2056 let parent_txn_id = parent_txn_id.to_string();
2058
2059 self.acquire()
2060 .await?
2061 .with_transaction(move |txn| {
2062 Ok(txn.prepare_cached(
2063 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2064 )?
2065 .execute((parent_key, parent_txn_id, room_id))?)
2066 })
2067 .await
2068 }
2069
2070 async fn remove_dependent_queued_request(
2071 &self,
2072 room_id: &RoomId,
2073 txn_id: &ChildTransactionId,
2074 ) -> Result<bool> {
2075 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2076
2077 let txn_id = txn_id.to_string();
2079
2080 let num_deleted = self
2081 .acquire()
2082 .await?
2083 .with_transaction(move |txn| {
2084 txn.prepare_cached(
2085 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2086 )?
2087 .execute((txn_id, room_id))
2088 })
2089 .await?;
2090
2091 Ok(num_deleted > 0)
2092 }
2093
2094 async fn load_dependent_queued_requests(
2095 &self,
2096 room_id: &RoomId,
2097 ) -> Result<Vec<DependentQueuedRequest>> {
2098 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2099
2100 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2102 .acquire()
2103 .await?
2104 .prepare(
2105 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2106 |mut stmt| {
2107 stmt.query((room_id,))?
2108 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2109 .collect()
2110 },
2111 )
2112 .await?;
2113
2114 let mut dependent_events = Vec::with_capacity(res.len());
2115 for entry in res {
2116 let created_at = entry
2117 .4
2118 .and_then(UInt::new)
2119 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2120 dependent_events.push(DependentQueuedRequest {
2121 own_transaction_id: entry.0.into(),
2122 parent_transaction_id: entry.1.into(),
2123 parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2124 kind: self.deserialize_json(&entry.3)?,
2125 created_at,
2126 });
2127 }
2128
2129 Ok(dependent_events)
2130 }
2131
2132 async fn upsert_thread_subscription(
2133 &self,
2134 room_id: &RoomId,
2135 thread_id: &EventId,
2136 mut new: StoredThreadSubscription,
2137 ) -> Result<(), Self::Error> {
2138 if let Some(previous) = self.load_thread_subscription(room_id, thread_id).await? {
2139 if previous == new {
2140 trace!("not saving thread subscription because the subscription is the same");
2142 return Ok(());
2143 }
2144 if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
2145 trace!("not saving thread subscription because we have a newer bump stamp");
2146 return Ok(());
2147 }
2148 }
2149
2150 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2151 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2152 let status = new.status.as_str();
2153
2154 self.acquire()
2155 .await?
2156 .with_transaction(move |txn| {
2157 txn.prepare_cached(
2159 "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
2160 VALUES (?, ?, ?, ?)",
2161 )?
2162 .execute((room_id, thread_id, status, new.bump_stamp))
2163 })
2164 .await?;
2165 Ok(())
2166 }
2167
2168 async fn load_thread_subscription(
2169 &self,
2170 room_id: &RoomId,
2171 thread_id: &EventId,
2172 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2173 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2174 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2175
2176 Ok(self
2177 .acquire()
2178 .await?
2179 .query_row(
2180 "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2181 (room_id, thread_id),
2182 |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2183 )
2184 .await
2185 .optional()?
2186 .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2187 let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2188 Error::InvalidData { details: format!("Invalid thread status: {status}") }
2189 })?;
2190 Ok(StoredThreadSubscription { status, bump_stamp })
2191 })
2192 .transpose()?)
2193 }
2194
2195 async fn remove_thread_subscription(
2196 &self,
2197 room_id: &RoomId,
2198 thread_id: &EventId,
2199 ) -> Result<(), Self::Error> {
2200 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2201 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2202
2203 self.acquire()
2204 .await?
2205 .execute(
2206 "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2207 (room_id, thread_id),
2208 )
2209 .await?;
2210
2211 Ok(())
2212 }
2213}
2214
2215#[derive(Debug, Clone, Serialize, Deserialize)]
2216struct ReceiptData {
2217 receipt: Receipt,
2218 event_id: OwnedEventId,
2219 user_id: OwnedUserId,
2220}
2221
2222#[cfg(test)]
2223mod tests {
2224 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2225
2226 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2227 use once_cell::sync::Lazy;
2228 use tempfile::{tempdir, TempDir};
2229
2230 use super::SqliteStateStore;
2231
2232 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2233 static NUM: AtomicU32 = AtomicU32::new(0);
2234
2235 async fn get_store() -> Result<impl StateStore, StoreError> {
2236 let name = NUM.fetch_add(1, SeqCst).to_string();
2237 let tmpdir_path = TMP_DIR.path().join(name);
2238
2239 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2240
2241 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2242 }
2243
2244 statestore_integration_tests!();
2245}
2246
2247#[cfg(test)]
2248mod encrypted_tests {
2249 use std::{
2250 path::PathBuf,
2251 sync::atomic::{AtomicU32, Ordering::SeqCst},
2252 };
2253
2254 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2255 use matrix_sdk_test::async_test;
2256 use once_cell::sync::Lazy;
2257 use tempfile::{tempdir, TempDir};
2258
2259 use super::SqliteStateStore;
2260 use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2261
2262 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2263 static NUM: AtomicU32 = AtomicU32::new(0);
2264
2265 fn new_state_store_workspace() -> PathBuf {
2266 let name = NUM.fetch_add(1, SeqCst).to_string();
2267 TMP_DIR.path().join(name)
2268 }
2269
2270 async fn get_store() -> Result<impl StateStore, StoreError> {
2271 let tmpdir_path = new_state_store_workspace();
2272
2273 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2274
2275 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2276 .await
2277 .unwrap())
2278 }
2279
2280 #[async_test]
2281 async fn test_pool_size() {
2282 let tmpdir_path = new_state_store_workspace();
2283 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2284
2285 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2286
2287 assert_eq!(store.pool.status().max_size, 42);
2288 }
2289
2290 #[async_test]
2291 async fn test_cache_size() {
2292 let tmpdir_path = new_state_store_workspace();
2293 let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2294
2295 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2296
2297 let conn = store.pool.get().await.unwrap();
2298 let cache_size =
2299 conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2300
2301 assert_eq!(cache_size, -(1500 / 1024));
2305 }
2306
2307 #[async_test]
2308 async fn test_journal_size_limit() {
2309 let tmpdir_path = new_state_store_workspace();
2310 let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2311
2312 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2313
2314 let conn = store.pool.get().await.unwrap();
2315 let journal_size_limit = conn
2316 .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2317 .await
2318 .unwrap();
2319
2320 assert_eq!(journal_size_limit, 1500);
2323 }
2324
2325 statestore_integration_tests!();
2326}
2327
2328#[cfg(test)]
2329mod migration_tests {
2330 use std::{
2331 path::{Path, PathBuf},
2332 sync::{
2333 atomic::{AtomicU32, Ordering::SeqCst},
2334 Arc,
2335 },
2336 };
2337
2338 use as_variant::as_variant;
2339 use deadpool_sqlite::Runtime;
2340 use matrix_sdk_base::{
2341 media::{MediaFormat, MediaRequestParameters},
2342 store::{
2343 ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2344 SerializableEventContent,
2345 },
2346 sync::UnreadNotificationsCount,
2347 RoomState, StateStore,
2348 };
2349 use matrix_sdk_test::async_test;
2350 use once_cell::sync::Lazy;
2351 use ruma::{
2352 events::{
2353 room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2354 StateEventType,
2355 },
2356 room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2357 RoomId, TransactionId, UserId,
2358 };
2359 use rusqlite::Transaction;
2360 use serde::{Deserialize, Serialize};
2361 use serde_json::json;
2362 use tempfile::{tempdir, TempDir};
2363 use tokio::fs;
2364
2365 use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2366 use crate::{
2367 error::{Error, Result},
2368 utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2369 OpenStoreError,
2370 };
2371
2372 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2373 static NUM: AtomicU32 = AtomicU32::new(0);
2374 const SECRET: &str = "secret";
2375
2376 fn new_path() -> PathBuf {
2377 let name = NUM.fetch_add(1, SeqCst).to_string();
2378 TMP_DIR.path().join(name)
2379 }
2380
2381 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2382 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2383
2384 let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2385 let pool = config.create_pool(Runtime::Tokio1).unwrap();
2388 let conn = pool.get().await?;
2389
2390 init(&conn).await?;
2391
2392 let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2393 let this = SqliteStateStore { store_cipher, pool };
2394 this.run_migrations(&conn, 1, Some(version)).await?;
2395
2396 Ok(this)
2397 }
2398
2399 fn room_info_v1_json(
2400 room_id: &RoomId,
2401 state: RoomState,
2402 name: Option<&str>,
2403 creator: Option<&UserId>,
2404 ) -> serde_json::Value {
2405 let name_content = match name {
2407 Some(name) => json!({ "name": name }),
2408 None => json!({ "name": null }),
2409 };
2410 let create_content = match creator {
2412 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2413 None => RoomCreateEventContent::new_v11(),
2414 };
2415
2416 json!({
2417 "room_id": room_id,
2418 "room_type": state,
2419 "notification_counts": UnreadNotificationsCount::default(),
2420 "summary": {
2421 "heroes": [],
2422 "joined_member_count": 0,
2423 "invited_member_count": 0,
2424 },
2425 "members_synced": false,
2426 "base_info": {
2427 "dm_targets": [],
2428 "max_power_level": 100,
2429 "name": {
2430 "Original": {
2431 "content": name_content,
2432 },
2433 },
2434 "create": {
2435 "Original": {
2436 "content": create_content,
2437 }
2438 }
2439 },
2440 })
2441 }
2442
2443 #[async_test]
2444 pub async fn test_migrating_v1_to_v2() {
2445 let path = new_path();
2446 {
2448 let db = create_fake_db(&path, 1).await.unwrap();
2449 let conn = db.pool.get().await.unwrap();
2450
2451 let this = db.clone();
2452 conn.with_transaction(move |txn| {
2453 for i in 0..5 {
2454 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2455 let (state, stripped) =
2456 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2457 let info = room_info_v1_json(&room_id, state, None, None);
2458
2459 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2460 let data = this.serialize_json(&info)?;
2461
2462 txn.prepare_cached(
2463 "INSERT INTO room_info (room_id, stripped, data)
2464 VALUES (?, ?, ?)",
2465 )?
2466 .execute((room_id, stripped, data))?;
2467 }
2468
2469 Result::<_, Error>::Ok(())
2470 })
2471 .await
2472 .unwrap();
2473 }
2474
2475 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2477
2478 assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2480 }
2481
2482 fn add_room_v2(
2484 this: &SqliteStateStore,
2485 txn: &Transaction<'_>,
2486 room_id: &RoomId,
2487 name: Option<&str>,
2488 create_creator: Option<&UserId>,
2489 create_sender: Option<&UserId>,
2490 ) -> Result<(), Error> {
2491 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2492
2493 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2494 let encoded_state =
2495 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2496 let data = this.serialize_json(&room_info_json)?;
2497
2498 txn.prepare_cached(
2499 "INSERT INTO room_info (room_id, state, data)
2500 VALUES (?, ?, ?)",
2501 )?
2502 .execute((encoded_room_id, encoded_state, data))?;
2503
2504 let Some(create_sender) = create_sender else {
2506 return Ok(());
2507 };
2508
2509 let create_content = match create_creator {
2510 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2511 None => RoomCreateEventContent::new_v11(),
2512 };
2513
2514 let event_id = EventId::new(server_name!("dummy.local"));
2515 let create_event = json!({
2516 "content": create_content,
2517 "event_id": event_id,
2518 "sender": create_sender.to_owned(),
2519 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2520 "state_key": "",
2521 "type": "m.room.create",
2522 "unsigned": {},
2523 });
2524
2525 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2526 let encoded_event_type =
2527 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2528 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2529 let stripped = false;
2530 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2531 let data = this.serialize_json(&create_event)?;
2532
2533 txn.prepare_cached(
2534 "INSERT
2535 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2536 VALUES (?, ?, ?, ?, ?, ?)",
2537 )?
2538 .execute((
2539 encoded_room_id,
2540 encoded_event_type,
2541 encoded_state_key,
2542 stripped,
2543 encoded_event_id,
2544 data,
2545 ))?;
2546
2547 Ok(())
2548 }
2549
2550 #[async_test]
2551 pub async fn test_migrating_v2_to_v3() {
2552 let path = new_path();
2553
2554 let room_a_id = room_id!("!room_a:dummy.local");
2556 let room_a_name = "Room A";
2557 let room_a_creator = user_id!("@creator:dummy.local");
2558 let room_a_create_sender = user_id!("@sender:dummy.local");
2561
2562 let room_b_id = room_id!("!room_b:dummy.local");
2564
2565 let room_c_id = room_id!("!room_c:dummy.local");
2567 let room_c_create_sender = user_id!("@creator:dummy.local");
2568
2569 {
2571 let db = create_fake_db(&path, 2).await.unwrap();
2572 let conn = db.pool.get().await.unwrap();
2573
2574 let this = db.clone();
2575 conn.with_transaction(move |txn| {
2576 add_room_v2(
2577 &this,
2578 txn,
2579 room_a_id,
2580 Some(room_a_name),
2581 Some(room_a_creator),
2582 Some(room_a_create_sender),
2583 )?;
2584 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2585 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2586
2587 Result::<_, Error>::Ok(())
2588 })
2589 .await
2590 .unwrap();
2591 }
2592
2593 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2595
2596 let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2598 assert_eq!(room_infos.len(), 3);
2599
2600 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2601 assert_eq!(room_a.name(), Some(room_a_name));
2602 assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2603
2604 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2605 assert_eq!(room_b.name(), None);
2606 assert_eq!(room_b.creators(), None);
2607
2608 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2609 assert_eq!(room_c.name(), None);
2610 assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2611 }
2612
2613 #[async_test]
2614 pub async fn test_migrating_v7_to_v9() {
2615 let path = new_path();
2616
2617 let room_id = room_id!("!room_a:dummy.local");
2618 let wedged_event_transaction_id = TransactionId::new();
2619 let local_event_transaction_id = TransactionId::new();
2620
2621 {
2623 let db = create_fake_db(&path, 7).await.unwrap();
2624 let conn = db.pool.get().await.unwrap();
2625
2626 let wedge_tx = wedged_event_transaction_id.clone();
2627 let local_tx = local_event_transaction_id.clone();
2628
2629 conn.with_transaction(move |txn| {
2630 add_dependent_send_queue_event_v7(
2631 &db,
2632 txn,
2633 room_id,
2634 &local_tx,
2635 ChildTransactionId::new(),
2636 DependentQueuedRequestKind::RedactEvent,
2637 )?;
2638 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2639 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2640 Result::<_, Error>::Ok(())
2641 })
2642 .await
2643 .unwrap();
2644 }
2645
2646 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2649
2650 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2651 assert!(requests.is_empty());
2652
2653 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2654 assert!(dependent_requests.is_empty());
2655 }
2656
2657 fn add_send_queue_event_v7(
2658 this: &SqliteStateStore,
2659 txn: &Transaction<'_>,
2660 transaction_id: &TransactionId,
2661 room_id: &RoomId,
2662 is_wedged: bool,
2663 ) -> Result<(), Error> {
2664 let content =
2665 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2666
2667 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2668 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2669
2670 let content = this.serialize_json(&content)?;
2671
2672 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2673 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2674
2675 Ok(())
2676 }
2677
2678 fn add_dependent_send_queue_event_v7(
2679 this: &SqliteStateStore,
2680 txn: &Transaction<'_>,
2681 room_id: &RoomId,
2682 parent_txn_id: &TransactionId,
2683 own_txn_id: ChildTransactionId,
2684 content: DependentQueuedRequestKind,
2685 ) -> Result<(), Error> {
2686 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2687
2688 let parent_txn_id = parent_txn_id.to_string();
2689 let own_txn_id = own_txn_id.to_string();
2690 let content = this.serialize_json(&content)?;
2691
2692 txn.prepare_cached(
2693 "INSERT INTO dependent_send_queue_events
2694 (room_id, parent_transaction_id, own_transaction_id, content)
2695 VALUES (?, ?, ?, ?)",
2696 )?
2697 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2698
2699 Ok(())
2700 }
2701
2702 #[derive(Clone, Debug, Serialize, Deserialize)]
2703 pub enum LegacyDependentQueuedRequestKind {
2704 UploadFileWithThumbnail {
2705 content_type: String,
2706 cache_key: MediaRequestParameters,
2707 related_to: OwnedTransactionId,
2708 },
2709 }
2710
2711 #[async_test]
2712 pub async fn test_dependent_queued_request_variant_renaming() {
2713 let path = new_path();
2714 let db = create_fake_db(&path, 7).await.unwrap();
2715
2716 let cache_key = MediaRequestParameters {
2717 format: MediaFormat::File,
2718 source: MediaSource::Plain("https://server.local/foobar".into()),
2719 };
2720 let related_to = TransactionId::new();
2721 let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2722 content_type: "image/png".to_owned(),
2723 cache_key,
2724 related_to: related_to.clone(),
2725 };
2726
2727 let data = db
2728 .serialize_json(&request)
2729 .expect("should be able to serialize legacy dependent request");
2730 let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2731 "should be able to deserialize dependent request from legacy dependent request",
2732 );
2733
2734 as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2735 assert_eq!(de_related_to, related_to);
2736 });
2737 }
2738}