use std::collections::HashMap;
use std::sync::{Arc, Weak};
use bitcoin::{Network, Transaction, Txid};
use log::{debug, error, info, trace};
use tokio::sync::RwLock;
use ark::vtxo::Full;
use ark::Vtxo;
use bitcoin_ext::{BlockHeight, TransactionExt, TxStatus, DEEPLY_CONFIRMED};
use crate::chain::ChainSource;
use crate::exit::models::{ChildTransactionInfo, ExitChildStatus, ExitError, ExitTransactionPackage, ExitTxOrigin, TransactionInfo};
use crate::onchain::ExitUnilaterally;
use crate::persist::BarkPersister;
pub struct ExitTransactionManager {
persister: Arc<dyn BarkPersister>,
chain_source: Arc<ChainSource>,
packages: Vec<Arc<RwLock<ExitTransactionPackage>>>,
index: HashMap<Txid, Weak<RwLock<ExitTransactionPackage>>>,
status: HashMap<Txid, TxStatus>,
}
impl ExitTransactionManager {
pub fn new(
persister: Arc<dyn BarkPersister>,
chain_source: Arc<ChainSource>,
) -> anyhow::Result<Self> {
Ok(ExitTransactionManager {
persister,
chain_source,
packages: Vec::new(),
index: HashMap::new(),
status: HashMap::new(),
})
}
pub fn network(&self) -> Network {
self.chain_source.network()
}
pub async fn track_vtxo_exits(
&mut self,
vtxo: &Vtxo<Full>,
onchain: &dyn ExitUnilaterally,
) -> anyhow::Result<Vec<Txid>, ExitError> {
let exit_txs = vtxo.transactions();
let mut txids = Vec::with_capacity(exit_txs.len());
for tx in exit_txs {
txids.push(self.track_exit_tx(tx.tx, onchain).await?);
}
Ok(txids)
}
pub async fn track_exit_tx(
&mut self,
tx: Transaction,
onchain: &dyn ExitUnilaterally,
) -> anyhow::Result<Txid, ExitError> {
let txid = tx.compute_txid();
if self.index.contains_key(&txid) {
return Ok(txid);
}
trace!("Tracking exit tx {}", txid);
let package = {
let info = TransactionInfo { txid, tx };
let child = self.find_child_locally(&info, onchain).await?;
trace!("Found local child for exit tx {}: {}", txid, child.is_some());
ExitTransactionPackage {
child,
exit: info,
}
};
let (status, child_txid) = match package.child.as_ref() {
None => (TxStatus::NotFound, None),
Some(child) => {
if let Some(block) = child.origin.confirmed_in() {
(TxStatus::Confirmed(block), Some(child.info.txid))
}
else {
(TxStatus::Mempool, Some(child.info.txid))
}
}
};
let package = Arc::new(RwLock::new(package));
self.index.insert(txid, Arc::downgrade(&package));
if let Some(child_txid) = child_txid {
self.index.insert(child_txid, Arc::downgrade(&package));
}
self.status.insert(txid, status);
self.packages.push(package);
Ok(txid)
}
pub async fn sync(&mut self) -> anyhow::Result<(), ExitError> {
trace!("Syncing exit transaction manager");
let tip = self.tip().await?;
let keys = self.status.keys().cloned().collect::<Vec<_>>();
for txid in keys {
let status = self.status.get(&txid).unwrap();
if let TxStatus::Confirmed(block) = status {
trace!("Skipping deeply confirmed exit tx {}", txid);
if block.height <= (tip - DEEPLY_CONFIRMED) {
continue;
}
}
match self.index.get(&txid) {
None => {
trace!("Updating status for non-exit tx {}", txid);
self.status.insert(txid, self.get_tx_status(txid).await?);
},
Some(weak_ptr) => {
trace!("Update status for exit tx {}", txid);
let package = weak_ptr.upgrade().expect("index contains a stale package");
let status = self.get_tx_status(txid).await?;
trace!("Exit tx {} old status {:?}, new status {:?}", txid, self.status.get(&txid), Some(status));
match status {
TxStatus::NotFound => {
match self.broadcast_package(&*package.read().await).await {
Ok(_) => continue,
Err(ExitError::ExitPackageBroadcastFailure { error, .. }) => {
error!("{}", error);
},
Err(e) => {
return Err(e);
},
}
},
_ => {
trace!("Attempting to update child status from network for exit tx {}", txid);
let status = self.update_child_from_network(
&package,
status.confirmed_height().unwrap_or(tip),
).await?;
self.status.insert(txid, status);
},
}
}
}
}
Ok(())
}
pub async fn get_child_status(
&self,
exit_txid: Txid,
) -> anyhow::Result<Option<ExitChildStatus>, ExitError> {
let package = self.get_package(exit_txid)?;
let guard = package.read().await;
if let Some(child) = &guard.child {
Ok(Some(ExitChildStatus {
txid: child.info.txid,
status: self.status.get(&exit_txid).cloned().expect("status should be set"),
origin: child.origin,
}))
} else {
Ok(None)
}
}
pub async fn get_child_txid(
&self,
exit_txid: Txid,
) -> anyhow::Result<Option<Txid>, ExitError> {
let package = self.get_package(exit_txid)?;
let guard = package.read().await;
if let Some(child) = &guard.child {
Ok(Some(child.info.txid))
} else {
Ok(None)
}
}
pub fn get_package(
&self,
exit_txid: Txid,
) -> anyhow::Result<Arc<RwLock<ExitTransactionPackage>>, ExitError> {
self.index.get(&exit_txid)
.ok_or(ExitError::InternalError {
error: format!("Attempt to get package for untracked exit tx: {}", exit_txid),
})?.upgrade()
.ok_or(ExitError::InternalError {
error: format!("Attempt to get package for stale exit tx: {}", exit_txid),
})
}
pub async fn tx_status(&mut self, txid: Txid) -> anyhow::Result<TxStatus, ExitError> {
if let Some(status) = self.status.get(&txid) {
Ok(status.clone())
} else {
let status = self.get_tx_status(txid).await?;
self.status.insert(txid, status.clone());
Ok(status)
}
}
pub async fn set_wallet_child_tx(
&mut self,
exit_txid: Txid,
child_tx: Transaction,
origin: ExitTxOrigin,
) -> anyhow::Result<Txid, ExitError> {
let package = self.get_package(exit_txid)?;
let child_txid = child_tx.compute_txid();
package.write().await.child = Some(ChildTransactionInfo {
info: TransactionInfo {
txid: child_txid,
tx: child_tx,
},
origin,
});
self.index.insert(child_txid, Arc::downgrade(&package));
self.status.insert(exit_txid, TxStatus::NotFound);
Ok(child_txid)
}
pub async fn broadcast_package(
&mut self,
package: &ExitTransactionPackage,
) -> Result<TxStatus, ExitError> {
if !self.status.contains_key(&package.exit.txid) {
self.status.insert(package.exit.txid, TxStatus::NotFound);
}
let status = match &package.child {
None => {
trace!("Skipping broadcast of exit package with no CPFP: {}", package.exit.txid);
TxStatus::NotFound
},
Some(child) => {
self.chain_source.broadcast_package(&[
&package.exit.tx, &child.info.tx
]).await
.map_err(|e| ExitError::ExitPackageBroadcastFailure {
txid: package.exit.txid,
error: e.to_string(),
})?;
info!("Successfully broadcast exit package: {}", package.exit.txid);
TxStatus::Mempool
}
};
self.status.insert(package.exit.txid, status);
Ok(status)
}
async fn tip(&self) -> anyhow::Result<BlockHeight, ExitError> {
self.chain_source.tip().await
.map_err(|e| ExitError::TipRetrievalFailure { error: e.to_string() })
}
async fn get_tx_status(&self, txid: Txid) -> anyhow::Result<TxStatus, ExitError> {
self.chain_source.tx_status(txid).await
.map_err(|e| ExitError::TransactionRetrievalFailure { txid, error: e.to_string() })
}
async fn find_child_locally(
&self,
exit_info: &TransactionInfo,
onchain: &dyn ExitUnilaterally,
) -> anyhow::Result<Option<ChildTransactionInfo>, ExitError> {
let wallet = self.find_child_in_wallet(exit_info, onchain).await?;
if wallet.is_some() {
Ok(wallet)
} else {
self.find_child_in_database(exit_info).await
}
}
async fn find_child_in_wallet(
&self,
exit_info: &TransactionInfo,
onchain: &dyn ExitUnilaterally,
) -> anyhow::Result<Option<ChildTransactionInfo>, ExitError> {
trace!("Looking for child in wallet for exit tx {}", exit_info.txid);
let (outpoint, _) = exit_info.tx.fee_anchor()
.ok_or_else(|| ExitError::InternalError { error: format!("Exit tx {} has no P2A output", exit_info.txid) })?;
trace!("Checking wallet for spending tx of {}:{}", outpoint.txid, outpoint.vout);
if let Some(child_tx) = onchain.get_spending_tx(outpoint) {
let child_txid = child_tx.compute_txid();
let block = onchain.get_wallet_tx_confirmed_block(child_txid)
.map_err(|e| ExitError::InvalidWalletState { error: e.to_string() })?;
Ok(Some(ChildTransactionInfo {
info: TransactionInfo {
txid: child_txid,
tx: (*child_tx).clone(),
},
origin: ExitTxOrigin::Wallet { confirmed_in: block},
}))
} else {
Ok(None)
}
}
async fn find_child_in_database(
&self,
exit_info: &TransactionInfo,
) -> Result<Option<ChildTransactionInfo>, ExitError> {
trace!("Looking for child in database for exit tx {}", exit_info.txid);
let result = self.persister.get_exit_child_tx(exit_info.txid).await
.map_err(|e| ExitError::DatabaseChildRetrievalFailure { error: e.to_string() })?;
trace!("Database lookup complete for exit tx {}", exit_info.txid);
if let Some((tx, origin)) = result {
Ok(Some(ChildTransactionInfo {
info: TransactionInfo {
txid: tx.compute_txid(),
tx,
},
origin,
}))
} else {
Ok(None)
}
}
async fn update_child_from_network(
&self,
package: &RwLock<ExitTransactionPackage>,
block_scan_start: BlockHeight,
) -> anyhow::Result<TxStatus, ExitError> {
let outpoint = {
let guard = package.read().await;
let (outpoint, _) = guard.exit.tx.fee_anchor()
.ok_or_else(|| ExitError::MissingAnchorOutput { txid: guard.exit.txid })?;
outpoint
};
let spend_results = self.chain_source
.txs_spending_inputs([outpoint.clone()], block_scan_start)
.await
.map_err(|e| ExitError::TransactionRetrievalFailure {
txid: outpoint.txid, error: e.to_string(),
})?;
debug!("txs_spending_inputs for {}: {:?}", outpoint, spend_results);
if let Some((txid, status)) = spend_results.get(&outpoint) {
let mut guard = package.write().await;
let current_txid = if let Some(child) = guard.child.as_mut() {
if matches!(child.origin, ExitTxOrigin::Wallet { .. }) && child.info.txid == *txid {
trace!("Updating block confirmation for wallet child tx {}: {:?}",
child.info.txid, status.confirmed_in(),
);
child.origin = ExitTxOrigin::Wallet { confirmed_in: status.confirmed_in() };
return Ok(status.clone());
}
Some(child.info.txid)
} else {
None
};
let tx = if current_txid.is_none() || current_txid.is_some_and(|t| t != *txid) {
info!("Downloading child tx {} for exit {}", txid, outpoint.txid);
let tx = self.chain_source.get_tx(txid)
.await
.map_err(|e| ExitError::TransactionRetrievalFailure {
txid: *txid, error: e.to_string(),
})?.expect("Spending transaction should exist");
info!("Successfully downloaded child tx {} for exit {}", txid, outpoint.txid);
tx
} else {
debug!("Skipping download of child txid {} for exit {}", txid, outpoint.txid);
guard.child.as_ref().unwrap().info.tx.clone()
};
let origin = if status.confirmed_in().is_some() {
ExitTxOrigin::Block { confirmed_in: status.confirmed_in().unwrap() }
} else {
debug!("Getting mempool ancestor information for exit {}", txid);
match self.chain_source.mempool_ancestor_info(*txid).await {
Ok(info) => {
let fee_rate = info.effective_fee_rate()
.ok_or_else(|| ExitError::AncestorRetrievalFailure {
txid: *txid,
error: format!("unable to calculate fee rate for {}", txid),
})?;
ExitTxOrigin::Mempool {
fee_rate, total_fee: info.total_fee,
}
},
Err(e) => {
let new_status = self.get_tx_status(*txid).await?;
if let Some(block) = new_status.confirmed_in() {
debug!("Child tx {} was confirmed while querying mempool info", txid);
ExitTxOrigin::Block { confirmed_in: block }
} else {
return Err(ExitError::AncestorRetrievalFailure {
txid: *txid, error: e.to_string(),
});
}
},
}
};
debug!("Storing child tx {} with origin {} in database", txid, origin);
let r = self.persister.store_exit_child_tx(outpoint.txid, &tx, origin).await;
if let Err(e) = r {
error!("Failed to store confirmed exit child transaction: {}", e);
}
guard.child = Some(ChildTransactionInfo {
info: TransactionInfo { txid: *txid, tx },
origin,
});
Ok(status.clone())
} else {
Ok(TxStatus::NotFound)
}
}
}