1mod convert;
12mod migrations;
13mod query;
14
15
16use std::path::{Path, PathBuf};
17
18use anyhow::Context;
19use bitcoin::Txid;
20use bitcoin::secp256k1::PublicKey;
21use chrono::DateTime;
22use lightning_invoice::Bolt11Invoice;
23use log::debug;
24use rusqlite::Connection;
25
26use ark::{Vtxo, VtxoId};
27use ark::lightning::{PaymentHash, Preimage};
28use ark::vtxo::Full;
29use bitcoin_ext::BlockDelta;
30
31use crate::WalletProperties;
32use crate::actions::{WalletActionCheckpoint, WalletActionId};
33use crate::exit::ExitTxOrigin;
34use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem, PaymentMethod};
35use crate::persist::{BarkPersister, RoundStateId, StoredRoundState, Unlocked};
36use crate::persist::models::{
37 LightningReceive, PaidInvoice, PendingBoard, PendingOffboard, StoredExit,
38};
39use crate::round::RoundState;
40use crate::vtxo::{VtxoState, VtxoStateKind, WalletVtxo};
41
42
43pub const DEFAULT_DB_FILE: &str = "db.sqlite";
45
46#[derive(Debug, Clone)]
49pub struct SqliteClient {
50 connection_string: PathBuf,
51}
52
53impl SqliteClient {
54 pub fn open(db_file: impl AsRef<Path>) -> anyhow::Result<SqliteClient> {
56 let path = db_file.as_ref().to_path_buf();
57
58 debug!("Opening database at {}", path.display());
59 let mut conn = rusqlite::Connection::open(&path)
60 .with_context(|| format!("Error connecting to database {}", path.display()))?;
61
62 let migrations = migrations::MigrationContext::new();
63 migrations.do_all_migrations(&mut conn)?;
64
65 Ok( Self { connection_string: path })
66 }
67
68 fn connect(&self) -> anyhow::Result<Connection> {
69 rusqlite::Connection::open(&self.connection_string)
70 .with_context(|| format!("Error connecting to database {}", self.connection_string.display()))
71 }
72}
73
74#[async_trait]
75impl BarkPersister for SqliteClient {
76 async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
77 let mut conn = self.connect()?;
78 let tx = conn.transaction()?;
79
80 query::set_properties(&tx, properties)?;
81
82 tx.commit()?;
83 Ok(())
84 }
85
86 #[cfg(feature = "onchain-bdk")]
87 async fn initialize_bdk_wallet(&self) -> anyhow::Result<bdk_wallet::ChangeSet> {
88 let mut conn = self.connect()?;
89 Ok(bdk_wallet::WalletPersister::initialize(&mut conn)?)
90 }
91
92 #[cfg(feature = "onchain-bdk")]
93 async fn store_bdk_wallet_changeset(&self, changeset: &bdk_wallet::ChangeSet) -> anyhow::Result<()> {
94 let mut conn = self.connect()?;
95 bdk_wallet::WalletPersister::persist(&mut conn, changeset)?;
96 Ok(())
97 }
98
99 async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
100 let conn = self.connect()?;
101 Ok(query::fetch_properties(&conn)?)
102 }
103
104 async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
105 let conn = self.connect()?;
106 query::set_server_pubkey(&conn, &server_pubkey)?;
107 Ok(())
108 }
109
110 async fn set_server_mailbox_pubkey(&self, server_mailbox_pubkey: PublicKey) -> anyhow::Result<()> {
111 let conn = self.connect()?;
112 query::set_server_mailbox_pubkey(&conn, &server_mailbox_pubkey)?;
113 Ok(())
114 }
115
116 async fn create_new_movement(&self,
117 status: MovementStatus,
118 subsystem: &MovementSubsystem,
119 time: DateTime<chrono::Local>,
120 ) -> anyhow::Result<MovementId> {
121 let mut conn = self.connect()?;
122 let tx = conn.transaction()?;
123 let movement_id = query::create_new_movement(&tx, status, subsystem, time)?;
124 tx.commit()?;
125 Ok(movement_id)
126 }
127
128 async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
129 let mut conn = self.connect()?;
130 let tx = conn.transaction()?;
131 query::update_movement(&tx, movement)?;
132 tx.commit()?;
133 Ok(())
134 }
135
136 async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
137 let conn = self.connect()?;
138 query::get_movement_by_id(&conn, movement_id)
139 }
140
141 async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
142 let conn = self.connect()?;
143 query::get_all_movements(&conn)
144 }
145
146 async fn get_movements_by_payment_method(
147 &self,
148 payment_method: &PaymentMethod,
149 ) -> anyhow::Result<Vec<Movement>> {
150 let conn = self.connect()?;
151 query::get_movements_by_payment_method(&conn, payment_method)
152 }
153
154 async fn store_pending_board(
155 &self,
156 vtxo: &Vtxo<Full>,
157 funding_tx: &bitcoin::Transaction,
158 movement_id: MovementId,
159 ) -> anyhow::Result<()> {
160 let mut conn = self.connect()?;
161 let tx = conn.transaction()?;
162 query::store_new_pending_board(&tx, vtxo, funding_tx, movement_id)?;
163 tx.commit()?;
164 Ok(())
165 }
166
167 async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
168 let mut conn = self.connect()?;
169 let tx = conn.transaction()?;
170 query::remove_pending_board(&tx, vtxo_id)?;
171 tx.commit()?;
172 Ok(())
173 }
174
175 async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
176 let conn = self.connect()?;
177 query::get_all_pending_boards_ids(&conn)
178 }
179
180 async fn get_pending_board_by_vtxo_id(&self, vtxo_id: VtxoId) -> anyhow::Result<Option<PendingBoard>> {
181 let conn = self.connect()?;
182 query::get_pending_board_by_vtxo_id(&conn, vtxo_id)
183 }
184
185 async fn store_round_state(&self, round_state: &RoundState) -> anyhow::Result<RoundStateId> {
186 let conn = self.connect()?;
187 query::store_round_state(&conn, round_state)
188 }
189
190 async fn update_round_state(&self, state: &StoredRoundState) -> anyhow::Result<()> {
191 let conn = self.connect()?;
192 query::update_round_state(&conn, state)?;
193 Ok(())
194 }
195
196 async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
197 let conn = self.connect()?;
198 query::remove_round_state(&conn, round_state.id())?;
199 Ok(())
200 }
201
202 async fn get_round_state_by_id(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
203 let conn = self.connect()?;
204 query::get_round_state_by_id(&conn, id)
205 }
206
207 async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
208 let conn = self.connect()?;
209 query::get_pending_round_state_ids(&conn)
210 }
211
212 async fn store_vtxos(
213 &self,
214 vtxos: &[(&Vtxo<Full>, &VtxoState)],
215 ) -> anyhow::Result<()> {
216 let mut conn = self.connect()?;
217 let tx = conn.transaction()?;
218
219 for (vtxo, state) in vtxos {
220 query::store_vtxo_with_initial_state(&tx, vtxo, state)?;
221 }
222 tx.commit()?;
223 Ok(())
224 }
225
226 async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
227 let conn = self.connect()?;
228 query::get_wallet_vtxo_by_id(&conn, id)
229 }
230
231 async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
232 let conn = self.connect()?;
233 query::get_all_vtxos(&conn)
234 }
235
236 async fn get_vtxos_by_state(&self, state: &[VtxoStateKind]) -> anyhow::Result<Vec<WalletVtxo>> {
238 let conn = self.connect()?;
239 query::get_vtxos_by_state(&conn, state)
240 }
241
242 async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
243 let conn = self.connect()?;
244 let state : Option<VtxoState> = query::get_vtxo_state(&conn, id)?;
245 let result = state.map(|s| s == VtxoState::Spent).unwrap_or(false);
246 Ok(result)
247 }
248
249 async fn get_full_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
250 let conn = self.connect()?;
251 query::get_full_vtxo_by_id(&conn, id)
252 }
253
254 async fn get_full_vtxos(&self, ids: &[VtxoId]) -> anyhow::Result<Vec<Vtxo<Full>>> {
255 let conn = self.connect()?;
256 query::get_full_vtxos_by_ids(&conn, ids)
257 }
258
259 async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
260 let mut conn = self.connect()?;
261 let tx = conn.transaction().context("Failed to start transaction")?;
262 let result = query::delete_vtxo(&tx, id);
263 tx.commit().context("Failed to commit transaction")?;
264 result
265 }
266
267 async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
268 let conn = self.connect()?;
269 query::store_vtxo_key(&conn, index, public_key)
270 }
271
272 async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
273 let conn = self.connect()?;
274 query::get_last_vtxo_key_index(&conn)
275 }
276
277 async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
278 let conn = self.connect()?;
279 query::get_public_key_idx(&conn, public_key)
280 }
281
282 async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
283 let conn = self.connect()?;
284 query::get_mailbox_checkpoint(&conn)
285 }
286
287 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
288 let conn = self.connect()?;
289 query::store_mailbox_checkpoint(&conn, checkpoint)?;
290 Ok(())
291 }
292
293 async fn store_lightning_receive(
295 &self,
296 payment_hash: PaymentHash,
297 preimage: Preimage,
298 invoice: &Bolt11Invoice,
299 htlc_recv_cltv_delta: BlockDelta,
300 ) -> anyhow::Result<()> {
301 let conn = self.connect()?;
302 query::store_lightning_receive(
303 &conn, payment_hash, preimage, invoice, htlc_recv_cltv_delta,
304 )?;
305 Ok(())
306 }
307
308 async fn upsert_wallet_action_checkpoint(
309 &self,
310 id: &WalletActionId,
311 checkpoint: &WalletActionCheckpoint,
312 ) -> anyhow::Result<()> {
313 let conn = self.connect()?;
314 query::upsert_wallet_action_checkpoint(&conn, id, checkpoint)
315 }
316
317 async fn get_wallet_action_checkpoint(
318 &self,
319 id: &WalletActionId,
320 ) -> anyhow::Result<Option<WalletActionCheckpoint>> {
321 let conn = self.connect()?;
322 query::get_wallet_action_checkpoint(&conn, id)
323 }
324
325 async fn get_all_wallet_action_checkpoints(
326 &self,
327 ) -> anyhow::Result<Vec<WalletActionCheckpoint>> {
328 let conn = self.connect()?;
329 query::get_all_wallet_action_checkpoints(&conn)
330 }
331
332 async fn remove_wallet_action_checkpoint(
333 &self,
334 id: &WalletActionId,
335 ) -> anyhow::Result<()> {
336 let conn = self.connect()?;
337 query::remove_wallet_action_checkpoint(&conn, id)
338 }
339
340 async fn record_paid_invoice(
341 &self,
342 payment_hash: PaymentHash,
343 preimage: Preimage,
344 ) -> anyhow::Result<()> {
345 let conn = self.connect()?;
346 query::record_paid_invoice(&conn, payment_hash, preimage)
347 }
348
349 async fn get_paid_invoice(
350 &self,
351 payment_hash: PaymentHash,
352 ) -> anyhow::Result<Option<PaidInvoice>> {
353 let conn = self.connect()?;
354 query::get_paid_invoice(&conn, payment_hash)
355 }
356
357 async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
358 let conn = self.connect()?;
359 query::get_all_pending_lightning_receives(&conn)
360 }
361
362 async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
363 let conn = self.connect()?;
364 query::set_preimage_revealed(&conn, payment_hash)?;
365 Ok(())
366 }
367
368 async fn update_lightning_receive(
369 &self,
370 payment_hash: PaymentHash,
371 htlc_vtxo_ids: &[VtxoId],
372 movement_id: MovementId,
373 ) -> anyhow::Result<()> {
374 let conn = self.connect()?;
375 query::update_lightning_receive(&conn, payment_hash, htlc_vtxo_ids, movement_id)?;
376 Ok(())
377 }
378
379 async fn fetch_lightning_receive_by_payment_hash(
381 &self,
382 payment_hash: PaymentHash,
383 ) -> anyhow::Result<Option<LightningReceive>> {
384 let conn = self.connect()?;
385 query::fetch_lightning_receive_by_payment_hash(&conn, payment_hash)
386 }
387
388 async fn finish_pending_lightning_receive(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
389 let conn = self.connect()?;
390 query::finish_pending_lightning_receive(&conn, payment_hash)?;
391 Ok(())
392 }
393
394 async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
395 let mut conn = self.connect()?;
396 let tx = conn.transaction()?;
397 query::store_exit_vtxo_entry(&tx, exit)?;
398 tx.commit()?;
399 Ok(())
400 }
401
402 async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
403 let mut conn = self.connect()?;
404 let tx = conn.transaction()?;
405 query::remove_exit_vtxo_entry(&tx, &id)?;
406 tx.commit()?;
407 Ok(())
408 }
409
410 async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
411 let conn = self.connect()?;
412 query::get_exit_vtxo_entries(&conn)
413 }
414
415 async fn store_exit_child_tx(
416 &self,
417 exit_txid: Txid,
418 child_tx: &bitcoin::Transaction,
419 origin: ExitTxOrigin,
420 ) -> anyhow::Result<()> {
421 let mut conn = self.connect()?;
422 let tx = conn.transaction()?;
423 query::store_exit_child_tx(&tx, exit_txid, child_tx, origin)?;
424 tx.commit()?;
425 Ok(())
426 }
427
428 async fn get_exit_child_tx(
429 &self,
430 exit_txid: Txid,
431 ) -> anyhow::Result<Option<(bitcoin::Transaction, ExitTxOrigin)>> {
432 let conn = self.connect()?;
433 query::get_exit_child_tx(&conn, exit_txid)
434 }
435
436 async fn update_vtxo_state_checked(
437 &self,
438 vtxo_id: VtxoId,
439 new_state: VtxoState,
440 allowed_old_states: &[VtxoStateKind]
441 ) -> anyhow::Result<WalletVtxo> {
442 let conn = self.connect()?;
443 query::update_vtxo_state_checked(&conn, vtxo_id, new_state, allowed_old_states)
444 }
445
446 async fn update_vtxo_states_checked(
447 &self,
448 vtxo_ids: &[VtxoId],
449 new_state: VtxoState,
450 allowed_old_states: &[VtxoStateKind],
451 ) -> anyhow::Result<()> {
452 let mut conn = self.connect()?;
453 let tx = conn.transaction()?;
454 query::update_vtxo_states_checked(&tx, vtxo_ids, new_state, allowed_old_states)?;
455 tx.commit()?;
456 Ok(())
457 }
458
459 async fn store_pending_offboard(
460 &self,
461 pending: &PendingOffboard,
462 ) -> anyhow::Result<()> {
463 let mut conn = self.connect()?;
464 let tx = conn.transaction()?;
465 query::store_pending_offboard(&tx, pending)?;
466 tx.commit()?;
467 Ok(())
468 }
469
470 async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
471 let conn = self.connect()?;
472 query::get_all_pending_offboards(&conn)
473 }
474
475 async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
476 let mut conn = self.connect()?;
477 let tx = conn.transaction()?;
478 query::remove_pending_offboard(&tx, movement_id)?;
479 tx.commit()?;
480 Ok(())
481 }
482}
483
484#[cfg(any(test, doc))]
485pub mod helpers {
486 use std::path::PathBuf;
487 use std::str::FromStr;
488
489 use rusqlite::Connection;
490
491 #[cfg(any(test, feature = "rand"))]
498 pub fn in_memory_db() -> (PathBuf, Connection) {
499 use rand::{distr, RngExt};
500
501 let mut rng = rand::rng();
507 let filename: String = (&mut rng).sample_iter(distr::Alphanumeric)
508 .take(16).map(char::from).collect();
509
510 let connection_string = format!("file:{}?mode=memory&cache=shared", filename);
511 let pathbuf = PathBuf::from_str(&connection_string).unwrap();
512
513 let conn = Connection::open(pathbuf.clone()).unwrap();
514 (pathbuf.clone(), conn)
515 }
516}
517
518#[cfg(test)]
519mod test {
520 use ark::ProtocolEncoding;
521 use ark::test_util::VTXO_VECTORS;
522
523 use crate::{persist::sqlite::helpers::in_memory_db, vtxo::VtxoState};
524
525 use super::*;
526
527 #[tokio::test]
528 async fn test_add_and_retrieve_vtxos() {
529 let vtxo_1 = &VTXO_VECTORS.board_vtxo;
530 let vtxo_2 = &VTXO_VECTORS.arkoor_htlc_out_vtxo;
531 let vtxo_3 = &VTXO_VECTORS.round2_vtxo;
532
533 let (cs, conn) = in_memory_db();
534 let db = SqliteClient::open(cs).unwrap();
535
536 db.store_vtxos(&[
537 (vtxo_1, &VtxoState::Spendable), (vtxo_2, &VtxoState::Spendable)
538 ]).await.unwrap();
539
540 let vtxo_1_db = db.get_wallet_vtxo(vtxo_1.id()).await.expect("No error").expect("A vtxo was found");
543 assert_eq!(vtxo_1_db.vtxo, vtxo_1.to_bare());
544
545 let vtxo_1_full = db.get_full_vtxo(vtxo_1.id()).await.unwrap().unwrap();
547 assert_eq!(vtxo_1_full.serialize(), vtxo_1.serialize());
548
549 assert!(db.get_wallet_vtxo(vtxo_3.id()).await.expect("No error").is_none());
551
552 let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
554 assert_eq!(vtxos.len(), 2);
555 assert!(vtxos.iter().any(|v| v.vtxo == vtxo_1.to_bare()));
556 assert!(vtxos.iter().any(|v| v.vtxo == vtxo_2.to_bare()));
557 assert!(!vtxos.iter().any(|v| v.vtxo == vtxo_3.to_bare()));
558
559 db.update_vtxo_state_checked(
561 vtxo_1.id(), VtxoState::Spent, &VtxoStateKind::UNSPENT_STATES,
562 ).await.unwrap();
563
564 let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
565 assert_eq!(vtxos.len(), 1);
566
567 db.store_vtxos(&[(vtxo_3, &VtxoState::Spendable)]).await.unwrap();
569
570 let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
571 assert_eq!(vtxos.len(), 2);
572 assert!(vtxos.iter().any(|v| v.vtxo == vtxo_2.to_bare()));
573 assert!(vtxos.iter().any(|v| v.vtxo == vtxo_3.to_bare()));
574
575 conn.close().unwrap();
576 }
577
578 #[tokio::test]
579 #[cfg(feature = "onchain-bdk")]
580 async fn test_create_wallet_then_load() {
581 use bdk_wallet::chain::DescriptorExt;
582
583 let (connection_string, conn) = in_memory_db();
584
585 let db = SqliteClient::open(connection_string).unwrap();
586 let network = bitcoin::Network::Testnet;
587
588 let seed = bip39::Mnemonic::generate(12).unwrap().to_seed("");
589 let xpriv = bitcoin::bip32::Xpriv::new_master(network, &seed).unwrap();
590
591 let desc = format!("tr({}/84'/0'/0'/*)", xpriv);
592
593 let _ = db.initialize_bdk_wallet().await.unwrap();
595 let mut created = bdk_wallet::Wallet::create_single(desc.clone())
596 .network(network)
597 .create_wallet_no_persist()
598 .unwrap();
599 db.store_bdk_wallet_changeset(&created.take_staged().unwrap()).await.unwrap();
600
601 let loaded = {
602 let changeset = db.initialize_bdk_wallet().await.unwrap();
603 bdk_wallet::Wallet::load()
604 .descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
605 .extract_keys()
606 .check_network(network)
607 .load_wallet_no_persist(changeset)
608 .unwrap()
609 };
610
611 assert!(loaded.is_some());
612 assert_eq!(
613 created.public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id(),
614 loaded.unwrap().public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id()
615 );
616
617 conn.close().unwrap();
620 }
621
622 #[tokio::test]
623 async fn differential_bark_persister_suite() {
624 let (cs, _conn) = helpers::in_memory_db();
625 let sqlite = SqliteClient::open(cs).unwrap();
626 let memory = crate::persist::adaptor::StorageAdaptorWrapper::new(
627 crate::persist::adaptor::memory::MemoryStorageAdaptor::new(),
628 );
629 crate::persist::test_suite::run_all(&sqlite, &memory).await;
630 }
631}