use crate::error::ErrorContext;
use crate::swap_storage::SwapStorage;
use crate::utils::timeout_op;
use crate::wallet::BoardingWallet;
use crate::wallet::OnchainWallet;
use crate::Blockchain;
use crate::Client;
use crate::Error;
use ark_core::asset::AssetId;
use ark_core::coin_select::select_vtxos;
use ark_core::coin_select::select_vtxos_for_asset;
use ark_core::coin_select::VirtualTxOutPoint;
use ark_core::intent;
use ark_core::script::extract_checksig_pubkeys;
use ark_core::send::build_asset_send_transactions;
use ark_core::send::sign_ark_transaction;
use ark_core::send::sign_checkpoint_transaction;
use ark_core::send::OffchainTransactions;
use ark_core::send::SendReceiver;
use ark_core::send::VtxoInput;
use ark_core::server;
use ark_core::server::PendingTx;
use bitcoin::key::Secp256k1;
use bitcoin::psbt;
use bitcoin::secp256k1;
use bitcoin::secp256k1::schnorr;
use bitcoin::Amount;
use bitcoin::OutPoint;
use bitcoin::TxOut;
use bitcoin::Txid;
use bitcoin::XOnlyPublicKey;
use std::collections::HashMap;
use std::collections::HashSet;
use std::time::Duration;
impl<B, W, S, K> Client<B, W, S, K>
where
B: Blockchain,
W: BoardingWallet + OnchainWallet,
S: SwapStorage + 'static,
K: crate::KeyProvider,
{
pub async fn send(&self, receivers: Vec<SendReceiver>) -> Result<Txid, Error> {
let selected = self
.auto_select_send_inputs(&receivers)
.await
.context("failed to auto-select send inputs")?;
let txid = self
.send_with_selected_inputs(selected, receivers)
.await
.context("failed to send with selected inputs")?;
Ok(txid)
}
pub async fn send_selection(
&self,
vtxo_outpoints: &[OutPoint],
receivers: Vec<SendReceiver>,
) -> Result<Txid, Error> {
let selected = self
.resolve_selected_send_inputs(vtxo_outpoints)
.await
.context("failed to resolve selected send inputs")?;
let txid = self
.send_with_selected_inputs(selected, receivers)
.await
.context("failed to send with selected inputs")?;
Ok(txid)
}
pub async fn finalize_pending_offchain_tx(&self, ark_txid: Txid) -> Result<(), Error> {
let pending_txs = self.fetch_pending_offchain_txs().await?;
let pending_tx = pending_txs
.into_iter()
.find(|tx| tx.ark_txid == ark_txid)
.ok_or_else(|| {
Error::ad_hoc(format!(
"no pending transaction found for ark txid {ark_txid}"
))
})?;
self.sign_and_finalize_pending_tx(pending_tx).await
}
pub async fn continue_pending_offchain_txs(&self) -> Result<Vec<Txid>, Error> {
let pending_txs = self.fetch_pending_offchain_txs().await?;
if pending_txs.is_empty() {
return Ok(vec![]);
}
let mut finalized_txids = Vec::new();
for pending_tx in pending_txs {
let ark_txid = pending_tx.ark_txid;
self.sign_and_finalize_pending_tx(pending_tx).await?;
finalized_txids.push(ark_txid);
}
Ok(finalized_txids)
}
pub async fn list_pending_offchain_txs(&self) -> Result<Vec<PendingTx>, Error> {
self.fetch_pending_offchain_txs().await
}
pub async fn submit_offchain_tx(
&self,
vtxo_inputs: Vec<VtxoInput>,
address: ark_core::ArkAddress,
amount: Amount,
) -> Result<Txid, Error> {
let server_info = self.server_info()?;
let receivers = vec![SendReceiver {
address,
amount,
assets: Vec::new(),
}];
let pending_tx = self
.build_and_submit(vtxo_inputs, receivers, &server_info)
.await?;
Ok(pending_tx.ark_txid)
}
fn make_sign_fn(
&self,
) -> impl FnMut(
&mut psbt::Input,
secp256k1::Message,
) -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error>
+ '_ {
|input, msg| {
let script = input
.witness_script
.as_ref()
.ok_or_else(|| ark_core::Error::ad_hoc("Missing witness script for psbt::Input"))?;
let pks = extract_checksig_pubkeys(script);
let secp = Secp256k1::new();
let mut sigs = vec![];
for pk in pks {
if let Ok(keypair) = self.keypair_by_pk(&pk) {
let sig = secp.sign_schnorr_no_aux_rand(&msg, &keypair);
sigs.push((sig, keypair.x_only_public_key().0));
}
}
Ok(sigs)
}
}
async fn auto_select_send_inputs(
&self,
receivers: &[SendReceiver],
) -> Result<Vec<VtxoInput>, Error> {
let (vtxo_list, script_pubkey_to_vtxo_map) = self
.list_vtxos()
.await
.context("failed to get spendable VTXOs")?;
let now = crate::utils::unix_now()?;
let server_info = self.server_info()?;
let spendable = vtxo_list
.spendable_offchain_at(&server_info, now, |script| {
script_pubkey_to_vtxo_map
.get(script)
.map(|vtxo| vtxo.server_pk())
})
.map(|vtxo| VirtualTxOutPoint {
outpoint: vtxo.outpoint,
script_pubkey: vtxo.script.clone(),
expire_at: vtxo.expires_at,
amount: vtxo.amount,
assets: vtxo.assets.clone(),
})
.collect::<Vec<_>>();
let mut selected_outpoints = HashSet::new();
let mut selected = Vec::new();
let mut asset_changes: HashMap<AssetId, u64> = HashMap::new();
let mut btc_needed = Amount::ZERO;
let mut btc_provided = Amount::ZERO;
for receiver in receivers {
btc_needed += receiver.amount;
for asset in &receiver.assets {
let mut amount_to_select = asset.amount;
if let Some(existing_change) = asset_changes.get_mut(&asset.asset_id) {
if amount_to_select <= *existing_change {
*existing_change -= amount_to_select;
if *existing_change == 0 {
asset_changes.remove(&asset.asset_id);
}
continue;
}
amount_to_select -= *existing_change;
asset_changes.remove(&asset.asset_id);
}
let available: Vec<_> = spendable
.iter()
.filter(|v| !selected_outpoints.contains(&v.outpoint))
.cloned()
.collect();
let (asset_coins, asset_change) =
select_vtxos_for_asset(&available, amount_to_select, asset.asset_id)
.map_err(Error::from)
.context("failed to select coins for asset transfer")?;
for coin in &asset_coins {
if selected_outpoints.insert(coin.outpoint) {
btc_provided += coin.amount;
for carried_asset in &coin.assets {
if carried_asset.asset_id != asset.asset_id {
*asset_changes.entry(carried_asset.asset_id).or_insert(0) +=
carried_asset.amount;
}
}
selected.push(coin.clone());
}
}
if asset_change > 0 {
*asset_changes.entry(asset.asset_id).or_insert(0) += asset_change;
}
}
}
if !asset_changes.is_empty() {
btc_needed += server_info.dust;
}
let btc_shortfall = btc_needed.checked_sub(btc_provided).unwrap_or(Amount::ZERO);
if btc_shortfall > Amount::ZERO {
let available: Vec<_> = spendable
.iter()
.filter(|v| !selected_outpoints.contains(&v.outpoint))
.cloned()
.collect();
let btc_coins = select_vtxos(available, btc_shortfall, server_info.dust, true)
.map_err(Error::from)
.context("failed to select BTC coins for asset transfer")?;
for coin in &btc_coins {
if selected_outpoints.insert(coin.outpoint) {
for carried_asset in &coin.assets {
*asset_changes.entry(carried_asset.asset_id).or_insert(0) +=
carried_asset.amount;
}
selected.push(coin.clone());
}
}
}
let inputs = self.build_vtxo_inputs(selected.clone(), &script_pubkey_to_vtxo_map)?;
Ok(inputs)
}
async fn resolve_selected_send_inputs(
&self,
vtxo_outpoints: &[OutPoint],
) -> Result<Vec<VtxoInput>, Error> {
let requested_outpoints: HashSet<_> = vtxo_outpoints.iter().copied().collect();
let (vtxo_list, script_pubkey_to_vtxo_map) = self
.list_vtxos_for_outpoints(vtxo_outpoints.to_vec())
.await
.context("failed to get VTXO list")?;
let now = crate::utils::unix_now()?;
let server_info = self.server_info()?;
let selected: Vec<_> = vtxo_list
.spendable_offchain_at(&server_info, now, |script| {
script_pubkey_to_vtxo_map
.get(script)
.map(|vtxo| vtxo.server_pk())
})
.filter(|vtxo| requested_outpoints.contains(&vtxo.outpoint))
.map(|vtxo| VirtualTxOutPoint {
outpoint: vtxo.outpoint,
script_pubkey: vtxo.script.clone(),
expire_at: vtxo.expires_at,
amount: vtxo.amount,
assets: vtxo.assets.clone(),
})
.collect();
if selected.is_empty() {
return Err(Error::ad_hoc("no matching VTXO outpoints found"));
}
if selected.len() != requested_outpoints.len() {
let found_outpoints: HashSet<_> = selected.iter().map(|v| v.outpoint).collect();
let missing_outpoints = requested_outpoints
.difference(&found_outpoints)
.map(ToString::to_string)
.collect::<Vec<_>>();
return Err(Error::ad_hoc(format!(
"some selected VTXO outpoints were not found or not spendable: {}",
missing_outpoints.join(", ")
)));
}
let inputs = self.build_vtxo_inputs(selected, &script_pubkey_to_vtxo_map)?;
Ok(inputs)
}
pub(crate) fn build_vtxo_inputs(
&self,
selected: Vec<VirtualTxOutPoint>,
script_pubkey_to_vtxo_map: &HashMap<bitcoin::ScriptBuf, ark_core::Vtxo>,
) -> Result<Vec<VtxoInput>, Error> {
selected
.into_iter()
.map(|vtp| {
let vtxo = script_pubkey_to_vtxo_map
.get(&vtp.script_pubkey)
.ok_or_else(|| {
ark_core::Error::ad_hoc(format!(
"missing VTXO for script pubkey: {}",
vtp.script_pubkey
))
})?;
let (forfeit_script, control_block) = vtxo
.forfeit_spend_info()
.context("failed to get forfeit spend info")?;
Ok(VtxoInput::new(
forfeit_script,
None,
control_block,
vtxo.tapscripts(),
vtxo.script_pubkey(),
vtp.amount,
vtp.outpoint,
vtp.assets,
))
})
.collect()
}
fn validate_selected_inputs_cover_receivers(
vtxo_inputs: &[VtxoInput],
receivers: &[SendReceiver],
dust: Amount,
) -> Result<(), Error> {
let selected_amount = vtxo_inputs
.iter()
.fold(Amount::ZERO, |acc, v| acc + v.amount());
let requested_amount = receivers.iter().fold(Amount::ZERO, |acc, r| acc + r.amount);
let mut selected_assets = HashMap::<AssetId, u64>::new();
for vtxo_input in vtxo_inputs {
for asset in vtxo_input.assets() {
*selected_assets.entry(asset.asset_id).or_insert(0) = selected_assets
.get(&asset.asset_id)
.copied()
.unwrap_or(0)
.checked_add(asset.amount)
.ok_or_else(|| Error::ad_hoc("selected asset amount overflow"))?;
}
}
let mut requested_assets = HashMap::<AssetId, u64>::new();
for receiver in receivers {
for asset in &receiver.assets {
*requested_assets.entry(asset.asset_id).or_insert(0) = requested_assets
.get(&asset.asset_id)
.copied()
.unwrap_or(0)
.checked_add(asset.amount)
.ok_or_else(|| Error::ad_hoc("requested asset amount overflow"))?;
}
}
for (asset_id, requested_asset_amount) in &requested_assets {
let selected_asset_amount = selected_assets.get(asset_id).copied().unwrap_or(0);
if selected_asset_amount < *requested_asset_amount {
return Err(Error::coin_select(format!(
"insufficient asset amount for {}: {} < {}",
asset_id, selected_asset_amount, requested_asset_amount
)));
}
}
let mut has_asset_change = false;
for (asset_id, selected_asset_amount) in &selected_assets {
let requested_asset_amount = requested_assets.get(asset_id).copied().unwrap_or(0);
if *selected_asset_amount < requested_asset_amount {
return Err(Error::coin_select(format!(
"insufficient asset amount for {}: {} < {}",
asset_id, selected_asset_amount, requested_asset_amount
)));
}
if *selected_asset_amount > requested_asset_amount {
has_asset_change = true;
}
}
let required_amount = match has_asset_change {
true => requested_amount
.checked_add(dust)
.ok_or_else(|| Error::ad_hoc("required BTC amount overflow"))?,
false => requested_amount,
};
if selected_amount < required_amount {
return Err(Error::coin_select(format!(
"insufficient VTXO amount: {} < {}",
selected_amount, required_amount
)));
}
Ok(())
}
async fn send_with_selected_inputs(
&self,
vtxo_inputs: Vec<VtxoInput>,
receivers: Vec<SendReceiver>,
) -> Result<Txid, Error> {
let server_info = self.server_info()?;
Self::validate_selected_inputs_cover_receivers(&vtxo_inputs, &receivers, server_info.dust)?;
let pending_tx = self
.build_and_submit(vtxo_inputs, receivers, &server_info)
.await?;
let ark_txid = pending_tx.ark_txid;
self.sign_and_finalize_pending_tx(pending_tx).await?;
Ok(ark_txid)
}
pub(crate) async fn submit_built_offchain_send(
&self,
mut ark_tx: bitcoin::Psbt,
checkpoint_txs: Vec<bitcoin::Psbt>,
used_pk: XOnlyPublicKey,
) -> Result<PendingTx, Error> {
for i in 0..checkpoint_txs.len() {
sign_ark_transaction(self.make_sign_fn(), &mut ark_tx, i)?;
}
let res = self
.network_client()
.submit_offchain_transaction_request(ark_tx, checkpoint_txs)
.await
.map_err(Error::ark_server)
.context("failed to submit offchain transaction request")?;
let pending_tx = PendingTx {
ark_txid: res.signed_ark_tx.unsigned_tx.compute_txid(),
signed_ark_tx: res.signed_ark_tx,
signed_checkpoint_txs: res.signed_checkpoint_txs,
};
if let Err(err) = self.inner.key_provider.mark_as_used(&used_pk) {
tracing::warn!(
"Failed updating keypair cache for used change address: {:?}",
err
);
}
Ok(pending_tx)
}
async fn build_and_submit(
&self,
inputs: Vec<VtxoInput>,
receivers: Vec<SendReceiver>,
server_info: &server::Info,
) -> Result<PendingTx, Error> {
let (change_address, change_address_vtxo) = self.get_offchain_address()?;
let OffchainTransactions {
ark_tx,
checkpoint_txs,
} = build_asset_send_transactions(&receivers, &change_address, &inputs, server_info)
.map_err(Error::from)
.context("failed to build offchain asset-send transactions")?;
self.submit_built_offchain_send(ark_tx, checkpoint_txs, change_address_vtxo.owner_pk())
.await
}
pub(crate) async fn sign_and_finalize_pending_tx(
&self,
pending_tx: PendingTx,
) -> Result<(), Error> {
let ark_txid = pending_tx.ark_txid;
let mut signed_checkpoint_txs = pending_tx.signed_checkpoint_txs;
let ark_input_idx_by_cp_txid: HashMap<_, _> = pending_tx
.signed_ark_tx
.unsigned_tx
.input
.iter()
.enumerate()
.map(|(i, inp)| (inp.previous_output.txid, i))
.collect();
for checkpoint_psbt in signed_checkpoint_txs.iter_mut() {
if checkpoint_psbt.inputs[0].witness_script.is_none() {
let checkpoint_txid = checkpoint_psbt.unsigned_tx.compute_txid();
let idx = ark_input_idx_by_cp_txid
.get(&checkpoint_txid)
.ok_or_else(|| {
Error::ad_hoc(format!(
"checkpoint txid {checkpoint_txid} not found in ark tx inputs \
for pending tx {ark_txid}"
))
})?;
let ws = pending_tx
.signed_ark_tx
.inputs
.get(*idx)
.and_then(|input| input.witness_script.clone())
.ok_or_else(|| {
Error::ad_hoc(format!(
"missing witness script on ark tx input {idx} \
for pending tx {ark_txid}"
))
})?;
checkpoint_psbt.inputs[0].witness_script = Some(ws);
}
sign_checkpoint_transaction(self.make_sign_fn(), checkpoint_psbt)?;
}
self.finalize_offchain_tx(ark_txid, signed_checkpoint_txs)
.await
}
pub async fn finalize_offchain_tx(
&self,
ark_txid: Txid,
signed_checkpoint_txs: Vec<bitcoin::Psbt>,
) -> Result<(), Error> {
const MAX_RETRIES: usize = 3;
let mut last_err = None;
for attempt in 0..=MAX_RETRIES {
if attempt > 0 {
let delay = Duration::from_millis(500 * (1 << (attempt - 1)));
tracing::warn!(
%ark_txid,
attempt,
?delay,
"Retrying finalize after transient failure"
);
crate::utils::sleep(delay).await;
}
match timeout_op(
self.inner.timeout,
self.network_client()
.finalize_offchain_transaction(ark_txid, signed_checkpoint_txs.clone()),
)
.await
.context("finalize offchain transaction timed out")?
{
Ok(_) => return Ok(()),
Err(e) => {
last_err = Some(Error::ark_server(e));
}
}
}
Err(last_err
.expect("at least one attempt was made")
.with_context(|| {
format!("failed to finalize offchain transaction after {MAX_RETRIES} retries")
}))
}
async fn fetch_pending_offchain_txs(&self) -> Result<Vec<PendingTx>, Error> {
const MAX_INPUTS_PER_INTENT: usize = 20;
let ark_addresses = self.get_offchain_addresses()?;
let script_pubkey_to_vtxo_map: HashMap<_, _> = ark_addresses
.iter()
.map(|(a, v)| (a.to_p2tr_script_pubkey(), v.clone()))
.collect();
let addresses = ark_addresses.iter().map(|(a, _)| *a);
let request = server::GetVtxosRequest::new_for_addresses(addresses)
.pending_only()
.map_err(Error::from)?;
let vtxos = self
.fetch_all_vtxos(request)
.await
.context("failed to fetch pending VTXOs")?;
tracing::debug!(num_pending_vtxos = vtxos.len(), "Fetched pending VTXOs");
if vtxos.is_empty() {
return Ok(vec![]);
}
let secp = Secp256k1::new();
let mut all_pending_txs = Vec::new();
let mut seen_ark_txids = HashSet::new();
for (batch_idx, batch) in vtxos.chunks(MAX_INPUTS_PER_INTENT).enumerate() {
let mut vtxo_inputs = Vec::new();
for virtual_tx_outpoint in batch {
let vtxo = match script_pubkey_to_vtxo_map.get(&virtual_tx_outpoint.script) {
Some(v) => v,
None => {
tracing::warn!(
outpoint = %virtual_tx_outpoint.outpoint,
script = %virtual_tx_outpoint.script,
"Skipping VTXO with unknown script"
);
continue;
}
};
let spend_info = vtxo
.forfeit_spend_info()
.context("failed to get forfeit spend info")?;
vtxo_inputs.push(intent::Input::new(
virtual_tx_outpoint.outpoint,
vtxo.exit_delay(),
None,
TxOut {
value: virtual_tx_outpoint.amount,
script_pubkey: vtxo.script_pubkey(),
},
vtxo.tapscripts(),
spend_info,
false,
virtual_tx_outpoint.is_swept,
virtual_tx_outpoint.assets.clone(),
));
}
if vtxo_inputs.is_empty() {
continue;
}
tracing::debug!(
batch = batch_idx,
num_inputs = vtxo_inputs.len(),
"Querying server for pending txs"
);
let message = intent::IntentMessage::GetPendingTx { expire_at: 0 };
let sign_for_vtxo_fn = |input: &mut psbt::Input,
msg: secp256k1::Message|
-> Result<
Vec<(schnorr::Signature, XOnlyPublicKey)>,
ark_core::Error,
> {
match &input.witness_script {
None => Err(ark_core::Error::ad_hoc(
"Missing witness script in psbt::Input when signing get-pending-tx intent",
)),
Some(script) => {
let pks = extract_checksig_pubkeys(script);
let mut res = vec![];
for pk in &pks {
if let Ok(keypair) = self.keypair_by_pk(pk) {
let sig = secp.sign_schnorr_no_aux_rand(&msg, &keypair);
res.push((sig, keypair.x_only_public_key().0));
}
}
Ok(res)
}
}
};
let sign_for_onchain_fn =
|_: &mut psbt::Input,
_: secp256k1::Message|
-> Result<(schnorr::Signature, XOnlyPublicKey), ark_core::Error> {
Err(ark_core::Error::ad_hoc(
"unexpected onchain input in get-pending-tx intent",
))
};
let get_pending_intent = intent::make_intent(
sign_for_vtxo_fn,
sign_for_onchain_fn,
vtxo_inputs,
vec![],
message,
)?;
let pending_txs = self
.network_client()
.get_pending_tx(get_pending_intent)
.await
.map_err(Error::ark_server)
.context("failed to get pending transactions")?;
tracing::debug!(
batch = batch_idx,
num_pending_txs = pending_txs.len(),
"Server response for batch"
);
for tx in pending_txs {
if seen_ark_txids.insert(tx.ark_txid) {
tracing::info!(
ark_txid = %tx.ark_txid,
"Found pending transaction"
);
all_pending_txs.push(tx);
}
}
}
tracing::info!(
num_pending_txs = all_pending_txs.len(),
"Total pending transactions found"
);
Ok(all_pending_txs)
}
}