1use crate::store::schema::*;
2use crate::store::traits::*;
3use async_trait::async_trait;
4use diesel::prelude::*;
5use diesel::r2d2::{ConnectionManager, Pool};
6use diesel::sql_query;
7use diesel::sqlite::SqliteConnection;
8use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
9use prost::Message;
10use wacore::appstate::hash::HashState;
11use wacore::libsignal;
12use wacore::libsignal::protocol::{Direction, KeyPair, PrivateKey, PublicKey};
13use wacore::store::error::{Result, StoreError};
14use wacore::store::traits::AppStateMutationMAC;
15use waproto::whatsapp::{self as wa, PreKeyRecordStructure, SignedPreKeyRecordStructure};
16
17use wacore::store::Device as CoreDevice;
18
19const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");
20
21type SqlitePool = Pool<ConnectionManager<SqliteConnection>>;
22type SignalStoreError = Box<dyn std::error::Error + Send + Sync>;
23type DeviceRow = (
24 i32, String, String, i32, Vec<u8>, Vec<u8>, Vec<u8>, i32, Vec<u8>, Vec<u8>, Option<Vec<u8>>, String, i32, i32, i64, i64, );
41
42#[derive(Clone)]
43pub struct SqliteStore {
44 pub(crate) pool: SqlitePool,
45}
46
47impl SqliteStore {
48 pub async fn new(database_url: &str) -> std::result::Result<Self, StoreError> {
49 let manager = ConnectionManager::<SqliteConnection>::new(database_url);
50 let pool = Pool::builder()
51 .build(manager)
52 .map_err(|e| StoreError::Connection(e.to_string()))?;
53
54 let pool_clone = pool.clone();
55 tokio::task::spawn_blocking(move || -> std::result::Result<(), StoreError> {
56 let mut conn = pool_clone
57 .get()
58 .map_err(|e| StoreError::Connection(e.to_string()))?;
59 conn.run_pending_migrations(MIGRATIONS)
60 .map_err(|e| StoreError::Migration(e.to_string()))?;
61 let _ = diesel::sql_query("PRAGMA journal_mode=WAL;").execute(&mut conn);
62 let _ = diesel::sql_query("PRAGMA synchronous=NORMAL;").execute(&mut conn);
63 let _ = diesel::sql_query("PRAGMA busy_timeout = 15000;").execute(&mut conn);
64 Ok(())
65 })
66 .await
67 .map_err(|e| StoreError::Database(e.to_string()))??;
68
69 Ok(Self { pool })
70 }
71
72 pub fn begin_transaction(
73 &self,
74 ) -> Result<diesel::r2d2::PooledConnection<ConnectionManager<SqliteConnection>>> {
75 let mut conn = self.get_connection()?;
76 diesel::sql_query("BEGIN DEFERRED TRANSACTION;")
77 .execute(&mut conn)
78 .map_err(|e| StoreError::Database(e.to_string()))?;
79 Ok(conn)
80 }
81 pub fn commit_transaction(
82 &self,
83 conn: &mut diesel::r2d2::PooledConnection<ConnectionManager<SqliteConnection>>,
84 ) -> Result<()> {
85 diesel::sql_query("COMMIT;")
86 .execute(conn)
87 .map_err(|e| StoreError::Database(e.to_string()))?;
88 Ok(())
89 }
90 pub fn rollback_transaction(
91 &self,
92 conn: &mut diesel::r2d2::PooledConnection<ConnectionManager<SqliteConnection>>,
93 ) -> Result<()> {
94 diesel::sql_query("ROLLBACK;")
95 .execute(conn)
96 .map_err(|e| StoreError::Database(e.to_string()))?;
97 Ok(())
98 }
99
100 pub(crate) fn get_connection(
101 &self,
102 ) -> std::result::Result<
103 diesel::r2d2::PooledConnection<ConnectionManager<SqliteConnection>>,
104 StoreError,
105 > {
106 self.pool
107 .get()
108 .map_err(|e| StoreError::Connection(e.to_string()))
109 }
110
111 fn serialize_keypair(&self, key_pair: &KeyPair) -> Result<Vec<u8>> {
112 let mut bytes = Vec::with_capacity(64);
113 bytes.extend_from_slice(&key_pair.private_key.serialize());
114 bytes.extend_from_slice(key_pair.public_key.public_key_bytes());
115 Ok(bytes)
116 }
117
118 fn deserialize_keypair(&self, bytes: &[u8]) -> Result<KeyPair> {
119 if bytes.len() != 64 {
120 return Err(StoreError::Serialization(format!(
121 "Invalid KeyPair length: {}",
122 bytes.len()
123 )));
124 }
125
126 let private_key = PrivateKey::deserialize(&bytes[0..32])
127 .map_err(|e| StoreError::Serialization(e.to_string()))?;
128 let public_key = PublicKey::from_djb_public_key_bytes(&bytes[32..64])
129 .map_err(|e| StoreError::Serialization(e.to_string()))?;
130
131 Ok(KeyPair::new(public_key, private_key))
132 }
133
134 pub async fn save_device_data(&self, device_data: &CoreDevice) -> Result<()> {
135 let pool = self.pool.clone();
136 let noise_key_data = self.serialize_keypair(&device_data.noise_key)?;
137 let identity_key_data = self.serialize_keypair(&device_data.identity_key)?;
138 let signed_pre_key_data = self.serialize_keypair(&device_data.signed_pre_key)?;
139 let account_data = device_data
140 .account
141 .as_ref()
142 .map(|account| account.encode_to_vec());
143 let registration_id = device_data.registration_id as i32;
144 let signed_pre_key_id = device_data.signed_pre_key_id as i32;
145 let signed_pre_key_signature: Vec<u8> = device_data.signed_pre_key_signature.to_vec();
146 let adv_secret_key: Vec<u8> = device_data.adv_secret_key.to_vec();
147 let push_name = device_data.push_name.clone();
148 let app_version_primary = device_data.app_version_primary as i32;
149 let app_version_secondary = device_data.app_version_secondary as i32;
150 let app_version_tertiary = device_data.app_version_tertiary as i64;
151 let app_version_last_fetched_ms = device_data.app_version_last_fetched_ms;
152 let new_lid = device_data
153 .lid
154 .as_ref()
155 .map(|j| j.to_string())
156 .unwrap_or_default();
157 let new_pn = device_data
158 .pn
159 .as_ref()
160 .map(|j| j.to_string())
161 .unwrap_or_default();
162
163 tokio::task::spawn_blocking(move || -> Result<()> {
164 let mut conn = pool
165 .get()
166 .map_err(|e| StoreError::Connection(e.to_string()))?;
167
168 let device_id: i32 = device::table
170 .select(device::id)
171 .first::<i32>(&mut conn)
172 .optional()
173 .map_err(|e| StoreError::Database(e.to_string()))?
174 .unwrap_or(1);
175
176 diesel::insert_into(device::table)
177 .values((
178 device::id.eq(device_id),
179 device::lid.eq(&new_lid),
180 device::pn.eq(&new_pn),
181 device::registration_id.eq(registration_id),
182 device::noise_key.eq(&noise_key_data),
183 device::identity_key.eq(&identity_key_data),
184 device::signed_pre_key.eq(&signed_pre_key_data),
185 device::signed_pre_key_id.eq(signed_pre_key_id),
186 device::signed_pre_key_signature.eq(&signed_pre_key_signature[..]),
187 device::adv_secret_key.eq(&adv_secret_key[..]),
188 device::account.eq(account_data.clone()),
189 device::push_name.eq(&push_name),
190 device::app_version_primary.eq(app_version_primary),
191 device::app_version_secondary.eq(app_version_secondary),
192 device::app_version_tertiary.eq(app_version_tertiary),
193 device::app_version_last_fetched_ms.eq(app_version_last_fetched_ms),
194 ))
195 .on_conflict(device::id)
196 .do_update()
197 .set((
198 device::lid.eq(&new_lid),
199 device::pn.eq(&new_pn),
200 device::registration_id.eq(registration_id),
201 device::noise_key.eq(&noise_key_data),
202 device::identity_key.eq(&identity_key_data),
203 device::signed_pre_key.eq(&signed_pre_key_data),
204 device::signed_pre_key_id.eq(signed_pre_key_id),
205 device::signed_pre_key_signature.eq(&signed_pre_key_signature[..]),
206 device::adv_secret_key.eq(&adv_secret_key[..]),
207 device::account.eq(account_data.clone()),
208 device::push_name.eq(&push_name),
209 device::app_version_primary.eq(app_version_primary),
210 device::app_version_secondary.eq(app_version_secondary),
211 device::app_version_tertiary.eq(app_version_tertiary),
212 device::app_version_last_fetched_ms.eq(app_version_last_fetched_ms),
213 ))
214 .execute(&mut conn)
215 .map_err(|e| StoreError::Database(e.to_string()))?;
216
217 Ok(())
218 })
219 .await
220 .map_err(|e| StoreError::Database(e.to_string()))??;
221
222 Ok(())
223 }
224
225 pub async fn save_device_data_for_device(
227 &self,
228 device_id: i32,
229 device_data: &CoreDevice,
230 ) -> Result<()> {
231 let pool = self.pool.clone();
232 let noise_key_data = self.serialize_keypair(&device_data.noise_key)?;
233 let identity_key_data = self.serialize_keypair(&device_data.identity_key)?;
234 let signed_pre_key_data = self.serialize_keypair(&device_data.signed_pre_key)?;
235 let account_data = device_data
236 .account
237 .as_ref()
238 .map(|account| account.encode_to_vec());
239 let registration_id = device_data.registration_id as i32;
240 let signed_pre_key_id = device_data.signed_pre_key_id as i32;
241 let signed_pre_key_signature: Vec<u8> = device_data.signed_pre_key_signature.to_vec();
242 let adv_secret_key: Vec<u8> = device_data.adv_secret_key.to_vec();
243 let push_name = device_data.push_name.clone();
244 let app_version_primary = device_data.app_version_primary as i32;
245 let app_version_secondary = device_data.app_version_secondary as i32;
246 let app_version_tertiary = device_data.app_version_tertiary as i64;
247 let app_version_last_fetched_ms = device_data.app_version_last_fetched_ms;
248 let new_lid = device_data
249 .lid
250 .as_ref()
251 .map(|j| j.to_string())
252 .unwrap_or_default();
253 let new_pn = device_data
254 .pn
255 .as_ref()
256 .map(|j| j.to_string())
257 .unwrap_or_default();
258
259 tokio::task::spawn_blocking(move || -> Result<()> {
260 let mut conn = pool
261 .get()
262 .map_err(|e| StoreError::Connection(e.to_string()))?;
263
264 diesel::insert_into(device::table)
265 .values((
266 device::id.eq(device_id),
267 device::lid.eq(&new_lid),
268 device::pn.eq(&new_pn),
269 device::registration_id.eq(registration_id),
270 device::noise_key.eq(&noise_key_data),
271 device::identity_key.eq(&identity_key_data),
272 device::signed_pre_key.eq(&signed_pre_key_data),
273 device::signed_pre_key_id.eq(signed_pre_key_id),
274 device::signed_pre_key_signature.eq(&signed_pre_key_signature[..]),
275 device::adv_secret_key.eq(&adv_secret_key[..]),
276 device::account.eq(account_data.clone()),
277 device::push_name.eq(&push_name),
278 device::app_version_primary.eq(app_version_primary),
279 device::app_version_secondary.eq(app_version_secondary),
280 device::app_version_tertiary.eq(app_version_tertiary),
281 device::app_version_last_fetched_ms.eq(app_version_last_fetched_ms),
282 ))
283 .on_conflict(device::id)
284 .do_update()
285 .set((
286 device::lid.eq(&new_lid),
287 device::pn.eq(&new_pn),
288 device::registration_id.eq(registration_id),
289 device::noise_key.eq(&noise_key_data),
290 device::identity_key.eq(&identity_key_data),
291 device::signed_pre_key.eq(&signed_pre_key_data),
292 device::signed_pre_key_id.eq(signed_pre_key_id),
293 device::signed_pre_key_signature.eq(&signed_pre_key_signature[..]),
294 device::adv_secret_key.eq(&adv_secret_key[..]),
295 device::account.eq(account_data.clone()),
296 device::push_name.eq(&push_name),
297 device::app_version_primary.eq(app_version_primary),
298 device::app_version_secondary.eq(app_version_secondary),
299 device::app_version_tertiary.eq(app_version_tertiary),
300 device::app_version_last_fetched_ms.eq(app_version_last_fetched_ms),
301 ))
302 .execute(&mut conn)
303 .map_err(|e| StoreError::Database(e.to_string()))?;
304
305 Ok(())
306 })
307 .await
308 .map_err(|e| StoreError::Database(e.to_string()))??;
309
310 Ok(())
311 }
312
313 pub async fn load_device_data(&self) -> Result<Option<CoreDevice>> {
314 let pool = self.pool.clone();
315 let row = tokio::task::spawn_blocking(move || -> Result<Option<DeviceRow>> {
316 let mut conn = pool
317 .get()
318 .map_err(|e| StoreError::Connection(e.to_string()))?;
319 let result = device::table
320 .first::<DeviceRow>(&mut conn)
321 .optional()
322 .map_err(|e| StoreError::Database(e.to_string()))?;
323 Ok(result)
324 })
325 .await
326 .map_err(|e| StoreError::Database(e.to_string()))??;
327
328 if let Some((
329 _device_id, lid_str,
331 pn_str,
332 registration_id,
333 noise_key_data,
334 identity_key_data,
335 signed_pre_key_data,
336 signed_pre_key_id,
337 signed_pre_key_signature_data,
338 adv_secret_key_data,
339 account_data,
340 push_name,
341 app_version_primary,
342 app_version_secondary,
343 app_version_tertiary,
344 app_version_last_fetched_ms,
345 )) = row
346 {
347 let id = if !pn_str.is_empty() {
348 pn_str.parse().ok()
349 } else {
350 None
351 };
352 let lid = if !lid_str.is_empty() {
353 lid_str.parse().ok()
354 } else {
355 None
356 };
357
358 let noise_key = self.deserialize_keypair(&noise_key_data)?;
359 let identity_key = self.deserialize_keypair(&identity_key_data)?;
360 let signed_pre_key = self.deserialize_keypair(&signed_pre_key_data)?;
361
362 let mut signed_pre_key_signature = [0u8; 64];
363 if signed_pre_key_signature_data.len() == 64 {
364 signed_pre_key_signature.copy_from_slice(&signed_pre_key_signature_data);
365 } else {
366 return Err(StoreError::Serialization(
367 "Invalid signature length".to_string(),
368 ));
369 }
370
371 let mut adv_secret_key = [0u8; 32];
372 if adv_secret_key_data.len() == 32 {
373 adv_secret_key.copy_from_slice(&adv_secret_key_data);
374 } else {
375 return Err(StoreError::Serialization(
376 "Invalid secret key length".to_string(),
377 ));
378 }
379
380 let account = if let Some(account_data) = account_data {
381 Some(
382 wa::AdvSignedDeviceIdentity::decode(account_data.as_slice())
383 .map_err(|e| StoreError::Serialization(e.to_string()))?,
384 )
385 } else {
386 None
387 };
388
389 Ok(Some(CoreDevice {
390 pn: id,
391 lid,
392 registration_id: registration_id as u32,
393 noise_key,
394 identity_key,
395 signed_pre_key,
396 signed_pre_key_id: signed_pre_key_id as u32,
397 signed_pre_key_signature,
398 adv_secret_key,
399 account,
400 push_name,
401 app_version_primary: app_version_primary as u32,
402 app_version_secondary: app_version_secondary as u32,
403 app_version_tertiary: app_version_tertiary.try_into().unwrap_or(0u32),
404 app_version_last_fetched_ms,
405 device_props: {
406 use wacore::store::device::DEVICE_PROPS;
407 DEVICE_PROPS.clone()
408 },
409 }))
410 } else {
411 Ok(None)
412 }
413 }
414
415 pub async fn create_new_device(&self) -> Result<i32> {
417 use crate::store::schema::device;
418
419 let pool = self.pool.clone();
420 tokio::task::spawn_blocking(move || -> Result<i32> {
421 let mut conn = pool
422 .get()
423 .map_err(|e| StoreError::Connection(e.to_string()))?;
424
425 let new_device = wacore::store::Device::new();
427
428 let noise_key_data = {
430 let mut bytes = Vec::with_capacity(64);
431 bytes.extend_from_slice(&new_device.noise_key.private_key.serialize());
432 bytes.extend_from_slice(new_device.noise_key.public_key.public_key_bytes());
433 bytes
434 };
435 let identity_key_data = {
436 let mut bytes = Vec::with_capacity(64);
437 bytes.extend_from_slice(&new_device.identity_key.private_key.serialize());
438 bytes.extend_from_slice(new_device.identity_key.public_key.public_key_bytes());
439 bytes
440 };
441 let signed_pre_key_data = {
442 let mut bytes = Vec::with_capacity(64);
443 bytes.extend_from_slice(&new_device.signed_pre_key.private_key.serialize());
444 bytes.extend_from_slice(new_device.signed_pre_key.public_key.public_key_bytes());
445 bytes
446 };
447
448 diesel::insert_into(device::table)
450 .values((
451 device::lid.eq(""), device::pn.eq(""), device::registration_id.eq(new_device.registration_id as i32),
454 device::noise_key.eq(&noise_key_data),
455 device::identity_key.eq(&identity_key_data),
456 device::signed_pre_key.eq(&signed_pre_key_data),
457 device::signed_pre_key_id.eq(new_device.signed_pre_key_id as i32),
458 device::signed_pre_key_signature.eq(&new_device.signed_pre_key_signature[..]),
459 device::adv_secret_key.eq(&new_device.adv_secret_key[..]),
460 device::account.eq(None::<Vec<u8>>),
461 device::push_name.eq(&new_device.push_name),
462 device::app_version_primary.eq(new_device.app_version_primary as i32),
463 device::app_version_secondary.eq(new_device.app_version_secondary as i32),
464 device::app_version_tertiary.eq(new_device.app_version_tertiary as i64),
465 device::app_version_last_fetched_ms.eq(new_device.app_version_last_fetched_ms),
466 ))
467 .execute(&mut conn)
468 .map_err(|e| StoreError::Database(e.to_string()))?;
469
470 use diesel::sql_types::Integer;
472
473 #[derive(QueryableByName)]
474 struct LastInsertedId {
475 #[diesel(sql_type = Integer)]
476 last_insert_rowid: i32,
477 }
478
479 let device_id: i32 = sql_query("SELECT last_insert_rowid() as last_insert_rowid")
480 .get_result::<LastInsertedId>(&mut conn)
481 .map_err(|e| StoreError::Database(e.to_string()))?
482 .last_insert_rowid;
483
484 Ok(device_id)
485 })
486 .await
487 .map_err(|e| StoreError::Database(e.to_string()))?
488 }
489
490 pub async fn device_exists(&self, device_id: i32) -> Result<bool> {
492 use crate::store::schema::device;
493
494 let pool = self.pool.clone();
495 tokio::task::spawn_blocking(move || -> Result<bool> {
496 let mut conn = pool
497 .get()
498 .map_err(|e| StoreError::Connection(e.to_string()))?;
499
500 let count: i64 = device::table
501 .filter(device::id.eq(device_id))
502 .count()
503 .get_result(&mut conn)
504 .map_err(|e| StoreError::Database(e.to_string()))?;
505
506 Ok(count > 0)
507 })
508 .await
509 .map_err(|e| StoreError::Database(e.to_string()))?
510 }
511
512 pub async fn list_device_ids(&self) -> Result<Vec<i32>> {
514 use crate::store::schema::device;
515
516 let pool = self.pool.clone();
517 tokio::task::spawn_blocking(move || -> Result<Vec<i32>> {
518 let mut conn = pool
519 .get()
520 .map_err(|e| StoreError::Connection(e.to_string()))?;
521
522 let ids: Vec<i32> = device::table
523 .select(device::id)
524 .load(&mut conn)
525 .map_err(|e| StoreError::Database(e.to_string()))?;
526
527 Ok(ids)
528 })
529 .await
530 .map_err(|e| StoreError::Database(e.to_string()))?
531 }
532
533 pub async fn delete_device(&self, device_id: i32) -> Result<()> {
535 use crate::store::schema::*;
536
537 let pool = self.pool.clone();
538 tokio::task::spawn_blocking(move || -> Result<()> {
539 let mut conn = pool
540 .get()
541 .map_err(|e| StoreError::Connection(e.to_string()))?;
542
543 conn.transaction::<_, diesel::result::Error, _>(|conn| {
545 diesel::delete(identities::table.filter(identities::device_id.eq(device_id)))
547 .execute(conn)?;
548
549 diesel::delete(sessions::table.filter(sessions::device_id.eq(device_id)))
550 .execute(conn)?;
551
552 diesel::delete(prekeys::table.filter(prekeys::device_id.eq(device_id)))
553 .execute(conn)?;
554
555 diesel::delete(
556 signed_prekeys::table.filter(signed_prekeys::device_id.eq(device_id)),
557 )
558 .execute(conn)?;
559
560 diesel::delete(sender_keys::table.filter(sender_keys::device_id.eq(device_id)))
561 .execute(conn)?;
562
563 diesel::delete(
564 app_state_keys::table.filter(app_state_keys::device_id.eq(device_id)),
565 )
566 .execute(conn)?;
567
568 diesel::delete(
569 app_state_versions::table.filter(app_state_versions::device_id.eq(device_id)),
570 )
571 .execute(conn)?;
572
573 diesel::delete(
574 app_state_mutation_macs::table
575 .filter(app_state_mutation_macs::device_id.eq(device_id)),
576 )
577 .execute(conn)?;
578
579 let deleted_rows =
581 diesel::delete(device::table.filter(device::id.eq(device_id))).execute(conn)?;
582
583 if deleted_rows == 0 {
584 return Err(diesel::result::Error::NotFound);
585 }
586
587 Ok(())
588 })
589 .map_err(|e| match e {
590 diesel::result::Error::NotFound => StoreError::DeviceNotFound(device_id),
591 _ => StoreError::Database(e.to_string()),
592 })?;
593
594 Ok(())
595 })
596 .await
597 .map_err(|e| StoreError::Database(e.to_string()))?
598 }
599
600 pub async fn load_device_data_for_device(&self, device_id: i32) -> Result<Option<CoreDevice>> {
602 use crate::store::schema::device;
603
604 let pool = self.pool.clone();
605 let row = tokio::task::spawn_blocking(move || -> Result<Option<DeviceRow>> {
606 let mut conn = pool
607 .get()
608 .map_err(|e| StoreError::Connection(e.to_string()))?;
609 let result = device::table
610 .filter(device::id.eq(device_id))
611 .first::<DeviceRow>(&mut conn)
612 .optional()
613 .map_err(|e| StoreError::Database(e.to_string()))?;
614 Ok(result)
615 })
616 .await
617 .map_err(|e| StoreError::Database(e.to_string()))??;
618
619 if let Some((
620 _device_id, lid_str,
622 pn_str,
623 registration_id,
624 noise_key_data,
625 identity_key_data,
626 signed_pre_key_data,
627 signed_pre_key_id,
628 signed_pre_key_signature_data,
629 adv_secret_key_data,
630 account_data,
631 push_name,
632 app_version_primary,
633 app_version_secondary,
634 app_version_tertiary,
635 app_version_last_fetched_ms,
636 )) = row
637 {
638 let id = if !pn_str.is_empty() {
640 pn_str.parse().ok()
641 } else {
642 None
643 };
644 let lid = if !lid_str.is_empty() {
645 lid_str.parse().ok()
646 } else {
647 None
648 };
649
650 let noise_key = self.deserialize_keypair(&noise_key_data)?;
651 let identity_key = self.deserialize_keypair(&identity_key_data)?;
652 let signed_pre_key = self.deserialize_keypair(&signed_pre_key_data)?;
653
654 let signed_pre_key_signature: [u8; 64] =
655 signed_pre_key_signature_data.try_into().map_err(|_| {
656 StoreError::Serialization("Invalid signed_pre_key_signature length".to_string())
657 })?;
658
659 let adv_secret_key: [u8; 32] = adv_secret_key_data.try_into().map_err(|_| {
660 StoreError::Serialization("Invalid adv_secret_key length".to_string())
661 })?;
662
663 let account = account_data
664 .map(|data| {
665 wa::AdvSignedDeviceIdentity::decode(&data[..])
666 .map_err(|e| StoreError::Serialization(e.to_string()))
667 })
668 .transpose()?;
669
670 Ok(Some(CoreDevice {
671 pn: id,
672 lid,
673 registration_id: registration_id as u32,
674 noise_key,
675 identity_key,
676 signed_pre_key,
677 signed_pre_key_id: signed_pre_key_id as u32,
678 signed_pre_key_signature,
679 adv_secret_key,
680 account,
681 push_name,
682 app_version_primary: app_version_primary as u32,
683 app_version_secondary: app_version_secondary as u32,
684 app_version_tertiary: app_version_tertiary.try_into().unwrap_or(0u32),
685 app_version_last_fetched_ms,
686 device_props: {
687 use wacore::store::device::DEVICE_PROPS;
688 DEVICE_PROPS.clone()
689 },
690 }))
691 } else {
692 Ok(None)
693 }
694 }
695
696 pub async fn put_identity_for_device(
698 &self,
699 address: &str,
700 key: [u8; 32],
701 device_id: i32,
702 ) -> Result<()> {
703 let pool = self.pool.clone();
704 let address = address.to_string();
705 tokio::task::spawn_blocking(move || -> Result<()> {
706 let mut conn = pool
707 .get()
708 .map_err(|e| StoreError::Connection(e.to_string()))?;
709 diesel::insert_into(identities::table)
710 .values((
711 identities::address.eq(address),
712 identities::key.eq(&key[..]),
713 identities::device_id.eq(device_id),
714 ))
715 .on_conflict((identities::address, identities::device_id))
716 .do_update()
717 .set(identities::key.eq(&key[..]))
718 .execute(&mut conn)
719 .map_err(|e| StoreError::Database(e.to_string()))?;
720 Ok(())
721 })
722 .await
723 .map_err(|e| StoreError::Database(e.to_string()))??;
724 Ok(())
725 }
726
727 pub async fn delete_identity_for_device(&self, address: &str, device_id: i32) -> Result<()> {
728 let pool = self.pool.clone();
729 let address = address.to_string();
730 tokio::task::spawn_blocking(move || -> Result<()> {
731 let mut conn = pool
732 .get()
733 .map_err(|e| StoreError::Connection(e.to_string()))?;
734 diesel::delete(
735 identities::table
736 .filter(identities::address.eq(address))
737 .filter(identities::device_id.eq(device_id)),
738 )
739 .execute(&mut conn)
740 .map_err(|e| StoreError::Database(e.to_string()))?;
741 Ok(())
742 })
743 .await
744 .map_err(|e| StoreError::Database(e.to_string()))??;
745 Ok(())
746 }
747
748 pub async fn load_identity_for_device(
749 &self,
750 address: &str,
751 device_id: i32,
752 ) -> Result<Option<Vec<u8>>> {
753 let pool = self.pool.clone();
754 let address = address.to_string();
755 tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
756 let mut conn = pool
757 .get()
758 .map_err(|e| StoreError::Connection(e.to_string()))?;
759 let res: Option<Vec<u8>> = identities::table
760 .select(identities::key)
761 .filter(identities::address.eq(address))
762 .filter(identities::device_id.eq(device_id))
763 .first(&mut conn)
764 .optional()
765 .map_err(|e| StoreError::Database(e.to_string()))?;
766 Ok(res)
767 })
768 .await
769 .map_err(|e| StoreError::Database(e.to_string()))?
770 }
771
772 pub async fn get_session_for_device(
773 &self,
774 address: &str,
775 device_id: i32,
776 ) -> Result<Option<Vec<u8>>> {
777 let pool = self.pool.clone();
778 let address = address.to_string();
779 tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
780 let mut conn = pool
781 .get()
782 .map_err(|e| StoreError::Connection(e.to_string()))?;
783 let res: Option<Vec<u8>> = sessions::table
784 .select(sessions::record)
785 .filter(sessions::address.eq(address))
786 .filter(sessions::device_id.eq(device_id))
787 .first(&mut conn)
788 .optional()
789 .map_err(|e| StoreError::Database(e.to_string()))?;
790 Ok(res)
791 })
792 .await
793 .map_err(|e| StoreError::Database(e.to_string()))?
794 }
795
796 pub async fn put_session_for_device(
797 &self,
798 address: &str,
799 session: &[u8],
800 device_id: i32,
801 ) -> Result<()> {
802 let pool = self.pool.clone();
803 let address = address.to_string();
804 let session = session.to_vec();
805 tokio::task::spawn_blocking(move || -> Result<()> {
806 let mut conn = pool
807 .get()
808 .map_err(|e| StoreError::Connection(e.to_string()))?;
809 diesel::insert_into(sessions::table)
810 .values((
811 sessions::address.eq(address),
812 sessions::record.eq(&session),
813 sessions::device_id.eq(device_id),
814 ))
815 .on_conflict((sessions::address, sessions::device_id))
816 .do_update()
817 .set(sessions::record.eq(&session))
818 .execute(&mut conn)
819 .map_err(|e| StoreError::Database(e.to_string()))?;
820 Ok(())
821 })
822 .await
823 .map_err(|e| StoreError::Database(e.to_string()))??;
824 Ok(())
825 }
826
827 pub async fn delete_session_for_device(&self, address: &str, device_id: i32) -> Result<()> {
828 let pool = self.pool.clone();
829 let address = address.to_string();
830 tokio::task::spawn_blocking(move || -> Result<()> {
831 let mut conn = pool
832 .get()
833 .map_err(|e| StoreError::Connection(e.to_string()))?;
834 diesel::delete(
835 sessions::table
836 .filter(sessions::address.eq(address))
837 .filter(sessions::device_id.eq(device_id)),
838 )
839 .execute(&mut conn)
840 .map_err(|e| StoreError::Database(e.to_string()))?;
841 Ok(())
842 })
843 .await
844 .map_err(|e| StoreError::Database(e.to_string()))??;
845 Ok(())
846 }
847
848 pub async fn has_session_for_device(&self, address: &str, device_id: i32) -> Result<bool> {
849 Ok(self
850 .get_session_for_device(address, device_id)
851 .await?
852 .is_some())
853 }
854
855 pub async fn put_sender_key_for_device(
856 &self,
857 address: &str,
858 record: &[u8],
859 device_id: i32,
860 ) -> Result<()> {
861 let pool = self.pool.clone();
862 let address = address.to_string();
863 let record_vec = record.to_vec();
864 tokio::task::spawn_blocking(move || -> Result<()> {
865 let mut conn = pool
866 .get()
867 .map_err(|e| StoreError::Connection(e.to_string()))?;
868 diesel::insert_into(sender_keys::table)
869 .values((
870 sender_keys::address.eq(address),
871 sender_keys::record.eq(&record_vec),
872 sender_keys::device_id.eq(device_id),
873 ))
874 .on_conflict((sender_keys::address, sender_keys::device_id))
875 .do_update()
876 .set(sender_keys::record.eq(&record_vec))
877 .execute(&mut conn)
878 .map_err(|e| StoreError::Database(e.to_string()))?;
879 Ok(())
880 })
881 .await
882 .map_err(|e| StoreError::Database(e.to_string()))??;
883 Ok(())
884 }
885
886 pub async fn get_sender_key_for_device(
887 &self,
888 address: &str,
889 device_id: i32,
890 ) -> Result<Option<Vec<u8>>> {
891 let pool = self.pool.clone();
892 let address = address.to_string();
893 tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
894 let mut conn = pool
895 .get()
896 .map_err(|e| StoreError::Connection(e.to_string()))?;
897 let res: Option<Vec<u8>> = sender_keys::table
898 .select(sender_keys::record)
899 .filter(sender_keys::address.eq(address))
900 .filter(sender_keys::device_id.eq(device_id))
901 .first(&mut conn)
902 .optional()
903 .map_err(|e| StoreError::Database(e.to_string()))?;
904 Ok(res)
905 })
906 .await
907 .map_err(|e| StoreError::Database(e.to_string()))?
908 }
909
910 pub async fn delete_sender_key_for_device(&self, address: &str, device_id: i32) -> Result<()> {
911 let pool = self.pool.clone();
912 let address = address.to_string();
913 tokio::task::spawn_blocking(move || -> Result<()> {
914 let mut conn = pool
915 .get()
916 .map_err(|e| StoreError::Connection(e.to_string()))?;
917 diesel::delete(
918 sender_keys::table
919 .filter(sender_keys::address.eq(address))
920 .filter(sender_keys::device_id.eq(device_id)),
921 )
922 .execute(&mut conn)
923 .map_err(|e| StoreError::Database(e.to_string()))?;
924 Ok(())
925 })
926 .await
927 .map_err(|e| StoreError::Database(e.to_string()))??;
928 Ok(())
929 }
930
931 pub async fn get_app_state_sync_key_for_device(
932 &self,
933 key_id: &[u8],
934 device_id: i32,
935 ) -> Result<Option<AppStateSyncKey>> {
936 let pool = self.pool.clone();
937 let key_id = key_id.to_vec();
938 let res: Option<Vec<u8>> =
939 tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
940 let mut conn = pool
941 .get()
942 .map_err(|e| StoreError::Connection(e.to_string()))?;
943 let res: Option<Vec<u8>> = app_state_keys::table
944 .select(app_state_keys::key_data)
945 .filter(app_state_keys::key_id.eq(&key_id))
946 .filter(app_state_keys::device_id.eq(device_id))
947 .first(&mut conn)
948 .optional()
949 .map_err(|e| StoreError::Database(e.to_string()))?;
950 Ok(res)
951 })
952 .await
953 .map_err(|e| StoreError::Database(e.to_string()))??;
954
955 if let Some(data) = res {
956 let (key, _) = bincode::serde::decode_from_slice(&data, bincode::config::standard())
957 .map_err(|e| StoreError::Serialization(e.to_string()))?;
958 Ok(Some(key))
959 } else {
960 Ok(None)
961 }
962 }
963
964 pub async fn set_app_state_sync_key_for_device(
965 &self,
966 key_id: &[u8],
967 key: AppStateSyncKey,
968 device_id: i32,
969 ) -> Result<()> {
970 let pool = self.pool.clone();
971 let key_id = key_id.to_vec();
972 let data = bincode::serde::encode_to_vec(&key, bincode::config::standard())
973 .map_err(|e| StoreError::Serialization(e.to_string()))?;
974 tokio::task::spawn_blocking(move || -> Result<()> {
975 let mut conn = pool
976 .get()
977 .map_err(|e| StoreError::Connection(e.to_string()))?;
978 diesel::insert_into(app_state_keys::table)
979 .values((
980 app_state_keys::key_id.eq(&key_id),
981 app_state_keys::key_data.eq(&data),
982 app_state_keys::device_id.eq(device_id),
983 ))
984 .on_conflict((app_state_keys::key_id, app_state_keys::device_id))
985 .do_update()
986 .set(app_state_keys::key_data.eq(&data))
987 .execute(&mut conn)
988 .map_err(|e| StoreError::Database(e.to_string()))?;
989 Ok(())
990 })
991 .await
992 .map_err(|e| StoreError::Database(e.to_string()))??;
993 Ok(())
994 }
995
996 pub async fn get_app_state_version_for_device(
997 &self,
998 name: &str,
999 device_id: i32,
1000 ) -> Result<HashState> {
1001 let pool = self.pool.clone();
1002 let name = name.to_string();
1003 let res: Option<Vec<u8>> =
1004 tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
1005 let mut conn = pool
1006 .get()
1007 .map_err(|e| StoreError::Connection(e.to_string()))?;
1008 let res: Option<Vec<u8>> = app_state_versions::table
1009 .select(app_state_versions::state_data)
1010 .filter(app_state_versions::name.eq(name))
1011 .filter(app_state_versions::device_id.eq(device_id))
1012 .first(&mut conn)
1013 .optional()
1014 .map_err(|e| StoreError::Database(e.to_string()))?;
1015 Ok(res)
1016 })
1017 .await
1018 .map_err(|e| StoreError::Database(e.to_string()))??;
1019
1020 if let Some(data) = res {
1021 let (state, _) = bincode::serde::decode_from_slice(&data, bincode::config::standard())
1022 .map_err(|e| StoreError::Serialization(e.to_string()))?;
1023 Ok(state)
1024 } else {
1025 Ok(HashState::default())
1026 }
1027 }
1028
1029 pub async fn set_app_state_version_for_device(
1030 &self,
1031 name: &str,
1032 state: HashState,
1033 device_id: i32,
1034 ) -> Result<()> {
1035 let pool = self.pool.clone();
1036 let name = name.to_string();
1037 let data = bincode::serde::encode_to_vec(&state, bincode::config::standard())
1038 .map_err(|e| StoreError::Serialization(e.to_string()))?;
1039 tokio::task::spawn_blocking(move || -> Result<()> {
1040 let mut conn = pool
1041 .get()
1042 .map_err(|e| StoreError::Connection(e.to_string()))?;
1043 diesel::insert_into(app_state_versions::table)
1044 .values((
1045 app_state_versions::name.eq(&name),
1046 app_state_versions::state_data.eq(&data),
1047 app_state_versions::device_id.eq(device_id),
1048 ))
1049 .on_conflict((app_state_versions::name, app_state_versions::device_id))
1050 .do_update()
1051 .set(app_state_versions::state_data.eq(&data))
1052 .execute(&mut conn)
1053 .map_err(|e| StoreError::Database(e.to_string()))?;
1054 Ok(())
1055 })
1056 .await
1057 .map_err(|e| StoreError::Database(e.to_string()))??;
1058 Ok(())
1059 }
1060
1061 pub async fn put_app_state_mutation_macs_for_device(
1062 &self,
1063 name: &str,
1064 version: u64,
1065 mutations: &[AppStateMutationMAC],
1066 device_id: i32,
1067 ) -> Result<()> {
1068 if mutations.is_empty() {
1069 return Ok(());
1070 }
1071 let pool = self.pool.clone();
1072 let name = name.to_string();
1073 let mutations: Vec<AppStateMutationMAC> = mutations.to_vec();
1074 tokio::task::spawn_blocking(move || -> Result<()> {
1075 let mut conn = pool
1076 .get()
1077 .map_err(|e| StoreError::Connection(e.to_string()))?;
1078 for m in mutations {
1079 diesel::insert_into(app_state_mutation_macs::table)
1080 .values((
1081 app_state_mutation_macs::name.eq(&name),
1082 app_state_mutation_macs::version.eq(version as i64),
1083 app_state_mutation_macs::index_mac.eq(&m.index_mac),
1084 app_state_mutation_macs::value_mac.eq(&m.value_mac),
1085 app_state_mutation_macs::device_id.eq(device_id),
1086 ))
1087 .on_conflict((
1088 app_state_mutation_macs::name,
1089 app_state_mutation_macs::index_mac,
1090 app_state_mutation_macs::device_id,
1091 ))
1092 .do_update()
1093 .set((
1094 app_state_mutation_macs::version.eq(version as i64),
1095 app_state_mutation_macs::value_mac.eq(&m.value_mac),
1096 ))
1097 .execute(&mut conn)
1098 .map_err(|e| StoreError::Database(e.to_string()))?;
1099 }
1100 Ok(())
1101 })
1102 .await
1103 .map_err(|e| StoreError::Database(e.to_string()))??;
1104 Ok(())
1105 }
1106
1107 pub async fn delete_app_state_mutation_macs_for_device(
1108 &self,
1109 name: &str,
1110 index_macs: &[Vec<u8>],
1111 device_id: i32,
1112 ) -> Result<()> {
1113 if index_macs.is_empty() {
1114 return Ok(());
1115 }
1116 let pool = self.pool.clone();
1117 let name = name.to_string();
1118 let index_macs: Vec<Vec<u8>> = index_macs.to_vec();
1119 tokio::task::spawn_blocking(move || -> Result<()> {
1120 let mut conn = pool
1121 .get()
1122 .map_err(|e| StoreError::Connection(e.to_string()))?;
1123 for idx in index_macs {
1124 diesel::delete(
1125 app_state_mutation_macs::table.filter(
1126 app_state_mutation_macs::name
1127 .eq(&name)
1128 .and(app_state_mutation_macs::index_mac.eq(&idx))
1129 .and(app_state_mutation_macs::device_id.eq(device_id)),
1130 ),
1131 )
1132 .execute(&mut conn)
1133 .map_err(|e| StoreError::Database(e.to_string()))?;
1134 }
1135 Ok(())
1136 })
1137 .await
1138 .map_err(|e| StoreError::Database(e.to_string()))??;
1139 Ok(())
1140 }
1141
1142 pub async fn get_app_state_mutation_mac_for_device(
1143 &self,
1144 name: &str,
1145 index_mac: &[u8],
1146 device_id: i32,
1147 ) -> Result<Option<Vec<u8>>> {
1148 let pool = self.pool.clone();
1149 let name = name.to_string();
1150 let index_mac = index_mac.to_vec();
1151 tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
1152 let mut conn = pool
1153 .get()
1154 .map_err(|e| StoreError::Connection(e.to_string()))?;
1155 let res: Option<Vec<u8>> = app_state_mutation_macs::table
1156 .select(app_state_mutation_macs::value_mac)
1157 .filter(app_state_mutation_macs::name.eq(&name))
1158 .filter(app_state_mutation_macs::index_mac.eq(&index_mac))
1159 .filter(app_state_mutation_macs::device_id.eq(device_id))
1160 .first(&mut conn)
1161 .optional()
1162 .map_err(|e| StoreError::Database(e.to_string()))?;
1163 Ok(res)
1164 })
1165 .await
1166 .map_err(|e| StoreError::Database(e.to_string()))?
1167 }
1168}
1169
1170#[async_trait]
1171impl IdentityStore for SqliteStore {
1172 async fn put_identity(&self, address: &str, key: [u8; 32]) -> Result<()> {
1173 self.put_identity_for_device(address, key, 1).await
1174 }
1175
1176 async fn delete_identity(&self, address: &str) -> Result<()> {
1177 self.delete_identity_for_device(address, 1).await
1178 }
1179
1180 async fn is_trusted_identity(
1181 &self,
1182 address: &str,
1183 key: &[u8; 32],
1184 _direction: Direction,
1185 ) -> Result<bool> {
1186 match self.load_identity(address).await? {
1187 Some(stored_key) => Ok(stored_key.as_slice() == key),
1188 None => Ok(true),
1189 }
1190 }
1191
1192 async fn load_identity(&self, address: &str) -> Result<Option<Vec<u8>>> {
1193 self.load_identity_for_device(address, 1).await
1194 }
1195}
1196
1197#[async_trait]
1198impl SessionStore for SqliteStore {
1199 async fn get_session(&self, address: &str) -> Result<Option<Vec<u8>>> {
1200 self.get_session_for_device(address, 1).await
1201 }
1202
1203 async fn put_session(&self, address: &str, session: &[u8]) -> Result<()> {
1204 self.put_session_for_device(address, session, 1).await
1205 }
1206
1207 async fn delete_session(&self, address: &str) -> Result<()> {
1208 self.delete_session_for_device(address, 1).await
1209 }
1210
1211 async fn has_session(&self, address: &str) -> Result<bool> {
1212 self.has_session_for_device(address, 1).await
1213 }
1214}
1215
1216#[async_trait]
1217impl libsignal::store::PreKeyStore for SqliteStore {
1218 async fn load_prekey(
1219 &self,
1220 prekey_id: u32,
1221 ) -> std::result::Result<Option<PreKeyRecordStructure>, SignalStoreError> {
1222 let pool = self.pool.clone();
1223 let result: Option<Vec<u8>> =
1224 tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
1225 let mut conn = pool
1226 .get()
1227 .map_err(|e| StoreError::Connection(e.to_string()))?;
1228 let res: Option<Vec<u8>> = prekeys::table
1229 .select(prekeys::key)
1230 .filter(prekeys::id.eq(prekey_id as i32))
1231 .filter(prekeys::device_id.eq(1))
1232 .first(&mut conn)
1233 .optional()
1234 .map_err(|e| StoreError::Database(e.to_string()))?;
1235 Ok(res)
1236 })
1237 .await
1238 .map_err(|e| StoreError::Database(e.to_string()))??;
1239
1240 if let Some(key_data) = result {
1241 if let Ok(private_key) = PrivateKey::deserialize(&key_data) {
1242 if let Ok(public_key) = private_key.public_key() {
1243 let key_pair = KeyPair::new(public_key, private_key);
1244 let record = wacore::libsignal::store::record_helpers::new_pre_key_record(
1245 prekey_id, &key_pair,
1246 );
1247 Ok(Some(record))
1248 } else {
1249 Ok(None)
1250 }
1251 } else {
1252 Ok(None)
1253 }
1254 } else {
1255 Ok(None)
1256 }
1257 }
1258
1259 async fn store_prekey(
1260 &self,
1261 prekey_id: u32,
1262 record: PreKeyRecordStructure,
1263 uploaded: bool,
1264 ) -> std::result::Result<(), SignalStoreError> {
1265 let pool = self.pool.clone();
1266 let private_key_bytes = record.private_key.unwrap_or_default();
1267 tokio::task::spawn_blocking(move || -> Result<()> {
1268 let mut conn = pool
1269 .get()
1270 .map_err(|e| StoreError::Connection(e.to_string()))?;
1271 diesel::insert_into(prekeys::table)
1272 .values((
1273 prekeys::id.eq(prekey_id as i32),
1274 prekeys::key.eq(&private_key_bytes),
1275 prekeys::uploaded.eq(uploaded),
1276 prekeys::device_id.eq(1),
1277 ))
1278 .on_conflict((prekeys::id, prekeys::device_id))
1279 .do_update()
1280 .set((
1281 prekeys::key.eq(&private_key_bytes),
1282 prekeys::uploaded.eq(uploaded),
1283 ))
1284 .execute(&mut conn)
1285 .map_err(|e| StoreError::Database(e.to_string()))?;
1286 Ok(())
1287 })
1288 .await
1289 .map_err(|e| StoreError::Database(e.to_string()))??;
1290 Ok(())
1291 }
1292
1293 async fn contains_prekey(&self, prekey_id: u32) -> std::result::Result<bool, SignalStoreError> {
1294 let pool = self.pool.clone();
1295 let count: i64 = tokio::task::spawn_blocking(move || -> Result<i64> {
1296 let mut conn = pool
1297 .get()
1298 .map_err(|e| StoreError::Connection(e.to_string()))?;
1299 let cnt: i64 = prekeys::table
1300 .filter(prekeys::id.eq(prekey_id as i32))
1301 .filter(prekeys::device_id.eq(1))
1302 .count()
1303 .get_result(&mut conn)
1304 .map_err(|e| StoreError::Database(e.to_string()))?;
1305 Ok(cnt)
1306 })
1307 .await
1308 .map_err(|e| StoreError::Database(e.to_string()))??;
1309 Ok(count > 0)
1310 }
1311
1312 async fn remove_prekey(&self, prekey_id: u32) -> std::result::Result<(), SignalStoreError> {
1313 let pool = self.pool.clone();
1314 tokio::task::spawn_blocking(move || -> Result<()> {
1315 let mut conn = pool
1316 .get()
1317 .map_err(|e| StoreError::Connection(e.to_string()))?;
1318 diesel::delete(
1319 prekeys::table
1320 .filter(prekeys::id.eq(prekey_id as i32))
1321 .filter(prekeys::device_id.eq(1)),
1322 )
1323 .execute(&mut conn)
1324 .map_err(|e| StoreError::Database(e.to_string()))?;
1325 Ok(())
1326 })
1327 .await
1328 .map_err(|e| StoreError::Database(e.to_string()))??;
1329 Ok(())
1330 }
1331}
1332
1333#[async_trait]
1334impl SenderKeyStoreHelper for SqliteStore {
1335 async fn put_sender_key(&self, address: &str, record: &[u8]) -> Result<()> {
1336 self.put_sender_key_for_device(address, record, 1).await
1337 }
1338
1339 async fn get_sender_key(&self, address: &str) -> Result<Option<Vec<u8>>> {
1340 self.get_sender_key_for_device(address, 1).await
1341 }
1342
1343 async fn delete_sender_key(&self, address: &str) -> Result<()> {
1344 self.delete_sender_key_for_device(address, 1).await
1345 }
1346}
1347
1348#[async_trait]
1349impl libsignal::store::SignedPreKeyStore for SqliteStore {
1350 async fn load_signed_prekey(
1351 &self,
1352 signed_prekey_id: u32,
1353 ) -> std::result::Result<Option<SignedPreKeyRecordStructure>, SignalStoreError> {
1354 let pool = self.pool.clone();
1355 let result: Option<Vec<u8>> =
1356 tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
1357 let mut conn = pool
1358 .get()
1359 .map_err(|e| StoreError::Connection(e.to_string()))?;
1360 let res: Option<Vec<u8>> = signed_prekeys::table
1361 .select(signed_prekeys::record)
1362 .filter(signed_prekeys::id.eq(signed_prekey_id as i32))
1363 .filter(signed_prekeys::device_id.eq(1))
1364 .first(&mut conn)
1365 .optional()
1366 .map_err(|e| StoreError::Database(e.to_string()))?;
1367 Ok(res)
1368 })
1369 .await
1370 .map_err(|e| StoreError::Database(e.to_string()))??;
1371
1372 if let Some(data) = result {
1373 let record = SignedPreKeyRecordStructure::decode(data.as_slice())
1374 .map_err(|e| StoreError::Serialization(e.to_string()))?;
1375 Ok(Some(record))
1376 } else {
1377 Ok(None)
1378 }
1379 }
1380
1381 async fn load_signed_prekeys(
1382 &self,
1383 ) -> std::result::Result<Vec<SignedPreKeyRecordStructure>, SignalStoreError> {
1384 let mut conn = self.get_connection()?;
1385
1386 let results: Vec<Vec<u8>> = signed_prekeys::table
1387 .select(signed_prekeys::record)
1388 .filter(signed_prekeys::device_id.eq(1))
1389 .load(&mut conn)
1390 .map_err(|e| StoreError::Database(e.to_string()))?;
1391
1392 let mut records = Vec::new();
1393 for data in results {
1394 let record = SignedPreKeyRecordStructure::decode(data.as_slice())
1395 .map_err(|e| StoreError::Serialization(e.to_string()))?;
1396 records.push(record);
1397 }
1398
1399 Ok(records)
1400 }
1401
1402 async fn store_signed_prekey(
1403 &self,
1404 signed_prekey_id: u32,
1405 record: SignedPreKeyRecordStructure,
1406 ) -> std::result::Result<(), SignalStoreError> {
1407 let pool = self.pool.clone();
1408 let data = record.encode_to_vec();
1409 tokio::task::spawn_blocking(move || -> Result<()> {
1410 let mut conn = pool
1411 .get()
1412 .map_err(|e| StoreError::Connection(e.to_string()))?;
1413 diesel::insert_into(signed_prekeys::table)
1414 .values((
1415 signed_prekeys::id.eq(signed_prekey_id as i32),
1416 signed_prekeys::record.eq(&data),
1417 signed_prekeys::device_id.eq(1),
1418 ))
1419 .on_conflict((signed_prekeys::id, signed_prekeys::device_id))
1420 .do_update()
1421 .set(signed_prekeys::record.eq(&data))
1422 .execute(&mut conn)
1423 .map_err(|e| StoreError::Database(e.to_string()))?;
1424 Ok(())
1425 })
1426 .await
1427 .map_err(|e| StoreError::Database(e.to_string()))??;
1428 Ok(())
1429 }
1430
1431 async fn contains_signed_prekey(
1432 &self,
1433 signed_prekey_id: u32,
1434 ) -> std::result::Result<bool, SignalStoreError> {
1435 let pool = self.pool.clone();
1436 let count: i64 = tokio::task::spawn_blocking(move || -> Result<i64> {
1437 let mut conn = pool
1438 .get()
1439 .map_err(|e| StoreError::Connection(e.to_string()))?;
1440 let cnt: i64 = signed_prekeys::table
1441 .filter(signed_prekeys::id.eq(signed_prekey_id as i32))
1442 .filter(signed_prekeys::device_id.eq(1))
1443 .count()
1444 .get_result(&mut conn)
1445 .map_err(|e| StoreError::Database(e.to_string()))?;
1446 Ok(cnt)
1447 })
1448 .await
1449 .map_err(|e| StoreError::Database(e.to_string()))??;
1450 Ok(count > 0)
1451 }
1452
1453 async fn remove_signed_prekey(
1454 &self,
1455 signed_prekey_id: u32,
1456 ) -> std::result::Result<(), SignalStoreError> {
1457 let pool = self.pool.clone();
1458 tokio::task::spawn_blocking(move || -> Result<()> {
1459 let mut conn = pool
1460 .get()
1461 .map_err(|e| StoreError::Connection(e.to_string()))?;
1462 diesel::delete(
1463 signed_prekeys::table
1464 .filter(signed_prekeys::id.eq(signed_prekey_id as i32))
1465 .filter(signed_prekeys::device_id.eq(1)),
1466 )
1467 .execute(&mut conn)
1468 .map_err(|e| StoreError::Database(e.to_string()))?;
1469 Ok(())
1470 })
1471 .await
1472 .map_err(|e| StoreError::Database(e.to_string()))??;
1473 Ok(())
1474 }
1475}
1476
1477#[async_trait]
1478impl AppStateKeyStore for SqliteStore {
1479 async fn get_app_state_sync_key(&self, key_id: &[u8]) -> Result<Option<AppStateSyncKey>> {
1480 self.get_app_state_sync_key_for_device(key_id, 1).await
1481 }
1482
1483 async fn set_app_state_sync_key(&self, key_id: &[u8], key: AppStateSyncKey) -> Result<()> {
1484 self.set_app_state_sync_key_for_device(key_id, key, 1).await
1485 }
1486}
1487
1488#[async_trait]
1489impl AppStateStore for SqliteStore {
1490 async fn get_app_state_version(&self, name: &str) -> Result<HashState> {
1491 self.get_app_state_version_for_device(name, 1).await
1492 }
1493
1494 async fn set_app_state_version(&self, name: &str, state: HashState) -> Result<()> {
1495 self.set_app_state_version_for_device(name, state, 1).await
1496 }
1497
1498 async fn put_app_state_mutation_macs(
1499 &self,
1500 name: &str,
1501 version: u64,
1502 mutations: &[AppStateMutationMAC],
1503 ) -> Result<()> {
1504 self.put_app_state_mutation_macs_for_device(name, version, mutations, 1)
1505 .await
1506 }
1507
1508 async fn delete_app_state_mutation_macs(
1509 &self,
1510 name: &str,
1511 index_macs: &[Vec<u8>],
1512 ) -> Result<()> {
1513 self.delete_app_state_mutation_macs_for_device(name, index_macs, 1)
1514 .await
1515 }
1516
1517 async fn get_app_state_mutation_mac(
1518 &self,
1519 name: &str,
1520 index_mac: &[u8],
1521 ) -> Result<Option<Vec<u8>>> {
1522 self.get_app_state_mutation_mac_for_device(name, index_mac, 1)
1523 .await
1524 }
1525}
1526
1527#[async_trait]
1528impl wacore::store::traits::DevicePersistence for SqliteStore {
1529 async fn save_device_data(
1530 &self,
1531 device_data: &wacore::store::Device,
1532 ) -> wacore::store::error::Result<()> {
1533 self.save_device_data_for_device(1, device_data).await
1535 }
1536
1537 async fn save_device_data_for_device(
1538 &self,
1539 device_id: i32,
1540 device_data: &wacore::store::Device,
1541 ) -> wacore::store::error::Result<()> {
1542 SqliteStore::save_device_data_for_device(self, device_id, device_data).await
1543 }
1544
1545 async fn load_device_data(
1546 &self,
1547 ) -> wacore::store::error::Result<Option<wacore::store::Device>> {
1548 self.load_device_data_for_device(1).await
1550 }
1551
1552 async fn load_device_data_for_device(
1553 &self,
1554 device_id: i32,
1555 ) -> wacore::store::error::Result<Option<wacore::store::Device>> {
1556 SqliteStore::load_device_data_for_device(self, device_id).await
1557 }
1558
1559 async fn device_exists(&self, device_id: i32) -> wacore::store::error::Result<bool> {
1560 SqliteStore::device_exists(self, device_id).await
1561 }
1562
1563 async fn create_new_device(&self) -> wacore::store::error::Result<i32> {
1564 SqliteStore::create_new_device(self).await
1565 }
1566}