Skip to main content

bark/persist/sqlite/
mod.rs

1//! SQLite persistence backend for Bark.
2//!
3//! This module provides a concrete implementation of the `BarkPersister` trait
4//! backed by a local SQLite database. It encapsulates schema creation and
5//! migrations, typed query helpers, and conversions between in-memory models
6//! and their stored representations. Operations are performed using explicit
7//! connections and transactions to ensure atomic updates across related tables,
8//! covering wallet properties, movements, vtxos and their states, round
9//! lifecycle data, Lightning receives, exit tracking, and sync metadata.
10
11mod convert;
12mod migrations;
13mod query;
14
15
16use std::path::{Path, PathBuf};
17
18use anyhow::Context;
19use bitcoin::{Amount, Txid};
20use bitcoin::secp256k1::PublicKey;
21use chrono::DateTime;
22use lightning_invoice::Bolt11Invoice;
23use log::debug;
24use rusqlite::Connection;
25
26use ark::{Vtxo, VtxoId};
27use ark::lightning::{Invoice, PaymentHash, Preimage};
28use ark::vtxo::Full;
29use bitcoin_ext::BlockDelta;
30
31use crate::WalletProperties;
32use crate::exit::ExitTxOrigin;
33use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem, PaymentMethod};
34use crate::persist::{BarkPersister, RoundStateId, StoredRoundState, Unlocked};
35use crate::persist::models::{
36	LightningReceive, LightningSend, PendingBoard, PendingOffboard, StoredExit,
37};
38use crate::round::RoundState;
39use crate::vtxo::{VtxoState, VtxoStateKind, WalletVtxo};
40
41/// An implementation of the BarkPersister using rusqlite. Changes are persisted using the given
42/// [PathBuf].
43#[derive(Clone)]
44pub struct SqliteClient {
45	connection_string: PathBuf,
46}
47
48impl SqliteClient {
49	/// Open a new [SqliteClient] with the given file path
50	pub fn open(db_file: impl AsRef<Path>) -> anyhow::Result<SqliteClient> {
51		let path = db_file.as_ref().to_path_buf();
52
53		debug!("Opening database at {}", path.display());
54		let mut conn = rusqlite::Connection::open(&path)
55			.with_context(|| format!("Error connecting to database {}", path.display()))?;
56
57		let migrations = migrations::MigrationContext::new();
58		migrations.do_all_migrations(&mut conn)?;
59
60		Ok( Self { connection_string: path })
61	}
62
63	fn connect(&self) -> anyhow::Result<Connection> {
64		rusqlite::Connection::open(&self.connection_string)
65			.with_context(|| format!("Error connecting to database {}", self.connection_string.display()))
66	}
67}
68
69#[async_trait]
70impl BarkPersister for SqliteClient {
71	async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
72		let mut conn = self.connect()?;
73		let tx = conn.transaction()?;
74
75		query::set_properties(&tx, properties)?;
76
77		tx.commit()?;
78		Ok(())
79	}
80
81	#[cfg(feature = "onchain-bdk")]
82	async fn initialize_bdk_wallet(&self) -> anyhow::Result<bdk_wallet::ChangeSet> {
83	    let mut conn = self.connect()?;
84		Ok(bdk_wallet::WalletPersister::initialize(&mut conn)?)
85	}
86
87	#[cfg(feature = "onchain-bdk")]
88	async fn store_bdk_wallet_changeset(&self, changeset: &bdk_wallet::ChangeSet) -> anyhow::Result<()> {
89	    let mut conn = self.connect()?;
90		bdk_wallet::WalletPersister::persist(&mut conn, changeset)?;
91		Ok(())
92	}
93
94	async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
95		let conn = self.connect()?;
96		Ok(query::fetch_properties(&conn)?)
97	}
98
99	async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
100		let conn = self.connect()?;
101		query::set_server_pubkey(&conn, &server_pubkey)?;
102		Ok(())
103	}
104
105	async fn create_new_movement(&self,
106		status: MovementStatus,
107		subsystem: &MovementSubsystem,
108		time: DateTime<chrono::Local>,
109	) -> anyhow::Result<MovementId> {
110		let mut conn = self.connect()?;
111		let tx = conn.transaction()?;
112		let movement_id = query::create_new_movement(&tx, status, subsystem, time)?;
113		tx.commit()?;
114		Ok(movement_id)
115	}
116
117	async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
118		let mut conn = self.connect()?;
119		let tx = conn.transaction()?;
120		query::update_movement(&tx, movement)?;
121		tx.commit()?;
122		Ok(())
123	}
124
125	async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
126		let conn = self.connect()?;
127		query::get_movement_by_id(&conn, movement_id)
128	}
129
130	async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
131		let conn = self.connect()?;
132		query::get_all_movements(&conn)
133	}
134
135	async fn get_movements_by_payment_method(
136		&self,
137		payment_method: &PaymentMethod,
138	) -> anyhow::Result<Vec<Movement>> {
139		let conn = self.connect()?;
140		query::get_movements_by_payment_method(&conn, payment_method)
141	}
142
143	async fn store_pending_board(
144		&self,
145		vtxo: &Vtxo<Full>,
146		funding_tx: &bitcoin::Transaction,
147		movement_id: MovementId,
148	) -> anyhow::Result<()> {
149		let mut conn = self.connect()?;
150		let tx = conn.transaction()?;
151		query::store_new_pending_board(&tx, vtxo, funding_tx, movement_id)?;
152		tx.commit()?;
153		Ok(())
154	}
155
156	async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
157		let mut conn = self.connect()?;
158		let tx = conn.transaction()?;
159		query::remove_pending_board(&tx, vtxo_id)?;
160		tx.commit()?;
161		Ok(())
162	}
163
164	async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
165		let conn = self.connect()?;
166		query::get_all_pending_boards_ids(&conn)
167	}
168
169	async fn get_pending_board_by_vtxo_id(&self, vtxo_id: VtxoId) -> anyhow::Result<Option<PendingBoard>> {
170		let conn = self.connect()?;
171		query::get_pending_board_by_vtxo_id(&conn, vtxo_id)
172	}
173
174	async fn store_round_state_lock_vtxos(&self, round_state: &RoundState) -> anyhow::Result<RoundStateId> {
175		let mut conn = self.connect()?;
176		let tx = conn.transaction()?;
177		for vtxo in round_state.participation().inputs.iter() {
178			query::update_vtxo_state_checked(
179				&*tx,
180				vtxo.id(),
181				VtxoState::Locked { movement_id: round_state.movement_id },
182				&[VtxoStateKind::Spendable],
183			)?;
184		}
185		let id = query::store_round_state(&tx, round_state)?;
186		tx.commit()?;
187		Ok(id)
188	}
189
190	async fn update_round_state(&self, state: &StoredRoundState) -> anyhow::Result<()> {
191		let conn = self.connect()?;
192		query::update_round_state(&conn, state)?;
193		Ok(())
194	}
195
196	async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
197		let conn = self.connect()?;
198		query::remove_round_state(&conn, round_state.id())?;
199		Ok(())
200	}
201
202	async fn get_round_state_by_id(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
203		let conn = self.connect()?;
204		query::get_round_state_by_id(&conn, id)
205	}
206
207	async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
208		let conn = self.connect()?;
209		query::get_pending_round_state_ids(&conn)
210	}
211
212	async fn store_vtxos(
213		&self,
214		vtxos: &[(&Vtxo<Full>, &VtxoState)],
215	) -> anyhow::Result<()> {
216		let mut conn = self.connect()?;
217		let tx = conn.transaction()?;
218
219		for (vtxo, state) in vtxos {
220			query::store_vtxo_with_initial_state(&tx, vtxo, state)?;
221		}
222		tx.commit()?;
223		Ok(())
224	}
225
226	async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
227		let conn = self.connect()?;
228		query::get_wallet_vtxo_by_id(&conn, id)
229	}
230
231	async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
232		let conn = self.connect()?;
233		query::get_all_vtxos(&conn)
234	}
235
236	/// Get all VTXOs that are in one of the provided states
237	async fn get_vtxos_by_state(&self, state: &[VtxoStateKind]) -> anyhow::Result<Vec<WalletVtxo>> {
238		let conn = self.connect()?;
239		query::get_vtxos_by_state(&conn, state)
240	}
241
242	async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
243		let conn = self.connect()?;
244		let state : Option<VtxoState> = query::get_vtxo_state(&conn, id)?;
245		let result = state.map(|s| s == VtxoState::Spent).unwrap_or(false);
246		Ok(result)
247	}
248
249	async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
250		let mut conn = self.connect()?;
251		let tx = conn.transaction().context("Failed to start transaction")?;
252		let result = query::delete_vtxo(&tx, id);
253		tx.commit().context("Failed to commit transaction")?;
254		result
255	}
256
257	async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
258		let conn = self.connect()?;
259		query::store_vtxo_key(&conn, index, public_key)
260	}
261
262	async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
263		let conn = self.connect()?;
264		query::get_last_vtxo_key_index(&conn)
265	}
266
267	async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
268		let conn = self.connect()?;
269		query::get_public_key_idx(&conn, public_key)
270	}
271
272	async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
273		let conn = self.connect()?;
274		query::get_mailbox_checkpoint(&conn)
275	}
276
277	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
278		let conn = self.connect()?;
279		query::store_mailbox_checkpoint(&conn, checkpoint)?;
280		Ok(())
281	}
282
283	/// Store a lightning receive
284	async fn store_lightning_receive(
285		&self,
286		payment_hash: PaymentHash,
287		preimage: Preimage,
288		invoice: &Bolt11Invoice,
289		htlc_recv_cltv_delta: BlockDelta,
290	) -> anyhow::Result<()> {
291		let conn = self.connect()?;
292		query::store_lightning_receive(
293			&conn, payment_hash, preimage, invoice, htlc_recv_cltv_delta,
294		)?;
295		Ok(())
296	}
297
298	async fn store_new_pending_lightning_send(
299		&self,
300		invoice: &Invoice,
301		amount: Amount,
302		fee: Amount,
303		vtxos: &[VtxoId],
304		movement_id: MovementId,
305	) -> anyhow::Result<LightningSend> {
306		let conn = self.connect()?;
307		query::store_new_pending_lightning_send(&conn, invoice, amount, fee, vtxos, movement_id)
308	}
309
310	async fn get_all_pending_lightning_send(&self) -> anyhow::Result<Vec<LightningSend>> {
311		let conn = self.connect()?;
312		query::get_all_pending_lightning_send(&conn)
313	}
314
315	async fn finish_lightning_send(
316		&self,
317		payment_hash: PaymentHash,
318		preimage: Option<Preimage>,
319	) -> anyhow::Result<()> {
320		let conn = self.connect()?;
321		query::finish_lightning_send(&conn, payment_hash, preimage)
322	}
323
324	async fn remove_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
325		let conn = self.connect()?;
326		query::remove_lightning_send(&conn, payment_hash)?;
327		Ok(())
328	}
329
330	async fn get_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<Option<LightningSend>> {
331		let conn = self.connect()?;
332		query::get_lightning_send(&conn, payment_hash)
333	}
334
335	async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
336		let conn = self.connect()?;
337		query::get_all_pending_lightning_receives(&conn)
338	}
339
340	async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
341		let conn = self.connect()?;
342		query::set_preimage_revealed(&conn, payment_hash)?;
343		Ok(())
344	}
345
346	async fn update_lightning_receive(
347		&self,
348		payment_hash: PaymentHash,
349		htlc_vtxo_ids: &[VtxoId],
350		movement_id: MovementId,
351	) -> anyhow::Result<()> {
352		let conn = self.connect()?;
353		query::update_lightning_receive(&conn, payment_hash, htlc_vtxo_ids, movement_id)?;
354		Ok(())
355	}
356
357	/// Fetch a lightning receive by payment hash
358	async fn fetch_lightning_receive_by_payment_hash(
359		&self,
360		payment_hash: PaymentHash,
361	) -> anyhow::Result<Option<LightningReceive>> {
362		let conn = self.connect()?;
363		query::fetch_lightning_receive_by_payment_hash(&conn, payment_hash)
364	}
365
366	async fn finish_pending_lightning_receive(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
367		let conn = self.connect()?;
368		query::finish_pending_lightning_receive(&conn, payment_hash)?;
369		Ok(())
370	}
371
372	async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
373		let mut conn = self.connect()?;
374		let tx = conn.transaction()?;
375		query::store_exit_vtxo_entry(&tx, exit)?;
376		tx.commit()?;
377		Ok(())
378	}
379
380	async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
381		let mut conn = self.connect()?;
382		let tx = conn.transaction()?;
383		query::remove_exit_vtxo_entry(&tx, &id)?;
384		tx.commit()?;
385		Ok(())
386	}
387
388	async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
389		let conn = self.connect()?;
390		query::get_exit_vtxo_entries(&conn)
391	}
392
393	async fn store_exit_child_tx(
394		&self,
395		exit_txid: Txid,
396		child_tx: &bitcoin::Transaction,
397		origin: ExitTxOrigin,
398	) -> anyhow::Result<()> {
399		let mut conn = self.connect()?;
400		let tx = conn.transaction()?;
401		query::store_exit_child_tx(&tx, exit_txid, child_tx, origin)?;
402		tx.commit()?;
403		Ok(())
404	}
405
406	async fn get_exit_child_tx(
407		&self,
408		exit_txid: Txid,
409	) -> anyhow::Result<Option<(bitcoin::Transaction, ExitTxOrigin)>> {
410		let conn = self.connect()?;
411		query::get_exit_child_tx(&conn, exit_txid)
412	}
413
414	async fn update_vtxo_state_checked(
415		&self,
416		vtxo_id: VtxoId,
417		new_state: VtxoState,
418		allowed_old_states: &[VtxoStateKind]
419	) -> anyhow::Result<WalletVtxo> {
420		let conn = self.connect()?;
421		query::update_vtxo_state_checked(&conn, vtxo_id, new_state, allowed_old_states)
422	}
423
424	async fn store_pending_offboard(
425		&self,
426		pending: &PendingOffboard,
427	) -> anyhow::Result<()> {
428		let mut conn = self.connect()?;
429		let tx = conn.transaction()?;
430		query::store_pending_offboard(&tx, pending)?;
431		tx.commit()?;
432		Ok(())
433	}
434
435	async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
436		let conn = self.connect()?;
437		query::get_all_pending_offboards(&conn)
438	}
439
440	async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
441		let mut conn = self.connect()?;
442		let tx = conn.transaction()?;
443		query::remove_pending_offboard(&tx, movement_id)?;
444		tx.commit()?;
445		Ok(())
446	}
447}
448
449#[cfg(any(test, doc))]
450pub mod helpers {
451	use std::path::PathBuf;
452	use std::str::FromStr;
453
454	use rusqlite::Connection;
455
456	/// Creates an in-memory sqlite connection.
457	///
458	/// It returns a [PathBuf] and a [Connection].
459	/// The user should ensure the [Connection] isn't dropped
460	/// until the test completes. If all connections are dropped during
461	/// the test the entire database might be cleared.
462	#[cfg(any(test, feature = "rand"))]
463	pub fn in_memory_db() -> (PathBuf, Connection) {
464		use rand::{distr, Rng};
465
466		// All tests run in the same process and share the same
467		// cache. To ensure that each call to `in_memory` results
468		// in a new database a random file-name is generated.
469		//
470		// This database is deleted once all connections are dropped
471		let mut rng = rand::rng();
472		let filename: String = (&mut rng).sample_iter(distr::Alphanumeric)
473			.take(16).map(char::from).collect();
474
475		let connection_string = format!("file:{}?mode=memory&cache=shared", filename);
476		let pathbuf = PathBuf::from_str(&connection_string).unwrap();
477
478		let conn = Connection::open(pathbuf.clone()).unwrap();
479		(pathbuf.clone(), conn)
480	}
481}
482
483#[cfg(test)]
484mod test {
485	use ark::test_util::VTXO_VECTORS;
486
487	use crate::{persist::sqlite::helpers::in_memory_db, vtxo::VtxoState};
488
489	use super::*;
490
491	#[tokio::test]
492	async fn test_add_and_retrieve_vtxos() {
493		let vtxo_1 = &VTXO_VECTORS.board_vtxo;
494		let vtxo_2 = &VTXO_VECTORS.arkoor_htlc_out_vtxo;
495		let vtxo_3 = &VTXO_VECTORS.round2_vtxo;
496
497		let (cs, conn) = in_memory_db();
498		let db = SqliteClient::open(cs).unwrap();
499
500		db.store_vtxos(&[
501			(vtxo_1, &VtxoState::Spendable), (vtxo_2, &VtxoState::Spendable)
502		]).await.unwrap();
503
504		// Check that vtxo-1 can be retrieved from the database
505		let vtxo_1_db = db.get_wallet_vtxo(vtxo_1.id()).await.expect("No error").expect("A vtxo was found");
506		assert_eq!(vtxo_1_db.vtxo, *vtxo_1);
507
508		// Verify that vtxo 3 is not in the database
509		assert!(db.get_wallet_vtxo(vtxo_3.id()).await.expect("No error").is_none());
510
511		// Verify that we have two entries in the database
512		let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
513		assert_eq!(vtxos.len(), 2);
514		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_1));
515		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_2));
516		assert!(!vtxos.iter().any(|v| v.vtxo == *vtxo_3));
517
518		// Verify that we can mark a vtxo as spent
519		db.update_vtxo_state_checked(
520			vtxo_1.id(), VtxoState::Spent, &VtxoStateKind::UNSPENT_STATES,
521		).await.unwrap();
522
523		let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
524		assert_eq!(vtxos.len(), 1);
525
526		// Add the third entry to the database
527		db.store_vtxos(&[(vtxo_3, &VtxoState::Spendable)]).await.unwrap();
528
529		let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
530		assert_eq!(vtxos.len(), 2);
531		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_2));
532		assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_3));
533
534		conn.close().unwrap();
535	}
536
537	#[tokio::test]
538	#[cfg(feature = "onchain-bdk")]
539	async fn test_create_wallet_then_load() {
540		use bdk_wallet::chain::DescriptorExt;
541
542		let (connection_string, conn) = in_memory_db();
543
544		let db = SqliteClient::open(connection_string).unwrap();
545		let network = bitcoin::Network::Testnet;
546
547		let seed = bip39::Mnemonic::generate(12).unwrap().to_seed("");
548		let xpriv = bitcoin::bip32::Xpriv::new_master(network, &seed).unwrap();
549
550		let desc = format!("tr({}/84'/0'/0'/*)", xpriv);
551
552		// need to call init before we call store
553		let _ = db.initialize_bdk_wallet().await.unwrap();
554		let mut created = bdk_wallet::Wallet::create_single(desc.clone())
555			.network(network)
556			.create_wallet_no_persist()
557			.unwrap();
558		db.store_bdk_wallet_changeset(&created.take_staged().unwrap()).await.unwrap();
559
560		let loaded = {
561			let changeset = db.initialize_bdk_wallet().await.unwrap();
562			bdk_wallet::Wallet::load()
563				.descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
564				.extract_keys()
565				.check_network(network)
566				.load_wallet_no_persist(changeset)
567				.unwrap()
568		};
569
570		assert!(loaded.is_some());
571		assert_eq!(
572			created.public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id(),
573			loaded.unwrap().public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id()
574		);
575
576		// Explicitly close the connection here
577		// This ensures the database isn't dropped during the test
578		conn.close().unwrap();
579	}
580
581	#[tokio::test]
582	async fn differential_bark_persister_suite() {
583		let (cs, _conn) = helpers::in_memory_db();
584		let sqlite = SqliteClient::open(cs).unwrap();
585		let memory = crate::persist::adaptor::StorageAdaptorWrapper::new(
586			crate::persist::adaptor::memory::MemoryStorageAdaptor::new(),
587		);
588		crate::persist::test_suite::run_all(&sqlite, &memory).await;
589	}
590}