mod models;
mod vtxo;
pub mod bdk;
pub(crate) mod progress;
pub(crate) mod transaction_manager;
pub use self::models::{
ExitCpfpRequest, ExitTransactionPackage, FeeInfo, RbfRequirement, TransactionInfo,
ChildTransactionInfo, ExitError, ExitState, ExitTx, ExitTxStatus, ExitTxOrigin, ExitStartState,
ExitProcessingState, ExitAwaitingDeltaState, ExitClaimableState, ExitClaimInProgressState,
ExitClaimedState, ExitProgressStatus, ExitTransactionStatus,
};
pub use self::vtxo::ExitVtxo;
use std::borrow::Borrow;
use std::cmp;
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Context;
use bitcoin::{
Address, Amount, FeeRate, Psbt, ScriptBuf, Sequence, Transaction, TxIn, TxOut, Txid, Witness, sighash
};
use bitcoin::consensus::Params;
use log::{error, info, trace, warn};
use ark::{Vtxo, VtxoId};
use ark::vtxo::Bare;
use ark::vtxo::policy::signing::VtxoSigner;
use bitcoin_ext::{BlockHeight, P2TR_DUST};
use crate::Wallet;
use crate::chain::ChainSource;
use crate::exit::transaction_manager::ExitTransactionManager;
use crate::movement::{MovementDestination, MovementStatus, PaymentMethod};
use crate::movement::manager::MovementManager;
use crate::movement::update::MovementUpdate;
use crate::persist::BarkPersister;
use crate::persist::models::StoredExit;
use crate::psbtext::PsbtInputExt;
use crate::subsystem::{ExitMovement, Subsystem};
use crate::vtxo::{VtxoState, VtxoStateKind};
pub(crate) struct ExitInner {
tx_manager: ExitTransactionManager,
persister: Arc<dyn BarkPersister>,
chain_source: Arc<ChainSource>,
movement_manager: Arc<MovementManager>,
exit_vtxos: Vec<ExitVtxo>,
}
impl ExitInner {
async fn start_exit_for_vtxos(
&mut self,
vtxos: &[impl Borrow<Vtxo<Bare>>],
) -> anyhow::Result<()> {
if vtxos.is_empty() {
return Ok(());
}
let tip = self.chain_source.tip().await?;
let params = Params::new(self.chain_source.network());
for vtxo in vtxos {
let vtxo = vtxo.borrow();
let vtxo_id = vtxo.id();
if self.exit_vtxos.iter().any(|ev| ev.id() == vtxo_id) {
warn!("VTXO {} is already in the exit process", vtxo_id);
continue;
}
if vtxo.amount() < P2TR_DUST {
return Err(ExitError::DustLimit {
vtxo: vtxo.amount(),
dust: P2TR_DUST,
}.into());
}
trace!("Starting exit for VTXO: {}", vtxo_id);
let exit = ExitVtxo::new(vtxo, tip);
self.persister.store_exit_vtxo_entry(&StoredExit::new(&exit)).await?;
self.persister.update_vtxo_state_checked(
vtxo_id, VtxoState::Spent, &VtxoStateKind::UNSPENT_STATES,
).await?;
self.exit_vtxos.push(exit);
trace!("Exit for VTXO started successfully: {}", vtxo_id);
let balance = -vtxo.amount().to_signed()?;
let script_pubkey = vtxo.output_script_pubkey();
let payment_method = match Address::from_script(&script_pubkey, ¶ms) {
Ok(addr) => PaymentMethod::Bitcoin(addr.into_unchecked()),
Err(e) => {
warn!("Unable to convert script pubkey to address: {:#}", e);
PaymentMethod::OutputScript(script_pubkey)
}
};
self.movement_manager.new_finished_movement(
Subsystem::EXIT,
ExitMovement::Exit.to_string(),
MovementStatus::Successful,
MovementUpdate::new()
.intended_and_effective_balance(balance)
.consumed_vtxo(vtxo_id)
.sent_to([MovementDestination::new(payment_method, vtxo.amount())]),
).await.context("Failed to register exit movement")?;
}
Ok(())
}
async fn refresh_tx_state(&mut self) -> anyhow::Result<()> {
let mut exit_vtxos = std::mem::take(&mut self.exit_vtxos);
for exit in &mut exit_vtxos {
if !exit.is_initialized() {
match exit.initialize(&mut self.tx_manager, &*self.persister).await {
Ok(()) => continue,
Err(e) => {
error!("Error initializing exit for VTXO {}: {:#}", exit.id(), e);
}
}
}
}
self.exit_vtxos = exit_vtxos;
self.tx_manager.sync().await?;
Ok(())
}
async fn sign_exit_claim_inputs(
&self,
psbt: &mut Psbt,
wallet: &Wallet,
) -> anyhow::Result<()> {
let prevouts = psbt.inputs.iter()
.map(|i| i.witness_utxo.clone().unwrap())
.collect::<Vec<_>>();
let prevouts = sighash::Prevouts::All(&prevouts);
let mut shc = sighash::SighashCache::new(&psbt.unsigned_tx);
let claimable = self.exit_vtxos.iter()
.filter(|ev| ev.is_claimable())
.map(|e| (e.id(), e))
.collect::<HashMap<_, _>>();
for (i, input) in psbt.inputs.iter_mut().enumerate() {
let vtxo = input.get_exit_claim_input();
if let Some(vtxo) = vtxo {
let exit_vtxo = claimable.get(&vtxo.id()).context("vtxo is not claimable yet")?;
let witness = wallet.sign_input(&vtxo, i, &mut shc, &prevouts).await
.map_err(|e| ExitError::ClaimSigningError { error: e.to_string() })?;
input.final_script_witness = Some(witness);
let _ = exit_vtxo;
}
}
Ok(())
}
}
pub struct Exit {
inner: Arc<tokio::sync::RwLock<ExitInner>>,
}
impl Exit {
pub(crate) async fn new(
persister: Arc<dyn BarkPersister>,
chain_source: Arc<ChainSource>,
movement_manager: Arc<MovementManager>,
) -> anyhow::Result<Exit> {
let tx_manager = ExitTransactionManager::new(persister.clone(), chain_source.clone())?;
let inner = ExitInner {
exit_vtxos: Vec::new(),
tx_manager,
persister,
chain_source,
movement_manager,
};
Ok(Exit { inner: Arc::new(tokio::sync::RwLock::new(inner)) })
}
pub(crate) async fn load(&self) -> anyhow::Result<()> {
let mut guard = self.inner.write().await;
let inner = &mut *guard;
let exit_vtxo_entries = inner.persister.get_exit_vtxo_entries().await?;
inner.exit_vtxos.reserve(exit_vtxo_entries.len());
for entry in exit_vtxo_entries {
if let Some(vtxo) = inner.persister.get_wallet_vtxo(entry.vtxo_id).await? {
let mut exit = ExitVtxo::from_entry(entry, &vtxo);
exit.initialize(&mut inner.tx_manager, &*inner.persister).await?;
inner.exit_vtxos.push(exit);
} else {
error!("VTXO {} is marked for exit but it's missing from the database", entry.vtxo_id);
}
}
Ok(())
}
pub async fn get_exit_status(
&self,
vtxo_id: VtxoId,
include_history: bool,
include_transactions: bool,
) -> Result<Option<ExitTransactionStatus>, ExitError> {
let guard = self.inner.read().await;
match guard.exit_vtxos.iter().find(|ev| ev.id() == vtxo_id) {
None => Ok(None),
Some(exit) => {
let mut txs = Vec::new();
if include_transactions {
if let Some(txids) = exit.txids() {
txs.reserve(txids.len());
for txid in txids {
txs.push(guard.tx_manager.get_package(*txid)?.read().await.clone());
}
} else {
let exit_vtxo = exit.get_full_vtxo(&*guard.persister).await?;
for tx in exit_vtxo.transactions() {
txs.push(ExitTransactionPackage {
exit: TransactionInfo {
txid: tx.tx.compute_txid(),
tx: tx.tx,
},
child: None,
})
}
}
}
Ok(Some(ExitTransactionStatus {
vtxo_id: exit.id(),
state: exit.state().clone(),
history: if include_history { Some(exit.history().clone()) } else { None },
transactions: txs,
}))
},
}
}
pub async fn get_exit_vtxo(&self, vtxo_id: VtxoId) -> Option<ExitVtxo> {
let guard = self.inner.read().await;
guard.exit_vtxos.iter().find(|ev| ev.id() == vtxo_id).cloned()
}
pub async fn get_exit_vtxos(&self) -> Vec<ExitVtxo> {
let guard = self.inner.read().await;
guard.exit_vtxos.clone()
}
pub async fn has_pending_exits(&self) -> bool {
let guard = self.inner.read().await;
guard.exit_vtxos.iter().any(|ev| ev.state().is_pending())
}
pub fn try_pending_total(&self) -> Option<Amount> {
self.inner.try_read().ok().map(|guard| {
guard.exit_vtxos.iter()
.filter_map(|ev| if ev.state().is_pending() { Some(ev.amount()) } else { None })
.sum()
})
}
pub async fn all_claimable_at_height(&self) -> Option<BlockHeight> {
let guard = self.inner.read().await;
let mut highest_claimable_height = None;
for exit in &guard.exit_vtxos {
if matches!(exit.state(), ExitState::Claimed(..)) {
continue;
}
match exit.state().claimable_height() {
Some(h) => highest_claimable_height = cmp::max(highest_claimable_height, Some(h)),
None => return None,
}
}
highest_claimable_height
}
pub async fn start_exit_for_entire_wallet(&self) -> anyhow::Result<()> {
let mut guard = self.inner.write().await;
let all_vtxos = guard.persister.get_vtxos_by_state(&VtxoStateKind::UNSPENT_STATES).await?
.into_iter().map(|v| v.vtxo);
let (eligible, dust) = all_vtxos.partition::<Vec<_>, _>(|v| v.amount() >= P2TR_DUST);
for vtxo in &dust {
warn!(
"Skipping dust VTXO {}: {} sats is below the dust limit ({} sats).",
vtxo.id(), vtxo.amount().to_sat(), P2TR_DUST.to_sat()
);
}
if eligible.is_empty() && !dust.is_empty() {
warn!(
"Exit not started: all {} VTXOs (total {}) are below the dust limit. \
To exit and consolidate dust, you need to refresh your VTXOs first \
(requires total balance >= {})",
dust.len(),
dust.iter().map(|v| v.amount()).sum::<Amount>(),
P2TR_DUST,
);
return Ok(());
}
guard.start_exit_for_vtxos(&eligible).await
}
pub async fn start_exit_for_vtxos(
&self,
vtxos: &[impl Borrow<Vtxo<Bare>>],
) -> anyhow::Result<()> {
let mut guard = self.inner.write().await;
guard.start_exit_for_vtxos(vtxos).await
}
pub(crate) async fn dangerous_clear_exit(&self) -> anyhow::Result<()> {
let mut guard = self.inner.write().await;
for exit in &guard.exit_vtxos {
guard.persister.remove_exit_vtxo_entry(&exit.id()).await?;
}
guard.exit_vtxos.clear();
Ok(())
}
pub async fn progress_exits(
&self,
wallet: &Wallet,
) -> anyhow::Result<Option<Vec<ExitProgressStatus>>> {
let mut guard = self.inner.write().await;
guard.refresh_tx_state().await?;
let mut exit_vtxos = std::mem::take(&mut guard.exit_vtxos);
let mut exit_statuses = Vec::with_capacity(exit_vtxos.len());
for ev in exit_vtxos.iter_mut() {
if !ev.is_initialized() {
warn!("Skipping progress of uninitialized unilateral exit {}", ev.id());
continue;
}
info!("Progressing exit for VTXO {}", ev.id());
let error = match ev.progress(
wallet,
&mut guard.tx_manager,
true,
).await {
Ok(_) => None,
Err(e) => {
match &e {
ExitError::InsufficientConfirmedFunds { .. } => {
warn!("Can't progress exit for VTXO {} at this time: {}", ev.id(), e);
},
_ => {
error!("Error progressing exit for VTXO {}: {}", ev.id(), e);
}
}
Some(e)
}
};
if !matches!(ev.state(), ExitState::Claimed(..)) {
exit_statuses.push(ExitProgressStatus {
vtxo_id: ev.id(),
state: ev.state().clone(),
error,
});
}
}
guard.exit_vtxos = exit_vtxos;
Ok(Some(exit_statuses))
}
pub async fn sync(
&self,
wallet: &Wallet,
) -> anyhow::Result<()> {
let mut guard = self.inner.write().await;
guard.refresh_tx_state().await?;
let mut exit_vtxos = std::mem::take(&mut guard.exit_vtxos);
for exit in &mut exit_vtxos {
if let Err(e) = exit.progress(
wallet, &mut guard.tx_manager, false,
).await {
error!("Error syncing exit for VTXO {}: {}", exit.id(), e);
}
}
guard.exit_vtxos = exit_vtxos;
Ok(())
}
pub async fn exits_needing_cpfp(&self) -> Vec<ExitCpfpRequest> {
let guard = self.inner.read().await;
let mut requests = Vec::new();
for ev in &guard.exit_vtxos {
let ExitState::Processing(s) = ev.state() else { continue };
for tx in &s.transactions {
let rbf_requirement = match &tx.status {
ExitTxStatus::AwaitingCpfpBroadcast => None,
ExitTxStatus::AwaitingConfirmation { origin: ExitTxOrigin::Mempool, .. } => {
match guard.tx_manager.get_child_status(tx.txid).await {
Ok(Some(c)) => match c.fee_info {
Some(fi) => Some(RbfRequirement {
min_fee_rate: fi.fee_rate,
current_package_fee: fi.total_fee,
}),
None => continue,
},
_ => continue,
}
},
_ => continue,
};
let package = match guard.tx_manager.get_package(tx.txid) {
Ok(p) => p,
Err(_) => continue,
};
let exit_tx = package.read().await.exit.tx.clone();
requests.push(ExitCpfpRequest {
vtxo_id: ev.id(),
exit_tx,
rbf_requirement,
});
}
}
requests
}
pub async fn provide_cpfp_tx(
&self,
wallet: &Wallet,
exit_txid: Txid,
child_tx: Transaction,
) -> anyhow::Result<(), ExitError> {
let origin = ExitTxOrigin::Wallet { confirmed_in: None };
let mut guard = self.inner.write().await;
let inner = &mut *guard;
inner.tx_manager.set_wallet_child_tx(exit_txid, child_tx, origin).await?;
let package = inner.tx_manager.get_package(exit_txid)?;
let pkg_guard = package.read().await;
match inner.tx_manager.broadcast_package(&*pkg_guard).await {
Ok(_) => {},
Err(ExitError::ExitPackageBroadcastFailure { ref error, .. })
if error.is_mempool_conflict() =>
{
warn!("CPFP broadcast conflict for {}: {} — another CPFP may already be in mempool", exit_txid, error);
},
Err(e) => return Err(e),
}
drop(pkg_guard);
for ev in inner.exit_vtxos.iter_mut() {
let ExitState::Processing(s) = ev.state() else { continue };
let has_tx = s.transactions.iter().any(|tx| tx.txid == exit_txid);
if has_tx {
if let Err(e) = ev.progress(wallet, &mut inner.tx_manager, false).await {
warn!("Failed to progress exit for {} after CPFP: {}", exit_txid, e);
}
break;
}
}
Ok(())
}
pub async fn list_claimable(&self) -> Vec<ExitVtxo> {
let guard = self.inner.read().await;
guard.exit_vtxos.iter().filter(|ev| ev.is_claimable()).cloned().collect()
}
pub async fn sign_exit_claim_inputs(&self, psbt: &mut Psbt, wallet: &Wallet) -> anyhow::Result<()> {
let guard = self.inner.read().await;
guard.sign_exit_claim_inputs(psbt, wallet).await
}
pub async fn drain_exits(
&self,
inputs: &[impl Borrow<ExitVtxo>],
wallet: &Wallet,
address: Address,
fee_rate_override: Option<FeeRate>,
) -> anyhow::Result<Psbt, ExitError> {
let guard = self.inner.read().await;
let tip = guard.chain_source.tip().await
.map_err(|e| ExitError::TipRetrievalFailure { error: e.to_string() })?;
if inputs.is_empty() {
return Err(ExitError::ClaimMissingInputs);
}
let mut vtxos = HashMap::with_capacity(inputs.len());
for input in inputs {
let i = input.borrow();
let vtxo = i.get_full_vtxo(&*guard.persister).await?;
vtxos.insert(i.id(), vtxo);
}
let mut tx = {
let mut output_amount = Amount::ZERO;
let mut tx_ins = Vec::with_capacity(inputs.len());
for input in inputs {
let input = input.borrow();
let vtxo = &vtxos[&input.id()];
if !matches!(input.state(), ExitState::Claimable(..)) {
return Err(ExitError::VtxoNotClaimable { vtxo: input.id() });
}
output_amount += vtxo.amount();
let clause = wallet.find_signable_clause(vtxo).await
.ok_or(ExitError::ClaimMissingSignableClause { vtxo: vtxo.id() })?;
tx_ins.push(TxIn {
previous_output: vtxo.point(),
script_sig: ScriptBuf::default(),
sequence: clause.sequence().unwrap_or(Sequence::ZERO),
witness: Witness::new(),
});
}
let locktime = bitcoin::absolute::LockTime::from_height(tip)
.map_err(|e| ExitError::InvalidLocktime { tip, error: e.to_string() })?;
Transaction {
version: bitcoin::transaction::Version(3),
lock_time: locktime,
input: tx_ins,
output: vec![
TxOut {
script_pubkey: address.script_pubkey(),
value: output_amount,
},
],
}
};
let create_psbt = |tx: Transaction| async {
let mut psbt = Psbt::from_unsigned_tx(tx)
.map_err(|e| ExitError::InternalError {
error: format!("Failed to create exit claim PSBT: {}", e),
})?;
psbt.inputs.iter_mut().zip(inputs).for_each(|(i, e)| {
let v = &vtxos[&e.borrow().id()];
i.set_exit_claim_input(v);
i.witness_utxo = Some(v.txout())
});
guard.sign_exit_claim_inputs(&mut psbt, wallet).await
.map_err(|e| ExitError::ClaimSigningError { error: e.to_string() })?;
Ok(psbt)
};
let fee_amount = {
let fee_rate = fee_rate_override
.unwrap_or(guard.chain_source.fee_rates().await.regular);
fee_rate * create_psbt(tx.clone()).await?
.extract_tx()
.map_err(|e| ExitError::InternalError {
error: format!("Failed to get tx from signed exit claim PSBT: {}", e),
})?
.weight()
};
let needed = fee_amount + P2TR_DUST;
if needed > tx.output[0].value {
return Err(ExitError::ClaimFeeExceedsOutput {
needed, output: tx.output[0].value,
});
}
tx.output[0].value -= fee_amount;
create_psbt(tx).await
}
}