use crate::error::ErrorContext;
use crate::key_provider::KeypairIndex;
use crate::utils::sleep;
use crate::utils::timeout_op;
use crate::utils::unix_now;
use crate::wallet::BoardingWallet;
use crate::wallet::OnchainWallet;
use ark_core::asset::AssetId;
use ark_core::build_anchor_tx;
use ark_core::history;
use ark_core::history::generate_incoming_vtxo_transaction_history;
use ark_core::history::generate_outgoing_vtxo_transaction_history;
use ark_core::history::sort_transactions_by_created_at;
use ark_core::history::OutgoingTransaction;
use ark_core::server;
use ark_core::server::GetVtxosRequest;
use ark_core::server::SubscriptionResponse;
use ark_core::server::VirtualTxOutPoint;
use ark_core::ArkAddress;
use ark_core::BoardingOutput;
use ark_core::ExplorerUtxo;
use ark_core::UtxoCoinSelection;
use ark_core::Vtxo;
use ark_core::VtxoList;
use ark_core::DEFAULT_DERIVATION_PATH;
use ark_grpc::VtxoChainResponse;
use bitcoin::bip32::DerivationPath;
use bitcoin::bip32::Xpriv;
use bitcoin::key::Keypair;
use bitcoin::key::Secp256k1;
use bitcoin::secp256k1::All;
use bitcoin::Address;
use bitcoin::Amount;
use bitcoin::OutPoint;
use bitcoin::ScriptBuf;
use bitcoin::Transaction;
use bitcoin::Txid;
use bitcoin::XOnlyPublicKey;
use futures::Future;
use futures::Stream;
use std::collections::HashMap;
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;
pub mod error;
pub mod key_provider;
pub mod swap_storage;
pub mod vtxo_watcher;
pub mod wallet;
mod asset;
mod batch;
mod boltz;
mod coin_select;
mod fee_estimation;
mod migration;
mod send_vtxo;
mod unilateral_exit;
mod utils;
pub use ark_core::server::DeprecatedSignerStatus;
pub use asset::IssueAssetResult;
pub use boltz::ChainSwapAmount;
pub use boltz::ChainSwapData;
pub use boltz::ChainSwapDirection;
pub use boltz::ChainSwapResult;
pub use boltz::PendingVhtlcSpendTx;
pub use boltz::PendingVhtlcSpendType;
pub use boltz::ReverseSwapData;
pub use boltz::SubmarineSwapData;
pub use boltz::SwapAmount;
pub use boltz::SwapStatus;
pub use boltz::SwapStatusInfo;
pub use boltz::SwapType;
pub use boltz::TimeoutBlockHeights;
pub use error::Error;
pub use key_provider::Bip32KeyProvider;
pub use key_provider::KeyProvider;
pub use key_provider::StaticKeyProvider;
pub use lightning_invoice;
pub use migration::DeprecatedSignerMigrationReport;
pub use migration::DeprecatedSignerReport;
pub use migration::MigrationLegReport;
pub use migration::MigrationSkipReason;
pub use migration::MigrationVtxoRef;
pub use migration::MAX_VTXOS_PER_SETTLEMENT;
pub use swap_storage::InMemorySwapStorage;
#[cfg(feature = "sqlite")]
pub use swap_storage::SqliteSwapStorage;
pub use swap_storage::SwapStorage;
pub const DEFAULT_GAP_LIMIT: u32 = 20;
pub const DEFAULT_BOLTZ_REFERRAL_ID: &str = "arkade-rs-SDK";
#[derive(Clone)]
pub struct OfflineClient<B, W, S, K> {
network_client: ark_grpc::Client,
pub name: String,
key_provider: Arc<K>,
blockchain: Arc<B>,
secp: Secp256k1<All>,
wallet: Arc<W>,
swap_storage: Arc<S>,
boltz_url: String,
boltz_referral_id: Option<String>,
timeout: Duration,
delegator_pk: Option<XOnlyPublicKey>,
historical_delegator_pks: Vec<XOnlyPublicKey>,
}
pub struct Client<B, W, S, K> {
inner: OfflineClient<B, W, S, K>,
state: Arc<RwLock<ServerState>>,
}
struct ServerState {
server_info: server::Info,
fee_estimator: ark_fees::Estimator,
}
#[derive(Clone, Copy, Debug)]
pub struct TxStatus {
pub confirmed_at: Option<i64>,
}
#[derive(Clone, Copy, Debug)]
pub struct SpendStatus {
pub spend_txid: Option<Txid>,
}
pub struct AddressVtxos {
pub unspent: Vec<VirtualTxOutPoint>,
pub spent: Vec<VirtualTxOutPoint>,
}
#[derive(Clone, Debug, Default)]
pub struct OffChainBalance {
pre_confirmed: Amount,
confirmed: Amount,
recoverable: Amount,
pending_recovery: Amount,
asset_balances: HashMap<AssetId, u64>,
}
impl OffChainBalance {
pub fn pre_confirmed(&self) -> Amount {
self.pre_confirmed
}
pub fn confirmed(&self) -> Amount {
self.confirmed
}
pub fn recoverable(&self) -> Amount {
self.recoverable
}
pub fn pending_recovery(&self) -> Amount {
self.pending_recovery
}
pub fn total(&self) -> Amount {
self.pre_confirmed + self.confirmed + self.recoverable + self.pending_recovery
}
pub fn asset_balances(&self) -> &HashMap<AssetId, u64> {
&self.asset_balances
}
}
pub trait Blockchain {
fn find_outpoints(
&self,
address: &Address,
) -> impl Future<Output = Result<Vec<ExplorerUtxo>, Error>> + Send;
fn find_tx(
&self,
txid: &Txid,
) -> impl Future<Output = Result<Option<Transaction>, Error>> + Send;
fn get_tx_status(&self, txid: &Txid) -> impl Future<Output = Result<TxStatus, Error>> + Send;
fn get_output_status(
&self,
txid: &Txid,
vout: u32,
) -> impl Future<Output = Result<SpendStatus, Error>> + Send;
fn broadcast(&self, tx: &Transaction) -> impl Future<Output = Result<(), Error>> + Send;
fn get_fee_rate(&self) -> impl Future<Output = Result<f64, Error>> + Send;
fn broadcast_package(
&self,
txs: &[&Transaction],
) -> impl Future<Output = Result<(), Error>> + Send;
}
impl<B, W, S, K> OfflineClient<B, W, S, K>
where
B: Blockchain,
W: BoardingWallet + OnchainWallet,
S: SwapStorage + 'static,
K: KeyProvider,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
name: String,
key_provider: Arc<K>,
blockchain: Arc<B>,
wallet: Arc<W>,
ark_server_url: String,
swap_storage: Arc<S>,
boltz_url: String,
boltz_referral_id: Option<String>,
timeout: Duration,
delegator_pk: Option<XOnlyPublicKey>,
historical_delegator_pks: Vec<XOnlyPublicKey>,
) -> Self {
let secp = Secp256k1::new();
let network_client = ark_grpc::Client::new(ark_server_url);
let mut seen = HashSet::new();
let mut historical_delegator_pks: Vec<_> = historical_delegator_pks
.into_iter()
.filter(|pk| seen.insert(*pk))
.collect();
if let Some(pk) = delegator_pk {
historical_delegator_pks.retain(|k| *k != pk);
historical_delegator_pks.insert(0, pk);
}
let boltz_referral_id =
boltz_referral_id.or_else(|| Some(DEFAULT_BOLTZ_REFERRAL_ID.to_string()));
Self {
network_client,
name,
key_provider,
blockchain,
secp,
wallet,
swap_storage,
boltz_url,
boltz_referral_id,
timeout,
delegator_pk,
historical_delegator_pks,
}
}
pub fn with_boltz_referral_id(mut self, boltz_referral_id: Option<String>) -> Self {
self.boltz_referral_id = boltz_referral_id;
self
}
#[allow(clippy::too_many_arguments)]
pub fn new_with_keypair(
name: String,
kp: Keypair,
blockchain: Arc<B>,
wallet: Arc<W>,
ark_server_url: String,
swap_storage: Arc<S>,
boltz_url: String,
boltz_referral_id: Option<String>,
timeout: Duration,
delegator_pk: Option<XOnlyPublicKey>,
historical_delegator_pks: Vec<XOnlyPublicKey>,
) -> OfflineClient<B, W, S, StaticKeyProvider> {
let key_provider = Arc::new(StaticKeyProvider::new(kp));
OfflineClient::new(
name,
key_provider,
blockchain,
wallet,
ark_server_url,
swap_storage,
boltz_url,
boltz_referral_id,
timeout,
delegator_pk,
historical_delegator_pks,
)
}
#[allow(clippy::too_many_arguments)]
pub fn new_with_bip32(
name: String,
xpriv: Xpriv,
path: Option<DerivationPath>,
blockchain: Arc<B>,
wallet: Arc<W>,
ark_server_url: String,
swap_storage: Arc<S>,
boltz_url: String,
boltz_referral_id: Option<String>,
timeout: Duration,
delegator_pk: Option<XOnlyPublicKey>,
historical_delegator_pks: Vec<XOnlyPublicKey>,
) -> OfflineClient<B, W, S, Bip32KeyProvider> {
let path = path.unwrap_or(
DerivationPath::from_str(DEFAULT_DERIVATION_PATH).expect("valid derivation path"),
);
let key_provider = Arc::new(Bip32KeyProvider::new(xpriv, path));
OfflineClient::new(
name,
key_provider,
blockchain,
wallet,
ark_server_url,
swap_storage,
boltz_url,
boltz_referral_id,
timeout,
delegator_pk,
historical_delegator_pks,
)
}
pub fn delegator_pk(&self) -> Option<XOnlyPublicKey> {
self.delegator_pk
}
pub fn boltz_referral_id(&self) -> Option<&str> {
self.boltz_referral_id.as_deref()
}
pub async fn connect(mut self) -> Result<Client<B, W, S, K>, Error> {
timeout_op(self.timeout, self.network_client.connect())
.await
.context("Failed to connect to Ark server")??;
self.finish_connect().await
}
pub async fn connect_with_retries(
mut self,
max_retries: usize,
) -> Result<Client<B, W, S, K>, Error> {
let mut n_retries = 0;
while n_retries < max_retries {
let res = timeout_op(self.timeout, self.network_client.connect())
.await
.context("Failed to connect to Ark server")?;
match res {
Ok(()) => break,
Err(error) => {
tracing::warn!(?error, "Failed to connect to Ark server, retrying");
sleep(Duration::from_secs(2)).await;
n_retries += 1;
continue;
}
};
}
self.finish_connect().await
}
async fn finish_connect(mut self) -> Result<Client<B, W, S, K>, Error> {
let server_info = timeout_op(self.timeout, self.network_client.get_info())
.await
.context("Failed to get Ark server info")??;
tracing::debug!(
name = self.name,
ark_server_url = ?self.network_client,
"Connected to Ark server"
);
let fee_estimator = build_fee_estimator(&server_info)?;
let state = Arc::new(RwLock::new(ServerState {
server_info,
fee_estimator,
}));
let hook_state = state.clone();
self.network_client
.set_info_refresh_hook(move |server_info| {
update_server_state(&hook_state, server_info)
.map_err(|err| Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
});
let client = Client { inner: self, state };
if let Err(error) = client.discover_keys(DEFAULT_GAP_LIMIT).await {
tracing::warn!(?error, "Failed during key discovery");
};
match client.server_info() {
Ok(server_info) => {
if let Err(error) = client.persist_watch_boarding_outputs(&server_info) {
tracing::warn!(?error, "Failed to persist boarding outputs at connect");
}
}
Err(error) => {
tracing::warn!(
?error,
"Failed to read server info for boarding persistence"
);
}
}
Ok(client)
}
}
fn build_fee_estimator(server_info: &server::Info) -> Result<ark_fees::Estimator, Error> {
let fee_estimator_config = server_info
.fees
.clone()
.map(|fees| ark_fees::Config {
intent_offchain_input_program: fees.intent_fee.offchain_input.unwrap_or_default(),
intent_onchain_input_program: fees.intent_fee.onchain_input.unwrap_or_default(),
intent_offchain_output_program: fees.intent_fee.offchain_output.unwrap_or_default(),
intent_onchain_output_program: fees.intent_fee.onchain_output.unwrap_or_default(),
})
.unwrap_or_default();
ark_fees::Estimator::new(fee_estimator_config).map_err(Error::ark_server)
}
fn update_server_state(
state: &Arc<RwLock<ServerState>>,
server_info: server::Info,
) -> Result<(), Error> {
let fee_estimator = build_fee_estimator(&server_info)?;
let mut state = state
.write()
.map_err(|_| Error::ad_hoc("client server state lock poisoned"))?;
state.server_info = server_info;
state.fee_estimator = fee_estimator;
Ok(())
}
impl<B, W, S, K> Client<B, W, S, K>
where
B: Blockchain,
W: BoardingWallet + OnchainWallet,
S: SwapStorage + 'static,
K: KeyProvider,
{
pub fn server_info(&self) -> Result<server::Info, Error> {
self.state
.read()
.map(|state| state.server_info.clone())
.map_err(|_| Error::ad_hoc("client server state lock poisoned"))
}
fn with_server_state<T>(&self, f: impl FnOnce(&ServerState) -> T) -> Result<T, Error> {
self.state
.read()
.map(|state| f(&state))
.map_err(|_| Error::ad_hoc("client server state lock poisoned"))
}
fn eval_onchain_output_fee(&self, output: ark_fees::Output) -> Result<Amount, Error> {
self.with_server_state(|state| state.fee_estimator.eval_onchain_output(output))?
.map(|fee| Amount::from_sat(fee.to_satoshis()))
.map_err(Error::ad_hoc)
}
pub async fn refresh_server_info(&self) -> Result<(), Error> {
let server_info = timeout_op(self.inner.timeout, self.network_client().get_info())
.await
.context("Failed to refresh Ark server info")??;
update_server_state(&self.state, server_info)?;
Ok(())
}
pub fn delegator_pk(&self) -> Option<XOnlyPublicKey> {
self.inner.delegator_pk()
}
pub fn boltz_referral_id(&self) -> Option<&str> {
self.inner.boltz_referral_id()
}
pub fn get_offchain_address(&self) -> Result<(ArkAddress, Vtxo), Error> {
let server_info = &self.server_info()?;
let server_signer = server_info.signer_pk.into();
let owner = self
.next_keypair(KeypairIndex::LastUnused)?
.public_key()
.into();
let vtxo = self.make_vtxo(server_signer, owner)?;
let ark_address = vtxo.to_ark_address();
Ok((ark_address, vtxo))
}
pub fn get_offchain_addresses(&self) -> Result<Vec<(ArkAddress, Vtxo)>, Error> {
let server_info = &self.server_info()?;
let pks = self.inner.key_provider.get_cached_pks()?;
let all_server_keys: Vec<XOnlyPublicKey> = server_info.all_server_keys().collect();
let candidate_delays = ark_core::candidate_exit_delays(
server_info.unilateral_exit_delay,
server_info.network,
)?;
let mut results = Vec::new();
for owner_pk in &pks {
for server_signer in &all_server_keys {
for exit_delay in &candidate_delays {
let default_vtxo = Vtxo::new_default(
self.secp(),
*server_signer,
*owner_pk,
*exit_delay,
server_info.network,
)?;
results.push((default_vtxo.to_ark_address(), default_vtxo));
let mut seen = HashSet::new();
for dpk in &self.inner.historical_delegator_pks {
if !seen.insert(dpk) {
continue;
}
let delegate_vtxo = Vtxo::new_with_delegator(
self.secp(),
*server_signer,
*owner_pk,
*dpk,
*exit_delay,
server_info.network,
)?;
results.push((delegate_vtxo.to_ark_address(), delegate_vtxo));
}
}
}
}
Ok(results)
}
fn make_vtxo(
&self,
server_signer: XOnlyPublicKey,
owner: XOnlyPublicKey,
) -> Result<Vtxo, Error> {
let server_info = &self.server_info()?;
match self.inner.delegator_pk {
Some(delegator) => Vtxo::new_with_delegator(
self.secp(),
server_signer,
owner,
delegator,
server_info.unilateral_exit_delay,
server_info.network,
)
.map_err(Into::into),
None => Vtxo::new_default(
self.secp(),
server_signer,
owner,
server_info.unilateral_exit_delay,
server_info.network,
)
.map_err(Into::into),
}
}
pub async fn discover_keys(&self, gap_limit: u32) -> Result<u32, Error> {
if !self.inner.key_provider.supports_discovery() {
tracing::debug!("Key provider does not support discovery, skipping");
return Ok(0);
}
let server_info = &self.server_info()?;
let all_server_keys: Vec<XOnlyPublicKey> = server_info.all_server_keys().collect();
let candidate_delays = ark_core::candidate_exit_delays(
server_info.unilateral_exit_delay,
server_info.network,
)?;
let mut start_index = 0u32;
let mut discovered_count = 0u32;
tracing::info!(gap_limit, "Starting key discovery");
loop {
let mut batch: Vec<(u32, Keypair, Vec<ArkAddress>)> =
Vec::with_capacity(gap_limit as usize);
for i in 0..gap_limit {
let index = start_index
.checked_add(i)
.ok_or_else(|| Error::ad_hoc("Key discovery index overflow"))?;
let kp = match self.inner.key_provider.derive_at_discovery_index(index)? {
Some(kp) => kp,
None => break,
};
let owner_pk = kp.x_only_public_key().0;
let mut addresses = Vec::new();
for server_signer in &all_server_keys {
for exit_delay in &candidate_delays {
let default_vtxo = Vtxo::new_default(
self.secp(),
*server_signer,
owner_pk,
*exit_delay,
server_info.network,
)?;
addresses.push(default_vtxo.to_ark_address());
for dpk in &self.inner.historical_delegator_pks {
let delegate_vtxo = Vtxo::new_with_delegator(
self.secp(),
*server_signer,
owner_pk,
*dpk,
*exit_delay,
server_info.network,
)?;
addresses.push(delegate_vtxo.to_ark_address());
}
}
}
batch.push((index, kp, addresses));
}
if batch.is_empty() {
break;
}
let addresses = batch.iter().flat_map(|(_, _, addrs)| addrs.iter().copied());
let vtxo_list = self.list_vtxos_for_addresses(addresses).await?;
let used_scripts: HashSet<&ScriptBuf> = vtxo_list.all().map(|v| &v.script).collect();
let mut found_any = false;
for (index, kp, addrs) in batch {
let used_addr = addrs.iter().find(|addr| {
let script = addr.to_p2tr_script_pubkey();
used_scripts.contains(&script)
});
if let Some(addr) = used_addr {
tracing::debug!(index, addr = %addr, "Found used address");
self.inner
.key_provider
.cache_discovered_keypair(index, kp)?;
discovered_count += 1;
found_any = true;
}
}
if !found_any {
break;
}
start_index = start_index
.checked_add(gap_limit)
.ok_or_else(|| Error::ad_hoc("Key discovery index overflow"))?;
}
tracing::info!(discovered_count, "Key discovery completed");
Ok(discovered_count)
}
pub fn get_boarding_address(&self) -> Result<Address, Error> {
let server_info = &self.server_info()?;
let boarding_output = self.inner.wallet.new_boarding_output(
server_info.signer_pk.into(),
server_info.boarding_exit_delay,
server_info.network,
)?;
Ok(boarding_output.address().clone())
}
pub fn get_onchain_address(&self) -> Result<Address, Error> {
self.inner.wallet.get_onchain_address()
}
pub fn get_boarding_addresses(&self) -> Result<Vec<Address>, Error> {
let server_info = &self.server_info()?;
let outputs = self.persist_watch_boarding_outputs(server_info)?;
let mut seen = HashSet::new();
let mut addresses = Vec::with_capacity(outputs.len());
for output in &outputs {
let address = output.address().clone();
if seen.insert(address.clone()) {
addresses.push(address);
}
}
Ok(addresses)
}
fn persist_watch_boarding_outputs(
&self,
server_info: &server::Info,
) -> Result<Vec<BoardingOutput>, Error> {
let candidate_delays =
ark_core::candidate_exit_delays(server_info.boarding_exit_delay, server_info.network)?;
let mut outputs = Vec::new();
for server_pk in server_info.all_server_keys() {
for exit_delay in &candidate_delays {
let boarding_output = self.inner.wallet.new_boarding_output(
server_pk,
*exit_delay,
server_info.network,
)?;
outputs.push(boarding_output);
}
}
Ok(outputs)
}
pub async fn get_virtual_tx_outpoints(
&self,
addresses: impl Iterator<Item = ArkAddress>,
) -> Result<Vec<VirtualTxOutPoint>, Error> {
let request = GetVtxosRequest::new_for_addresses(addresses);
self.fetch_all_vtxos(request).await
}
pub async fn list_vtxos(&self) -> Result<(VtxoList, HashMap<ScriptBuf, Vtxo>), Error> {
let ark_addresses = self.get_offchain_addresses()?;
let script_pubkey_to_vtxo_map = ark_addresses
.iter()
.map(|(a, v)| (a.to_p2tr_script_pubkey(), v.clone()))
.collect();
let addresses = ark_addresses.iter().map(|(a, _)| a).copied();
let vtxo_list = self.list_vtxos_for_addresses(addresses).await?;
Ok((vtxo_list, script_pubkey_to_vtxo_map))
}
pub async fn list_vtxos_for_addresses(
&self,
addresses: impl Iterator<Item = ArkAddress>,
) -> Result<VtxoList, Error> {
let virtual_tx_outpoints = self
.get_virtual_tx_outpoints(addresses)
.await
.context("failed to get VTXOs for addresses")?;
let vtxo_list = VtxoList::new(self.server_info()?.dust, virtual_tx_outpoints);
Ok(vtxo_list)
}
pub async fn list_vtxos_for_outpoints(
&self,
outpoints: Vec<OutPoint>,
) -> Result<(VtxoList, HashMap<ScriptBuf, Vtxo>), Error> {
let ark_addresses = self.get_offchain_addresses()?;
let script_pubkey_to_vtxo_map = ark_addresses
.iter()
.map(|(a, v)| (a.to_p2tr_script_pubkey(), v.clone()))
.collect::<HashMap<_, _>>();
let request = GetVtxosRequest::new_for_outpoints(&outpoints);
let virtual_tx_outpoints = self.fetch_all_vtxos(request).await?;
let virtual_tx_outpoints = virtual_tx_outpoints
.into_iter()
.filter(|v| match script_pubkey_to_vtxo_map.get(&v.script) {
Some(_) => true,
None => {
tracing::debug!(outpoint = %v.outpoint, "Missing spend info for VTXO");
false
}
})
.collect();
let vtxo_list = VtxoList::new(self.server_info()?.dust, virtual_tx_outpoints);
Ok((vtxo_list, script_pubkey_to_vtxo_map))
}
pub async fn get_vtxo_chain(
&self,
out_point: OutPoint,
size: i32,
index: i32,
) -> Result<Option<VtxoChainResponse>, Error> {
let vtxo_chain = timeout_op(
self.inner.timeout,
self.network_client()
.get_vtxo_chain(Some(out_point), Some((size, index))),
)
.await
.context("Failed to fetch VTXO chain")??;
Ok(Some(vtxo_chain))
}
pub async fn offchain_balance(&self) -> Result<OffChainBalance, Error> {
let (vtxo_list, script_map) = self.list_vtxos().await.context("failed to list VTXOs")?;
let now = unix_now()?;
let server_info = self.server_info()?;
let spendable_outpoints: HashSet<OutPoint> = vtxo_list
.spendable_offchain_at(&server_info, now, |script| {
script_map.get(script).map(|vtxo| vtxo.server_pk())
})
.map(|vtxo| vtxo.outpoint)
.collect();
let pre_confirmed = vtxo_list
.pre_confirmed()
.filter(|v| spendable_outpoints.contains(&v.outpoint))
.fold(Amount::ZERO, |acc, x| acc + x.amount);
let confirmed = vtxo_list
.confirmed()
.filter(|v| spendable_outpoints.contains(&v.outpoint))
.fold(Amount::ZERO, |acc, x| acc + x.amount);
let recoverable = vtxo_list
.recoverable()
.fold(Amount::ZERO, |acc, x| acc + x.amount);
let pending_recovery = vtxo_list
.pending_recovery_due_to_signer_at(&server_info, now, |script| {
script_map.get(script).map(|vtxo| vtxo.server_pk())
})
.fold(Amount::ZERO, |acc, x| acc + x.amount);
let mut asset_balances: HashMap<AssetId, u64> = HashMap::new();
for vtxo in vtxo_list.spendable_offchain_at(&server_info, now, |script| {
script_map.get(script).map(|vtxo| vtxo.server_pk())
}) {
for asset in &vtxo.assets {
let total = asset_balances
.get(&asset.asset_id)
.copied()
.unwrap_or(0)
.checked_add(asset.amount)
.ok_or_else(|| Error::ad_hoc("asset balance overflow"))?;
asset_balances.insert(asset.asset_id, total);
}
}
Ok(OffChainBalance {
pre_confirmed,
confirmed,
recoverable,
pending_recovery,
asset_balances,
})
}
pub async fn get_asset(&self, asset_id: AssetId) -> Result<server::AssetInfo, Error> {
timeout_op(
self.inner.timeout,
self.network_client().get_asset(asset_id),
)
.await
.context("Failed to get asset info")?
.map_err(Error::ark_server)
}
pub async fn transaction_history(&self) -> Result<Vec<history::Transaction>, Error> {
let mut boarding_transactions = Vec::new();
let mut boarding_commitment_transactions = Vec::new();
let boarding_addresses = self.get_boarding_addresses()?;
for boarding_address in boarding_addresses.iter() {
let outpoints = timeout_op(
self.inner.timeout,
self.blockchain().find_outpoints(boarding_address),
)
.await
.context("Failed to find outpoints")??;
for ExplorerUtxo {
outpoint,
amount,
confirmation_blocktime,
..
} in outpoints.iter()
{
let confirmed_at = confirmation_blocktime.map(|t| t as i64);
boarding_transactions.push(history::Transaction::Boarding {
txid: outpoint.txid,
amount: *amount,
confirmed_at,
});
let status = timeout_op(
self.inner.timeout,
self.blockchain()
.get_output_status(&outpoint.txid, outpoint.vout),
)
.await
.context("Failed to get Tx output status")??;
if let Some(spend_txid) = status.spend_txid {
boarding_commitment_transactions.push(spend_txid);
}
}
}
let (vtxo_list, _) = self.list_vtxos().await?;
let spent_outpoints = vtxo_list.spent().cloned().collect::<Vec<_>>();
let unspent_outpoints = vtxo_list.all_unspent().cloned().collect::<Vec<_>>();
let incoming_transactions = generate_incoming_vtxo_transaction_history(
&spent_outpoints,
&unspent_outpoints,
&boarding_commitment_transactions,
)?;
let outgoing_txs =
generate_outgoing_vtxo_transaction_history(&spent_outpoints, &unspent_outpoints)?;
let mut outgoing_transactions = vec![];
for tx in outgoing_txs {
let tx = match tx {
OutgoingTransaction::Complete(tx) => tx,
OutgoingTransaction::Incomplete(incomplete_tx) => {
let first_outpoint = incomplete_tx.first_outpoint();
let request = GetVtxosRequest::new_for_outpoints(&[first_outpoint]);
let vtxos = self.fetch_all_vtxos(request).await?;
match vtxos.first() {
Some(virtual_tx_outpoint) => {
match incomplete_tx.finish(virtual_tx_outpoint) {
Ok(tx) => tx,
Err(e) => {
tracing::warn!(
%first_outpoint,
"Could not finish outgoing TX, skipping: {e}"
);
continue;
}
}
}
None => {
tracing::warn!(
%first_outpoint,
"Could not find virtual TX outpoint for outgoing TX, skipping"
);
continue;
}
}
}
OutgoingTransaction::IncompleteOffboard(incomplete_offboard) => {
let status = timeout_op(
self.inner.timeout,
self.blockchain()
.get_tx_status(&incomplete_offboard.commitment_txid()),
)
.await
.context("failed to get commitment TX status")??;
incomplete_offboard.finish(status.confirmed_at)
}
};
outgoing_transactions.push(tx);
}
let mut txs = [
boarding_transactions,
incoming_transactions,
outgoing_transactions,
]
.concat();
sort_transactions_by_created_at(&mut txs);
Ok(txs)
}
pub fn dust(&self) -> Result<Amount, Error> {
Ok(self.server_info()?.dust)
}
pub fn network_client(&self) -> ark_grpc::Client {
self.inner.network_client.clone()
}
async fn fetch_all_vtxos(
&self,
request: GetVtxosRequest,
) -> Result<Vec<VirtualTxOutPoint>, Error> {
if request.reference().is_empty() {
return Ok(Vec::new());
}
let mut all_vtxos = Vec::new();
let mut cursor = 0;
const PAGE_SIZE: i32 = 100;
loop {
let paged_request = request.clone().with_page(PAGE_SIZE, cursor);
let response = timeout_op(
self.inner.timeout,
self.network_client().list_vtxos(paged_request),
)
.await
.context("failed to fetch list of VTXOs")??;
all_vtxos.extend(response.vtxos);
match response.page {
Some(page) if page.next < page.total => {
cursor = page.next;
}
_ => break,
}
}
Ok(all_vtxos)
}
fn next_keypair(&self, keypair_index: KeypairIndex) -> Result<Keypair, Error> {
self.inner.key_provider.get_next_keypair(keypair_index)
}
fn keypair_by_pk(&self, pk: &XOnlyPublicKey) -> Result<Keypair, Error> {
self.inner.key_provider.get_keypair_for_pk(pk)
}
fn derivation_index_for_pk(&self, pk: &XOnlyPublicKey) -> Option<u32> {
self.inner.key_provider.get_derivation_index_for_pk(pk)
}
fn secp(&self) -> &Secp256k1<All> {
&self.inner.secp
}
fn blockchain(&self) -> &B {
&self.inner.blockchain
}
fn swap_storage(&self) -> &S {
&self.inner.swap_storage
}
pub async fn bump_tx(&self, parent: &Transaction) -> Result<Transaction, Error> {
let fee_rate = timeout_op(self.inner.timeout, self.blockchain().get_fee_rate())
.await
.context("Failed to retrieve fee rate")??;
let change_address = self.inner.wallet.get_onchain_address()?;
let select_coins_fn =
|target_amount: Amount| -> Result<UtxoCoinSelection, ark_core::Error> {
self.inner.wallet.select_coins(target_amount).map_err(|e| {
ark_core::Error::ad_hoc(format!("failed to select coins for anchor TX: {e}"))
})
};
let mut psbt = build_anchor_tx(parent, change_address, fee_rate, select_coins_fn)
.map_err(|e| Error::ad_hoc(e.to_string()))?;
self.inner
.wallet
.sign(&mut psbt)
.context("failed to sign bump TX")?;
let tx = psbt.extract_tx().map_err(Error::ad_hoc)?;
Ok(tx)
}
pub async fn subscribe_to_scripts(
&self,
scripts: Vec<ArkAddress>,
subscription_id: Option<String>,
) -> Result<String, Error> {
self.network_client()
.subscribe_to_scripts(scripts, subscription_id)
.await
.map_err(Into::into)
}
pub async fn unsubscribe_from_scripts(
&self,
scripts: Vec<ArkAddress>,
subscription_id: String,
) -> Result<(), Error> {
self.network_client()
.unsubscribe_from_scripts(scripts, subscription_id)
.await
.map_err(Into::into)
}
pub async fn get_subscription(
&self,
subscription_id: String,
) -> Result<impl Stream<Item = Result<SubscriptionResponse, ark_grpc::Error>> + Unpin, Error>
{
self.network_client()
.get_subscription(subscription_id)
.await
.map_err(Into::into)
}
}
#[cfg(test)]
mod digest_guard_tests {
use super::*;
use ark_grpc::test_utils;
use bitcoin::key::Secp256k1;
use bitcoin::secp256k1::SecretKey;
use bitcoin::Address;
use std::convert::Infallible;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::task::Context;
use std::task::Poll;
use tokio::net::TcpListener;
use tonic::body::Body;
use tonic::codegen::http;
use tonic::codegen::Service;
use tonic::server::NamedService;
use tonic::server::UnaryService;
#[derive(Clone, Default)]
struct MockArkServer {
state: Arc<MockState>,
}
#[derive(Default)]
struct MockState {
get_info_calls: AtomicUsize,
list_vtxos_calls: AtomicUsize,
}
impl Service<http::Request<Body>> for MockArkServer {
type Response = http::Response<Body>;
type Error = Infallible;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<Body>) -> Self::Future {
match req.uri().path() {
"/ark.v1.ArkService/GetInfo" => {
let method = GetInfoSvc {
state: self.state.clone(),
};
Box::pin(async move {
let codec = tonic_prost::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec);
Ok(grpc.unary(method, req).await)
})
}
"/ark.v1.IndexerService/GetVtxos" => {
let method = ListVtxosSvc {
state: self.state.clone(),
};
Box::pin(async move {
let codec = tonic_prost::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec);
Ok(grpc.unary(method, req).await)
})
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(Body::empty())
.unwrap())
}),
}
}
}
impl NamedService for MockArkServer {
const NAME: &'static str = "ark.v1.ArkService";
}
#[derive(Clone)]
struct MockIndexerServer(MockArkServer);
impl Service<http::Request<Body>> for MockIndexerServer {
type Response = http::Response<Body>;
type Error = Infallible;
type Future = <MockArkServer as Service<http::Request<Body>>>::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}
fn call(&mut self, req: http::Request<Body>) -> Self::Future {
self.0.call(req)
}
}
impl NamedService for MockIndexerServer {
const NAME: &'static str = "ark.v1.IndexerService";
}
#[derive(Clone)]
struct GetInfoSvc {
state: Arc<MockState>,
}
impl UnaryService<test_utils::GetInfoRequest> for GetInfoSvc {
type Response = test_utils::GetInfoResponse;
type Future = Pin<
Box<dyn Future<Output = Result<tonic::Response<Self::Response>, tonic::Status>> + Send>,
>;
fn call(&mut self, _request: tonic::Request<test_utils::GetInfoRequest>) -> Self::Future {
self.state.get_info_calls.fetch_add(1, Ordering::SeqCst);
Box::pin(async { Ok(tonic::Response::new(info_response("fresh-digest"))) })
}
}
#[derive(Clone)]
struct ListVtxosSvc {
state: Arc<MockState>,
}
impl UnaryService<test_utils::GetVtxosRequest> for ListVtxosSvc {
type Response = test_utils::GetVtxosResponse;
type Future = Pin<
Box<dyn Future<Output = Result<tonic::Response<Self::Response>, tonic::Status>> + Send>,
>;
fn call(&mut self, _request: tonic::Request<test_utils::GetVtxosRequest>) -> Self::Future {
self.state.list_vtxos_calls.fetch_add(1, Ordering::SeqCst);
Box::pin(async {
Err(tonic::Status::failed_precondition(
"DIGEST_MISMATCH: invalid digest header",
))
})
}
}
fn info_response(digest: &str) -> test_utils::GetInfoResponse {
let secp = Secp256k1::new();
let secret_key = SecretKey::from_slice(&[1; 32]).unwrap();
let keypair = Keypair::from_secret_key(&secp, &secret_key);
let public_key = bitcoin::secp256k1::PublicKey::from_secret_key(&secp, &secret_key);
let (xonly, _) = keypair.x_only_public_key();
let address = Address::p2tr(&secp, xonly, None, bitcoin::Network::Regtest);
test_utils::GetInfoResponse {
version: "0.9.9".to_string(),
signer_pubkey: public_key.to_string(),
forfeit_pubkey: public_key.to_string(),
forfeit_address: address.to_string(),
checkpoint_tapscript: String::new(),
network: "regtest".to_string(),
session_duration: 60,
unilateral_exit_delay: 144,
boarding_exit_delay: 144,
utxo_min_amount: 0,
utxo_max_amount: 0,
vtxo_min_amount: 0,
vtxo_max_amount: 0,
dust: 1000,
fees: None,
scheduled_session: None,
deprecated_signers: Vec::new(),
service_status: Default::default(),
digest: digest.to_string(),
max_tx_weight: 0,
max_op_return_outputs: 0,
}
}
#[tokio::test]
async fn guarded_client_refreshes_info_and_does_not_retry_on_digest_mismatch() {
let _ = rustls::crypto::ring::default_provider().install_default();
let mock = MockArkServer::default();
let state = mock.state.clone();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
let indexer_mock = MockIndexerServer(mock.clone());
tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(mock)
.add_service(indexer_mock)
.serve_with_incoming(incoming)
.await
.unwrap();
});
let mut inner = ark_grpc::Client::new(format!("http://{addr}"));
inner.connect().await.unwrap();
let initial_info: server::Info = info_response("stale-digest").try_into().unwrap();
let cached_state = Arc::new(RwLock::new(ServerState {
server_info: initial_info,
fee_estimator: build_fee_estimator(&info_response("stale-digest").try_into().unwrap())
.unwrap(),
}));
let hook_state = cached_state.clone();
inner.set_info_refresh_hook(move |server_info| {
update_server_state(&hook_state, server_info)
.map_err(|err| Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
});
let err = match inner
.list_vtxos(GetVtxosRequest::new_for_outpoints(&[OutPoint::null()]))
.await
{
Ok(_) => panic!("list_vtxos unexpectedly succeeded"),
Err(err) => err,
};
assert!(err.is_server_info_changed());
assert!(Error::from(err).is_server_info_changed());
assert_eq!(state.list_vtxos_calls.load(Ordering::SeqCst), 1);
assert_eq!(state.get_info_calls.load(Ordering::SeqCst), 1);
assert_eq!(
cached_state.read().unwrap().server_info.digest,
"fresh-digest"
);
}
}