whatsapp_rust/store/
sqlite_store.rs

1use crate::store::schema::*;
2use crate::store::traits::*;
3use async_trait::async_trait;
4use diesel::prelude::*;
5use diesel::r2d2::{ConnectionManager, Pool};
6use diesel::sql_query;
7use diesel::sqlite::SqliteConnection;
8use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
9use prost::Message;
10use wacore::appstate::hash::HashState;
11use wacore::libsignal;
12use wacore::libsignal::protocol::{Direction, KeyPair, PrivateKey, PublicKey};
13use wacore::store::error::{Result, StoreError};
14use wacore::store::traits::AppStateMutationMAC;
15use waproto::whatsapp::{self as wa, PreKeyRecordStructure, SignedPreKeyRecordStructure};
16
17use wacore::store::Device as CoreDevice;
18
19const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");
20
21type SqlitePool = Pool<ConnectionManager<SqliteConnection>>;
22type SignalStoreError = Box<dyn std::error::Error + Send + Sync>;
23type DeviceRow = (
24    i32,             // id (new primary key)
25    String,          // lid
26    String,          // pn
27    i32,             // registration_id
28    Vec<u8>,         // noise_key
29    Vec<u8>,         // identity_key
30    Vec<u8>,         // signed_pre_key
31    i32,             // signed_pre_key_id
32    Vec<u8>,         // signed_pre_key_signature
33    Vec<u8>,         // adv_secret_key
34    Option<Vec<u8>>, // account
35    String,          // push_name
36    i32,             // app_version_primary
37    i32,             // app_version_secondary
38    i64,             // app_version_tertiary
39    i64,             // app_version_last_fetched_ms
40);
41
42#[derive(Clone)]
43pub struct SqliteStore {
44    pub(crate) pool: SqlitePool,
45}
46
47impl SqliteStore {
48    pub async fn new(database_url: &str) -> std::result::Result<Self, StoreError> {
49        let manager = ConnectionManager::<SqliteConnection>::new(database_url);
50        let pool = Pool::builder()
51            .build(manager)
52            .map_err(|e| StoreError::Connection(e.to_string()))?;
53
54        let pool_clone = pool.clone();
55        tokio::task::spawn_blocking(move || -> std::result::Result<(), StoreError> {
56            let mut conn = pool_clone
57                .get()
58                .map_err(|e| StoreError::Connection(e.to_string()))?;
59            conn.run_pending_migrations(MIGRATIONS)
60                .map_err(|e| StoreError::Migration(e.to_string()))?;
61            let _ = diesel::sql_query("PRAGMA journal_mode=WAL;").execute(&mut conn);
62            let _ = diesel::sql_query("PRAGMA synchronous=NORMAL;").execute(&mut conn);
63            let _ = diesel::sql_query("PRAGMA busy_timeout = 15000;").execute(&mut conn);
64            Ok(())
65        })
66        .await
67        .map_err(|e| StoreError::Database(e.to_string()))??;
68
69        Ok(Self { pool })
70    }
71
72    pub fn begin_transaction(
73        &self,
74    ) -> Result<diesel::r2d2::PooledConnection<ConnectionManager<SqliteConnection>>> {
75        let mut conn = self.get_connection()?;
76        diesel::sql_query("BEGIN DEFERRED TRANSACTION;")
77            .execute(&mut conn)
78            .map_err(|e| StoreError::Database(e.to_string()))?;
79        Ok(conn)
80    }
81    pub fn commit_transaction(
82        &self,
83        conn: &mut diesel::r2d2::PooledConnection<ConnectionManager<SqliteConnection>>,
84    ) -> Result<()> {
85        diesel::sql_query("COMMIT;")
86            .execute(conn)
87            .map_err(|e| StoreError::Database(e.to_string()))?;
88        Ok(())
89    }
90    pub fn rollback_transaction(
91        &self,
92        conn: &mut diesel::r2d2::PooledConnection<ConnectionManager<SqliteConnection>>,
93    ) -> Result<()> {
94        diesel::sql_query("ROLLBACK;")
95            .execute(conn)
96            .map_err(|e| StoreError::Database(e.to_string()))?;
97        Ok(())
98    }
99
100    pub(crate) fn get_connection(
101        &self,
102    ) -> std::result::Result<
103        diesel::r2d2::PooledConnection<ConnectionManager<SqliteConnection>>,
104        StoreError,
105    > {
106        self.pool
107            .get()
108            .map_err(|e| StoreError::Connection(e.to_string()))
109    }
110
111    fn serialize_keypair(&self, key_pair: &KeyPair) -> Result<Vec<u8>> {
112        let mut bytes = Vec::with_capacity(64);
113        bytes.extend_from_slice(&key_pair.private_key.serialize());
114        bytes.extend_from_slice(key_pair.public_key.public_key_bytes());
115        Ok(bytes)
116    }
117
118    fn deserialize_keypair(&self, bytes: &[u8]) -> Result<KeyPair> {
119        if bytes.len() != 64 {
120            return Err(StoreError::Serialization(format!(
121                "Invalid KeyPair length: {}",
122                bytes.len()
123            )));
124        }
125
126        let private_key = PrivateKey::deserialize(&bytes[0..32])
127            .map_err(|e| StoreError::Serialization(e.to_string()))?;
128        let public_key = PublicKey::from_djb_public_key_bytes(&bytes[32..64])
129            .map_err(|e| StoreError::Serialization(e.to_string()))?;
130
131        Ok(KeyPair::new(public_key, private_key))
132    }
133
134    pub async fn save_device_data(&self, device_data: &CoreDevice) -> Result<()> {
135        let pool = self.pool.clone();
136        let noise_key_data = self.serialize_keypair(&device_data.noise_key)?;
137        let identity_key_data = self.serialize_keypair(&device_data.identity_key)?;
138        let signed_pre_key_data = self.serialize_keypair(&device_data.signed_pre_key)?;
139        let account_data = device_data
140            .account
141            .as_ref()
142            .map(|account| account.encode_to_vec());
143        let registration_id = device_data.registration_id as i32;
144        let signed_pre_key_id = device_data.signed_pre_key_id as i32;
145        let signed_pre_key_signature: Vec<u8> = device_data.signed_pre_key_signature.to_vec();
146        let adv_secret_key: Vec<u8> = device_data.adv_secret_key.to_vec();
147        let push_name = device_data.push_name.clone();
148        let app_version_primary = device_data.app_version_primary as i32;
149        let app_version_secondary = device_data.app_version_secondary as i32;
150        let app_version_tertiary = device_data.app_version_tertiary as i64;
151        let app_version_last_fetched_ms = device_data.app_version_last_fetched_ms;
152        let new_lid = device_data
153            .lid
154            .as_ref()
155            .map(|j| j.to_string())
156            .unwrap_or_default();
157        let new_pn = device_data
158            .pn
159            .as_ref()
160            .map(|j| j.to_string())
161            .unwrap_or_default();
162
163        tokio::task::spawn_blocking(move || -> Result<()> {
164            let mut conn = pool
165                .get()
166                .map_err(|e| StoreError::Connection(e.to_string()))?;
167
168            // In single-device mode, find the first device or default to ID 1.
169            let device_id: i32 = device::table
170                .select(device::id)
171                .first::<i32>(&mut conn)
172                .optional()
173                .map_err(|e| StoreError::Database(e.to_string()))?
174                .unwrap_or(1);
175
176            diesel::insert_into(device::table)
177                .values((
178                    device::id.eq(device_id),
179                    device::lid.eq(&new_lid),
180                    device::pn.eq(&new_pn),
181                    device::registration_id.eq(registration_id),
182                    device::noise_key.eq(&noise_key_data),
183                    device::identity_key.eq(&identity_key_data),
184                    device::signed_pre_key.eq(&signed_pre_key_data),
185                    device::signed_pre_key_id.eq(signed_pre_key_id),
186                    device::signed_pre_key_signature.eq(&signed_pre_key_signature[..]),
187                    device::adv_secret_key.eq(&adv_secret_key[..]),
188                    device::account.eq(account_data.clone()),
189                    device::push_name.eq(&push_name),
190                    device::app_version_primary.eq(app_version_primary),
191                    device::app_version_secondary.eq(app_version_secondary),
192                    device::app_version_tertiary.eq(app_version_tertiary),
193                    device::app_version_last_fetched_ms.eq(app_version_last_fetched_ms),
194                ))
195                .on_conflict(device::id)
196                .do_update()
197                .set((
198                    device::lid.eq(&new_lid),
199                    device::pn.eq(&new_pn),
200                    device::registration_id.eq(registration_id),
201                    device::noise_key.eq(&noise_key_data),
202                    device::identity_key.eq(&identity_key_data),
203                    device::signed_pre_key.eq(&signed_pre_key_data),
204                    device::signed_pre_key_id.eq(signed_pre_key_id),
205                    device::signed_pre_key_signature.eq(&signed_pre_key_signature[..]),
206                    device::adv_secret_key.eq(&adv_secret_key[..]),
207                    device::account.eq(account_data.clone()),
208                    device::push_name.eq(&push_name),
209                    device::app_version_primary.eq(app_version_primary),
210                    device::app_version_secondary.eq(app_version_secondary),
211                    device::app_version_tertiary.eq(app_version_tertiary),
212                    device::app_version_last_fetched_ms.eq(app_version_last_fetched_ms),
213                ))
214                .execute(&mut conn)
215                .map_err(|e| StoreError::Database(e.to_string()))?;
216
217            Ok(())
218        })
219        .await
220        .map_err(|e| StoreError::Database(e.to_string()))??;
221
222        Ok(())
223    }
224
225    /// Save device data for a specific device ID (multi-account mode)
226    pub async fn save_device_data_for_device(
227        &self,
228        device_id: i32,
229        device_data: &CoreDevice,
230    ) -> Result<()> {
231        let pool = self.pool.clone();
232        let noise_key_data = self.serialize_keypair(&device_data.noise_key)?;
233        let identity_key_data = self.serialize_keypair(&device_data.identity_key)?;
234        let signed_pre_key_data = self.serialize_keypair(&device_data.signed_pre_key)?;
235        let account_data = device_data
236            .account
237            .as_ref()
238            .map(|account| account.encode_to_vec());
239        let registration_id = device_data.registration_id as i32;
240        let signed_pre_key_id = device_data.signed_pre_key_id as i32;
241        let signed_pre_key_signature: Vec<u8> = device_data.signed_pre_key_signature.to_vec();
242        let adv_secret_key: Vec<u8> = device_data.adv_secret_key.to_vec();
243        let push_name = device_data.push_name.clone();
244        let app_version_primary = device_data.app_version_primary as i32;
245        let app_version_secondary = device_data.app_version_secondary as i32;
246        let app_version_tertiary = device_data.app_version_tertiary as i64;
247        let app_version_last_fetched_ms = device_data.app_version_last_fetched_ms;
248        let new_lid = device_data
249            .lid
250            .as_ref()
251            .map(|j| j.to_string())
252            .unwrap_or_default();
253        let new_pn = device_data
254            .pn
255            .as_ref()
256            .map(|j| j.to_string())
257            .unwrap_or_default();
258
259        tokio::task::spawn_blocking(move || -> Result<()> {
260            let mut conn = pool
261                .get()
262                .map_err(|e| StoreError::Connection(e.to_string()))?;
263
264            diesel::insert_into(device::table)
265                .values((
266                    device::id.eq(device_id),
267                    device::lid.eq(&new_lid),
268                    device::pn.eq(&new_pn),
269                    device::registration_id.eq(registration_id),
270                    device::noise_key.eq(&noise_key_data),
271                    device::identity_key.eq(&identity_key_data),
272                    device::signed_pre_key.eq(&signed_pre_key_data),
273                    device::signed_pre_key_id.eq(signed_pre_key_id),
274                    device::signed_pre_key_signature.eq(&signed_pre_key_signature[..]),
275                    device::adv_secret_key.eq(&adv_secret_key[..]),
276                    device::account.eq(account_data.clone()),
277                    device::push_name.eq(&push_name),
278                    device::app_version_primary.eq(app_version_primary),
279                    device::app_version_secondary.eq(app_version_secondary),
280                    device::app_version_tertiary.eq(app_version_tertiary),
281                    device::app_version_last_fetched_ms.eq(app_version_last_fetched_ms),
282                ))
283                .on_conflict(device::id)
284                .do_update()
285                .set((
286                    device::lid.eq(&new_lid),
287                    device::pn.eq(&new_pn),
288                    device::registration_id.eq(registration_id),
289                    device::noise_key.eq(&noise_key_data),
290                    device::identity_key.eq(&identity_key_data),
291                    device::signed_pre_key.eq(&signed_pre_key_data),
292                    device::signed_pre_key_id.eq(signed_pre_key_id),
293                    device::signed_pre_key_signature.eq(&signed_pre_key_signature[..]),
294                    device::adv_secret_key.eq(&adv_secret_key[..]),
295                    device::account.eq(account_data.clone()),
296                    device::push_name.eq(&push_name),
297                    device::app_version_primary.eq(app_version_primary),
298                    device::app_version_secondary.eq(app_version_secondary),
299                    device::app_version_tertiary.eq(app_version_tertiary),
300                    device::app_version_last_fetched_ms.eq(app_version_last_fetched_ms),
301                ))
302                .execute(&mut conn)
303                .map_err(|e| StoreError::Database(e.to_string()))?;
304
305            Ok(())
306        })
307        .await
308        .map_err(|e| StoreError::Database(e.to_string()))??;
309
310        Ok(())
311    }
312
313    pub async fn load_device_data(&self) -> Result<Option<CoreDevice>> {
314        let pool = self.pool.clone();
315        let row = tokio::task::spawn_blocking(move || -> Result<Option<DeviceRow>> {
316            let mut conn = pool
317                .get()
318                .map_err(|e| StoreError::Connection(e.to_string()))?;
319            let result = device::table
320                .first::<DeviceRow>(&mut conn)
321                .optional()
322                .map_err(|e| StoreError::Database(e.to_string()))?;
323            Ok(result)
324        })
325        .await
326        .map_err(|e| StoreError::Database(e.to_string()))??;
327
328        if let Some((
329            _device_id, // We don't use this in the CoreDevice (id is just for DB organization)
330            lid_str,
331            pn_str,
332            registration_id,
333            noise_key_data,
334            identity_key_data,
335            signed_pre_key_data,
336            signed_pre_key_id,
337            signed_pre_key_signature_data,
338            adv_secret_key_data,
339            account_data,
340            push_name,
341            app_version_primary,
342            app_version_secondary,
343            app_version_tertiary,
344            app_version_last_fetched_ms,
345        )) = row
346        {
347            let id = if !pn_str.is_empty() {
348                pn_str.parse().ok()
349            } else {
350                None
351            };
352            let lid = if !lid_str.is_empty() {
353                lid_str.parse().ok()
354            } else {
355                None
356            };
357
358            let noise_key = self.deserialize_keypair(&noise_key_data)?;
359            let identity_key = self.deserialize_keypair(&identity_key_data)?;
360            let signed_pre_key = self.deserialize_keypair(&signed_pre_key_data)?;
361
362            let mut signed_pre_key_signature = [0u8; 64];
363            if signed_pre_key_signature_data.len() == 64 {
364                signed_pre_key_signature.copy_from_slice(&signed_pre_key_signature_data);
365            } else {
366                return Err(StoreError::Serialization(
367                    "Invalid signature length".to_string(),
368                ));
369            }
370
371            let mut adv_secret_key = [0u8; 32];
372            if adv_secret_key_data.len() == 32 {
373                adv_secret_key.copy_from_slice(&adv_secret_key_data);
374            } else {
375                return Err(StoreError::Serialization(
376                    "Invalid secret key length".to_string(),
377                ));
378            }
379
380            let account = if let Some(account_data) = account_data {
381                Some(
382                    wa::AdvSignedDeviceIdentity::decode(account_data.as_slice())
383                        .map_err(|e| StoreError::Serialization(e.to_string()))?,
384                )
385            } else {
386                None
387            };
388
389            Ok(Some(CoreDevice {
390                pn: id,
391                lid,
392                registration_id: registration_id as u32,
393                noise_key,
394                identity_key,
395                signed_pre_key,
396                signed_pre_key_id: signed_pre_key_id as u32,
397                signed_pre_key_signature,
398                adv_secret_key,
399                account,
400                push_name,
401                app_version_primary: app_version_primary as u32,
402                app_version_secondary: app_version_secondary as u32,
403                app_version_tertiary: app_version_tertiary.try_into().unwrap_or(0u32),
404                app_version_last_fetched_ms,
405                device_props: {
406                    use wacore::store::device::DEVICE_PROPS;
407                    DEVICE_PROPS.clone()
408                },
409            }))
410        } else {
411            Ok(None)
412        }
413    }
414
415    /// Create a new device entry in the database and return its ID
416    pub async fn create_new_device(&self) -> Result<i32> {
417        use crate::store::schema::device;
418
419        let pool = self.pool.clone();
420        tokio::task::spawn_blocking(move || -> Result<i32> {
421            let mut conn = pool
422                .get()
423                .map_err(|e| StoreError::Connection(e.to_string()))?;
424
425            // Create a new CoreDevice with default values
426            let new_device = wacore::store::Device::new();
427
428            // Serialize the device data
429            let noise_key_data = {
430                let mut bytes = Vec::with_capacity(64);
431                bytes.extend_from_slice(&new_device.noise_key.private_key.serialize());
432                bytes.extend_from_slice(new_device.noise_key.public_key.public_key_bytes());
433                bytes
434            };
435            let identity_key_data = {
436                let mut bytes = Vec::with_capacity(64);
437                bytes.extend_from_slice(&new_device.identity_key.private_key.serialize());
438                bytes.extend_from_slice(new_device.identity_key.public_key.public_key_bytes());
439                bytes
440            };
441            let signed_pre_key_data = {
442                let mut bytes = Vec::with_capacity(64);
443                bytes.extend_from_slice(&new_device.signed_pre_key.private_key.serialize());
444                bytes.extend_from_slice(new_device.signed_pre_key.public_key.public_key_bytes());
445                bytes
446            };
447
448            // Insert the new device
449            diesel::insert_into(device::table)
450                .values((
451                    device::lid.eq(""), // Empty initially, will be set during pairing
452                    device::pn.eq(""),  // Empty initially, will be set during pairing
453                    device::registration_id.eq(new_device.registration_id as i32),
454                    device::noise_key.eq(&noise_key_data),
455                    device::identity_key.eq(&identity_key_data),
456                    device::signed_pre_key.eq(&signed_pre_key_data),
457                    device::signed_pre_key_id.eq(new_device.signed_pre_key_id as i32),
458                    device::signed_pre_key_signature.eq(&new_device.signed_pre_key_signature[..]),
459                    device::adv_secret_key.eq(&new_device.adv_secret_key[..]),
460                    device::account.eq(None::<Vec<u8>>),
461                    device::push_name.eq(&new_device.push_name),
462                    device::app_version_primary.eq(new_device.app_version_primary as i32),
463                    device::app_version_secondary.eq(new_device.app_version_secondary as i32),
464                    device::app_version_tertiary.eq(new_device.app_version_tertiary as i64),
465                    device::app_version_last_fetched_ms.eq(new_device.app_version_last_fetched_ms),
466                ))
467                .execute(&mut conn)
468                .map_err(|e| StoreError::Database(e.to_string()))?;
469
470            // Get the last inserted row ID
471            use diesel::sql_types::Integer;
472
473            #[derive(QueryableByName)]
474            struct LastInsertedId {
475                #[diesel(sql_type = Integer)]
476                last_insert_rowid: i32,
477            }
478
479            let device_id: i32 = sql_query("SELECT last_insert_rowid() as last_insert_rowid")
480                .get_result::<LastInsertedId>(&mut conn)
481                .map_err(|e| StoreError::Database(e.to_string()))?
482                .last_insert_rowid;
483
484            Ok(device_id)
485        })
486        .await
487        .map_err(|e| StoreError::Database(e.to_string()))?
488    }
489
490    /// Check if a device with the given ID exists
491    pub async fn device_exists(&self, device_id: i32) -> Result<bool> {
492        use crate::store::schema::device;
493
494        let pool = self.pool.clone();
495        tokio::task::spawn_blocking(move || -> Result<bool> {
496            let mut conn = pool
497                .get()
498                .map_err(|e| StoreError::Connection(e.to_string()))?;
499
500            let count: i64 = device::table
501                .filter(device::id.eq(device_id))
502                .count()
503                .get_result(&mut conn)
504                .map_err(|e| StoreError::Database(e.to_string()))?;
505
506            Ok(count > 0)
507        })
508        .await
509        .map_err(|e| StoreError::Database(e.to_string()))?
510    }
511
512    /// List all device IDs in the database
513    pub async fn list_device_ids(&self) -> Result<Vec<i32>> {
514        use crate::store::schema::device;
515
516        let pool = self.pool.clone();
517        tokio::task::spawn_blocking(move || -> Result<Vec<i32>> {
518            let mut conn = pool
519                .get()
520                .map_err(|e| StoreError::Connection(e.to_string()))?;
521
522            let ids: Vec<i32> = device::table
523                .select(device::id)
524                .load(&mut conn)
525                .map_err(|e| StoreError::Database(e.to_string()))?;
526
527            Ok(ids)
528        })
529        .await
530        .map_err(|e| StoreError::Database(e.to_string()))?
531    }
532
533    /// Delete a device and all its associated data
534    pub async fn delete_device(&self, device_id: i32) -> Result<()> {
535        use crate::store::schema::*;
536
537        let pool = self.pool.clone();
538        tokio::task::spawn_blocking(move || -> Result<()> {
539            let mut conn = pool
540                .get()
541                .map_err(|e| StoreError::Connection(e.to_string()))?;
542
543            // Start a transaction to ensure all deletes succeed or fail together
544            conn.transaction::<_, diesel::result::Error, _>(|conn| {
545                // Delete all associated data first (foreign key children)
546                diesel::delete(identities::table.filter(identities::device_id.eq(device_id)))
547                    .execute(conn)?;
548
549                diesel::delete(sessions::table.filter(sessions::device_id.eq(device_id)))
550                    .execute(conn)?;
551
552                diesel::delete(prekeys::table.filter(prekeys::device_id.eq(device_id)))
553                    .execute(conn)?;
554
555                diesel::delete(
556                    signed_prekeys::table.filter(signed_prekeys::device_id.eq(device_id)),
557                )
558                .execute(conn)?;
559
560                diesel::delete(sender_keys::table.filter(sender_keys::device_id.eq(device_id)))
561                    .execute(conn)?;
562
563                diesel::delete(
564                    app_state_keys::table.filter(app_state_keys::device_id.eq(device_id)),
565                )
566                .execute(conn)?;
567
568                diesel::delete(
569                    app_state_versions::table.filter(app_state_versions::device_id.eq(device_id)),
570                )
571                .execute(conn)?;
572
573                diesel::delete(
574                    app_state_mutation_macs::table
575                        .filter(app_state_mutation_macs::device_id.eq(device_id)),
576                )
577                .execute(conn)?;
578
579                // Finally delete the device itself
580                let deleted_rows =
581                    diesel::delete(device::table.filter(device::id.eq(device_id))).execute(conn)?;
582
583                if deleted_rows == 0 {
584                    return Err(diesel::result::Error::NotFound);
585                }
586
587                Ok(())
588            })
589            .map_err(|e| match e {
590                diesel::result::Error::NotFound => StoreError::DeviceNotFound(device_id),
591                _ => StoreError::Database(e.to_string()),
592            })?;
593
594            Ok(())
595        })
596        .await
597        .map_err(|e| StoreError::Database(e.to_string()))?
598    }
599
600    /// Load device data for a specific device ID
601    pub async fn load_device_data_for_device(&self, device_id: i32) -> Result<Option<CoreDevice>> {
602        use crate::store::schema::device;
603
604        let pool = self.pool.clone();
605        let row = tokio::task::spawn_blocking(move || -> Result<Option<DeviceRow>> {
606            let mut conn = pool
607                .get()
608                .map_err(|e| StoreError::Connection(e.to_string()))?;
609            let result = device::table
610                .filter(device::id.eq(device_id))
611                .first::<DeviceRow>(&mut conn)
612                .optional()
613                .map_err(|e| StoreError::Database(e.to_string()))?;
614            Ok(result)
615        })
616        .await
617        .map_err(|e| StoreError::Database(e.to_string()))??;
618
619        if let Some((
620            _device_id, // We already know the device_id
621            lid_str,
622            pn_str,
623            registration_id,
624            noise_key_data,
625            identity_key_data,
626            signed_pre_key_data,
627            signed_pre_key_id,
628            signed_pre_key_signature_data,
629            adv_secret_key_data,
630            account_data,
631            push_name,
632            app_version_primary,
633            app_version_secondary,
634            app_version_tertiary,
635            app_version_last_fetched_ms,
636        )) = row
637        {
638            // Same parsing logic as load_device_data
639            let id = if !pn_str.is_empty() {
640                pn_str.parse().ok()
641            } else {
642                None
643            };
644            let lid = if !lid_str.is_empty() {
645                lid_str.parse().ok()
646            } else {
647                None
648            };
649
650            let noise_key = self.deserialize_keypair(&noise_key_data)?;
651            let identity_key = self.deserialize_keypair(&identity_key_data)?;
652            let signed_pre_key = self.deserialize_keypair(&signed_pre_key_data)?;
653
654            let signed_pre_key_signature: [u8; 64] =
655                signed_pre_key_signature_data.try_into().map_err(|_| {
656                    StoreError::Serialization("Invalid signed_pre_key_signature length".to_string())
657                })?;
658
659            let adv_secret_key: [u8; 32] = adv_secret_key_data.try_into().map_err(|_| {
660                StoreError::Serialization("Invalid adv_secret_key length".to_string())
661            })?;
662
663            let account = account_data
664                .map(|data| {
665                    wa::AdvSignedDeviceIdentity::decode(&data[..])
666                        .map_err(|e| StoreError::Serialization(e.to_string()))
667                })
668                .transpose()?;
669
670            Ok(Some(CoreDevice {
671                pn: id,
672                lid,
673                registration_id: registration_id as u32,
674                noise_key,
675                identity_key,
676                signed_pre_key,
677                signed_pre_key_id: signed_pre_key_id as u32,
678                signed_pre_key_signature,
679                adv_secret_key,
680                account,
681                push_name,
682                app_version_primary: app_version_primary as u32,
683                app_version_secondary: app_version_secondary as u32,
684                app_version_tertiary: app_version_tertiary.try_into().unwrap_or(0u32),
685                app_version_last_fetched_ms,
686                device_props: {
687                    use wacore::store::device::DEVICE_PROPS;
688                    DEVICE_PROPS.clone()
689                },
690            }))
691        } else {
692            Ok(None)
693        }
694    }
695
696    // ---- Device-parameterized helpers (to deduplicate code) ----
697    pub async fn put_identity_for_device(
698        &self,
699        address: &str,
700        key: [u8; 32],
701        device_id: i32,
702    ) -> Result<()> {
703        let pool = self.pool.clone();
704        let address = address.to_string();
705        tokio::task::spawn_blocking(move || -> Result<()> {
706            let mut conn = pool
707                .get()
708                .map_err(|e| StoreError::Connection(e.to_string()))?;
709            diesel::insert_into(identities::table)
710                .values((
711                    identities::address.eq(address),
712                    identities::key.eq(&key[..]),
713                    identities::device_id.eq(device_id),
714                ))
715                .on_conflict((identities::address, identities::device_id))
716                .do_update()
717                .set(identities::key.eq(&key[..]))
718                .execute(&mut conn)
719                .map_err(|e| StoreError::Database(e.to_string()))?;
720            Ok(())
721        })
722        .await
723        .map_err(|e| StoreError::Database(e.to_string()))??;
724        Ok(())
725    }
726
727    pub async fn delete_identity_for_device(&self, address: &str, device_id: i32) -> Result<()> {
728        let pool = self.pool.clone();
729        let address = address.to_string();
730        tokio::task::spawn_blocking(move || -> Result<()> {
731            let mut conn = pool
732                .get()
733                .map_err(|e| StoreError::Connection(e.to_string()))?;
734            diesel::delete(
735                identities::table
736                    .filter(identities::address.eq(address))
737                    .filter(identities::device_id.eq(device_id)),
738            )
739            .execute(&mut conn)
740            .map_err(|e| StoreError::Database(e.to_string()))?;
741            Ok(())
742        })
743        .await
744        .map_err(|e| StoreError::Database(e.to_string()))??;
745        Ok(())
746    }
747
748    pub async fn load_identity_for_device(
749        &self,
750        address: &str,
751        device_id: i32,
752    ) -> Result<Option<Vec<u8>>> {
753        let pool = self.pool.clone();
754        let address = address.to_string();
755        tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
756            let mut conn = pool
757                .get()
758                .map_err(|e| StoreError::Connection(e.to_string()))?;
759            let res: Option<Vec<u8>> = identities::table
760                .select(identities::key)
761                .filter(identities::address.eq(address))
762                .filter(identities::device_id.eq(device_id))
763                .first(&mut conn)
764                .optional()
765                .map_err(|e| StoreError::Database(e.to_string()))?;
766            Ok(res)
767        })
768        .await
769        .map_err(|e| StoreError::Database(e.to_string()))?
770    }
771
772    pub async fn get_session_for_device(
773        &self,
774        address: &str,
775        device_id: i32,
776    ) -> Result<Option<Vec<u8>>> {
777        let pool = self.pool.clone();
778        let address = address.to_string();
779        tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
780            let mut conn = pool
781                .get()
782                .map_err(|e| StoreError::Connection(e.to_string()))?;
783            let res: Option<Vec<u8>> = sessions::table
784                .select(sessions::record)
785                .filter(sessions::address.eq(address))
786                .filter(sessions::device_id.eq(device_id))
787                .first(&mut conn)
788                .optional()
789                .map_err(|e| StoreError::Database(e.to_string()))?;
790            Ok(res)
791        })
792        .await
793        .map_err(|e| StoreError::Database(e.to_string()))?
794    }
795
796    pub async fn put_session_for_device(
797        &self,
798        address: &str,
799        session: &[u8],
800        device_id: i32,
801    ) -> Result<()> {
802        let pool = self.pool.clone();
803        let address = address.to_string();
804        let session = session.to_vec();
805        tokio::task::spawn_blocking(move || -> Result<()> {
806            let mut conn = pool
807                .get()
808                .map_err(|e| StoreError::Connection(e.to_string()))?;
809            diesel::insert_into(sessions::table)
810                .values((
811                    sessions::address.eq(address),
812                    sessions::record.eq(&session),
813                    sessions::device_id.eq(device_id),
814                ))
815                .on_conflict((sessions::address, sessions::device_id))
816                .do_update()
817                .set(sessions::record.eq(&session))
818                .execute(&mut conn)
819                .map_err(|e| StoreError::Database(e.to_string()))?;
820            Ok(())
821        })
822        .await
823        .map_err(|e| StoreError::Database(e.to_string()))??;
824        Ok(())
825    }
826
827    pub async fn delete_session_for_device(&self, address: &str, device_id: i32) -> Result<()> {
828        let pool = self.pool.clone();
829        let address = address.to_string();
830        tokio::task::spawn_blocking(move || -> Result<()> {
831            let mut conn = pool
832                .get()
833                .map_err(|e| StoreError::Connection(e.to_string()))?;
834            diesel::delete(
835                sessions::table
836                    .filter(sessions::address.eq(address))
837                    .filter(sessions::device_id.eq(device_id)),
838            )
839            .execute(&mut conn)
840            .map_err(|e| StoreError::Database(e.to_string()))?;
841            Ok(())
842        })
843        .await
844        .map_err(|e| StoreError::Database(e.to_string()))??;
845        Ok(())
846    }
847
848    pub async fn has_session_for_device(&self, address: &str, device_id: i32) -> Result<bool> {
849        Ok(self
850            .get_session_for_device(address, device_id)
851            .await?
852            .is_some())
853    }
854
855    pub async fn put_sender_key_for_device(
856        &self,
857        address: &str,
858        record: &[u8],
859        device_id: i32,
860    ) -> Result<()> {
861        let pool = self.pool.clone();
862        let address = address.to_string();
863        let record_vec = record.to_vec();
864        tokio::task::spawn_blocking(move || -> Result<()> {
865            let mut conn = pool
866                .get()
867                .map_err(|e| StoreError::Connection(e.to_string()))?;
868            diesel::insert_into(sender_keys::table)
869                .values((
870                    sender_keys::address.eq(address),
871                    sender_keys::record.eq(&record_vec),
872                    sender_keys::device_id.eq(device_id),
873                ))
874                .on_conflict((sender_keys::address, sender_keys::device_id))
875                .do_update()
876                .set(sender_keys::record.eq(&record_vec))
877                .execute(&mut conn)
878                .map_err(|e| StoreError::Database(e.to_string()))?;
879            Ok(())
880        })
881        .await
882        .map_err(|e| StoreError::Database(e.to_string()))??;
883        Ok(())
884    }
885
886    pub async fn get_sender_key_for_device(
887        &self,
888        address: &str,
889        device_id: i32,
890    ) -> Result<Option<Vec<u8>>> {
891        let pool = self.pool.clone();
892        let address = address.to_string();
893        tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
894            let mut conn = pool
895                .get()
896                .map_err(|e| StoreError::Connection(e.to_string()))?;
897            let res: Option<Vec<u8>> = sender_keys::table
898                .select(sender_keys::record)
899                .filter(sender_keys::address.eq(address))
900                .filter(sender_keys::device_id.eq(device_id))
901                .first(&mut conn)
902                .optional()
903                .map_err(|e| StoreError::Database(e.to_string()))?;
904            Ok(res)
905        })
906        .await
907        .map_err(|e| StoreError::Database(e.to_string()))?
908    }
909
910    pub async fn delete_sender_key_for_device(&self, address: &str, device_id: i32) -> Result<()> {
911        let pool = self.pool.clone();
912        let address = address.to_string();
913        tokio::task::spawn_blocking(move || -> Result<()> {
914            let mut conn = pool
915                .get()
916                .map_err(|e| StoreError::Connection(e.to_string()))?;
917            diesel::delete(
918                sender_keys::table
919                    .filter(sender_keys::address.eq(address))
920                    .filter(sender_keys::device_id.eq(device_id)),
921            )
922            .execute(&mut conn)
923            .map_err(|e| StoreError::Database(e.to_string()))?;
924            Ok(())
925        })
926        .await
927        .map_err(|e| StoreError::Database(e.to_string()))??;
928        Ok(())
929    }
930
931    pub async fn get_app_state_sync_key_for_device(
932        &self,
933        key_id: &[u8],
934        device_id: i32,
935    ) -> Result<Option<AppStateSyncKey>> {
936        let pool = self.pool.clone();
937        let key_id = key_id.to_vec();
938        let res: Option<Vec<u8>> =
939            tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
940                let mut conn = pool
941                    .get()
942                    .map_err(|e| StoreError::Connection(e.to_string()))?;
943                let res: Option<Vec<u8>> = app_state_keys::table
944                    .select(app_state_keys::key_data)
945                    .filter(app_state_keys::key_id.eq(&key_id))
946                    .filter(app_state_keys::device_id.eq(device_id))
947                    .first(&mut conn)
948                    .optional()
949                    .map_err(|e| StoreError::Database(e.to_string()))?;
950                Ok(res)
951            })
952            .await
953            .map_err(|e| StoreError::Database(e.to_string()))??;
954
955        if let Some(data) = res {
956            let (key, _) = bincode::serde::decode_from_slice(&data, bincode::config::standard())
957                .map_err(|e| StoreError::Serialization(e.to_string()))?;
958            Ok(Some(key))
959        } else {
960            Ok(None)
961        }
962    }
963
964    pub async fn set_app_state_sync_key_for_device(
965        &self,
966        key_id: &[u8],
967        key: AppStateSyncKey,
968        device_id: i32,
969    ) -> Result<()> {
970        let pool = self.pool.clone();
971        let key_id = key_id.to_vec();
972        let data = bincode::serde::encode_to_vec(&key, bincode::config::standard())
973            .map_err(|e| StoreError::Serialization(e.to_string()))?;
974        tokio::task::spawn_blocking(move || -> Result<()> {
975            let mut conn = pool
976                .get()
977                .map_err(|e| StoreError::Connection(e.to_string()))?;
978            diesel::insert_into(app_state_keys::table)
979                .values((
980                    app_state_keys::key_id.eq(&key_id),
981                    app_state_keys::key_data.eq(&data),
982                    app_state_keys::device_id.eq(device_id),
983                ))
984                .on_conflict((app_state_keys::key_id, app_state_keys::device_id))
985                .do_update()
986                .set(app_state_keys::key_data.eq(&data))
987                .execute(&mut conn)
988                .map_err(|e| StoreError::Database(e.to_string()))?;
989            Ok(())
990        })
991        .await
992        .map_err(|e| StoreError::Database(e.to_string()))??;
993        Ok(())
994    }
995
996    pub async fn get_app_state_version_for_device(
997        &self,
998        name: &str,
999        device_id: i32,
1000    ) -> Result<HashState> {
1001        let pool = self.pool.clone();
1002        let name = name.to_string();
1003        let res: Option<Vec<u8>> =
1004            tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
1005                let mut conn = pool
1006                    .get()
1007                    .map_err(|e| StoreError::Connection(e.to_string()))?;
1008                let res: Option<Vec<u8>> = app_state_versions::table
1009                    .select(app_state_versions::state_data)
1010                    .filter(app_state_versions::name.eq(name))
1011                    .filter(app_state_versions::device_id.eq(device_id))
1012                    .first(&mut conn)
1013                    .optional()
1014                    .map_err(|e| StoreError::Database(e.to_string()))?;
1015                Ok(res)
1016            })
1017            .await
1018            .map_err(|e| StoreError::Database(e.to_string()))??;
1019
1020        if let Some(data) = res {
1021            let (state, _) = bincode::serde::decode_from_slice(&data, bincode::config::standard())
1022                .map_err(|e| StoreError::Serialization(e.to_string()))?;
1023            Ok(state)
1024        } else {
1025            Ok(HashState::default())
1026        }
1027    }
1028
1029    pub async fn set_app_state_version_for_device(
1030        &self,
1031        name: &str,
1032        state: HashState,
1033        device_id: i32,
1034    ) -> Result<()> {
1035        let pool = self.pool.clone();
1036        let name = name.to_string();
1037        let data = bincode::serde::encode_to_vec(&state, bincode::config::standard())
1038            .map_err(|e| StoreError::Serialization(e.to_string()))?;
1039        tokio::task::spawn_blocking(move || -> Result<()> {
1040            let mut conn = pool
1041                .get()
1042                .map_err(|e| StoreError::Connection(e.to_string()))?;
1043            diesel::insert_into(app_state_versions::table)
1044                .values((
1045                    app_state_versions::name.eq(&name),
1046                    app_state_versions::state_data.eq(&data),
1047                    app_state_versions::device_id.eq(device_id),
1048                ))
1049                .on_conflict((app_state_versions::name, app_state_versions::device_id))
1050                .do_update()
1051                .set(app_state_versions::state_data.eq(&data))
1052                .execute(&mut conn)
1053                .map_err(|e| StoreError::Database(e.to_string()))?;
1054            Ok(())
1055        })
1056        .await
1057        .map_err(|e| StoreError::Database(e.to_string()))??;
1058        Ok(())
1059    }
1060
1061    pub async fn put_app_state_mutation_macs_for_device(
1062        &self,
1063        name: &str,
1064        version: u64,
1065        mutations: &[AppStateMutationMAC],
1066        device_id: i32,
1067    ) -> Result<()> {
1068        if mutations.is_empty() {
1069            return Ok(());
1070        }
1071        let pool = self.pool.clone();
1072        let name = name.to_string();
1073        let mutations: Vec<AppStateMutationMAC> = mutations.to_vec();
1074        tokio::task::spawn_blocking(move || -> Result<()> {
1075            let mut conn = pool
1076                .get()
1077                .map_err(|e| StoreError::Connection(e.to_string()))?;
1078            for m in mutations {
1079                diesel::insert_into(app_state_mutation_macs::table)
1080                    .values((
1081                        app_state_mutation_macs::name.eq(&name),
1082                        app_state_mutation_macs::version.eq(version as i64),
1083                        app_state_mutation_macs::index_mac.eq(&m.index_mac),
1084                        app_state_mutation_macs::value_mac.eq(&m.value_mac),
1085                        app_state_mutation_macs::device_id.eq(device_id),
1086                    ))
1087                    .on_conflict((
1088                        app_state_mutation_macs::name,
1089                        app_state_mutation_macs::index_mac,
1090                        app_state_mutation_macs::device_id,
1091                    ))
1092                    .do_update()
1093                    .set((
1094                        app_state_mutation_macs::version.eq(version as i64),
1095                        app_state_mutation_macs::value_mac.eq(&m.value_mac),
1096                    ))
1097                    .execute(&mut conn)
1098                    .map_err(|e| StoreError::Database(e.to_string()))?;
1099            }
1100            Ok(())
1101        })
1102        .await
1103        .map_err(|e| StoreError::Database(e.to_string()))??;
1104        Ok(())
1105    }
1106
1107    pub async fn delete_app_state_mutation_macs_for_device(
1108        &self,
1109        name: &str,
1110        index_macs: &[Vec<u8>],
1111        device_id: i32,
1112    ) -> Result<()> {
1113        if index_macs.is_empty() {
1114            return Ok(());
1115        }
1116        let pool = self.pool.clone();
1117        let name = name.to_string();
1118        let index_macs: Vec<Vec<u8>> = index_macs.to_vec();
1119        tokio::task::spawn_blocking(move || -> Result<()> {
1120            let mut conn = pool
1121                .get()
1122                .map_err(|e| StoreError::Connection(e.to_string()))?;
1123            for idx in index_macs {
1124                diesel::delete(
1125                    app_state_mutation_macs::table.filter(
1126                        app_state_mutation_macs::name
1127                            .eq(&name)
1128                            .and(app_state_mutation_macs::index_mac.eq(&idx))
1129                            .and(app_state_mutation_macs::device_id.eq(device_id)),
1130                    ),
1131                )
1132                .execute(&mut conn)
1133                .map_err(|e| StoreError::Database(e.to_string()))?;
1134            }
1135            Ok(())
1136        })
1137        .await
1138        .map_err(|e| StoreError::Database(e.to_string()))??;
1139        Ok(())
1140    }
1141
1142    pub async fn get_app_state_mutation_mac_for_device(
1143        &self,
1144        name: &str,
1145        index_mac: &[u8],
1146        device_id: i32,
1147    ) -> Result<Option<Vec<u8>>> {
1148        let pool = self.pool.clone();
1149        let name = name.to_string();
1150        let index_mac = index_mac.to_vec();
1151        tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
1152            let mut conn = pool
1153                .get()
1154                .map_err(|e| StoreError::Connection(e.to_string()))?;
1155            let res: Option<Vec<u8>> = app_state_mutation_macs::table
1156                .select(app_state_mutation_macs::value_mac)
1157                .filter(app_state_mutation_macs::name.eq(&name))
1158                .filter(app_state_mutation_macs::index_mac.eq(&index_mac))
1159                .filter(app_state_mutation_macs::device_id.eq(device_id))
1160                .first(&mut conn)
1161                .optional()
1162                .map_err(|e| StoreError::Database(e.to_string()))?;
1163            Ok(res)
1164        })
1165        .await
1166        .map_err(|e| StoreError::Database(e.to_string()))?
1167    }
1168}
1169
1170#[async_trait]
1171impl IdentityStore for SqliteStore {
1172    async fn put_identity(&self, address: &str, key: [u8; 32]) -> Result<()> {
1173        self.put_identity_for_device(address, key, 1).await
1174    }
1175
1176    async fn delete_identity(&self, address: &str) -> Result<()> {
1177        self.delete_identity_for_device(address, 1).await
1178    }
1179
1180    async fn is_trusted_identity(
1181        &self,
1182        address: &str,
1183        key: &[u8; 32],
1184        _direction: Direction,
1185    ) -> Result<bool> {
1186        match self.load_identity(address).await? {
1187            Some(stored_key) => Ok(stored_key.as_slice() == key),
1188            None => Ok(true),
1189        }
1190    }
1191
1192    async fn load_identity(&self, address: &str) -> Result<Option<Vec<u8>>> {
1193        self.load_identity_for_device(address, 1).await
1194    }
1195}
1196
1197#[async_trait]
1198impl SessionStore for SqliteStore {
1199    async fn get_session(&self, address: &str) -> Result<Option<Vec<u8>>> {
1200        self.get_session_for_device(address, 1).await
1201    }
1202
1203    async fn put_session(&self, address: &str, session: &[u8]) -> Result<()> {
1204        self.put_session_for_device(address, session, 1).await
1205    }
1206
1207    async fn delete_session(&self, address: &str) -> Result<()> {
1208        self.delete_session_for_device(address, 1).await
1209    }
1210
1211    async fn has_session(&self, address: &str) -> Result<bool> {
1212        self.has_session_for_device(address, 1).await
1213    }
1214}
1215
1216#[async_trait]
1217impl libsignal::store::PreKeyStore for SqliteStore {
1218    async fn load_prekey(
1219        &self,
1220        prekey_id: u32,
1221    ) -> std::result::Result<Option<PreKeyRecordStructure>, SignalStoreError> {
1222        let pool = self.pool.clone();
1223        let result: Option<Vec<u8>> =
1224            tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
1225                let mut conn = pool
1226                    .get()
1227                    .map_err(|e| StoreError::Connection(e.to_string()))?;
1228                let res: Option<Vec<u8>> = prekeys::table
1229                    .select(prekeys::key)
1230                    .filter(prekeys::id.eq(prekey_id as i32))
1231                    .filter(prekeys::device_id.eq(1))
1232                    .first(&mut conn)
1233                    .optional()
1234                    .map_err(|e| StoreError::Database(e.to_string()))?;
1235                Ok(res)
1236            })
1237            .await
1238            .map_err(|e| StoreError::Database(e.to_string()))??;
1239
1240        if let Some(key_data) = result {
1241            if let Ok(private_key) = PrivateKey::deserialize(&key_data) {
1242                if let Ok(public_key) = private_key.public_key() {
1243                    let key_pair = KeyPair::new(public_key, private_key);
1244                    let record = wacore::libsignal::store::record_helpers::new_pre_key_record(
1245                        prekey_id, &key_pair,
1246                    );
1247                    Ok(Some(record))
1248                } else {
1249                    Ok(None)
1250                }
1251            } else {
1252                Ok(None)
1253            }
1254        } else {
1255            Ok(None)
1256        }
1257    }
1258
1259    async fn store_prekey(
1260        &self,
1261        prekey_id: u32,
1262        record: PreKeyRecordStructure,
1263        uploaded: bool,
1264    ) -> std::result::Result<(), SignalStoreError> {
1265        let pool = self.pool.clone();
1266        let private_key_bytes = record.private_key.unwrap_or_default();
1267        tokio::task::spawn_blocking(move || -> Result<()> {
1268            let mut conn = pool
1269                .get()
1270                .map_err(|e| StoreError::Connection(e.to_string()))?;
1271            diesel::insert_into(prekeys::table)
1272                .values((
1273                    prekeys::id.eq(prekey_id as i32),
1274                    prekeys::key.eq(&private_key_bytes),
1275                    prekeys::uploaded.eq(uploaded),
1276                    prekeys::device_id.eq(1),
1277                ))
1278                .on_conflict((prekeys::id, prekeys::device_id))
1279                .do_update()
1280                .set((
1281                    prekeys::key.eq(&private_key_bytes),
1282                    prekeys::uploaded.eq(uploaded),
1283                ))
1284                .execute(&mut conn)
1285                .map_err(|e| StoreError::Database(e.to_string()))?;
1286            Ok(())
1287        })
1288        .await
1289        .map_err(|e| StoreError::Database(e.to_string()))??;
1290        Ok(())
1291    }
1292
1293    async fn contains_prekey(&self, prekey_id: u32) -> std::result::Result<bool, SignalStoreError> {
1294        let pool = self.pool.clone();
1295        let count: i64 = tokio::task::spawn_blocking(move || -> Result<i64> {
1296            let mut conn = pool
1297                .get()
1298                .map_err(|e| StoreError::Connection(e.to_string()))?;
1299            let cnt: i64 = prekeys::table
1300                .filter(prekeys::id.eq(prekey_id as i32))
1301                .filter(prekeys::device_id.eq(1))
1302                .count()
1303                .get_result(&mut conn)
1304                .map_err(|e| StoreError::Database(e.to_string()))?;
1305            Ok(cnt)
1306        })
1307        .await
1308        .map_err(|e| StoreError::Database(e.to_string()))??;
1309        Ok(count > 0)
1310    }
1311
1312    async fn remove_prekey(&self, prekey_id: u32) -> std::result::Result<(), SignalStoreError> {
1313        let pool = self.pool.clone();
1314        tokio::task::spawn_blocking(move || -> Result<()> {
1315            let mut conn = pool
1316                .get()
1317                .map_err(|e| StoreError::Connection(e.to_string()))?;
1318            diesel::delete(
1319                prekeys::table
1320                    .filter(prekeys::id.eq(prekey_id as i32))
1321                    .filter(prekeys::device_id.eq(1)),
1322            )
1323            .execute(&mut conn)
1324            .map_err(|e| StoreError::Database(e.to_string()))?;
1325            Ok(())
1326        })
1327        .await
1328        .map_err(|e| StoreError::Database(e.to_string()))??;
1329        Ok(())
1330    }
1331}
1332
1333#[async_trait]
1334impl SenderKeyStoreHelper for SqliteStore {
1335    async fn put_sender_key(&self, address: &str, record: &[u8]) -> Result<()> {
1336        self.put_sender_key_for_device(address, record, 1).await
1337    }
1338
1339    async fn get_sender_key(&self, address: &str) -> Result<Option<Vec<u8>>> {
1340        self.get_sender_key_for_device(address, 1).await
1341    }
1342
1343    async fn delete_sender_key(&self, address: &str) -> Result<()> {
1344        self.delete_sender_key_for_device(address, 1).await
1345    }
1346}
1347
1348#[async_trait]
1349impl libsignal::store::SignedPreKeyStore for SqliteStore {
1350    async fn load_signed_prekey(
1351        &self,
1352        signed_prekey_id: u32,
1353    ) -> std::result::Result<Option<SignedPreKeyRecordStructure>, SignalStoreError> {
1354        let pool = self.pool.clone();
1355        let result: Option<Vec<u8>> =
1356            tokio::task::spawn_blocking(move || -> Result<Option<Vec<u8>>> {
1357                let mut conn = pool
1358                    .get()
1359                    .map_err(|e| StoreError::Connection(e.to_string()))?;
1360                let res: Option<Vec<u8>> = signed_prekeys::table
1361                    .select(signed_prekeys::record)
1362                    .filter(signed_prekeys::id.eq(signed_prekey_id as i32))
1363                    .filter(signed_prekeys::device_id.eq(1))
1364                    .first(&mut conn)
1365                    .optional()
1366                    .map_err(|e| StoreError::Database(e.to_string()))?;
1367                Ok(res)
1368            })
1369            .await
1370            .map_err(|e| StoreError::Database(e.to_string()))??;
1371
1372        if let Some(data) = result {
1373            let record = SignedPreKeyRecordStructure::decode(data.as_slice())
1374                .map_err(|e| StoreError::Serialization(e.to_string()))?;
1375            Ok(Some(record))
1376        } else {
1377            Ok(None)
1378        }
1379    }
1380
1381    async fn load_signed_prekeys(
1382        &self,
1383    ) -> std::result::Result<Vec<SignedPreKeyRecordStructure>, SignalStoreError> {
1384        let mut conn = self.get_connection()?;
1385
1386        let results: Vec<Vec<u8>> = signed_prekeys::table
1387            .select(signed_prekeys::record)
1388            .filter(signed_prekeys::device_id.eq(1))
1389            .load(&mut conn)
1390            .map_err(|e| StoreError::Database(e.to_string()))?;
1391
1392        let mut records = Vec::new();
1393        for data in results {
1394            let record = SignedPreKeyRecordStructure::decode(data.as_slice())
1395                .map_err(|e| StoreError::Serialization(e.to_string()))?;
1396            records.push(record);
1397        }
1398
1399        Ok(records)
1400    }
1401
1402    async fn store_signed_prekey(
1403        &self,
1404        signed_prekey_id: u32,
1405        record: SignedPreKeyRecordStructure,
1406    ) -> std::result::Result<(), SignalStoreError> {
1407        let pool = self.pool.clone();
1408        let data = record.encode_to_vec();
1409        tokio::task::spawn_blocking(move || -> Result<()> {
1410            let mut conn = pool
1411                .get()
1412                .map_err(|e| StoreError::Connection(e.to_string()))?;
1413            diesel::insert_into(signed_prekeys::table)
1414                .values((
1415                    signed_prekeys::id.eq(signed_prekey_id as i32),
1416                    signed_prekeys::record.eq(&data),
1417                    signed_prekeys::device_id.eq(1),
1418                ))
1419                .on_conflict((signed_prekeys::id, signed_prekeys::device_id))
1420                .do_update()
1421                .set(signed_prekeys::record.eq(&data))
1422                .execute(&mut conn)
1423                .map_err(|e| StoreError::Database(e.to_string()))?;
1424            Ok(())
1425        })
1426        .await
1427        .map_err(|e| StoreError::Database(e.to_string()))??;
1428        Ok(())
1429    }
1430
1431    async fn contains_signed_prekey(
1432        &self,
1433        signed_prekey_id: u32,
1434    ) -> std::result::Result<bool, SignalStoreError> {
1435        let pool = self.pool.clone();
1436        let count: i64 = tokio::task::spawn_blocking(move || -> Result<i64> {
1437            let mut conn = pool
1438                .get()
1439                .map_err(|e| StoreError::Connection(e.to_string()))?;
1440            let cnt: i64 = signed_prekeys::table
1441                .filter(signed_prekeys::id.eq(signed_prekey_id as i32))
1442                .filter(signed_prekeys::device_id.eq(1))
1443                .count()
1444                .get_result(&mut conn)
1445                .map_err(|e| StoreError::Database(e.to_string()))?;
1446            Ok(cnt)
1447        })
1448        .await
1449        .map_err(|e| StoreError::Database(e.to_string()))??;
1450        Ok(count > 0)
1451    }
1452
1453    async fn remove_signed_prekey(
1454        &self,
1455        signed_prekey_id: u32,
1456    ) -> std::result::Result<(), SignalStoreError> {
1457        let pool = self.pool.clone();
1458        tokio::task::spawn_blocking(move || -> Result<()> {
1459            let mut conn = pool
1460                .get()
1461                .map_err(|e| StoreError::Connection(e.to_string()))?;
1462            diesel::delete(
1463                signed_prekeys::table
1464                    .filter(signed_prekeys::id.eq(signed_prekey_id as i32))
1465                    .filter(signed_prekeys::device_id.eq(1)),
1466            )
1467            .execute(&mut conn)
1468            .map_err(|e| StoreError::Database(e.to_string()))?;
1469            Ok(())
1470        })
1471        .await
1472        .map_err(|e| StoreError::Database(e.to_string()))??;
1473        Ok(())
1474    }
1475}
1476
1477#[async_trait]
1478impl AppStateKeyStore for SqliteStore {
1479    async fn get_app_state_sync_key(&self, key_id: &[u8]) -> Result<Option<AppStateSyncKey>> {
1480        self.get_app_state_sync_key_for_device(key_id, 1).await
1481    }
1482
1483    async fn set_app_state_sync_key(&self, key_id: &[u8], key: AppStateSyncKey) -> Result<()> {
1484        self.set_app_state_sync_key_for_device(key_id, key, 1).await
1485    }
1486}
1487
1488#[async_trait]
1489impl AppStateStore for SqliteStore {
1490    async fn get_app_state_version(&self, name: &str) -> Result<HashState> {
1491        self.get_app_state_version_for_device(name, 1).await
1492    }
1493
1494    async fn set_app_state_version(&self, name: &str, state: HashState) -> Result<()> {
1495        self.set_app_state_version_for_device(name, state, 1).await
1496    }
1497
1498    async fn put_app_state_mutation_macs(
1499        &self,
1500        name: &str,
1501        version: u64,
1502        mutations: &[AppStateMutationMAC],
1503    ) -> Result<()> {
1504        self.put_app_state_mutation_macs_for_device(name, version, mutations, 1)
1505            .await
1506    }
1507
1508    async fn delete_app_state_mutation_macs(
1509        &self,
1510        name: &str,
1511        index_macs: &[Vec<u8>],
1512    ) -> Result<()> {
1513        self.delete_app_state_mutation_macs_for_device(name, index_macs, 1)
1514            .await
1515    }
1516
1517    async fn get_app_state_mutation_mac(
1518        &self,
1519        name: &str,
1520        index_mac: &[u8],
1521    ) -> Result<Option<Vec<u8>>> {
1522        self.get_app_state_mutation_mac_for_device(name, index_mac, 1)
1523            .await
1524    }
1525}
1526
1527#[async_trait]
1528impl wacore::store::traits::DevicePersistence for SqliteStore {
1529    async fn save_device_data(
1530        &self,
1531        device_data: &wacore::store::Device,
1532    ) -> wacore::store::error::Result<()> {
1533        // Single-device mode always targets device_id = 1
1534        self.save_device_data_for_device(1, device_data).await
1535    }
1536
1537    async fn save_device_data_for_device(
1538        &self,
1539        device_id: i32,
1540        device_data: &wacore::store::Device,
1541    ) -> wacore::store::error::Result<()> {
1542        SqliteStore::save_device_data_for_device(self, device_id, device_data).await
1543    }
1544
1545    async fn load_device_data(
1546        &self,
1547    ) -> wacore::store::error::Result<Option<wacore::store::Device>> {
1548        // Single-device mode always targets device_id = 1
1549        self.load_device_data_for_device(1).await
1550    }
1551
1552    async fn load_device_data_for_device(
1553        &self,
1554        device_id: i32,
1555    ) -> wacore::store::error::Result<Option<wacore::store::Device>> {
1556        SqliteStore::load_device_data_for_device(self, device_id).await
1557    }
1558
1559    async fn device_exists(&self, device_id: i32) -> wacore::store::error::Result<bool> {
1560        SqliteStore::device_exists(self, device_id).await
1561    }
1562
1563    async fn create_new_device(&self) -> wacore::store::error::Result<i32> {
1564        SqliteStore::create_new_device(self).await
1565    }
1566}