use crate::error::ErrorContext;
use crate::key_provider::KeyProvider;
use crate::swap_storage::SwapStorage;
use crate::wallet::BoardingWallet;
use crate::wallet::OnchainWallet;
use crate::Blockchain;
use crate::Client;
use crate::Error;
use ark_core::intent;
use ark_core::server::SubscriptionResponse;
use ark_core::server::VirtualTxOutPoint;
use ark_core::ArkAddress;
use ark_core::Vtxo;
use ark_delegator::DelegatorClient;
use bitcoin::secp256k1::PublicKey;
use bitcoin::Amount;
use bitcoin::OutPoint;
use bitcoin::ScriptBuf;
use bitcoin::TxOut;
use futures::StreamExt;
use rand::rngs::OsRng;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::watch;
pub struct VtxoWatcherHandle {
stop_tx: watch::Sender<bool>,
}
impl VtxoWatcherHandle {
pub fn stop(self) {
let _ = self.stop_tx.send(true);
}
}
impl Drop for VtxoWatcherHandle {
fn drop(&mut self) {
let _ = self.stop_tx.send(true);
}
}
const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
const MAX_BACKOFF: Duration = Duration::from_secs(30);
const KEY_DISCOVERY_INTERVAL: Duration = Duration::from_secs(10);
const KEY_DISCOVERY_GAP_LIMIT: u32 = 20;
const MIGRATION_INTERVAL: Duration = Duration::from_secs(60);
const MIGRATION_BASE_COOLDOWN: Duration = Duration::from_secs(30);
const MIGRATION_MAX_COOLDOWN: Duration = Duration::from_secs(300);
#[derive(Debug, Clone, Copy)]
pub struct VtxoWatcherConfig {
pub migrate_deprecated_signers: bool,
}
impl Default for VtxoWatcherConfig {
fn default() -> Self {
Self {
migrate_deprecated_signers: true,
}
}
}
struct ScriptMap {
vtxo_by_script: HashMap<ScriptBuf, Vtxo>,
addr_by_script: HashMap<ScriptBuf, ArkAddress>,
}
impl ScriptMap {
fn from_addresses(addresses: &[(ArkAddress, Vtxo)]) -> Self {
let mut vtxo_by_script = HashMap::with_capacity(addresses.len());
let mut addr_by_script = HashMap::with_capacity(addresses.len());
for (addr, vtxo) in addresses {
let script = addr.to_p2tr_script_pubkey();
vtxo_by_script.insert(script.clone(), vtxo.clone());
addr_by_script.insert(script, *addr);
}
Self {
vtxo_by_script,
addr_by_script,
}
}
fn addresses_for(&self, vtxos: &[VirtualTxOutPoint]) -> Vec<ArkAddress> {
let mut seen = HashSet::new();
let mut result = Vec::new();
for vtp in vtxos {
if let Some(addr) = self.addr_by_script.get(&vtp.script) {
if seen.insert(&vtp.script) {
result.push(*addr);
}
}
}
result
}
}
enum WatcherWork {
NewVtxos {
vtxos: Vec<VirtualTxOutPoint>,
script_map: Arc<ScriptMap>,
},
RenewTick {
script_map: Arc<ScriptMap>,
},
}
impl<B, W, S, K> Client<B, W, S, K>
where
B: Blockchain + Send + Sync + 'static,
W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
S: SwapStorage + 'static,
K: KeyProvider + Send + Sync + 'static,
{
pub fn start_vtxo_watcher(
self: &Arc<Self>,
delegator: Arc<DelegatorClient>,
config: VtxoWatcherConfig,
) -> VtxoWatcherHandle {
let (stop_tx, stop_rx) = watch::channel(false);
let client = Arc::clone(self);
tokio::spawn(async move {
run_watcher_loop(client, delegator, config, stop_rx).await;
tracing::debug!("VTXO watcher stopped");
});
VtxoWatcherHandle { stop_tx }
}
}
async fn run_watcher_loop<B, W, S, K>(
client: Arc<Client<B, W, S, K>>,
delegator: Arc<DelegatorClient>,
config: VtxoWatcherConfig,
mut stop_rx: watch::Receiver<bool>,
) where
B: Blockchain + Send + Sync + 'static,
W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
S: SwapStorage + 'static,
K: KeyProvider + Send + Sync + 'static,
{
let mut backoff = INITIAL_BACKOFF;
loop {
if *stop_rx.borrow() {
return;
}
let addresses = match client.get_offchain_addresses() {
Ok(a) => a,
Err(e) => {
tracing::error!("Failed to get offchain addresses: {e}");
return;
}
};
let script_map = Arc::new(ScriptMap::from_addresses(&addresses));
let ark_addresses: Vec<_> = addresses.iter().map(|(addr, _)| *addr).collect();
let subscription_id = match client.subscribe_to_scripts(ark_addresses, None).await {
Ok(id) => id,
Err(e) => {
tracing::warn!("Failed to subscribe: {e}, retrying in {backoff:?}");
if wait_or_stop(&mut stop_rx, backoff).await {
return;
}
backoff = (backoff * 2).min(MAX_BACKOFF);
continue;
}
};
let mut stream = match client.get_subscription(subscription_id.clone()).await {
Ok(s) => s,
Err(e) => {
tracing::warn!("Failed to get subscription stream: {e}, retrying in {backoff:?}");
if wait_or_stop(&mut stop_rx, backoff).await {
return;
}
backoff = (backoff * 2).min(MAX_BACKOFF);
continue;
}
};
tracing::info!("VTXO watcher connected");
backoff = INITIAL_BACKOFF;
let mut subscribed_addrs: HashSet<ArkAddress> =
addresses.iter().map(|(addr, _)| *addr).collect();
let mut script_map = script_map;
let mut renew_interval = tokio::time::interval(Duration::from_secs(60));
let mut discovery_interval = tokio::time::interval(KEY_DISCOVERY_INTERVAL);
let (work_tx, mut work_rx) = mpsc::channel::<WatcherWork>(128);
let worker_handle = tokio::spawn({
let client = client.clone();
let delegator = delegator.clone();
async move {
let mut seen_unspent_outpoints = HashSet::<OutPoint>::new();
while let Some(first) = work_rx.recv().await {
let (
mut pending_vtxos,
mut latest_script_map,
mut should_renew,
mut should_sync,
) = match first {
WatcherWork::NewVtxos { vtxos, script_map } => {
(vtxos, Some(script_map), true, false)
}
WatcherWork::RenewTick { script_map } => {
(Vec::new(), Some(script_map), true, true)
}
};
while let Ok(work) = work_rx.try_recv() {
match work {
WatcherWork::NewVtxos { vtxos, script_map } => {
pending_vtxos.extend(vtxos);
latest_script_map = Some(script_map);
should_renew = true;
}
WatcherWork::RenewTick { script_map } => {
latest_script_map = Some(script_map);
should_renew = true;
should_sync = true;
}
}
}
if let (true, Some(script_map)) = (should_sync, latest_script_map.as_deref()) {
match collect_new_delegation_candidates(
&client,
script_map,
&mut seen_unspent_outpoints,
)
.await
{
Ok(new_candidates) => {
if !new_candidates.is_empty() {
tracing::debug!(
count = new_candidates.len(),
"Found new delegatable VTXOs from failsafe polling"
);
pending_vtxos.extend(new_candidates);
}
}
Err(e) => {
tracing::warn!("Failsafe delegation poll failed: {e}");
}
}
}
if !pending_vtxos.is_empty() {
let mut deduped = Vec::new();
let mut seen = HashSet::new();
for vtxo in pending_vtxos {
if seen.insert(vtxo.outpoint) {
deduped.push(vtxo);
}
}
tracing::debug!(count = deduped.len(), "Processing VTXOs for delegation");
if let Some(script_map) = latest_script_map {
delegate_vtxos(&client, &delegator, &deduped, &script_map).await;
}
}
if should_renew {
renew_expiring_vtxos(&client).await;
}
}
}
});
let migration_handle = config.migrate_deprecated_signers.then(|| {
let client = client.clone();
let mut stop_rx = stop_rx.clone();
tokio::spawn(async move {
run_migration_arm(&client, &mut stop_rx).await;
})
});
loop {
tokio::select! {
_ = stop_rx.changed() => {
drop(work_tx);
let _ = worker_handle.await;
if let Some(handle) = migration_handle {
handle.abort();
}
return;
}
_ = renew_interval.tick() => {
if work_tx.send(WatcherWork::RenewTick {
script_map: Arc::clone(&script_map),
}).await.is_err() {
tracing::warn!("VTXO worker channel closed, reconnecting in {backoff:?}");
break;
}
}
_ = discovery_interval.tick() => {
match refresh_subscription_scripts(
client.as_ref(),
&subscription_id,
&mut subscribed_addrs,
)
.await
{
Ok(Some(new_script_map)) => {
script_map = new_script_map;
}
Ok(None) => {}
Err(e) => {
tracing::warn!("Failed to refresh script subscription: {e}");
}
}
}
event = stream.next() => {
match event {
Some(Ok(SubscriptionResponse::Heartbeat)) => {}
Some(Ok(SubscriptionResponse::Event(event))) => {
if !event.new_vtxos.is_empty() {
tracing::debug!(
txid = %event.txid,
new_vtxos = event.new_vtxos.len(),
"Received subscription event with new VTXOs"
);
if work_tx.send(WatcherWork::NewVtxos {
vtxos: event.new_vtxos,
script_map: Arc::clone(&script_map),
})
.await.is_err()
{
tracing::warn!("VTXO worker channel closed. Reconnecting in {backoff:?}");
break;
}
}
}
Some(Err(e)) => {
tracing::warn!("VTXO subscription error: {e}, reconnecting in {backoff:?}");
break;
}
None => {
tracing::debug!("VTXO subscription stream ended, reconnecting in {backoff:?}");
break;
}
}
}
}
}
drop(work_tx);
let _ = worker_handle.await;
if let Some(handle) = migration_handle {
handle.abort();
}
if wait_or_stop(&mut stop_rx, backoff).await {
return;
}
backoff = (backoff * 2).min(MAX_BACKOFF);
}
}
async fn run_migration_arm<B, W, S, K>(
client: &Client<B, W, S, K>,
stop_rx: &mut watch::Receiver<bool>,
) where
B: Blockchain + Send + Sync + 'static,
W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
S: SwapStorage + 'static,
K: KeyProvider + Send + Sync + 'static,
{
let mut consecutive_failures: u32 = 0;
loop {
let delay = migration_delay(consecutive_failures);
if wait_or_stop(stop_rx, delay).await {
return;
}
let mut rng = OsRng;
match client.migrate_deprecated_signer_vtxos(&mut rng).await {
Ok(report) => {
if report.failed() {
consecutive_failures = consecutive_failures.saturating_add(1);
let next = migration_delay(consecutive_failures);
tracing::warn!(
txids = ?report.settle_txids(),
vtxo_error = ?report.vtxo.error.as_deref(),
boarding_error = ?report.boarding.error.as_deref(),
"Background migration pass had leg failure; backing off {next:?}"
);
} else {
if report.rotated() {
tracing::info!(
txids = ?report.settle_txids(),
"Background migration rotated funds off deprecated signer(s)"
);
} else {
tracing::debug!("Background migration pass: nothing to migrate");
}
consecutive_failures = 0;
}
}
Err(e) => {
consecutive_failures = consecutive_failures.saturating_add(1);
let next = migration_delay(consecutive_failures);
tracing::warn!("Background migration pass failed: {e}; backing off {next:?}");
}
}
}
}
fn migration_delay(consecutive_failures: u32) -> Duration {
if consecutive_failures == 0 {
return MIGRATION_INTERVAL;
}
let shift = consecutive_failures - 1;
let scaled = MIGRATION_BASE_COOLDOWN
.checked_mul(1u32.checked_shl(shift).unwrap_or(u32::MAX))
.unwrap_or(MIGRATION_MAX_COOLDOWN);
scaled.min(MIGRATION_MAX_COOLDOWN)
}
async fn wait_or_stop(stop_rx: &mut watch::Receiver<bool>, duration: Duration) -> bool {
tokio::select! {
_ = stop_rx.changed() => true,
_ = tokio::time::sleep(duration) => false,
}
}
async fn refresh_subscription_scripts<B, W, S, K>(
client: &Client<B, W, S, K>,
subscription_id: &str,
subscribed_addrs: &mut HashSet<ArkAddress>,
) -> Result<Option<Arc<ScriptMap>>, Error>
where
B: Blockchain + Send + Sync + 'static,
W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
S: SwapStorage + 'static,
K: KeyProvider + Send + Sync + 'static,
{
let _discovered = client.discover_keys(KEY_DISCOVERY_GAP_LIMIT).await?;
let addrs = client.get_offchain_addresses()?;
let new_addrs: Vec<_> = addrs
.iter()
.map(|(addr, _)| *addr)
.filter(|addr| !subscribed_addrs.contains(addr))
.collect();
if new_addrs.is_empty() {
return Ok(None);
}
client
.subscribe_to_scripts(new_addrs.clone(), Some(subscription_id.to_string()))
.await?;
let added = new_addrs.len();
subscribed_addrs.extend(new_addrs);
tracing::info!(
added,
"Updated watcher subscription with newly derived addresses"
);
Ok(Some(Arc::new(ScriptMap::from_addresses(&addrs))))
}
async fn collect_new_delegation_candidates<B, W, S, K>(
client: &Client<B, W, S, K>,
script_map: &ScriptMap,
seen_unspent_outpoints: &mut HashSet<OutPoint>,
) -> Result<Vec<VirtualTxOutPoint>, Error>
where
B: Blockchain + Send + Sync + 'static,
W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
S: SwapStorage + 'static,
K: KeyProvider + Send + Sync + 'static,
{
let (vtxo_list, _) = client.list_vtxos().await?;
let mut current_outpoints = HashSet::new();
let mut newly_seen = Vec::new();
for vtp in vtxo_list.all_unspent() {
let Some(vtxo) = script_map.vtxo_by_script.get(&vtp.script) else {
continue;
};
if vtxo.delegator_pk().is_none() {
continue;
}
current_outpoints.insert(vtp.outpoint);
if !seen_unspent_outpoints.contains(&vtp.outpoint) {
newly_seen.push(vtp.clone());
}
}
*seen_unspent_outpoints = current_outpoints;
Ok(newly_seen)
}
struct DelegatorState {
cosigner_pk: PublicKey,
fee: Amount,
fee_address_script: ScriptBuf,
}
async fn fetch_delegator_state(delegator: &DelegatorClient) -> Result<DelegatorState, Error> {
let info = delegator
.info()
.await
.context(Error::ad_hoc("failed to get delegator info"))?;
let cosigner_pk: PublicKey = info
.pubkey
.parse::<PublicKey>()
.context("failed to parse delegator PK")?;
let fee = info
.fee
.parse::<u64>()
.map(Amount::from_sat)
.context("failed to parse delegator fee")?;
let fee_address_script = info
.delegator_address
.parse::<ArkAddress>()
.context("failed to parse delegator fee address")?
.to_p2tr_script_pubkey();
Ok(DelegatorState {
cosigner_pk,
fee,
fee_address_script,
})
}
const SECONDS_PER_DAY: i64 = 86_400;
fn day_timestamp(ts: i64) -> i64 {
ts - ts.rem_euclid(SECONDS_PER_DAY)
}
fn group_by_expiry_day<'a>(
vtxos: &'a [VirtualTxOutPoint],
script_map: &'a ScriptMap,
dust: Amount,
) -> Vec<(i64, Vec<(&'a VirtualTxOutPoint, &'a Vtxo)>)> {
let mut groups: BTreeMap<i64, Vec<(&'a VirtualTxOutPoint, &'a Vtxo)>> = BTreeMap::new();
let mut recoverable: Vec<(&'a VirtualTxOutPoint, &'a Vtxo)> = Vec::new();
for vtp in vtxos {
if vtp.is_spent {
continue;
}
let vtxo = match script_map.vtxo_by_script.get(&vtp.script) {
Some(v) => v,
None => continue,
};
if vtxo.delegator_pk().is_none() {
continue;
}
if vtp.is_recoverable(dust) {
recoverable.push((vtp, vtxo));
} else if vtp.expires_at > 0 {
let day = day_timestamp(vtp.expires_at);
groups.entry(day).or_default().push((vtp, vtxo));
}
}
if !recoverable.is_empty() {
if let Some((&earliest_day, _)) = groups.iter().next() {
groups.entry(earliest_day).or_default().extend(recoverable);
} else {
groups.insert(0, recoverable);
}
}
groups.into_iter().collect()
}
fn calculate_valid_at(group_vtxos: &[(&VirtualTxOutPoint, &Vtxo)], dust: Amount) -> u64 {
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let earliest_activation = group_vtxos
.iter()
.filter(|(vtp, _)| {
!vtp.is_recoverable(dust)
&& vtp.created_at > 0
&& vtp.expires_at > 0
&& vtp.expires_at > vtp.created_at
})
.map(|(vtp, _)| {
let created_at = vtp.created_at as u64;
let lifetime = (vtp.expires_at - vtp.created_at) as u64;
created_at + (lifetime * 9 / 10)
})
.min();
match earliest_activation {
Some(valid_at) if valid_at > now_secs => valid_at,
_ => now_secs + 60,
}
}
async fn delegate_vtxos<B, W, S, K>(
client: &Arc<Client<B, W, S, K>>,
delegator: &DelegatorClient,
new_vtxos: &[VirtualTxOutPoint],
script_map: &ScriptMap,
) where
B: Blockchain + Send + Sync + 'static,
W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
S: SwapStorage + 'static,
K: KeyProvider + Send + Sync + 'static,
{
let affected_addresses = script_map.addresses_for(new_vtxos);
if affected_addresses.is_empty() {
tracing::debug!("No affected addresses resolved from new VTXOs; skipping delegation");
return;
}
let vtxo_list = match client
.list_vtxos_for_addresses(affected_addresses.into_iter())
.await
{
Ok(v) => v,
Err(e) => {
tracing::error!("Failed to list VTXOs for delegation: {e}");
return;
}
};
let new_outpoints: HashSet<_> = new_vtxos.iter().map(|v| v.outpoint).collect();
let enriched: Vec<_> = vtxo_list
.all_unspent()
.filter(|vtp| new_outpoints.contains(&vtp.outpoint))
.cloned()
.collect();
let server_info = match client.server_info() {
Ok(server_info) => server_info,
Err(e) => {
tracing::error!("Failed to read server info for delegation: {e}");
return;
}
};
let groups = group_by_expiry_day(&enriched, script_map, server_info.dust);
if groups.is_empty() {
tracing::debug!("No delegate-eligible VTXOs after enrichment/grouping; skipping");
return;
}
let delegator_state = match fetch_delegator_state(delegator).await {
Ok(s) => Arc::new(s),
Err(e) => {
tracing::error!("{e}");
return;
}
};
let (to_address, _) = match client.get_offchain_address() {
Ok(v) => v,
Err(e) => {
tracing::error!("Failed to get offchain address for delegation: {e}");
return;
}
};
let dest_script = to_address.to_p2tr_script_pubkey();
let mut handles = Vec::new();
for (_day, group_vtxos) in groups {
let valid_at = calculate_valid_at(&group_vtxos, server_info.dust);
let mut vtxo_inputs = Vec::new();
let mut total_amount = Amount::ZERO;
for (vtp, vtxo) in &group_vtxos {
let spend_info = match vtxo.delegate_spend_info() {
Ok(info) => info,
Err(e) => {
tracing::warn!(outpoint = %vtp.outpoint, "Cannot get delegate spend info: {e}");
continue;
}
};
vtxo_inputs.push(intent::Input::new(
vtp.outpoint,
vtxo.exit_delay(),
None,
TxOut {
value: vtp.amount,
script_pubkey: vtp.script.clone(),
},
vtxo.tapscripts(),
spend_info,
vtp.is_spent,
false,
vtp.assets.clone(),
));
total_amount += vtp.amount;
}
if vtxo_inputs.is_empty() {
continue;
}
let fee = delegator_state.fee;
if fee >= total_amount {
tracing::warn!(
%total_amount, %fee,
"Delegator fee exceeds VTXO group value, skipping"
);
continue;
}
let net_amount = total_amount - fee;
if net_amount < server_info.dust {
tracing::warn!(%net_amount, "Net amount after fee is below dust, skipping");
continue;
}
let mut outputs = Vec::new();
if fee > Amount::ZERO {
outputs.push(intent::Output::Offchain(TxOut {
value: fee,
script_pubkey: delegator_state.fee_address_script.clone(),
}));
}
outputs.push(intent::Output::Offchain(TxOut {
value: net_amount,
script_pubkey: dest_script.clone(),
}));
let server_info_forfeit_addr = server_info.forfeit_address.clone();
let dust = server_info.dust;
let ds = Arc::clone(&delegator_state);
let delegator = delegator.clone();
let client = Arc::clone(client);
handles.push(tokio::spawn(async move {
delegate_group(
&client,
&delegator,
vtxo_inputs,
outputs,
ds.cosigner_pk,
&server_info_forfeit_addr,
dust,
valid_at,
)
.await;
}));
}
for handle in handles {
let _ = handle.await;
}
}
async fn delegate_group<B, W, S, K>(
client: &Client<B, W, S, K>,
delegator: &DelegatorClient,
vtxo_inputs: Vec<intent::Input>,
outputs: Vec<intent::Output>,
cosigner_pk: PublicKey,
forfeit_address: &bitcoin::Address,
dust: Amount,
valid_at: u64,
) where
B: Blockchain + Send + Sync + 'static,
W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
S: SwapStorage + 'static,
K: KeyProvider + Send + Sync + 'static,
{
let input_count = vtxo_inputs.len();
let mut delegate = match ark_core::batch::prepare_delegate_psbts_at(
vtxo_inputs,
outputs,
cosigner_pk,
forfeit_address,
dust,
Some(valid_at),
) {
Ok(d) => d,
Err(e) => {
tracing::error!("Failed to prepare delegate PSBTs: {e}");
return;
}
};
if let Err(e) =
client.sign_delegate_psbts(&mut delegate.intent.proof, &mut delegate.forfeit_psbts)
{
tracing::error!("Failed to sign delegate PSBTs: {e}");
return;
}
if let Err(e) = delegator
.delegate(&delegate.intent, &delegate.forfeit_psbts, None)
.await
{
tracing::error!("Failed to submit delegation: {e}");
return;
}
tracing::info!(
vtxo_count = input_count,
valid_at,
"Delegated VTXO group to delegator service"
);
}
const SELF_RENEW_REMAINING_FRACTION: f64 = 0.10;
async fn renew_expiring_vtxos<B, W, S, K>(client: &Client<B, W, S, K>)
where
B: Blockchain + Send + Sync + 'static,
W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
S: SwapStorage + 'static,
K: KeyProvider + Send + Sync + 'static,
{
let (vtxo_list, _) = match client.list_vtxos().await {
Ok(v) => v,
Err(e) => {
tracing::warn!("Failed to list VTXOs for renewal check: {e}");
return;
}
};
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
let expiring_outpoints: Vec<OutPoint> = vtxo_list
.all_unspent()
.filter(|vtp| {
if vtp.expires_at <= 0 || vtp.created_at <= 0 {
return false;
}
let total_lifetime = vtp.expires_at - vtp.created_at;
let remaining = vtp.expires_at - now;
remaining > 0
&& (remaining as f64) < (total_lifetime as f64 * SELF_RENEW_REMAINING_FRACTION)
})
.map(|vtp| vtp.outpoint)
.collect();
if expiring_outpoints.is_empty() {
return;
}
tracing::info!(
count = expiring_outpoints.len(),
"Self-renewing expiring VTXOs"
);
let mut rng = OsRng;
match client
.settle_vtxos(&mut rng, &expiring_outpoints, &[])
.await
{
Ok(Some(txid)) => {
tracing::info!(%txid, "Self-renewed expiring VTXOs");
}
Ok(None) => {}
Err(e) => {
tracing::warn!("Failed to self-renew VTXOs: {e}");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bitcoin::hashes::Hash;
use bitcoin::key::Secp256k1;
use bitcoin::Network;
use bitcoin::Sequence;
use bitcoin::Txid;
use bitcoin::XOnlyPublicKey;
use std::str::FromStr;
fn test_keys() -> (XOnlyPublicKey, XOnlyPublicKey, XOnlyPublicKey) {
let server = XOnlyPublicKey::from_str(
"18845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
)
.unwrap();
let owner = XOnlyPublicKey::from_str(
"28845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
)
.unwrap();
let delegator = XOnlyPublicKey::from_str(
"38845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
)
.unwrap();
(server, owner, delegator)
}
fn delegated_vtxo() -> (ArkAddress, Vtxo) {
let secp = Secp256k1::new();
let (server, owner, delegator) = test_keys();
let vtxo = Vtxo::new_with_delegator(
&secp,
server,
owner,
delegator,
Sequence::from_seconds_ceil(86400).unwrap(),
Network::Regtest,
)
.unwrap();
(vtxo.to_ark_address(), vtxo)
}
fn mk_vtp(script: ScriptBuf, amount_sat: u64, expires_at: i64, vout: u32) -> VirtualTxOutPoint {
VirtualTxOutPoint {
outpoint: OutPoint::new(Txid::all_zeros(), vout),
created_at: expires_at - 1000,
expires_at,
amount: Amount::from_sat(amount_sat),
script,
is_preconfirmed: false,
is_swept: false,
is_unrolled: false,
is_spent: false,
spent_by: None,
commitment_txids: vec![],
settled_by: None,
ark_txid: None,
assets: vec![],
}
}
#[test]
fn migration_delay_uses_base_interval_when_healthy() {
assert_eq!(migration_delay(0), MIGRATION_INTERVAL);
}
#[test]
fn migration_delay_backs_off_exponentially_and_caps() {
assert_eq!(migration_delay(1), MIGRATION_BASE_COOLDOWN);
assert_eq!(migration_delay(2), MIGRATION_BASE_COOLDOWN * 2);
assert_eq!(migration_delay(3), MIGRATION_BASE_COOLDOWN * 4);
assert_eq!(migration_delay(4), MIGRATION_BASE_COOLDOWN * 8);
assert_eq!(migration_delay(5), MIGRATION_MAX_COOLDOWN);
assert_eq!(migration_delay(100), MIGRATION_MAX_COOLDOWN);
assert_eq!(migration_delay(u32::MAX), MIGRATION_MAX_COOLDOWN);
}
#[test]
fn day_timestamp_normalizes_to_midnight() {
let ts = 1705322700; let day = day_timestamp(ts);
assert_eq!(day % SECONDS_PER_DAY, 0);
assert!(day <= ts);
assert!(ts - day < SECONDS_PER_DAY);
}
#[test]
fn day_timestamp_already_midnight() {
let ts = SECONDS_PER_DAY * 19738;
assert_eq!(day_timestamp(ts), ts);
}
#[test]
fn group_by_expiry_day_merges_recoverable_into_earliest_group() {
let (addr, vtxo) = delegated_vtxo();
let script = addr.to_p2tr_script_pubkey();
let script_map = ScriptMap::from_addresses(&[(addr, vtxo)]);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let day1_midnight = day_timestamp(now) + SECONDS_PER_DAY;
let day2_midnight = day1_midnight + SECONDS_PER_DAY;
let recoverable = mk_vtp(script.clone(), 100, day1_midnight + 500, 0); let non_recoverable_day1 = mk_vtp(script.clone(), 10_000, day1_midnight + 800, 1);
let non_recoverable_day2 = mk_vtp(script, 10_000, day2_midnight + 800, 2);
let vtxos = [non_recoverable_day2, recoverable, non_recoverable_day1];
let groups = group_by_expiry_day(&vtxos, &script_map, Amount::from_sat(500));
assert_eq!(groups.len(), 2);
assert_eq!(groups[0].0, day_timestamp(day1_midnight + 800));
assert_eq!(groups[1].0, day_timestamp(day2_midnight + 800));
assert_eq!(groups[0].1.len(), 2);
assert_eq!(groups[1].1.len(), 1);
}
#[test]
fn calculate_valid_at_for_non_recoverable_group_is_before_expiry() {
let (_addr, vtxo) = delegated_vtxo();
let script = ScriptBuf::new();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let later = mk_vtp(script, 10_000, now + 10_000, 1);
let group = vec![(&later, &vtxo)];
let valid_at = calculate_valid_at(&group, Amount::from_sat(500));
assert!(valid_at > now as u64);
assert!(valid_at < later.expires_at as u64);
}
#[test]
fn calculate_valid_at_for_recoverable_only_group_is_soon() {
let (_addr, vtxo) = delegated_vtxo();
let script = ScriptBuf::new();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let recoverable = mk_vtp(script, 100, now + 5_000, 0); let group = vec![(&recoverable, &vtxo)];
let start = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let valid_at = calculate_valid_at(&group, Amount::from_sat(500));
let end = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
assert!(valid_at >= start + 60);
assert!(valid_at <= end + 61);
}
}