1mod convert;
12mod migrations;
13mod query;
14
15
16use std::path::{Path, PathBuf};
17
18use anyhow::Context;
19use bitcoin::{Amount, Txid};
20use bitcoin::secp256k1::PublicKey;
21use chrono::DateTime;
22use lightning_invoice::Bolt11Invoice;
23use log::debug;
24use rusqlite::Connection;
25
26use ark::{Vtxo, VtxoId};
27use ark::lightning::{Invoice, PaymentHash, Preimage};
28use ark::vtxo::Full;
29use bitcoin_ext::BlockDelta;
30
31use crate::WalletProperties;
32use crate::actions::{WalletActionCheckpoint, WalletActionId};
33use crate::exit::ExitTxOrigin;
34use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem, PaymentMethod};
35use crate::persist::{BarkPersister, RoundStateId, StoredRoundState, Unlocked};
36use crate::persist::models::{
37 LightningReceive, LightningSend, PendingBoard, PendingOffboard, StoredExit,
38};
39use crate::round::RoundState;
40use crate::vtxo::{VtxoState, VtxoStateKind, WalletVtxo};
41
42#[derive(Clone)]
45pub struct SqliteClient {
46 connection_string: PathBuf,
47}
48
49impl SqliteClient {
50 pub fn open(db_file: impl AsRef<Path>) -> anyhow::Result<SqliteClient> {
52 let path = db_file.as_ref().to_path_buf();
53
54 debug!("Opening database at {}", path.display());
55 let mut conn = rusqlite::Connection::open(&path)
56 .with_context(|| format!("Error connecting to database {}", path.display()))?;
57
58 let migrations = migrations::MigrationContext::new();
59 migrations.do_all_migrations(&mut conn)?;
60
61 Ok( Self { connection_string: path })
62 }
63
64 fn connect(&self) -> anyhow::Result<Connection> {
65 rusqlite::Connection::open(&self.connection_string)
66 .with_context(|| format!("Error connecting to database {}", self.connection_string.display()))
67 }
68}
69
70#[async_trait]
71impl BarkPersister for SqliteClient {
72 async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
73 let mut conn = self.connect()?;
74 let tx = conn.transaction()?;
75
76 query::set_properties(&tx, properties)?;
77
78 tx.commit()?;
79 Ok(())
80 }
81
82 #[cfg(feature = "onchain-bdk")]
83 async fn initialize_bdk_wallet(&self) -> anyhow::Result<bdk_wallet::ChangeSet> {
84 let mut conn = self.connect()?;
85 Ok(bdk_wallet::WalletPersister::initialize(&mut conn)?)
86 }
87
88 #[cfg(feature = "onchain-bdk")]
89 async fn store_bdk_wallet_changeset(&self, changeset: &bdk_wallet::ChangeSet) -> anyhow::Result<()> {
90 let mut conn = self.connect()?;
91 bdk_wallet::WalletPersister::persist(&mut conn, changeset)?;
92 Ok(())
93 }
94
95 async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
96 let conn = self.connect()?;
97 Ok(query::fetch_properties(&conn)?)
98 }
99
100 async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
101 let conn = self.connect()?;
102 query::set_server_pubkey(&conn, &server_pubkey)?;
103 Ok(())
104 }
105
106 async fn set_server_mailbox_pubkey(&self, server_mailbox_pubkey: PublicKey) -> anyhow::Result<()> {
107 let conn = self.connect()?;
108 query::set_server_mailbox_pubkey(&conn, &server_mailbox_pubkey)?;
109 Ok(())
110 }
111
112 async fn create_new_movement(&self,
113 status: MovementStatus,
114 subsystem: &MovementSubsystem,
115 time: DateTime<chrono::Local>,
116 ) -> anyhow::Result<MovementId> {
117 let mut conn = self.connect()?;
118 let tx = conn.transaction()?;
119 let movement_id = query::create_new_movement(&tx, status, subsystem, time)?;
120 tx.commit()?;
121 Ok(movement_id)
122 }
123
124 async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
125 let mut conn = self.connect()?;
126 let tx = conn.transaction()?;
127 query::update_movement(&tx, movement)?;
128 tx.commit()?;
129 Ok(())
130 }
131
132 async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
133 let conn = self.connect()?;
134 query::get_movement_by_id(&conn, movement_id)
135 }
136
137 async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
138 let conn = self.connect()?;
139 query::get_all_movements(&conn)
140 }
141
142 async fn get_movements_by_payment_method(
143 &self,
144 payment_method: &PaymentMethod,
145 ) -> anyhow::Result<Vec<Movement>> {
146 let conn = self.connect()?;
147 query::get_movements_by_payment_method(&conn, payment_method)
148 }
149
150 async fn store_pending_board(
151 &self,
152 vtxo: &Vtxo<Full>,
153 funding_tx: &bitcoin::Transaction,
154 movement_id: MovementId,
155 ) -> anyhow::Result<()> {
156 let mut conn = self.connect()?;
157 let tx = conn.transaction()?;
158 query::store_new_pending_board(&tx, vtxo, funding_tx, movement_id)?;
159 tx.commit()?;
160 Ok(())
161 }
162
163 async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
164 let mut conn = self.connect()?;
165 let tx = conn.transaction()?;
166 query::remove_pending_board(&tx, vtxo_id)?;
167 tx.commit()?;
168 Ok(())
169 }
170
171 async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
172 let conn = self.connect()?;
173 query::get_all_pending_boards_ids(&conn)
174 }
175
176 async fn get_pending_board_by_vtxo_id(&self, vtxo_id: VtxoId) -> anyhow::Result<Option<PendingBoard>> {
177 let conn = self.connect()?;
178 query::get_pending_board_by_vtxo_id(&conn, vtxo_id)
179 }
180
181 async fn store_round_state_lock_vtxos(&self, round_state: &RoundState) -> anyhow::Result<RoundStateId> {
182 let mut conn = self.connect()?;
183 let tx = conn.transaction()?;
184 for vtxo in round_state.participation().inputs.iter() {
185 query::update_vtxo_state_checked(
186 &*tx,
187 vtxo.id(),
188 VtxoState::Locked {
189 holder: round_state.movement_id
190 .map(|id| crate::vtxo::VtxoLockHolder::Movement { id }),
191 },
192 &[VtxoStateKind::Spendable],
193 )?;
194 }
195 let id = query::store_round_state(&tx, round_state)?;
196 tx.commit()?;
197 Ok(id)
198 }
199
200 async fn update_round_state(&self, state: &StoredRoundState) -> anyhow::Result<()> {
201 let conn = self.connect()?;
202 query::update_round_state(&conn, state)?;
203 Ok(())
204 }
205
206 async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
207 let conn = self.connect()?;
208 query::remove_round_state(&conn, round_state.id())?;
209 Ok(())
210 }
211
212 async fn get_round_state_by_id(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
213 let conn = self.connect()?;
214 query::get_round_state_by_id(&conn, id)
215 }
216
217 async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
218 let conn = self.connect()?;
219 query::get_pending_round_state_ids(&conn)
220 }
221
222 async fn store_vtxos(
223 &self,
224 vtxos: &[(&Vtxo<Full>, &VtxoState)],
225 ) -> anyhow::Result<()> {
226 let mut conn = self.connect()?;
227 let tx = conn.transaction()?;
228
229 for (vtxo, state) in vtxos {
230 query::store_vtxo_with_initial_state(&tx, vtxo, state)?;
231 }
232 tx.commit()?;
233 Ok(())
234 }
235
236 async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
237 let conn = self.connect()?;
238 query::get_wallet_vtxo_by_id(&conn, id)
239 }
240
241 async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
242 let conn = self.connect()?;
243 query::get_all_vtxos(&conn)
244 }
245
246 async fn get_vtxos_by_state(&self, state: &[VtxoStateKind]) -> anyhow::Result<Vec<WalletVtxo>> {
248 let conn = self.connect()?;
249 query::get_vtxos_by_state(&conn, state)
250 }
251
252 async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
253 let conn = self.connect()?;
254 let state : Option<VtxoState> = query::get_vtxo_state(&conn, id)?;
255 let result = state.map(|s| s == VtxoState::Spent).unwrap_or(false);
256 Ok(result)
257 }
258
259 async fn get_full_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
260 let conn = self.connect()?;
261 query::get_full_vtxo_by_id(&conn, id)
262 }
263
264 async fn get_full_vtxos(&self, ids: &[VtxoId]) -> anyhow::Result<Vec<Vtxo<Full>>> {
265 let conn = self.connect()?;
266 query::get_full_vtxos_by_ids(&conn, ids)
267 }
268
269 async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
270 let mut conn = self.connect()?;
271 let tx = conn.transaction().context("Failed to start transaction")?;
272 let result = query::delete_vtxo(&tx, id);
273 tx.commit().context("Failed to commit transaction")?;
274 result
275 }
276
277 async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
278 let conn = self.connect()?;
279 query::store_vtxo_key(&conn, index, public_key)
280 }
281
282 async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
283 let conn = self.connect()?;
284 query::get_last_vtxo_key_index(&conn)
285 }
286
287 async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
288 let conn = self.connect()?;
289 query::get_public_key_idx(&conn, public_key)
290 }
291
292 async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
293 let conn = self.connect()?;
294 query::get_mailbox_checkpoint(&conn)
295 }
296
297 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
298 let conn = self.connect()?;
299 query::store_mailbox_checkpoint(&conn, checkpoint)?;
300 Ok(())
301 }
302
303 async fn store_lightning_receive(
305 &self,
306 payment_hash: PaymentHash,
307 preimage: Preimage,
308 invoice: &Bolt11Invoice,
309 htlc_recv_cltv_delta: BlockDelta,
310 ) -> anyhow::Result<()> {
311 let conn = self.connect()?;
312 query::store_lightning_receive(
313 &conn, payment_hash, preimage, invoice, htlc_recv_cltv_delta,
314 )?;
315 Ok(())
316 }
317
318 async fn store_new_pending_lightning_send(
319 &self,
320 invoice: &Invoice,
321 amount: Amount,
322 fee: Amount,
323 vtxos: &[VtxoId],
324 movement_id: MovementId,
325 ) -> anyhow::Result<LightningSend> {
326 let conn = self.connect()?;
327 query::store_new_pending_lightning_send(&conn, invoice, amount, fee, vtxos, movement_id)
328 }
329
330 async fn get_all_pending_lightning_send(&self) -> anyhow::Result<Vec<LightningSend>> {
331 let conn = self.connect()?;
332 query::get_all_pending_lightning_send(&conn)
333 }
334
335 async fn finish_lightning_send(
336 &self,
337 payment_hash: PaymentHash,
338 preimage: Option<Preimage>,
339 ) -> anyhow::Result<()> {
340 let conn = self.connect()?;
341 query::finish_lightning_send(&conn, payment_hash, preimage)
342 }
343
344 async fn remove_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
345 let conn = self.connect()?;
346 query::remove_lightning_send(&conn, payment_hash)?;
347 Ok(())
348 }
349
350 async fn get_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<Option<LightningSend>> {
351 let conn = self.connect()?;
352 query::get_lightning_send(&conn, payment_hash)
353 }
354
355 async fn upsert_wallet_action_checkpoint(
356 &self,
357 id: &WalletActionId,
358 checkpoint: &WalletActionCheckpoint,
359 ) -> anyhow::Result<()> {
360 let conn = self.connect()?;
361 query::upsert_wallet_action_checkpoint(&conn, id, checkpoint)
362 }
363
364 async fn get_wallet_action_checkpoint(
365 &self,
366 id: &WalletActionId,
367 ) -> anyhow::Result<Option<WalletActionCheckpoint>> {
368 let conn = self.connect()?;
369 query::get_wallet_action_checkpoint(&conn, id)
370 }
371
372 async fn get_all_wallet_action_checkpoints(
373 &self,
374 ) -> anyhow::Result<Vec<WalletActionCheckpoint>> {
375 let conn = self.connect()?;
376 query::get_all_wallet_action_checkpoints(&conn)
377 }
378
379 async fn remove_wallet_action_checkpoint(
380 &self,
381 id: &WalletActionId,
382 ) -> anyhow::Result<()> {
383 let conn = self.connect()?;
384 query::remove_wallet_action_checkpoint(&conn, id)
385 }
386
387 async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
388 let conn = self.connect()?;
389 query::get_all_pending_lightning_receives(&conn)
390 }
391
392 async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
393 let conn = self.connect()?;
394 query::set_preimage_revealed(&conn, payment_hash)?;
395 Ok(())
396 }
397
398 async fn update_lightning_receive(
399 &self,
400 payment_hash: PaymentHash,
401 htlc_vtxo_ids: &[VtxoId],
402 movement_id: MovementId,
403 ) -> anyhow::Result<()> {
404 let conn = self.connect()?;
405 query::update_lightning_receive(&conn, payment_hash, htlc_vtxo_ids, movement_id)?;
406 Ok(())
407 }
408
409 async fn fetch_lightning_receive_by_payment_hash(
411 &self,
412 payment_hash: PaymentHash,
413 ) -> anyhow::Result<Option<LightningReceive>> {
414 let conn = self.connect()?;
415 query::fetch_lightning_receive_by_payment_hash(&conn, payment_hash)
416 }
417
418 async fn finish_pending_lightning_receive(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
419 let conn = self.connect()?;
420 query::finish_pending_lightning_receive(&conn, payment_hash)?;
421 Ok(())
422 }
423
424 async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
425 let mut conn = self.connect()?;
426 let tx = conn.transaction()?;
427 query::store_exit_vtxo_entry(&tx, exit)?;
428 tx.commit()?;
429 Ok(())
430 }
431
432 async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
433 let mut conn = self.connect()?;
434 let tx = conn.transaction()?;
435 query::remove_exit_vtxo_entry(&tx, &id)?;
436 tx.commit()?;
437 Ok(())
438 }
439
440 async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
441 let conn = self.connect()?;
442 query::get_exit_vtxo_entries(&conn)
443 }
444
445 async fn store_exit_child_tx(
446 &self,
447 exit_txid: Txid,
448 child_tx: &bitcoin::Transaction,
449 origin: ExitTxOrigin,
450 ) -> anyhow::Result<()> {
451 let mut conn = self.connect()?;
452 let tx = conn.transaction()?;
453 query::store_exit_child_tx(&tx, exit_txid, child_tx, origin)?;
454 tx.commit()?;
455 Ok(())
456 }
457
458 async fn get_exit_child_tx(
459 &self,
460 exit_txid: Txid,
461 ) -> anyhow::Result<Option<(bitcoin::Transaction, ExitTxOrigin)>> {
462 let conn = self.connect()?;
463 query::get_exit_child_tx(&conn, exit_txid)
464 }
465
466 async fn update_vtxo_state_checked(
467 &self,
468 vtxo_id: VtxoId,
469 new_state: VtxoState,
470 allowed_old_states: &[VtxoStateKind]
471 ) -> anyhow::Result<WalletVtxo> {
472 let conn = self.connect()?;
473 query::update_vtxo_state_checked(&conn, vtxo_id, new_state, allowed_old_states)
474 }
475
476 async fn store_pending_offboard(
477 &self,
478 pending: &PendingOffboard,
479 ) -> anyhow::Result<()> {
480 let mut conn = self.connect()?;
481 let tx = conn.transaction()?;
482 query::store_pending_offboard(&tx, pending)?;
483 tx.commit()?;
484 Ok(())
485 }
486
487 async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
488 let conn = self.connect()?;
489 query::get_all_pending_offboards(&conn)
490 }
491
492 async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
493 let mut conn = self.connect()?;
494 let tx = conn.transaction()?;
495 query::remove_pending_offboard(&tx, movement_id)?;
496 tx.commit()?;
497 Ok(())
498 }
499}
500
501#[cfg(any(test, doc))]
502pub mod helpers {
503 use std::path::PathBuf;
504 use std::str::FromStr;
505
506 use rusqlite::Connection;
507
508 #[cfg(any(test, feature = "rand"))]
515 pub fn in_memory_db() -> (PathBuf, Connection) {
516 use rand::{distr, RngExt};
517
518 let mut rng = rand::rng();
524 let filename: String = (&mut rng).sample_iter(distr::Alphanumeric)
525 .take(16).map(char::from).collect();
526
527 let connection_string = format!("file:{}?mode=memory&cache=shared", filename);
528 let pathbuf = PathBuf::from_str(&connection_string).unwrap();
529
530 let conn = Connection::open(pathbuf.clone()).unwrap();
531 (pathbuf.clone(), conn)
532 }
533}
534
535#[cfg(test)]
536mod test {
537 use ark::ProtocolEncoding;
538 use ark::test_util::VTXO_VECTORS;
539
540 use crate::{persist::sqlite::helpers::in_memory_db, vtxo::VtxoState};
541
542 use super::*;
543
544 #[tokio::test]
545 async fn test_add_and_retrieve_vtxos() {
546 let vtxo_1 = &VTXO_VECTORS.board_vtxo;
547 let vtxo_2 = &VTXO_VECTORS.arkoor_htlc_out_vtxo;
548 let vtxo_3 = &VTXO_VECTORS.round2_vtxo;
549
550 let (cs, conn) = in_memory_db();
551 let db = SqliteClient::open(cs).unwrap();
552
553 db.store_vtxos(&[
554 (vtxo_1, &VtxoState::Spendable), (vtxo_2, &VtxoState::Spendable)
555 ]).await.unwrap();
556
557 let vtxo_1_db = db.get_wallet_vtxo(vtxo_1.id()).await.expect("No error").expect("A vtxo was found");
560 assert_eq!(vtxo_1_db.vtxo, vtxo_1.to_bare());
561
562 let vtxo_1_full = db.get_full_vtxo(vtxo_1.id()).await.unwrap().unwrap();
564 assert_eq!(vtxo_1_full.serialize(), vtxo_1.serialize());
565
566 assert!(db.get_wallet_vtxo(vtxo_3.id()).await.expect("No error").is_none());
568
569 let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
571 assert_eq!(vtxos.len(), 2);
572 assert!(vtxos.iter().any(|v| v.vtxo == vtxo_1.to_bare()));
573 assert!(vtxos.iter().any(|v| v.vtxo == vtxo_2.to_bare()));
574 assert!(!vtxos.iter().any(|v| v.vtxo == vtxo_3.to_bare()));
575
576 db.update_vtxo_state_checked(
578 vtxo_1.id(), VtxoState::Spent, &VtxoStateKind::UNSPENT_STATES,
579 ).await.unwrap();
580
581 let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
582 assert_eq!(vtxos.len(), 1);
583
584 db.store_vtxos(&[(vtxo_3, &VtxoState::Spendable)]).await.unwrap();
586
587 let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
588 assert_eq!(vtxos.len(), 2);
589 assert!(vtxos.iter().any(|v| v.vtxo == vtxo_2.to_bare()));
590 assert!(vtxos.iter().any(|v| v.vtxo == vtxo_3.to_bare()));
591
592 conn.close().unwrap();
593 }
594
595 #[tokio::test]
596 #[cfg(feature = "onchain-bdk")]
597 async fn test_create_wallet_then_load() {
598 use bdk_wallet::chain::DescriptorExt;
599
600 let (connection_string, conn) = in_memory_db();
601
602 let db = SqliteClient::open(connection_string).unwrap();
603 let network = bitcoin::Network::Testnet;
604
605 let seed = bip39::Mnemonic::generate(12).unwrap().to_seed("");
606 let xpriv = bitcoin::bip32::Xpriv::new_master(network, &seed).unwrap();
607
608 let desc = format!("tr({}/84'/0'/0'/*)", xpriv);
609
610 let _ = db.initialize_bdk_wallet().await.unwrap();
612 let mut created = bdk_wallet::Wallet::create_single(desc.clone())
613 .network(network)
614 .create_wallet_no_persist()
615 .unwrap();
616 db.store_bdk_wallet_changeset(&created.take_staged().unwrap()).await.unwrap();
617
618 let loaded = {
619 let changeset = db.initialize_bdk_wallet().await.unwrap();
620 bdk_wallet::Wallet::load()
621 .descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
622 .extract_keys()
623 .check_network(network)
624 .load_wallet_no_persist(changeset)
625 .unwrap()
626 };
627
628 assert!(loaded.is_some());
629 assert_eq!(
630 created.public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id(),
631 loaded.unwrap().public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id()
632 );
633
634 conn.close().unwrap();
637 }
638
639 #[tokio::test]
640 async fn differential_bark_persister_suite() {
641 let (cs, _conn) = helpers::in_memory_db();
642 let sqlite = SqliteClient::open(cs).unwrap();
643 let memory = crate::persist::adaptor::StorageAdaptorWrapper::new(
644 crate::persist::adaptor::memory::MemoryStorageAdaptor::new(),
645 );
646 crate::persist::test_suite::run_all(&sqlite, &memory).await;
647 }
648}