use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use anyhow::Context;
use ark::time::timestamp_secs;
use bdk_esplora::EsploraAsyncExt;
use bdk_wallet::chain::{ChainPosition, CheckPoint};
use bdk_wallet::Wallet as BdkWallet;
use bdk_wallet::coin_selection::DefaultCoinSelectionAlgorithm;
use bdk_wallet::{Balance, KeychainKind, LocalOutput, TxBuilder, TxOrdering};
use bitcoin::{
Address, Amount, FeeRate, Network, OutPoint, Psbt, Sequence, Transaction, TxOut, Txid, Weight, bip32, psbt
};
use log::{debug, error, info, trace, warn};
use ark::vtxo::policy::signing::VtxoSigner;
use bitcoin_ext::{BlockHeight, BlockRef};
use bitcoin_ext::bdk::{CpfpInternalError, WalletExt};
use bitcoin_ext::cpfp::CpfpError;
use bitcoin_ext::rpc::RpcApi;
use crate::chain::{ChainSource, ChainSourceClient};
use crate::exit::{ExitVtxo, ExitState};
use crate::onchain::{
ChainSync, GetBalance, GetSpendingTx, GetWalletTx, LocalUtxo,
MakeCpfp, MakeCpfpFees, PreparePsbt, SignPsbt, Utxo
};
use crate::persist::BarkPersister;
use crate::psbtext::PsbtInputExt;
use crate::Wallet;
const STOP_GAP: usize = 50;
const PARALLEL_REQS: usize = 4;
const GENESIS_HEIGHT: u32 = 0;
impl From<LocalOutput> for LocalUtxo {
fn from(value: LocalOutput) -> Self {
LocalUtxo {
outpoint: value.outpoint,
amount: value.txout.value,
confirmation_height: value.chain_position.confirmation_height_upper_bound(),
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait TxBuilderExt: Send + Sync {
async fn add_exit_claim_inputs(
&mut self,
wallet: &Wallet,
exit_outputs: &[&ExitVtxo],
) -> anyhow::Result<()>;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<Cs: Send + Sync> TxBuilderExt for TxBuilder<'_, Cs> {
async fn add_exit_claim_inputs(
&mut self,
wallet: &Wallet,
exit_outputs: &[&ExitVtxo],
) -> anyhow::Result<()> {
self.version(2);
for input in exit_outputs {
if !matches!(input.state(), ExitState::Claimable(..)) {
bail!("VTXO exit is not spendable");
}
let vtxo = wallet.db.get_wallet_vtxo(input.id()).await?
.context(format!("Unable to load VTXO for exit: {}", input.id()))?;
let mut psbt_in = psbt::Input::default();
psbt_in.set_exit_claim_input(&vtxo);
psbt_in.witness_utxo = Some(TxOut {
script_pubkey: vtxo.output_script_pubkey(),
value: vtxo.amount(),
});
let clause = wallet.find_signable_clause(&vtxo).await
.context("Cannot sign vtxo")?;
let witness_weight = {
let witness_size = clause.witness_size(&vtxo);
Weight::from_witness_data_size(witness_size as u64)
};
self.add_foreign_utxo_with_sequence(
vtxo.point(),
psbt_in,
witness_weight,
clause.sequence().unwrap_or(Sequence::ZERO),
).expect("error adding foreign utxo for claim input");
}
Ok(())
}
}
impl <W: Deref<Target = BdkWallet>> GetBalance for W {
fn get_balance(&self) -> Amount {
self.deref().balance().total()
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl SignPsbt for BdkWallet {
async fn finish_tx(&mut self, mut psbt: Psbt) -> anyhow::Result<Transaction> {
#[allow(deprecated)]
let opts = bdk_wallet::SignOptions {
trust_witness_utxo: true,
..Default::default()
};
let finalized = self.sign(&mut psbt, opts).context("signing error")?;
assert!(finalized);
let tx = psbt.extract_tx()?;
self.apply_unconfirmed_txs([(tx.clone(), timestamp_secs())]);
Ok(tx)
}
}
impl <W: Deref<Target = BdkWallet>> GetWalletTx for W {
fn get_wallet_tx(&self, txid: Txid) -> Option<Arc<Transaction>> {
self.deref().get_tx(txid).map(|tx| tx.tx_node.tx)
}
fn get_wallet_tx_confirmed_block(&self, txid: Txid) -> anyhow::Result<Option<BlockRef>> {
match self.deref().get_tx(txid) {
Some(tx) => match tx.chain_position {
ChainPosition::Confirmed { anchor, .. } => Ok(Some(anchor.block_id.into())),
ChainPosition::Unconfirmed { .. } => Ok(None),
},
None => Err(anyhow!("Tx {} does not exist in the wallet", txid)),
}
}
}
impl <W: DerefMut<Target = BdkWallet>> PreparePsbt for W {
fn prepare_tx(
&mut self,
destinations: &[(Address, Amount)],
fee_rate: FeeRate,
) -> anyhow::Result<Psbt> {
let mut b = self.deref_mut().build_tx();
b.ordering(TxOrdering::Untouched);
for (dest, amount) in destinations {
b.add_recipient(dest.script_pubkey(), *amount);
}
b.fee_rate(fee_rate);
b.finish().context("error building tx")
}
fn prepare_drain_tx(
&mut self,
destination: Address,
fee_rate: FeeRate,
) -> anyhow::Result<Psbt> {
let mut b = self.deref_mut().build_tx();
b.drain_to(destination.script_pubkey());
b.fee_rate(fee_rate);
b.drain_wallet();
b.finish().context("error building tx")
}
}
impl <W: Deref<Target = BdkWallet>> GetSpendingTx for W {
fn get_spending_tx(&self, outpoint: OutPoint) -> Option<Arc<Transaction>> {
for transaction in self.deref().transactions() {
if transaction.tx_node.tx.input.iter().any(|i| i.previous_output == outpoint) {
return Some(transaction.tx_node.tx);
}
}
None
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl MakeCpfp for BdkWallet {
fn make_signed_p2a_cpfp(
&mut self,
tx: &Transaction,
fees: MakeCpfpFees,
) -> Result<Transaction, CpfpError> {
WalletExt::make_signed_p2a_cpfp(self, tx, fees)
.inspect_err(|e| error!("Error creating signed P2A CPFP: {}", e))
.map_err(|e| match e {
CpfpInternalError::General(s) => CpfpError::InternalError(s),
CpfpInternalError::Create(e) => CpfpError::CreateError(e.to_string()),
CpfpInternalError::Extract(e) => CpfpError::FinalizeError(e.to_string()),
CpfpInternalError::Fee() => CpfpError::InternalError(e.to_string()),
CpfpInternalError::FinalizeError(s) => CpfpError::FinalizeError(s),
CpfpInternalError::InsufficientConfirmedFunds(f) => {
CpfpError::InsufficientConfirmedFunds {
needed: f.needed, available: f.available,
}
},
CpfpInternalError::NoFeeAnchor(txid) => CpfpError::NoFeeAnchor(txid),
CpfpInternalError::Signer(e) => CpfpError::SigningError(e.to_string()),
})
}
async fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
self.apply_unconfirmed_txs([(tx.clone(), timestamp_secs())]);
trace!("Unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
Ok(())
}
}
pub struct OnchainWallet {
pub inner: BdkWallet,
db: Arc<dyn BarkPersister>,
}
impl Deref for OnchainWallet {
type Target = BdkWallet;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for OnchainWallet {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl OnchainWallet {
pub async fn load_or_create(network: Network, seed: [u8; 64], db: Arc<dyn BarkPersister>) -> anyhow::Result<Self> {
let xpriv = bip32::Xpriv::new_master(network, &seed).expect("valid seed");
let desc = bdk_wallet::template::Bip86(xpriv, KeychainKind::External);
let changeset = db.initialize_bdk_wallet().await.context("error reading bdk wallet state")?;
let wallet_opt = bdk_wallet::Wallet::load()
.descriptor(bdk_wallet::KeychainKind::External, Some(desc.clone()))
.extract_keys()
.check_network(network)
.load_wallet_no_persist(changeset)?;
let wallet = match wallet_opt {
Some(wallet) => wallet,
None => bdk_wallet::Wallet::create_single(desc)
.network(network)
.create_wallet_no_persist()?,
};
Ok(Self { inner: wallet, db })
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl MakeCpfp for OnchainWallet {
fn make_signed_p2a_cpfp(
&mut self,
tx: &Transaction,
fees: MakeCpfpFees,
) -> Result<Transaction, CpfpError> {
MakeCpfp::make_signed_p2a_cpfp(&mut self.inner, tx, fees)
}
async fn store_signed_p2a_cpfp(&mut self, tx: &Transaction) -> anyhow::Result<(), CpfpError> {
self.inner.store_signed_p2a_cpfp(tx).await?;
self.persist().await
.map_err(|e| CpfpError::StoreError(e.to_string()))
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl SignPsbt for OnchainWallet {
async fn finish_tx(&mut self, psbt: Psbt) -> anyhow::Result<Transaction> {
let tx = self.inner.finish_tx(psbt).await?;
self.persist().await?;
Ok(tx)
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl ChainSync for OnchainWallet {
async fn sync(&mut self, chain: &ChainSource) -> anyhow::Result<()> {
debug!("Starting wallet sync...");
debug!("Starting balance: {}", self.inner.balance());
trace!("Starting unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
match chain.inner() {
ChainSourceClient::Bitcoind(bitcoind) => {
let prev_tip = self.inner.latest_checkpoint();
self.inner_sync_bitcoind(bitcoind, prev_tip).await?;
},
ChainSourceClient::Esplora(client) => {
debug!("Syncing with esplora...");
let request = self.inner.start_sync_with_revealed_spks()
.outpoints(self.list_unspent().iter().map(|o| o.outpoint).collect::<Vec<_>>())
.txids(self.inner.transactions().map(|tx| tx.tx_node.txid).collect::<Vec<_>>());
let update = client.sync(request, PARALLEL_REQS).await?;
self.inner.apply_update(update)?;
self.persist().await?;
debug!("Finished syncing with esplora");
},
}
debug!("Current balance: {}", self.inner.balance());
trace!("Current unconfirmed txs: {:?}", self.unconfirmed_txids().collect::<Vec<_>>());
self.rebroadcast_txs(chain, timestamp_secs()).await?;
Ok(())
}
}
impl OnchainWallet {
pub fn balance(&self) -> Balance {
self.inner.balance()
}
pub fn list_unspent(&self) -> Vec<LocalOutput> {
self.inner.list_unspent().collect()
}
pub fn list_transactions(&self) -> Vec<Arc<Transaction>> {
self.inner.transactions().map(|tx| tx.tx_node.tx).collect()
}
pub async fn address(&mut self) -> anyhow::Result<Address> {
let ret = self.inner.reveal_next_address(bdk_wallet::KeychainKind::External).address;
self.persist().await?;
Ok(ret)
}
pub fn utxos(&self) -> Vec<Utxo> {
self.list_unspent().into_iter().map(|o| Utxo::Local(o.into())).collect()
}
pub async fn send(&mut self, chain: &ChainSource, dest: Address, amount: Amount, fee_rate: FeeRate
) -> anyhow::Result<Txid> {
let psbt = self.prepare_tx(&[(dest, amount)], fee_rate)?;
let tx = self.finish_tx(psbt).await?;
chain.broadcast_tx(&tx).await?;
Ok(tx.compute_txid())
}
pub async fn send_many(
&mut self,
chain: &ChainSource,
destinations: &[(Address, Amount)],
fee_rate: FeeRate,
) -> anyhow::Result<Txid> {
let pbst = self.prepare_tx(destinations, fee_rate)?;
let tx = self.finish_tx(pbst).await?;
chain.broadcast_tx(&tx).await?;
Ok(tx.compute_txid())
}
pub async fn drain(
&mut self,
chain: &ChainSource,
destination: Address,
fee_rate: FeeRate,
) -> anyhow::Result<Txid> {
let psbt = self.prepare_drain_tx(destination, fee_rate)?;
let tx = self.finish_tx(psbt).await?;
chain.broadcast_tx(&tx).await?;
Ok(tx.compute_txid())
}
pub fn build_tx(&mut self) -> TxBuilder<'_, DefaultCoinSelectionAlgorithm> {
self.inner.build_tx()
}
async fn inner_sync_bitcoind(
&mut self,
bitcoind: &bitcoin_ext::rpc::Client,
prev_tip: CheckPoint,
) -> anyhow::Result<()> {
debug!("Syncing with bitcoind, starting at block height {}...", prev_tip.height());
let mut emitter = bdk_bitcoind_rpc::Emitter::new(
bitcoind, prev_tip.clone(), prev_tip.height(), self.unconfirmed_txs()
);
let mut count = 0;
while let Some(em) = emitter.next_block()? {
self.inner.apply_block_connected_to(
&em.block, em.block_height(), em.connected_to(),
)?;
count += 1;
if count % 10_000 == 0 {
self.persist().await?;
info!("Synced until block height {}", em.block_height());
}
}
let mempool = emitter.mempool()?;
self.inner.apply_evicted_txs(mempool.evicted);
self.inner.apply_unconfirmed_txs(mempool.update);
self.persist().await?;
debug!("Finished syncing with bitcoind");
Ok(())
}
async fn rebroadcast_txs(&mut self, chain: &ChainSource, sync_start: u64) -> anyhow::Result<Amount> {
let balance = self.inner.balance();
let transactions = self.inner.transactions().filter(|tx| {
if let ChainPosition::Unconfirmed { last_seen, .. } = tx.chain_position {
match last_seen {
Some(last_seen) => last_seen < sync_start,
None => true,
}
} else {
false
}
}).collect::<Vec<_>>();
for tx in transactions {
if let Err(e) = chain.broadcast_tx(&tx.tx_node.tx).await {
warn!("Error broadcasting tx {}: {}", tx.tx_node.txid, e);
}
}
Ok(balance.total())
}
pub async fn initial_wallet_scan(
&mut self,
chain: &ChainSource,
start_height: Option<BlockHeight>,
) -> anyhow::Result<Amount> {
info!("Starting initial wallet sync...");
debug!("Starting balance: {}", self.inner.balance());
match chain.inner() {
ChainSourceClient::Bitcoind(bitcoind) => {
let height = start_height.unwrap_or(GENESIS_HEIGHT).saturating_sub(1);
let block_hash = bitcoind.get_block_hash(height as u64)?;
self.inner.set_checkpoint(height, block_hash);
self.inner_sync_bitcoind(bitcoind, self.inner.latest_checkpoint()).await?;
},
ChainSourceClient::Esplora(client) => {
debug!("Starting full scan with esplora...");
let request = self.inner.start_full_scan();
let update = client.full_scan(request, STOP_GAP, PARALLEL_REQS).await?;
self.inner.apply_update(update)?;
self.persist().await?;
debug!("Finished scanning with esplora");
},
}
debug!("Current balance: {}", self.inner.balance());
self.rebroadcast_txs(chain, timestamp_secs()).await
}
async fn persist(&mut self) -> anyhow::Result<()> {
if let Some(stage) = self.inner.staged() {
self.db.store_bdk_wallet_changeset(&*stage).await?;
let _ = self.inner.take_staged();
}
Ok(())
}
}