Skip to main content

ark_client/
vtxo_watcher.rs

1//! Background VTXO watcher that auto-delegates and auto-renews VTXOs.
2//!
3//! Full behavior:
4//! - On new VTXOs received: submit them to the delegator service for future renewal
5//! - On new VTXOs received: self-renew VTXOs that are close to expiry (safety net)
6//! - On stream error: reconnect with exponential backoff
7
8use crate::error::ErrorContext;
9use crate::key_provider::KeyProvider;
10use crate::swap_storage::SwapStorage;
11use crate::wallet::BoardingWallet;
12use crate::wallet::OnchainWallet;
13use crate::Blockchain;
14use crate::Client;
15use crate::Error;
16use ark_core::intent;
17use ark_core::server::SubscriptionResponse;
18use ark_core::server::VirtualTxOutPoint;
19use ark_core::ArkAddress;
20use ark_core::Vtxo;
21use ark_delegator::DelegatorClient;
22use bitcoin::secp256k1::PublicKey;
23use bitcoin::Amount;
24use bitcoin::OutPoint;
25use bitcoin::ScriptBuf;
26use bitcoin::TxOut;
27use futures::StreamExt;
28use rand::rngs::OsRng;
29use std::collections::BTreeMap;
30use std::collections::HashMap;
31use std::collections::HashSet;
32use std::sync::Arc;
33use std::time::Duration;
34use tokio::sync::mpsc;
35use tokio::sync::watch;
36
37/// Handle to stop the background VTXO watcher.
38///
39/// Dropping the handle will also stop the watcher.
40pub struct VtxoWatcherHandle {
41    stop_tx: watch::Sender<bool>,
42}
43
44impl VtxoWatcherHandle {
45    /// Stop the background watcher.
46    pub fn stop(self) {
47        let _ = self.stop_tx.send(true);
48    }
49}
50
51impl Drop for VtxoWatcherHandle {
52    fn drop(&mut self) {
53        let _ = self.stop_tx.send(true);
54    }
55}
56
57/// Backoff parameters for reconnection.
58const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
59const MAX_BACKOFF: Duration = Duration::from_secs(30);
60
61/// Periodic key discovery settings for keeping script subscriptions fresh.
62const KEY_DISCOVERY_INTERVAL: Duration = Duration::from_secs(10);
63const KEY_DISCOVERY_GAP_LIMIT: u32 = 20;
64
65/// Pre-computed mapping from script pubkeys to their Vtxo metadata and ArkAddress.
66///
67/// Built once per (re)connection from `get_offchain_addresses()`. Used both for the subscription
68/// and for resolving VTXO metadata from subscription events, so they can never diverge.
69struct ScriptMap {
70    vtxo_by_script: HashMap<ScriptBuf, Vtxo>,
71    addr_by_script: HashMap<ScriptBuf, ArkAddress>,
72}
73
74impl ScriptMap {
75    fn from_addresses(addresses: &[(ArkAddress, Vtxo)]) -> Self {
76        let mut vtxo_by_script = HashMap::with_capacity(addresses.len());
77        let mut addr_by_script = HashMap::with_capacity(addresses.len());
78        for (addr, vtxo) in addresses {
79            let script = addr.to_p2tr_script_pubkey();
80            vtxo_by_script.insert(script.clone(), vtxo.clone());
81            addr_by_script.insert(script, *addr);
82        }
83        Self {
84            vtxo_by_script,
85            addr_by_script,
86        }
87    }
88
89    /// Get the unique ArkAddresses that appear in the given VTXO outpoints.
90    fn addresses_for(&self, vtxos: &[VirtualTxOutPoint]) -> Vec<ArkAddress> {
91        let mut seen = HashSet::new();
92        let mut result = Vec::new();
93        for vtp in vtxos {
94            if let Some(addr) = self.addr_by_script.get(&vtp.script) {
95                if seen.insert(&vtp.script) {
96                    result.push(*addr);
97                }
98            }
99        }
100        result
101    }
102}
103
104enum WatcherWork {
105    NewVtxos {
106        vtxos: Vec<VirtualTxOutPoint>,
107        script_map: Arc<ScriptMap>,
108    },
109    RenewTick {
110        script_map: Arc<ScriptMap>,
111    },
112}
113
114impl<B, W, S, K> Client<B, W, S, K>
115where
116    B: Blockchain + Send + Sync + 'static,
117    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
118    S: SwapStorage + 'static,
119    K: KeyProvider + Send + Sync + 'static,
120{
121    /// Start a background task that watches for new VTXOs and:
122    ///
123    /// 1. **Delegates** them to the configured delegator service for future auto-renewal
124    /// 2. **Self-renews** VTXOs that are close to expiry (safety net)
125    ///
126    /// Reconnects automatically with exponential backoff (1s → 2s → … → 30s) on stream errors.
127    ///
128    /// Requires the client to be wrapped in an `Arc` for shared ownership with the background
129    /// task.
130    ///
131    /// Returns a [`VtxoWatcherHandle`] that stops the watcher when dropped.
132    pub fn start_vtxo_watcher(
133        self: &Arc<Self>,
134        delegator: Arc<DelegatorClient>,
135    ) -> VtxoWatcherHandle {
136        let (stop_tx, stop_rx) = watch::channel(false);
137
138        let client = Arc::clone(self);
139        tokio::spawn(async move {
140            run_watcher_loop(client, delegator, stop_rx).await;
141            tracing::debug!("VTXO watcher stopped");
142        });
143
144        VtxoWatcherHandle { stop_tx }
145    }
146}
147
148/// Outer loop that reconnects on stream errors with exponential backoff.
149async fn run_watcher_loop<B, W, S, K>(
150    client: Arc<Client<B, W, S, K>>,
151    delegator: Arc<DelegatorClient>,
152    mut stop_rx: watch::Receiver<bool>,
153) where
154    B: Blockchain + Send + Sync + 'static,
155    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
156    S: SwapStorage + 'static,
157    K: KeyProvider + Send + Sync + 'static,
158{
159    let mut backoff = INITIAL_BACKOFF;
160
161    loop {
162        if *stop_rx.borrow() {
163            return;
164        }
165
166        // Build the script map and subscription from the same address set.
167        let addresses = match client.get_offchain_addresses() {
168            Ok(a) => a,
169            Err(e) => {
170                tracing::error!("Failed to get offchain addresses: {e}");
171                return;
172            }
173        };
174        let script_map = Arc::new(ScriptMap::from_addresses(&addresses));
175        let ark_addresses: Vec<_> = addresses.iter().map(|(addr, _)| *addr).collect();
176
177        let subscription_id = match client.subscribe_to_scripts(ark_addresses, None).await {
178            Ok(id) => id,
179            Err(e) => {
180                tracing::warn!("Failed to subscribe: {e}, retrying in {backoff:?}");
181                if wait_or_stop(&mut stop_rx, backoff).await {
182                    return;
183                }
184                backoff = (backoff * 2).min(MAX_BACKOFF);
185                continue;
186            }
187        };
188
189        let mut stream = match client.get_subscription(subscription_id.clone()).await {
190            Ok(s) => s,
191            Err(e) => {
192                tracing::warn!("Failed to get subscription stream: {e}, retrying in {backoff:?}");
193                if wait_or_stop(&mut stop_rx, backoff).await {
194                    return;
195                }
196                backoff = (backoff * 2).min(MAX_BACKOFF);
197                continue;
198            }
199        };
200
201        tracing::info!("VTXO watcher connected");
202        backoff = INITIAL_BACKOFF;
203        let mut subscribed_addrs: HashSet<ArkAddress> =
204            addresses.iter().map(|(addr, _)| *addr).collect();
205        let mut script_map = script_map;
206        let mut renew_interval = tokio::time::interval(Duration::from_secs(60));
207        let mut discovery_interval = tokio::time::interval(KEY_DISCOVERY_INTERVAL);
208        let (work_tx, mut work_rx) = mpsc::channel::<WatcherWork>(128);
209
210        let worker_handle = tokio::spawn({
211            let client = client.clone();
212            let delegator = delegator.clone();
213            async move {
214                let mut seen_unspent_outpoints = HashSet::<OutPoint>::new();
215
216                while let Some(first) = work_rx.recv().await {
217                    // Handle one guaranteed message to start the batch.
218                    let (
219                        mut pending_vtxos,
220                        mut latest_script_map,
221                        mut should_renew,
222                        mut should_sync,
223                    ) = match first {
224                        WatcherWork::NewVtxos { vtxos, script_map } => {
225                            (vtxos, Some(script_map), true, false)
226                        }
227                        WatcherWork::RenewTick { script_map } => {
228                            (Vec::new(), Some(script_map), true, true)
229                        }
230                    };
231
232                    // Drain whatever else is already queued without waiting.
233                    while let Ok(work) = work_rx.try_recv() {
234                        match work {
235                            WatcherWork::NewVtxos { vtxos, script_map } => {
236                                pending_vtxos.extend(vtxos);
237                                latest_script_map = Some(script_map);
238                                should_renew = true;
239                            }
240                            WatcherWork::RenewTick { script_map } => {
241                                latest_script_map = Some(script_map);
242                                should_renew = true;
243                                should_sync = true;
244                            }
245                        }
246                    }
247
248                    if let (true, Some(script_map)) = (should_sync, latest_script_map.as_deref()) {
249                        match collect_new_delegation_candidates(
250                            &client,
251                            script_map,
252                            &mut seen_unspent_outpoints,
253                        )
254                        .await
255                        {
256                            Ok(new_candidates) => {
257                                if !new_candidates.is_empty() {
258                                    tracing::debug!(
259                                        count = new_candidates.len(),
260                                        "Found new delegatable VTXOs from failsafe polling"
261                                    );
262                                    pending_vtxos.extend(new_candidates);
263                                }
264                            }
265                            Err(e) => {
266                                tracing::warn!("Failsafe delegation poll failed: {e}");
267                            }
268                        }
269                    }
270
271                    if !pending_vtxos.is_empty() {
272                        let mut deduped = Vec::new();
273                        let mut seen = HashSet::new();
274                        for vtxo in pending_vtxos {
275                            if seen.insert(vtxo.outpoint) {
276                                deduped.push(vtxo);
277                            }
278                        }
279
280                        tracing::debug!(count = deduped.len(), "Processing VTXOs for delegation");
281                        if let Some(script_map) = latest_script_map {
282                            delegate_vtxos(&client, &delegator, &deduped, &script_map).await;
283                        }
284                    }
285
286                    if should_renew {
287                        renew_expiring_vtxos(&client).await;
288                    }
289                }
290            }
291        });
292
293        loop {
294            tokio::select! {
295                _ = stop_rx.changed() => {
296                    drop(work_tx);
297                    let _ = worker_handle.await;
298                    return;
299                }
300                _ = renew_interval.tick() => {
301                    if work_tx.send(WatcherWork::RenewTick {
302                        script_map: Arc::clone(&script_map),
303                    }).await.is_err() {
304                        tracing::warn!("VTXO worker channel closed, reconnecting in {backoff:?}");
305                        break;
306                    }
307                }
308                _ = discovery_interval.tick() => {
309                    match refresh_subscription_scripts(
310                        client.as_ref(),
311                        &subscription_id,
312                        &mut subscribed_addrs,
313                    )
314                    .await
315                    {
316                        Ok(Some(new_script_map)) => {
317                            script_map = new_script_map;
318                        }
319                        Ok(None) => {}
320                        Err(e) => {
321                            tracing::warn!("Failed to refresh script subscription: {e}");
322                        }
323                    }
324                }
325                event = stream.next() => {
326                    match event {
327                        Some(Ok(SubscriptionResponse::Heartbeat)) => {}
328                        Some(Ok(SubscriptionResponse::Event(event))) => {
329                            if !event.new_vtxos.is_empty() {
330                                tracing::debug!(
331                                    txid = %event.txid,
332                                    new_vtxos = event.new_vtxos.len(),
333                                    "Received subscription event with new VTXOs"
334                                );
335
336                                if work_tx.send(WatcherWork::NewVtxos {
337                                    vtxos: event.new_vtxos,
338                                    script_map: Arc::clone(&script_map),
339                                })
340                                .await.is_err()
341                                {
342                                    tracing::warn!("VTXO worker channel closed. Reconnecting in {backoff:?}");
343                                    break;
344                                }
345                            }
346                        }
347                        Some(Err(e)) => {
348                            tracing::warn!("VTXO subscription error: {e}, reconnecting in {backoff:?}");
349                            break;
350                        }
351                        None => {
352                            tracing::debug!("VTXO subscription stream ended, reconnecting in {backoff:?}");
353                            break;
354                        }
355                    }
356                }
357            }
358        }
359
360        drop(work_tx);
361        let _ = worker_handle.await;
362
363        if wait_or_stop(&mut stop_rx, backoff).await {
364            return;
365        }
366        backoff = (backoff * 2).min(MAX_BACKOFF);
367    }
368}
369
370/// Wait for the given duration or until stop is signalled. Returns `true` if stopped.
371async fn wait_or_stop(stop_rx: &mut watch::Receiver<bool>, duration: Duration) -> bool {
372    tokio::select! {
373        _ = stop_rx.changed() => true,
374        _ = tokio::time::sleep(duration) => false,
375    }
376}
377
378/// Discover keys and add newly derived scripts to an existing subscription.
379async fn refresh_subscription_scripts<B, W, S, K>(
380    client: &Client<B, W, S, K>,
381    subscription_id: &str,
382    subscribed_addrs: &mut HashSet<ArkAddress>,
383) -> Result<Option<Arc<ScriptMap>>, Error>
384where
385    B: Blockchain + Send + Sync + 'static,
386    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
387    S: SwapStorage + 'static,
388    K: KeyProvider + Send + Sync + 'static,
389{
390    let _discovered = client.discover_keys(KEY_DISCOVERY_GAP_LIMIT).await?;
391
392    let addrs = client.get_offchain_addresses()?;
393    let new_addrs: Vec<_> = addrs
394        .iter()
395        .map(|(addr, _)| *addr)
396        .filter(|addr| !subscribed_addrs.contains(addr))
397        .collect();
398
399    if new_addrs.is_empty() {
400        return Ok(None);
401    }
402
403    client
404        .subscribe_to_scripts(new_addrs.clone(), Some(subscription_id.to_string()))
405        .await?;
406
407    let added = new_addrs.len();
408    subscribed_addrs.extend(new_addrs);
409    tracing::info!(
410        added,
411        "Updated watcher subscription with newly derived addresses"
412    );
413
414    Ok(Some(Arc::new(ScriptMap::from_addresses(&addrs))))
415}
416
417/// Enumerate newly seen unspent delegate-eligible VTXOs from wallet state.
418///
419/// This is a failsafe path to catch outputs that may have been missed by subscription timing.
420async fn collect_new_delegation_candidates<B, W, S, K>(
421    client: &Client<B, W, S, K>,
422    script_map: &ScriptMap,
423    seen_unspent_outpoints: &mut HashSet<OutPoint>,
424) -> Result<Vec<VirtualTxOutPoint>, Error>
425where
426    B: Blockchain + Send + Sync + 'static,
427    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
428    S: SwapStorage + 'static,
429    K: KeyProvider + Send + Sync + 'static,
430{
431    let (vtxo_list, _) = client.list_vtxos().await?;
432
433    let mut current_outpoints = HashSet::new();
434    let mut newly_seen = Vec::new();
435
436    for vtp in vtxo_list.all_unspent() {
437        let Some(vtxo) = script_map.vtxo_by_script.get(&vtp.script) else {
438            continue;
439        };
440
441        if vtxo.delegator_pk().is_none() {
442            continue;
443        }
444
445        current_outpoints.insert(vtp.outpoint);
446
447        if !seen_unspent_outpoints.contains(&vtp.outpoint) {
448            newly_seen.push(vtp.clone());
449        }
450    }
451
452    *seen_unspent_outpoints = current_outpoints;
453
454    Ok(newly_seen)
455}
456
457/// Delegator info cached per delegation batch.
458struct DelegatorState {
459    cosigner_pk: PublicKey,
460    fee: Amount,
461    fee_address_script: ScriptBuf,
462}
463
464/// Fetch and parse delegator info into a usable form.
465async fn fetch_delegator_state(delegator: &DelegatorClient) -> Result<DelegatorState, Error> {
466    let info = delegator
467        .info()
468        .await
469        .context(Error::ad_hoc("failed to get delegator info"))?;
470
471    let cosigner_pk: PublicKey = info
472        .pubkey
473        .parse::<PublicKey>()
474        .context("failed to parse delegator PK")?;
475
476    let fee = info
477        .fee
478        .parse::<u64>()
479        .map(Amount::from_sat)
480        .context("failed to parse delegator fee")?;
481
482    let fee_address_script = info
483        .delegator_address
484        .parse::<ArkAddress>()
485        .context("failed to parse delegator fee address")?
486        .to_p2tr_script_pubkey();
487
488    Ok(DelegatorState {
489        cosigner_pk,
490        fee,
491        fee_address_script,
492    })
493}
494
495/// Number of seconds in a UTC day.
496const SECONDS_PER_DAY: i64 = 86_400;
497
498/// Normalize a unix timestamp (seconds) to UTC midnight of that day.
499fn day_timestamp(ts: i64) -> i64 {
500    ts - ts.rem_euclid(SECONDS_PER_DAY)
501}
502
503/// Group VTXOs by their expiry day (UTC midnight), returning groups sorted by expiry.
504///
505/// Recoverable VTXOs (expired or sub-dust) are collected separately and merged into the earliest
506/// non-recoverable group.
507fn group_by_expiry_day<'a>(
508    vtxos: &'a [VirtualTxOutPoint],
509    script_map: &'a ScriptMap,
510    dust: Amount,
511) -> Vec<(i64, Vec<(&'a VirtualTxOutPoint, &'a Vtxo)>)> {
512    let mut groups: BTreeMap<i64, Vec<(&'a VirtualTxOutPoint, &'a Vtxo)>> = BTreeMap::new();
513    let mut recoverable: Vec<(&'a VirtualTxOutPoint, &'a Vtxo)> = Vec::new();
514
515    for vtp in vtxos {
516        if vtp.is_spent {
517            continue;
518        }
519
520        let vtxo = match script_map.vtxo_by_script.get(&vtp.script) {
521            Some(v) => v,
522            None => continue,
523        };
524
525        if vtxo.delegator_pk().is_none() {
526            continue;
527        }
528
529        if vtp.is_recoverable(dust) {
530            recoverable.push((vtp, vtxo));
531        } else if vtp.expires_at > 0 {
532            let day = day_timestamp(vtp.expires_at);
533            groups.entry(day).or_default().push((vtp, vtxo));
534        }
535    }
536
537    if !recoverable.is_empty() {
538        if let Some((&earliest_day, _)) = groups.iter().next() {
539            groups.entry(earliest_day).or_default().extend(recoverable);
540        } else {
541            groups.insert(0, recoverable);
542        }
543    }
544
545    groups.into_iter().collect()
546}
547
548/// Calculate the `valid_at` timestamp for a delegation group.
549///
550/// For each non-recoverable VTXO, compute activation at 90% of its full lifetime:
551/// `created_at + (expires_at - created_at) * 0.9`. The earliest of those activations is used.
552///
553/// If the group only contains recoverable/expired VTXOs (or activation is already in the past),
554/// schedule soon (`now + 60s`).
555fn calculate_valid_at(group_vtxos: &[(&VirtualTxOutPoint, &Vtxo)], dust: Amount) -> u64 {
556    let now_secs = std::time::SystemTime::now()
557        .duration_since(std::time::UNIX_EPOCH)
558        .unwrap_or_default()
559        .as_secs();
560
561    let earliest_activation = group_vtxos
562        .iter()
563        .filter(|(vtp, _)| {
564            !vtp.is_recoverable(dust)
565                && vtp.created_at > 0
566                && vtp.expires_at > 0
567                && vtp.expires_at > vtp.created_at
568        })
569        .map(|(vtp, _)| {
570            let created_at = vtp.created_at as u64;
571            let lifetime = (vtp.expires_at - vtp.created_at) as u64;
572            created_at + (lifetime * 9 / 10)
573        })
574        .min();
575
576    match earliest_activation {
577        Some(valid_at) if valid_at > now_secs => valid_at,
578        _ => now_secs + 60,
579    }
580}
581
582/// Submit newly received VTXOs to the delegator service for future auto-renewal.
583///
584/// The `script_map` provides VTXO metadata (tapscripts, spend info) without a network call.
585/// Only the affected addresses are queried for expiry data.
586async fn delegate_vtxos<B, W, S, K>(
587    client: &Arc<Client<B, W, S, K>>,
588    delegator: &DelegatorClient,
589    new_vtxos: &[VirtualTxOutPoint],
590    script_map: &ScriptMap,
591) where
592    B: Blockchain + Send + Sync + 'static,
593    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
594    S: SwapStorage + 'static,
595    K: KeyProvider + Send + Sync + 'static,
596{
597    // Query only the addresses that appear in the event, not all wallet addresses.
598    let affected_addresses = script_map.addresses_for(new_vtxos);
599    if affected_addresses.is_empty() {
600        tracing::debug!("No affected addresses resolved from new VTXOs; skipping delegation");
601        return;
602    }
603
604    let vtxo_list = match client
605        .list_vtxos_for_addresses(affected_addresses.into_iter())
606        .await
607    {
608        Ok(v) => v,
609        Err(e) => {
610            tracing::error!("Failed to list VTXOs for delegation: {e}");
611            return;
612        }
613    };
614
615    // The subscription event tells us which outpoints are new, but we need the full
616    // VirtualTxOutPoint (with expires_at, created_at) from the server for grouping.
617    let new_outpoints: HashSet<_> = new_vtxos.iter().map(|v| v.outpoint).collect();
618    let enriched: Vec<_> = vtxo_list
619        .all_unspent()
620        .filter(|vtp| new_outpoints.contains(&vtp.outpoint))
621        .cloned()
622        .collect();
623
624    let groups = group_by_expiry_day(&enriched, script_map, client.server_info.dust);
625    if groups.is_empty() {
626        tracing::debug!("No delegate-eligible VTXOs after enrichment/grouping; skipping");
627        return;
628    }
629
630    let delegator_state = match fetch_delegator_state(delegator).await {
631        Ok(s) => Arc::new(s),
632        Err(e) => {
633            tracing::error!("{e}");
634            return;
635        }
636    };
637
638    let (to_address, _) = match client.get_offchain_address() {
639        Ok(v) => v,
640        Err(e) => {
641            tracing::error!("Failed to get offchain address for delegation: {e}");
642            return;
643        }
644    };
645    let dest_script = to_address.to_p2tr_script_pubkey();
646
647    let mut handles = Vec::new();
648
649    for (_day, group_vtxos) in groups {
650        let valid_at = calculate_valid_at(&group_vtxos, client.server_info.dust);
651
652        let mut vtxo_inputs = Vec::new();
653        let mut total_amount = Amount::ZERO;
654
655        for (vtp, vtxo) in &group_vtxos {
656            let spend_info = match vtxo.delegate_spend_info() {
657                Ok(info) => info,
658                Err(e) => {
659                    tracing::warn!(outpoint = %vtp.outpoint, "Cannot get delegate spend info: {e}");
660                    continue;
661                }
662            };
663
664            vtxo_inputs.push(intent::Input::new(
665                vtp.outpoint,
666                vtxo.exit_delay(),
667                None,
668                TxOut {
669                    value: vtp.amount,
670                    script_pubkey: vtp.script.clone(),
671                },
672                vtxo.tapscripts(),
673                spend_info,
674                vtp.is_spent,
675                false,
676                vtp.assets.clone(),
677            ));
678
679            total_amount += vtp.amount;
680        }
681
682        if vtxo_inputs.is_empty() {
683            continue;
684        }
685
686        let fee = delegator_state.fee;
687        if fee >= total_amount {
688            tracing::warn!(
689                %total_amount, %fee,
690                "Delegator fee exceeds VTXO group value, skipping"
691            );
692            continue;
693        }
694        let net_amount = total_amount - fee;
695
696        if net_amount < client.server_info.dust {
697            tracing::warn!(%net_amount, "Net amount after fee is below dust, skipping");
698            continue;
699        }
700
701        let mut outputs = Vec::new();
702        if fee > Amount::ZERO {
703            outputs.push(intent::Output::Offchain(TxOut {
704                value: fee,
705                script_pubkey: delegator_state.fee_address_script.clone(),
706            }));
707        }
708        outputs.push(intent::Output::Offchain(TxOut {
709            value: net_amount,
710            script_pubkey: dest_script.clone(),
711        }));
712
713        let server_info_forfeit_addr = client.server_info.forfeit_address.clone();
714        let dust = client.server_info.dust;
715        let ds = Arc::clone(&delegator_state);
716
717        let delegator = delegator.clone();
718        let client = Arc::clone(client);
719        handles.push(tokio::spawn(async move {
720            delegate_group(
721                &client,
722                &delegator,
723                vtxo_inputs,
724                outputs,
725                ds.cosigner_pk,
726                &server_info_forfeit_addr,
727                dust,
728                valid_at,
729            )
730            .await;
731        }));
732    }
733
734    for handle in handles {
735        let _ = handle.await;
736    }
737}
738
739/// Prepare, sign, and submit a single delegation group.
740async fn delegate_group<B, W, S, K>(
741    client: &Client<B, W, S, K>,
742    delegator: &DelegatorClient,
743    vtxo_inputs: Vec<intent::Input>,
744    outputs: Vec<intent::Output>,
745    cosigner_pk: PublicKey,
746    forfeit_address: &bitcoin::Address,
747    dust: Amount,
748    valid_at: u64,
749) where
750    B: Blockchain + Send + Sync + 'static,
751    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
752    S: SwapStorage + 'static,
753    K: KeyProvider + Send + Sync + 'static,
754{
755    let input_count = vtxo_inputs.len();
756
757    let mut delegate = match ark_core::batch::prepare_delegate_psbts_at(
758        vtxo_inputs,
759        outputs,
760        cosigner_pk,
761        forfeit_address,
762        dust,
763        Some(valid_at),
764    ) {
765        Ok(d) => d,
766        Err(e) => {
767            tracing::error!("Failed to prepare delegate PSBTs: {e}");
768            return;
769        }
770    };
771
772    if let Err(e) =
773        client.sign_delegate_psbts(&mut delegate.intent.proof, &mut delegate.forfeit_psbts)
774    {
775        tracing::error!("Failed to sign delegate PSBTs: {e}");
776        return;
777    }
778
779    if let Err(e) = delegator
780        .delegate(&delegate.intent, &delegate.forfeit_psbts, None)
781        .await
782    {
783        tracing::error!("Failed to submit delegation: {e}");
784        return;
785    }
786
787    tracing::info!(
788        vtxo_count = input_count,
789        valid_at,
790        "Delegated VTXO group to delegator service"
791    );
792}
793
794/// Fraction of VTXO lifetime remaining at which we self-renew as a safety net.
795const SELF_RENEW_REMAINING_FRACTION: f64 = 0.10;
796
797/// Self-renew VTXOs that are close to expiry.
798///
799/// Only settles VTXOs whose remaining lifetime is less than [`SELF_RENEW_REMAINING_FRACTION`] of
800/// their total lifetime, leaving freshly-received VTXOs alone.
801async fn renew_expiring_vtxos<B, W, S, K>(client: &Client<B, W, S, K>)
802where
803    B: Blockchain + Send + Sync + 'static,
804    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
805    S: SwapStorage + 'static,
806    K: KeyProvider + Send + Sync + 'static,
807{
808    let (vtxo_list, _) = match client.list_vtxos().await {
809        Ok(v) => v,
810        Err(e) => {
811            tracing::warn!("Failed to list VTXOs for renewal check: {e}");
812            return;
813        }
814    };
815
816    let now = std::time::SystemTime::now()
817        .duration_since(std::time::UNIX_EPOCH)
818        .unwrap_or_default()
819        .as_secs() as i64;
820
821    let expiring_outpoints: Vec<OutPoint> = vtxo_list
822        .all_unspent()
823        .filter(|vtp| {
824            if vtp.expires_at <= 0 || vtp.created_at <= 0 {
825                return false;
826            }
827            let total_lifetime = vtp.expires_at - vtp.created_at;
828            let remaining = vtp.expires_at - now;
829            remaining > 0
830                && (remaining as f64) < (total_lifetime as f64 * SELF_RENEW_REMAINING_FRACTION)
831        })
832        .map(|vtp| vtp.outpoint)
833        .collect();
834
835    if expiring_outpoints.is_empty() {
836        return;
837    }
838
839    tracing::info!(
840        count = expiring_outpoints.len(),
841        "Self-renewing expiring VTXOs"
842    );
843
844    let mut rng = OsRng;
845    match client
846        .settle_vtxos(&mut rng, &expiring_outpoints, &[])
847        .await
848    {
849        Ok(Some(txid)) => {
850            tracing::info!(%txid, "Self-renewed expiring VTXOs");
851        }
852        Ok(None) => {}
853        Err(e) => {
854            tracing::warn!("Failed to self-renew VTXOs: {e}");
855        }
856    }
857}
858
859#[cfg(test)]
860mod tests {
861    use super::*;
862    use bitcoin::hashes::Hash;
863    use bitcoin::key::Secp256k1;
864    use bitcoin::Network;
865    use bitcoin::Sequence;
866    use bitcoin::Txid;
867    use bitcoin::XOnlyPublicKey;
868    use std::str::FromStr;
869
870    fn test_keys() -> (XOnlyPublicKey, XOnlyPublicKey, XOnlyPublicKey) {
871        let server = XOnlyPublicKey::from_str(
872            "18845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
873        )
874        .unwrap();
875        let owner = XOnlyPublicKey::from_str(
876            "28845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
877        )
878        .unwrap();
879        let delegator = XOnlyPublicKey::from_str(
880            "38845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
881        )
882        .unwrap();
883        (server, owner, delegator)
884    }
885
886    fn delegated_vtxo() -> (ArkAddress, Vtxo) {
887        let secp = Secp256k1::new();
888        let (server, owner, delegator) = test_keys();
889        let vtxo = Vtxo::new_with_delegator(
890            &secp,
891            server,
892            owner,
893            delegator,
894            Sequence::from_seconds_ceil(86400).unwrap(),
895            Network::Regtest,
896        )
897        .unwrap();
898        (vtxo.to_ark_address(), vtxo)
899    }
900
901    fn mk_vtp(script: ScriptBuf, amount_sat: u64, expires_at: i64, vout: u32) -> VirtualTxOutPoint {
902        VirtualTxOutPoint {
903            outpoint: OutPoint::new(Txid::all_zeros(), vout),
904            created_at: expires_at - 1000,
905            expires_at,
906            amount: Amount::from_sat(amount_sat),
907            script,
908            is_preconfirmed: false,
909            is_swept: false,
910            is_unrolled: false,
911            is_spent: false,
912            spent_by: None,
913            commitment_txids: vec![],
914            settled_by: None,
915            ark_txid: None,
916            assets: vec![],
917        }
918    }
919
920    #[test]
921    fn day_timestamp_normalizes_to_midnight() {
922        let ts = 1705322700; // 2024-01-15 13:45:00 UTC
923        let day = day_timestamp(ts);
924        assert_eq!(day % SECONDS_PER_DAY, 0);
925        assert!(day <= ts);
926        assert!(ts - day < SECONDS_PER_DAY);
927    }
928
929    #[test]
930    fn day_timestamp_already_midnight() {
931        let ts = SECONDS_PER_DAY * 19738;
932        assert_eq!(day_timestamp(ts), ts);
933    }
934
935    #[test]
936    fn group_by_expiry_day_merges_recoverable_into_earliest_group() {
937        let (addr, vtxo) = delegated_vtxo();
938        let script = addr.to_p2tr_script_pubkey();
939        let script_map = ScriptMap::from_addresses(&[(addr, vtxo)]);
940
941        let now = std::time::SystemTime::now()
942            .duration_since(std::time::UNIX_EPOCH)
943            .unwrap()
944            .as_secs() as i64;
945        let day1_midnight = day_timestamp(now) + SECONDS_PER_DAY;
946        let day2_midnight = day1_midnight + SECONDS_PER_DAY;
947
948        let recoverable = mk_vtp(script.clone(), 100, day1_midnight + 500, 0); // sub-dust
949        let non_recoverable_day1 = mk_vtp(script.clone(), 10_000, day1_midnight + 800, 1);
950        let non_recoverable_day2 = mk_vtp(script, 10_000, day2_midnight + 800, 2);
951
952        let vtxos = [non_recoverable_day2, recoverable, non_recoverable_day1];
953        let groups = group_by_expiry_day(&vtxos, &script_map, Amount::from_sat(500));
954
955        assert_eq!(groups.len(), 2);
956        assert_eq!(groups[0].0, day_timestamp(day1_midnight + 800));
957        assert_eq!(groups[1].0, day_timestamp(day2_midnight + 800));
958        assert_eq!(groups[0].1.len(), 2);
959        assert_eq!(groups[1].1.len(), 1);
960    }
961
962    #[test]
963    fn calculate_valid_at_for_non_recoverable_group_is_before_expiry() {
964        let (_addr, vtxo) = delegated_vtxo();
965        let script = ScriptBuf::new();
966
967        let now = std::time::SystemTime::now()
968            .duration_since(std::time::UNIX_EPOCH)
969            .unwrap()
970            .as_secs() as i64;
971
972        let later = mk_vtp(script, 10_000, now + 10_000, 1);
973        let group = vec![(&later, &vtxo)];
974
975        let valid_at = calculate_valid_at(&group, Amount::from_sat(500));
976
977        assert!(valid_at > now as u64);
978        assert!(valid_at < later.expires_at as u64);
979    }
980
981    #[test]
982    fn calculate_valid_at_for_recoverable_only_group_is_soon() {
983        let (_addr, vtxo) = delegated_vtxo();
984        let script = ScriptBuf::new();
985
986        let now = std::time::SystemTime::now()
987            .duration_since(std::time::UNIX_EPOCH)
988            .unwrap()
989            .as_secs() as i64;
990
991        let recoverable = mk_vtp(script, 100, now + 5_000, 0); // sub-dust at dust=500
992        let group = vec![(&recoverable, &vtxo)];
993
994        let start = std::time::SystemTime::now()
995            .duration_since(std::time::UNIX_EPOCH)
996            .unwrap()
997            .as_secs();
998        let valid_at = calculate_valid_at(&group, Amount::from_sat(500));
999        let end = std::time::SystemTime::now()
1000            .duration_since(std::time::UNIX_EPOCH)
1001            .unwrap()
1002            .as_secs();
1003
1004        assert!(valid_at >= start + 60);
1005        assert!(valid_at <= end + 61);
1006    }
1007}