1mod convert;
12mod migrations;
13mod query;
14
15
16use std::path::{Path, PathBuf};
17
18use anyhow::Context;
19use bitcoin::{Amount, Txid};
20use bitcoin::secp256k1::PublicKey;
21use chrono::DateTime;
22use lightning_invoice::Bolt11Invoice;
23use log::debug;
24use rusqlite::Connection;
25
26use ark::{Vtxo, VtxoId};
27use ark::lightning::{Invoice, PaymentHash, Preimage};
28use ark::vtxo::Full;
29use bitcoin_ext::BlockDelta;
30
31use crate::WalletProperties;
32use crate::exit::ExitTxOrigin;
33use crate::movement::{Movement, MovementId, MovementStatus, MovementSubsystem, PaymentMethod};
34use crate::persist::{BarkPersister, RoundStateId, StoredRoundState, Unlocked};
35use crate::persist::models::{
36 LightningReceive, LightningSend, PendingBoard, PendingOffboard, StoredExit,
37};
38use crate::round::RoundState;
39use crate::vtxo::{VtxoState, VtxoStateKind, WalletVtxo};
40
41#[derive(Clone)]
44pub struct SqliteClient {
45 connection_string: PathBuf,
46}
47
48impl SqliteClient {
49 pub fn open(db_file: impl AsRef<Path>) -> anyhow::Result<SqliteClient> {
51 let path = db_file.as_ref().to_path_buf();
52
53 debug!("Opening database at {}", path.display());
54 let mut conn = rusqlite::Connection::open(&path)
55 .with_context(|| format!("Error connecting to database {}", path.display()))?;
56
57 let migrations = migrations::MigrationContext::new();
58 migrations.do_all_migrations(&mut conn)?;
59
60 Ok( Self { connection_string: path })
61 }
62
63 fn connect(&self) -> anyhow::Result<Connection> {
64 rusqlite::Connection::open(&self.connection_string)
65 .with_context(|| format!("Error connecting to database {}", self.connection_string.display()))
66 }
67}
68
69#[async_trait]
70impl BarkPersister for SqliteClient {
71 async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
72 let mut conn = self.connect()?;
73 let tx = conn.transaction()?;
74
75 query::set_properties(&tx, properties)?;
76
77 tx.commit()?;
78 Ok(())
79 }
80
81 #[cfg(feature = "onchain-bdk")]
82 async fn initialize_bdk_wallet(&self) -> anyhow::Result<bdk_wallet::ChangeSet> {
83 let mut conn = self.connect()?;
84 Ok(bdk_wallet::WalletPersister::initialize(&mut conn)?)
85 }
86
87 #[cfg(feature = "onchain-bdk")]
88 async fn store_bdk_wallet_changeset(&self, changeset: &bdk_wallet::ChangeSet) -> anyhow::Result<()> {
89 let mut conn = self.connect()?;
90 bdk_wallet::WalletPersister::persist(&mut conn, changeset)?;
91 Ok(())
92 }
93
94 async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
95 let conn = self.connect()?;
96 Ok(query::fetch_properties(&conn)?)
97 }
98
99 async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
100 let conn = self.connect()?;
101 query::set_server_pubkey(&conn, &server_pubkey)?;
102 Ok(())
103 }
104
105 async fn create_new_movement(&self,
106 status: MovementStatus,
107 subsystem: &MovementSubsystem,
108 time: DateTime<chrono::Local>,
109 ) -> anyhow::Result<MovementId> {
110 let mut conn = self.connect()?;
111 let tx = conn.transaction()?;
112 let movement_id = query::create_new_movement(&tx, status, subsystem, time)?;
113 tx.commit()?;
114 Ok(movement_id)
115 }
116
117 async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
118 let mut conn = self.connect()?;
119 let tx = conn.transaction()?;
120 query::update_movement(&tx, movement)?;
121 tx.commit()?;
122 Ok(())
123 }
124
125 async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
126 let conn = self.connect()?;
127 query::get_movement_by_id(&conn, movement_id)
128 }
129
130 async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
131 let conn = self.connect()?;
132 query::get_all_movements(&conn)
133 }
134
135 async fn get_movements_by_payment_method(
136 &self,
137 payment_method: &PaymentMethod,
138 ) -> anyhow::Result<Vec<Movement>> {
139 let conn = self.connect()?;
140 query::get_movements_by_payment_method(&conn, payment_method)
141 }
142
143 async fn store_pending_board(
144 &self,
145 vtxo: &Vtxo<Full>,
146 funding_tx: &bitcoin::Transaction,
147 movement_id: MovementId,
148 ) -> anyhow::Result<()> {
149 let mut conn = self.connect()?;
150 let tx = conn.transaction()?;
151 query::store_new_pending_board(&tx, vtxo, funding_tx, movement_id)?;
152 tx.commit()?;
153 Ok(())
154 }
155
156 async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
157 let mut conn = self.connect()?;
158 let tx = conn.transaction()?;
159 query::remove_pending_board(&tx, vtxo_id)?;
160 tx.commit()?;
161 Ok(())
162 }
163
164 async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
165 let conn = self.connect()?;
166 query::get_all_pending_boards_ids(&conn)
167 }
168
169 async fn get_pending_board_by_vtxo_id(&self, vtxo_id: VtxoId) -> anyhow::Result<Option<PendingBoard>> {
170 let conn = self.connect()?;
171 query::get_pending_board_by_vtxo_id(&conn, vtxo_id)
172 }
173
174 async fn store_round_state_lock_vtxos(&self, round_state: &RoundState) -> anyhow::Result<RoundStateId> {
175 let mut conn = self.connect()?;
176 let tx = conn.transaction()?;
177 for vtxo in round_state.participation().inputs.iter() {
178 query::update_vtxo_state_checked(
179 &*tx,
180 vtxo.id(),
181 VtxoState::Locked { movement_id: round_state.movement_id },
182 &[VtxoStateKind::Spendable],
183 )?;
184 }
185 let id = query::store_round_state(&tx, round_state)?;
186 tx.commit()?;
187 Ok(id)
188 }
189
190 async fn update_round_state(&self, state: &StoredRoundState) -> anyhow::Result<()> {
191 let conn = self.connect()?;
192 query::update_round_state(&conn, state)?;
193 Ok(())
194 }
195
196 async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
197 let conn = self.connect()?;
198 query::remove_round_state(&conn, round_state.id())?;
199 Ok(())
200 }
201
202 async fn get_round_state_by_id(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
203 let conn = self.connect()?;
204 query::get_round_state_by_id(&conn, id)
205 }
206
207 async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
208 let conn = self.connect()?;
209 query::get_pending_round_state_ids(&conn)
210 }
211
212 async fn store_vtxos(
213 &self,
214 vtxos: &[(&Vtxo<Full>, &VtxoState)],
215 ) -> anyhow::Result<()> {
216 let mut conn = self.connect()?;
217 let tx = conn.transaction()?;
218
219 for (vtxo, state) in vtxos {
220 query::store_vtxo_with_initial_state(&tx, vtxo, state)?;
221 }
222 tx.commit()?;
223 Ok(())
224 }
225
226 async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
227 let conn = self.connect()?;
228 query::get_wallet_vtxo_by_id(&conn, id)
229 }
230
231 async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
232 let conn = self.connect()?;
233 query::get_all_vtxos(&conn)
234 }
235
236 async fn get_vtxos_by_state(&self, state: &[VtxoStateKind]) -> anyhow::Result<Vec<WalletVtxo>> {
238 let conn = self.connect()?;
239 query::get_vtxos_by_state(&conn, state)
240 }
241
242 async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
243 let conn = self.connect()?;
244 let state : Option<VtxoState> = query::get_vtxo_state(&conn, id)?;
245 let result = state.map(|s| s == VtxoState::Spent).unwrap_or(false);
246 Ok(result)
247 }
248
249 async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
250 let mut conn = self.connect()?;
251 let tx = conn.transaction().context("Failed to start transaction")?;
252 let result = query::delete_vtxo(&tx, id);
253 tx.commit().context("Failed to commit transaction")?;
254 result
255 }
256
257 async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
258 let conn = self.connect()?;
259 query::store_vtxo_key(&conn, index, public_key)
260 }
261
262 async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
263 let conn = self.connect()?;
264 query::get_last_vtxo_key_index(&conn)
265 }
266
267 async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
268 let conn = self.connect()?;
269 query::get_public_key_idx(&conn, public_key)
270 }
271
272 async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
273 let conn = self.connect()?;
274 query::get_mailbox_checkpoint(&conn)
275 }
276
277 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
278 let conn = self.connect()?;
279 query::store_mailbox_checkpoint(&conn, checkpoint)?;
280 Ok(())
281 }
282
283 async fn store_lightning_receive(
285 &self,
286 payment_hash: PaymentHash,
287 preimage: Preimage,
288 invoice: &Bolt11Invoice,
289 htlc_recv_cltv_delta: BlockDelta,
290 ) -> anyhow::Result<()> {
291 let conn = self.connect()?;
292 query::store_lightning_receive(
293 &conn, payment_hash, preimage, invoice, htlc_recv_cltv_delta,
294 )?;
295 Ok(())
296 }
297
298 async fn store_new_pending_lightning_send(
299 &self,
300 invoice: &Invoice,
301 amount: Amount,
302 fee: Amount,
303 vtxos: &[VtxoId],
304 movement_id: MovementId,
305 ) -> anyhow::Result<LightningSend> {
306 let conn = self.connect()?;
307 query::store_new_pending_lightning_send(&conn, invoice, amount, fee, vtxos, movement_id)
308 }
309
310 async fn get_all_pending_lightning_send(&self) -> anyhow::Result<Vec<LightningSend>> {
311 let conn = self.connect()?;
312 query::get_all_pending_lightning_send(&conn)
313 }
314
315 async fn finish_lightning_send(
316 &self,
317 payment_hash: PaymentHash,
318 preimage: Option<Preimage>,
319 ) -> anyhow::Result<()> {
320 let conn = self.connect()?;
321 query::finish_lightning_send(&conn, payment_hash, preimage)
322 }
323
324 async fn remove_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
325 let conn = self.connect()?;
326 query::remove_lightning_send(&conn, payment_hash)?;
327 Ok(())
328 }
329
330 async fn get_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<Option<LightningSend>> {
331 let conn = self.connect()?;
332 query::get_lightning_send(&conn, payment_hash)
333 }
334
335 async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
336 let conn = self.connect()?;
337 query::get_all_pending_lightning_receives(&conn)
338 }
339
340 async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
341 let conn = self.connect()?;
342 query::set_preimage_revealed(&conn, payment_hash)?;
343 Ok(())
344 }
345
346 async fn update_lightning_receive(
347 &self,
348 payment_hash: PaymentHash,
349 htlc_vtxo_ids: &[VtxoId],
350 movement_id: MovementId,
351 ) -> anyhow::Result<()> {
352 let conn = self.connect()?;
353 query::update_lightning_receive(&conn, payment_hash, htlc_vtxo_ids, movement_id)?;
354 Ok(())
355 }
356
357 async fn fetch_lightning_receive_by_payment_hash(
359 &self,
360 payment_hash: PaymentHash,
361 ) -> anyhow::Result<Option<LightningReceive>> {
362 let conn = self.connect()?;
363 query::fetch_lightning_receive_by_payment_hash(&conn, payment_hash)
364 }
365
366 async fn finish_pending_lightning_receive(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
367 let conn = self.connect()?;
368 query::finish_pending_lightning_receive(&conn, payment_hash)?;
369 Ok(())
370 }
371
372 async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
373 let mut conn = self.connect()?;
374 let tx = conn.transaction()?;
375 query::store_exit_vtxo_entry(&tx, exit)?;
376 tx.commit()?;
377 Ok(())
378 }
379
380 async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
381 let mut conn = self.connect()?;
382 let tx = conn.transaction()?;
383 query::remove_exit_vtxo_entry(&tx, &id)?;
384 tx.commit()?;
385 Ok(())
386 }
387
388 async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
389 let conn = self.connect()?;
390 query::get_exit_vtxo_entries(&conn)
391 }
392
393 async fn store_exit_child_tx(
394 &self,
395 exit_txid: Txid,
396 child_tx: &bitcoin::Transaction,
397 origin: ExitTxOrigin,
398 ) -> anyhow::Result<()> {
399 let mut conn = self.connect()?;
400 let tx = conn.transaction()?;
401 query::store_exit_child_tx(&tx, exit_txid, child_tx, origin)?;
402 tx.commit()?;
403 Ok(())
404 }
405
406 async fn get_exit_child_tx(
407 &self,
408 exit_txid: Txid,
409 ) -> anyhow::Result<Option<(bitcoin::Transaction, ExitTxOrigin)>> {
410 let conn = self.connect()?;
411 query::get_exit_child_tx(&conn, exit_txid)
412 }
413
414 async fn update_vtxo_state_checked(
415 &self,
416 vtxo_id: VtxoId,
417 new_state: VtxoState,
418 allowed_old_states: &[VtxoStateKind]
419 ) -> anyhow::Result<WalletVtxo> {
420 let conn = self.connect()?;
421 query::update_vtxo_state_checked(&conn, vtxo_id, new_state, allowed_old_states)
422 }
423
424 async fn store_pending_offboard(
425 &self,
426 pending: &PendingOffboard,
427 ) -> anyhow::Result<()> {
428 let mut conn = self.connect()?;
429 let tx = conn.transaction()?;
430 query::store_pending_offboard(&tx, pending)?;
431 tx.commit()?;
432 Ok(())
433 }
434
435 async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
436 let conn = self.connect()?;
437 query::get_all_pending_offboards(&conn)
438 }
439
440 async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
441 let mut conn = self.connect()?;
442 let tx = conn.transaction()?;
443 query::remove_pending_offboard(&tx, movement_id)?;
444 tx.commit()?;
445 Ok(())
446 }
447}
448
449#[cfg(any(test, doc))]
450pub mod helpers {
451 use std::path::PathBuf;
452 use std::str::FromStr;
453
454 use rusqlite::Connection;
455
456 #[cfg(any(test, feature = "rand"))]
463 pub fn in_memory_db() -> (PathBuf, Connection) {
464 use rand::{distr, Rng};
465
466 let mut rng = rand::rng();
472 let filename: String = (&mut rng).sample_iter(distr::Alphanumeric)
473 .take(16).map(char::from).collect();
474
475 let connection_string = format!("file:{}?mode=memory&cache=shared", filename);
476 let pathbuf = PathBuf::from_str(&connection_string).unwrap();
477
478 let conn = Connection::open(pathbuf.clone()).unwrap();
479 (pathbuf.clone(), conn)
480 }
481}
482
483#[cfg(test)]
484mod test {
485 use ark::test_util::VTXO_VECTORS;
486
487 use crate::{persist::sqlite::helpers::in_memory_db, vtxo::VtxoState};
488
489 use super::*;
490
491 #[tokio::test]
492 async fn test_add_and_retrieve_vtxos() {
493 let vtxo_1 = &VTXO_VECTORS.board_vtxo;
494 let vtxo_2 = &VTXO_VECTORS.arkoor_htlc_out_vtxo;
495 let vtxo_3 = &VTXO_VECTORS.round2_vtxo;
496
497 let (cs, conn) = in_memory_db();
498 let db = SqliteClient::open(cs).unwrap();
499
500 db.store_vtxos(&[
501 (vtxo_1, &VtxoState::Spendable), (vtxo_2, &VtxoState::Spendable)
502 ]).await.unwrap();
503
504 let vtxo_1_db = db.get_wallet_vtxo(vtxo_1.id()).await.expect("No error").expect("A vtxo was found");
506 assert_eq!(vtxo_1_db.vtxo, *vtxo_1);
507
508 assert!(db.get_wallet_vtxo(vtxo_3.id()).await.expect("No error").is_none());
510
511 let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
513 assert_eq!(vtxos.len(), 2);
514 assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_1));
515 assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_2));
516 assert!(!vtxos.iter().any(|v| v.vtxo == *vtxo_3));
517
518 db.update_vtxo_state_checked(
520 vtxo_1.id(), VtxoState::Spent, &VtxoStateKind::UNSPENT_STATES,
521 ).await.unwrap();
522
523 let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
524 assert_eq!(vtxos.len(), 1);
525
526 db.store_vtxos(&[(vtxo_3, &VtxoState::Spendable)]).await.unwrap();
528
529 let vtxos = db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await.unwrap();
530 assert_eq!(vtxos.len(), 2);
531 assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_2));
532 assert!(vtxos.iter().any(|v| v.vtxo == *vtxo_3));
533
534 conn.close().unwrap();
535 }
536
537 #[tokio::test]
538 #[cfg(feature = "onchain-bdk")]
539 async fn test_create_wallet_then_load() {
540 use bdk_wallet::chain::DescriptorExt;
541
542 let (connection_string, conn) = in_memory_db();
543
544 let db = SqliteClient::open(connection_string).unwrap();
545 let network = bitcoin::Network::Testnet;
546
547 let seed = bip39::Mnemonic::generate(12).unwrap().to_seed("");
548 let xpriv = bitcoin::bip32::Xpriv::new_master(network, &seed).unwrap();
549
550 let desc = format!("tr({}/84'/0'/0'/*)", xpriv);
551
552 let _ = db.initialize_bdk_wallet().await.unwrap();
554 let mut created = bdk_wallet::Wallet::create_single(desc.clone())
555 .network(network)
556 .create_wallet_no_persist()
557 .unwrap();
558 db.store_bdk_wallet_changeset(&created.take_staged().unwrap()).await.unwrap();
559
560 let loaded = {
561 let changeset = db.initialize_bdk_wallet().await.unwrap();
562 bdk_wallet::Wallet::load()
563 .descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
564 .extract_keys()
565 .check_network(network)
566 .load_wallet_no_persist(changeset)
567 .unwrap()
568 };
569
570 assert!(loaded.is_some());
571 assert_eq!(
572 created.public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id(),
573 loaded.unwrap().public_descriptor(bdk_wallet::KeychainKind::External).descriptor_id()
574 );
575
576 conn.close().unwrap();
579 }
580
581 #[tokio::test]
582 async fn differential_bark_persister_suite() {
583 let (cs, _conn) = helpers::in_memory_db();
584 let sqlite = SqliteClient::open(cs).unwrap();
585 let memory = crate::persist::adaptor::StorageAdaptorWrapper::new(
586 crate::persist::adaptor::memory::MemoryStorageAdaptor::new(),
587 );
588 crate::persist::test_suite::run_all(&sqlite, &memory).await;
589 }
590}