Skip to main content

ark_client/
vtxo_watcher.rs

1//! Background VTXO watcher that auto-delegates and auto-renews VTXOs.
2//!
3//! Full behavior:
4//! - On new VTXOs received: submit them to the delegator service for future renewal
5//! - On new VTXOs received: self-renew VTXOs that are close to expiry (safety net)
6//! - On stream error: reconnect with exponential backoff
7
8use crate::error::ErrorContext;
9use crate::key_provider::KeyProvider;
10use crate::swap_storage::SwapStorage;
11use crate::wallet::BoardingWallet;
12use crate::wallet::OnchainWallet;
13use crate::Blockchain;
14use crate::Client;
15use crate::Error;
16use ark_core::intent;
17use ark_core::server::SubscriptionResponse;
18use ark_core::server::VirtualTxOutPoint;
19use ark_core::ArkAddress;
20use ark_core::Vtxo;
21use ark_delegator::DelegatorClient;
22use bitcoin::secp256k1::PublicKey;
23use bitcoin::Amount;
24use bitcoin::OutPoint;
25use bitcoin::ScriptBuf;
26use bitcoin::TxOut;
27use futures::StreamExt;
28use rand::rngs::OsRng;
29use std::collections::BTreeMap;
30use std::collections::HashMap;
31use std::collections::HashSet;
32use std::sync::Arc;
33use std::time::Duration;
34use tokio::sync::mpsc;
35use tokio::sync::watch;
36
37/// Handle to stop the background VTXO watcher.
38///
39/// Dropping the handle will also stop the watcher.
40pub struct VtxoWatcherHandle {
41    stop_tx: watch::Sender<bool>,
42}
43
44impl VtxoWatcherHandle {
45    /// Stop the background watcher.
46    pub fn stop(self) {
47        let _ = self.stop_tx.send(true);
48    }
49}
50
51impl Drop for VtxoWatcherHandle {
52    fn drop(&mut self) {
53        let _ = self.stop_tx.send(true);
54    }
55}
56
57/// Backoff parameters for reconnection.
58const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
59const MAX_BACKOFF: Duration = Duration::from_secs(30);
60
61/// Periodic key discovery settings for keeping script subscriptions fresh.
62const KEY_DISCOVERY_INTERVAL: Duration = Duration::from_secs(10);
63const KEY_DISCOVERY_GAP_LIMIT: u32 = 20;
64
65/// How often the background migration arm fires when healthy. The frequent cadence is safe
66/// because [`Client::migrate_deprecated_signer_vtxos`] short-circuits to a no-op
67/// `NothingMigratable` report when the server advertises no deprecated signers or the wallet holds
68/// no pre-cutoff deprecated-signer outputs.
69const MIGRATION_INTERVAL: Duration = Duration::from_secs(60);
70
71/// Exponential-backoff bounds for the migration arm after a failing pass. The cooldown doubles per
72/// consecutive failure, caps at five minutes, and resets to the base on a successful or no-op pass.
73const MIGRATION_BASE_COOLDOWN: Duration = Duration::from_secs(30);
74const MIGRATION_MAX_COOLDOWN: Duration = Duration::from_secs(300);
75
76/// Configuration for [`Client::start_vtxo_watcher`].
77#[derive(Debug, Clone, Copy)]
78pub struct VtxoWatcherConfig {
79    /// When `true` (the default), the watcher runs a periodic
80    /// [`Client::migrate_deprecated_signer_vtxos`] pass that rotates funds off any deprecated
81    /// server signer the wallet still holds pre-cutoff outputs under. Errors are logged and
82    /// swallowed (never killing the loop), and a persistently failing pass backs off
83    /// exponentially. Set to `false` to disable the migration arm entirely; renewal and delegation
84    /// behavior are unaffected either way.
85    pub migrate_deprecated_signers: bool,
86}
87
88impl Default for VtxoWatcherConfig {
89    fn default() -> Self {
90        Self {
91            migrate_deprecated_signers: true,
92        }
93    }
94}
95
96/// Pre-computed mapping from script pubkeys to their Vtxo metadata and ArkAddress.
97///
98/// Built once per (re)connection from `get_offchain_addresses()`. Used both for the subscription
99/// and for resolving VTXO metadata from subscription events, so they can never diverge.
100struct ScriptMap {
101    vtxo_by_script: HashMap<ScriptBuf, Vtxo>,
102    addr_by_script: HashMap<ScriptBuf, ArkAddress>,
103}
104
105impl ScriptMap {
106    fn from_addresses(addresses: &[(ArkAddress, Vtxo)]) -> Self {
107        let mut vtxo_by_script = HashMap::with_capacity(addresses.len());
108        let mut addr_by_script = HashMap::with_capacity(addresses.len());
109        for (addr, vtxo) in addresses {
110            let script = addr.to_p2tr_script_pubkey();
111            vtxo_by_script.insert(script.clone(), vtxo.clone());
112            addr_by_script.insert(script, *addr);
113        }
114        Self {
115            vtxo_by_script,
116            addr_by_script,
117        }
118    }
119
120    /// Get the unique ArkAddresses that appear in the given VTXO outpoints.
121    fn addresses_for(&self, vtxos: &[VirtualTxOutPoint]) -> Vec<ArkAddress> {
122        let mut seen = HashSet::new();
123        let mut result = Vec::new();
124        for vtp in vtxos {
125            if let Some(addr) = self.addr_by_script.get(&vtp.script) {
126                if seen.insert(&vtp.script) {
127                    result.push(*addr);
128                }
129            }
130        }
131        result
132    }
133}
134
135enum WatcherWork {
136    NewVtxos {
137        vtxos: Vec<VirtualTxOutPoint>,
138        script_map: Arc<ScriptMap>,
139    },
140    RenewTick {
141        script_map: Arc<ScriptMap>,
142    },
143}
144
145impl<B, W, S, K> Client<B, W, S, K>
146where
147    B: Blockchain + Send + Sync + 'static,
148    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
149    S: SwapStorage + 'static,
150    K: KeyProvider + Send + Sync + 'static,
151{
152    /// Start a background task that watches for new VTXOs and:
153    ///
154    /// 1. **Delegates** them to the configured delegator service for future auto-renewal
155    /// 2. **Self-renews** VTXOs that are close to expiry (safety net)
156    /// 3. **Migrates** funds off deprecated server signers on a periodic, backed-off pass (unless
157    ///    disabled via [`VtxoWatcherConfig::migrate_deprecated_signers`])
158    ///
159    /// Reconnects automatically with exponential backoff (1s → 2s → … → 30s) on stream errors.
160    ///
161    /// Requires the client to be wrapped in an `Arc` for shared ownership with the background
162    /// task.
163    ///
164    /// Returns a [`VtxoWatcherHandle`] that stops the watcher when dropped.
165    pub fn start_vtxo_watcher(
166        self: &Arc<Self>,
167        delegator: Arc<DelegatorClient>,
168        config: VtxoWatcherConfig,
169    ) -> VtxoWatcherHandle {
170        let (stop_tx, stop_rx) = watch::channel(false);
171
172        let client = Arc::clone(self);
173        tokio::spawn(async move {
174            run_watcher_loop(client, delegator, config, stop_rx).await;
175            tracing::debug!("VTXO watcher stopped");
176        });
177
178        VtxoWatcherHandle { stop_tx }
179    }
180}
181
182/// Outer loop that reconnects on stream errors with exponential backoff.
183async fn run_watcher_loop<B, W, S, K>(
184    client: Arc<Client<B, W, S, K>>,
185    delegator: Arc<DelegatorClient>,
186    config: VtxoWatcherConfig,
187    mut stop_rx: watch::Receiver<bool>,
188) where
189    B: Blockchain + Send + Sync + 'static,
190    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
191    S: SwapStorage + 'static,
192    K: KeyProvider + Send + Sync + 'static,
193{
194    let mut backoff = INITIAL_BACKOFF;
195
196    loop {
197        if *stop_rx.borrow() {
198            return;
199        }
200
201        // Build the script map and subscription from the same address set.
202        let addresses = match client.get_offchain_addresses() {
203            Ok(a) => a,
204            Err(e) => {
205                tracing::error!("Failed to get offchain addresses: {e}");
206                return;
207            }
208        };
209        let script_map = Arc::new(ScriptMap::from_addresses(&addresses));
210        let ark_addresses: Vec<_> = addresses.iter().map(|(addr, _)| *addr).collect();
211
212        let subscription_id = match client.subscribe_to_scripts(ark_addresses, None).await {
213            Ok(id) => id,
214            Err(e) => {
215                tracing::warn!("Failed to subscribe: {e}, retrying in {backoff:?}");
216                if wait_or_stop(&mut stop_rx, backoff).await {
217                    return;
218                }
219                backoff = (backoff * 2).min(MAX_BACKOFF);
220                continue;
221            }
222        };
223
224        let mut stream = match client.get_subscription(subscription_id.clone()).await {
225            Ok(s) => s,
226            Err(e) => {
227                tracing::warn!("Failed to get subscription stream: {e}, retrying in {backoff:?}");
228                if wait_or_stop(&mut stop_rx, backoff).await {
229                    return;
230                }
231                backoff = (backoff * 2).min(MAX_BACKOFF);
232                continue;
233            }
234        };
235
236        tracing::info!("VTXO watcher connected");
237        backoff = INITIAL_BACKOFF;
238        let mut subscribed_addrs: HashSet<ArkAddress> =
239            addresses.iter().map(|(addr, _)| *addr).collect();
240        let mut script_map = script_map;
241        let mut renew_interval = tokio::time::interval(Duration::from_secs(60));
242        let mut discovery_interval = tokio::time::interval(KEY_DISCOVERY_INTERVAL);
243        let (work_tx, mut work_rx) = mpsc::channel::<WatcherWork>(128);
244
245        let worker_handle = tokio::spawn({
246            let client = client.clone();
247            let delegator = delegator.clone();
248            async move {
249                let mut seen_unspent_outpoints = HashSet::<OutPoint>::new();
250
251                while let Some(first) = work_rx.recv().await {
252                    // Handle one guaranteed message to start the batch.
253                    let (
254                        mut pending_vtxos,
255                        mut latest_script_map,
256                        mut should_renew,
257                        mut should_sync,
258                    ) = match first {
259                        WatcherWork::NewVtxos { vtxos, script_map } => {
260                            (vtxos, Some(script_map), true, false)
261                        }
262                        WatcherWork::RenewTick { script_map } => {
263                            (Vec::new(), Some(script_map), true, true)
264                        }
265                    };
266
267                    // Drain whatever else is already queued without waiting.
268                    while let Ok(work) = work_rx.try_recv() {
269                        match work {
270                            WatcherWork::NewVtxos { vtxos, script_map } => {
271                                pending_vtxos.extend(vtxos);
272                                latest_script_map = Some(script_map);
273                                should_renew = true;
274                            }
275                            WatcherWork::RenewTick { script_map } => {
276                                latest_script_map = Some(script_map);
277                                should_renew = true;
278                                should_sync = true;
279                            }
280                        }
281                    }
282
283                    if let (true, Some(script_map)) = (should_sync, latest_script_map.as_deref()) {
284                        match collect_new_delegation_candidates(
285                            &client,
286                            script_map,
287                            &mut seen_unspent_outpoints,
288                        )
289                        .await
290                        {
291                            Ok(new_candidates) => {
292                                if !new_candidates.is_empty() {
293                                    tracing::debug!(
294                                        count = new_candidates.len(),
295                                        "Found new delegatable VTXOs from failsafe polling"
296                                    );
297                                    pending_vtxos.extend(new_candidates);
298                                }
299                            }
300                            Err(e) => {
301                                tracing::warn!("Failsafe delegation poll failed: {e}");
302                            }
303                        }
304                    }
305
306                    if !pending_vtxos.is_empty() {
307                        let mut deduped = Vec::new();
308                        let mut seen = HashSet::new();
309                        for vtxo in pending_vtxos {
310                            if seen.insert(vtxo.outpoint) {
311                                deduped.push(vtxo);
312                            }
313                        }
314
315                        tracing::debug!(count = deduped.len(), "Processing VTXOs for delegation");
316                        if let Some(script_map) = latest_script_map {
317                            delegate_vtxos(&client, &delegator, &deduped, &script_map).await;
318                        }
319                    }
320
321                    if should_renew {
322                        renew_expiring_vtxos(&client).await;
323                    }
324                }
325            }
326        });
327
328        // Independent migration arm: rotates funds off deprecated server signers on its own
329        // self-paced cooldown loop, separate from the renewal/delegation worker and the
330        // subscription stream (it polls wallet state, like renewal). Spawned per connection and
331        // aborted on reconnect/stop so passes never overlap. Disabled entirely when the config
332        // flag is off.
333        let migration_handle = config.migrate_deprecated_signers.then(|| {
334            let client = client.clone();
335            let mut stop_rx = stop_rx.clone();
336            tokio::spawn(async move {
337                run_migration_arm(&client, &mut stop_rx).await;
338            })
339        });
340
341        loop {
342            tokio::select! {
343                _ = stop_rx.changed() => {
344                    drop(work_tx);
345                    let _ = worker_handle.await;
346                    if let Some(handle) = migration_handle {
347                        handle.abort();
348                    }
349                    return;
350                }
351                _ = renew_interval.tick() => {
352                    if work_tx.send(WatcherWork::RenewTick {
353                        script_map: Arc::clone(&script_map),
354                    }).await.is_err() {
355                        tracing::warn!("VTXO worker channel closed, reconnecting in {backoff:?}");
356                        break;
357                    }
358                }
359                _ = discovery_interval.tick() => {
360                    match refresh_subscription_scripts(
361                        client.as_ref(),
362                        &subscription_id,
363                        &mut subscribed_addrs,
364                    )
365                    .await
366                    {
367                        Ok(Some(new_script_map)) => {
368                            script_map = new_script_map;
369                        }
370                        Ok(None) => {}
371                        Err(e) => {
372                            tracing::warn!("Failed to refresh script subscription: {e}");
373                        }
374                    }
375                }
376                event = stream.next() => {
377                    match event {
378                        Some(Ok(SubscriptionResponse::Heartbeat)) => {}
379                        Some(Ok(SubscriptionResponse::Event(event))) => {
380                            if !event.new_vtxos.is_empty() {
381                                tracing::debug!(
382                                    txid = %event.txid,
383                                    new_vtxos = event.new_vtxos.len(),
384                                    "Received subscription event with new VTXOs"
385                                );
386
387                                if work_tx.send(WatcherWork::NewVtxos {
388                                    vtxos: event.new_vtxos,
389                                    script_map: Arc::clone(&script_map),
390                                })
391                                .await.is_err()
392                                {
393                                    tracing::warn!("VTXO worker channel closed. Reconnecting in {backoff:?}");
394                                    break;
395                                }
396                            }
397                        }
398                        Some(Err(e)) => {
399                            tracing::warn!("VTXO subscription error: {e}, reconnecting in {backoff:?}");
400                            break;
401                        }
402                        None => {
403                            tracing::debug!("VTXO subscription stream ended, reconnecting in {backoff:?}");
404                            break;
405                        }
406                    }
407                }
408            }
409        }
410
411        drop(work_tx);
412        let _ = worker_handle.await;
413        // Abort the per-connection migration arm; the next iteration spawns a fresh one. This
414        // prevents two migration loops racing across a reconnect.
415        if let Some(handle) = migration_handle {
416            handle.abort();
417        }
418
419        if wait_or_stop(&mut stop_rx, backoff).await {
420            return;
421        }
422        backoff = (backoff * 2).min(MAX_BACKOFF);
423    }
424}
425
426/// Background migration arm: periodically rotate funds off deprecated server signers.
427///
428/// On each fire it runs one [`Client::migrate_deprecated_signer_vtxos`] pass and logs the outcome.
429/// Errors are swallowed (never propagated — this must never kill the watcher). Cadence is
430/// [`MIGRATION_INTERVAL`] while healthy; a failing pass backs off exponentially between
431/// [`MIGRATION_BASE_COOLDOWN`] and [`MIGRATION_MAX_COOLDOWN`], resetting to the base interval on a
432/// fully successful or no-op pass. The frequent base cadence is cheap because the migration call is
433/// a no-op (`NothingMigratable`) whenever there is nothing to rotate.
434///
435/// When [`Client::refresh_server_info`] updates the cached `deprecated_signers`, this arm picks up
436/// the freshly advertised deprecated signers on its next pass and migrates.
437async fn run_migration_arm<B, W, S, K>(
438    client: &Client<B, W, S, K>,
439    stop_rx: &mut watch::Receiver<bool>,
440) where
441    B: Blockchain + Send + Sync + 'static,
442    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
443    S: SwapStorage + 'static,
444    K: KeyProvider + Send + Sync + 'static,
445{
446    // Consecutive-failure count drives the exponential cooldown; `0` means healthy (use the base
447    // interval). Reset to `0` on any fully successful or no-op pass.
448    let mut consecutive_failures: u32 = 0;
449    loop {
450        let delay = migration_delay(consecutive_failures);
451        if wait_or_stop(stop_rx, delay).await {
452            return;
453        }
454
455        let mut rng = OsRng;
456        match client.migrate_deprecated_signer_vtxos(&mut rng).await {
457            Ok(report) => {
458                if report.failed() {
459                    consecutive_failures = consecutive_failures.saturating_add(1);
460                    let next = migration_delay(consecutive_failures);
461                    tracing::warn!(
462                        txids = ?report.settle_txids(),
463                        vtxo_error = ?report.vtxo.error.as_deref(),
464                        boarding_error = ?report.boarding.error.as_deref(),
465                        "Background migration pass had leg failure; backing off {next:?}"
466                    );
467                } else {
468                    if report.rotated() {
469                        tracing::info!(
470                            txids = ?report.settle_txids(),
471                            "Background migration rotated funds off deprecated signer(s)"
472                        );
473                    } else {
474                        tracing::debug!("Background migration pass: nothing to migrate");
475                    }
476                    // Success or no-op: back to the healthy cadence.
477                    consecutive_failures = 0;
478                }
479            }
480            Err(e) => {
481                // Back off so a persistently failing migration does not retry every interval.
482                consecutive_failures = consecutive_failures.saturating_add(1);
483                let next = migration_delay(consecutive_failures);
484                tracing::warn!("Background migration pass failed: {e}; backing off {next:?}");
485            }
486        }
487    }
488}
489
490/// Cooldown before the next migration pass given the consecutive-failure count.
491///
492/// `0` failures → the healthy [`MIGRATION_INTERVAL`]. Otherwise an exponential backoff of
493/// `MIGRATION_BASE_COOLDOWN * 2^(failures - 1)`, saturating at [`MIGRATION_MAX_COOLDOWN`].
494fn migration_delay(consecutive_failures: u32) -> Duration {
495    if consecutive_failures == 0 {
496        return MIGRATION_INTERVAL;
497    }
498    let shift = consecutive_failures - 1;
499    let scaled = MIGRATION_BASE_COOLDOWN
500        .checked_mul(1u32.checked_shl(shift).unwrap_or(u32::MAX))
501        .unwrap_or(MIGRATION_MAX_COOLDOWN);
502    scaled.min(MIGRATION_MAX_COOLDOWN)
503}
504
505/// Wait for the given duration or until stop is signalled. Returns `true` if stopped.
506async fn wait_or_stop(stop_rx: &mut watch::Receiver<bool>, duration: Duration) -> bool {
507    tokio::select! {
508        _ = stop_rx.changed() => true,
509        _ = tokio::time::sleep(duration) => false,
510    }
511}
512
513/// Discover keys and add newly derived scripts to an existing subscription.
514async fn refresh_subscription_scripts<B, W, S, K>(
515    client: &Client<B, W, S, K>,
516    subscription_id: &str,
517    subscribed_addrs: &mut HashSet<ArkAddress>,
518) -> Result<Option<Arc<ScriptMap>>, Error>
519where
520    B: Blockchain + Send + Sync + 'static,
521    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
522    S: SwapStorage + 'static,
523    K: KeyProvider + Send + Sync + 'static,
524{
525    let _discovered = client.discover_keys(KEY_DISCOVERY_GAP_LIMIT).await?;
526
527    let addrs = client.get_offchain_addresses()?;
528    let new_addrs: Vec<_> = addrs
529        .iter()
530        .map(|(addr, _)| *addr)
531        .filter(|addr| !subscribed_addrs.contains(addr))
532        .collect();
533
534    if new_addrs.is_empty() {
535        return Ok(None);
536    }
537
538    client
539        .subscribe_to_scripts(new_addrs.clone(), Some(subscription_id.to_string()))
540        .await?;
541
542    let added = new_addrs.len();
543    subscribed_addrs.extend(new_addrs);
544    tracing::info!(
545        added,
546        "Updated watcher subscription with newly derived addresses"
547    );
548
549    Ok(Some(Arc::new(ScriptMap::from_addresses(&addrs))))
550}
551
552/// Enumerate newly seen unspent delegate-eligible VTXOs from wallet state.
553///
554/// This is a failsafe path to catch outputs that may have been missed by subscription timing.
555async fn collect_new_delegation_candidates<B, W, S, K>(
556    client: &Client<B, W, S, K>,
557    script_map: &ScriptMap,
558    seen_unspent_outpoints: &mut HashSet<OutPoint>,
559) -> Result<Vec<VirtualTxOutPoint>, Error>
560where
561    B: Blockchain + Send + Sync + 'static,
562    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
563    S: SwapStorage + 'static,
564    K: KeyProvider + Send + Sync + 'static,
565{
566    let (vtxo_list, _) = client.list_vtxos().await?;
567
568    let mut current_outpoints = HashSet::new();
569    let mut newly_seen = Vec::new();
570
571    for vtp in vtxo_list.all_unspent() {
572        let Some(vtxo) = script_map.vtxo_by_script.get(&vtp.script) else {
573            continue;
574        };
575
576        if vtxo.delegator_pk().is_none() {
577            continue;
578        }
579
580        current_outpoints.insert(vtp.outpoint);
581
582        if !seen_unspent_outpoints.contains(&vtp.outpoint) {
583            newly_seen.push(vtp.clone());
584        }
585    }
586
587    *seen_unspent_outpoints = current_outpoints;
588
589    Ok(newly_seen)
590}
591
592/// Delegator info cached per delegation batch.
593struct DelegatorState {
594    cosigner_pk: PublicKey,
595    fee: Amount,
596    fee_address_script: ScriptBuf,
597}
598
599/// Fetch and parse delegator info into a usable form.
600async fn fetch_delegator_state(delegator: &DelegatorClient) -> Result<DelegatorState, Error> {
601    let info = delegator
602        .info()
603        .await
604        .context(Error::ad_hoc("failed to get delegator info"))?;
605
606    let cosigner_pk: PublicKey = info
607        .pubkey
608        .parse::<PublicKey>()
609        .context("failed to parse delegator PK")?;
610
611    let fee = info
612        .fee
613        .parse::<u64>()
614        .map(Amount::from_sat)
615        .context("failed to parse delegator fee")?;
616
617    let fee_address_script = info
618        .delegator_address
619        .parse::<ArkAddress>()
620        .context("failed to parse delegator fee address")?
621        .to_p2tr_script_pubkey();
622
623    Ok(DelegatorState {
624        cosigner_pk,
625        fee,
626        fee_address_script,
627    })
628}
629
630/// Number of seconds in a UTC day.
631const SECONDS_PER_DAY: i64 = 86_400;
632
633/// Normalize a unix timestamp (seconds) to UTC midnight of that day.
634fn day_timestamp(ts: i64) -> i64 {
635    ts - ts.rem_euclid(SECONDS_PER_DAY)
636}
637
638/// Group VTXOs by their expiry day (UTC midnight), returning groups sorted by expiry.
639///
640/// Recoverable VTXOs (expired or sub-dust) are collected separately and merged into the earliest
641/// non-recoverable group.
642fn group_by_expiry_day<'a>(
643    vtxos: &'a [VirtualTxOutPoint],
644    script_map: &'a ScriptMap,
645    dust: Amount,
646) -> Vec<(i64, Vec<(&'a VirtualTxOutPoint, &'a Vtxo)>)> {
647    let mut groups: BTreeMap<i64, Vec<(&'a VirtualTxOutPoint, &'a Vtxo)>> = BTreeMap::new();
648    let mut recoverable: Vec<(&'a VirtualTxOutPoint, &'a Vtxo)> = Vec::new();
649
650    for vtp in vtxos {
651        if vtp.is_spent {
652            continue;
653        }
654
655        let vtxo = match script_map.vtxo_by_script.get(&vtp.script) {
656            Some(v) => v,
657            None => continue,
658        };
659
660        if vtxo.delegator_pk().is_none() {
661            continue;
662        }
663
664        if vtp.is_recoverable(dust) {
665            recoverable.push((vtp, vtxo));
666        } else if vtp.expires_at > 0 {
667            let day = day_timestamp(vtp.expires_at);
668            groups.entry(day).or_default().push((vtp, vtxo));
669        }
670    }
671
672    if !recoverable.is_empty() {
673        if let Some((&earliest_day, _)) = groups.iter().next() {
674            groups.entry(earliest_day).or_default().extend(recoverable);
675        } else {
676            groups.insert(0, recoverable);
677        }
678    }
679
680    groups.into_iter().collect()
681}
682
683/// Calculate the `valid_at` timestamp for a delegation group.
684///
685/// For each non-recoverable VTXO, compute activation at 90% of its full lifetime:
686/// `created_at + (expires_at - created_at) * 0.9`. The earliest of those activations is used.
687///
688/// If the group only contains recoverable/expired VTXOs (or activation is already in the past),
689/// schedule soon (`now + 60s`).
690fn calculate_valid_at(group_vtxos: &[(&VirtualTxOutPoint, &Vtxo)], dust: Amount) -> u64 {
691    let now_secs = std::time::SystemTime::now()
692        .duration_since(std::time::UNIX_EPOCH)
693        .unwrap_or_default()
694        .as_secs();
695
696    let earliest_activation = group_vtxos
697        .iter()
698        .filter(|(vtp, _)| {
699            !vtp.is_recoverable(dust)
700                && vtp.created_at > 0
701                && vtp.expires_at > 0
702                && vtp.expires_at > vtp.created_at
703        })
704        .map(|(vtp, _)| {
705            let created_at = vtp.created_at as u64;
706            let lifetime = (vtp.expires_at - vtp.created_at) as u64;
707            created_at + (lifetime * 9 / 10)
708        })
709        .min();
710
711    match earliest_activation {
712        Some(valid_at) if valid_at > now_secs => valid_at,
713        _ => now_secs + 60,
714    }
715}
716
717/// Submit newly received VTXOs to the delegator service for future auto-renewal.
718///
719/// The `script_map` provides VTXO metadata (tapscripts, spend info) without a network call.
720/// Only the affected addresses are queried for expiry data.
721async fn delegate_vtxos<B, W, S, K>(
722    client: &Arc<Client<B, W, S, K>>,
723    delegator: &DelegatorClient,
724    new_vtxos: &[VirtualTxOutPoint],
725    script_map: &ScriptMap,
726) where
727    B: Blockchain + Send + Sync + 'static,
728    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
729    S: SwapStorage + 'static,
730    K: KeyProvider + Send + Sync + 'static,
731{
732    // Query only the addresses that appear in the event, not all wallet addresses.
733    let affected_addresses = script_map.addresses_for(new_vtxos);
734    if affected_addresses.is_empty() {
735        tracing::debug!("No affected addresses resolved from new VTXOs; skipping delegation");
736        return;
737    }
738
739    let vtxo_list = match client
740        .list_vtxos_for_addresses(affected_addresses.into_iter())
741        .await
742    {
743        Ok(v) => v,
744        Err(e) => {
745            tracing::error!("Failed to list VTXOs for delegation: {e}");
746            return;
747        }
748    };
749
750    // The subscription event tells us which outpoints are new, but we need the full
751    // VirtualTxOutPoint (with expires_at, created_at) from the server for grouping.
752    let new_outpoints: HashSet<_> = new_vtxos.iter().map(|v| v.outpoint).collect();
753    let enriched: Vec<_> = vtxo_list
754        .all_unspent()
755        .filter(|vtp| new_outpoints.contains(&vtp.outpoint))
756        .cloned()
757        .collect();
758
759    let server_info = match client.server_info() {
760        Ok(server_info) => server_info,
761        Err(e) => {
762            tracing::error!("Failed to read server info for delegation: {e}");
763            return;
764        }
765    };
766
767    let groups = group_by_expiry_day(&enriched, script_map, server_info.dust);
768    if groups.is_empty() {
769        tracing::debug!("No delegate-eligible VTXOs after enrichment/grouping; skipping");
770        return;
771    }
772
773    let delegator_state = match fetch_delegator_state(delegator).await {
774        Ok(s) => Arc::new(s),
775        Err(e) => {
776            tracing::error!("{e}");
777            return;
778        }
779    };
780
781    let (to_address, _) = match client.get_offchain_address() {
782        Ok(v) => v,
783        Err(e) => {
784            tracing::error!("Failed to get offchain address for delegation: {e}");
785            return;
786        }
787    };
788    let dest_script = to_address.to_p2tr_script_pubkey();
789
790    let mut handles = Vec::new();
791
792    for (_day, group_vtxos) in groups {
793        let valid_at = calculate_valid_at(&group_vtxos, server_info.dust);
794
795        let mut vtxo_inputs = Vec::new();
796        let mut total_amount = Amount::ZERO;
797
798        for (vtp, vtxo) in &group_vtxos {
799            let spend_info = match vtxo.delegate_spend_info() {
800                Ok(info) => info,
801                Err(e) => {
802                    tracing::warn!(outpoint = %vtp.outpoint, "Cannot get delegate spend info: {e}");
803                    continue;
804                }
805            };
806
807            vtxo_inputs.push(intent::Input::new(
808                vtp.outpoint,
809                vtxo.exit_delay(),
810                None,
811                TxOut {
812                    value: vtp.amount,
813                    script_pubkey: vtp.script.clone(),
814                },
815                vtxo.tapscripts(),
816                spend_info,
817                vtp.is_spent,
818                false,
819                vtp.assets.clone(),
820            ));
821
822            total_amount += vtp.amount;
823        }
824
825        if vtxo_inputs.is_empty() {
826            continue;
827        }
828
829        let fee = delegator_state.fee;
830        if fee >= total_amount {
831            tracing::warn!(
832                %total_amount, %fee,
833                "Delegator fee exceeds VTXO group value, skipping"
834            );
835            continue;
836        }
837        let net_amount = total_amount - fee;
838
839        if net_amount < server_info.dust {
840            tracing::warn!(%net_amount, "Net amount after fee is below dust, skipping");
841            continue;
842        }
843
844        let mut outputs = Vec::new();
845        if fee > Amount::ZERO {
846            outputs.push(intent::Output::Offchain(TxOut {
847                value: fee,
848                script_pubkey: delegator_state.fee_address_script.clone(),
849            }));
850        }
851        outputs.push(intent::Output::Offchain(TxOut {
852            value: net_amount,
853            script_pubkey: dest_script.clone(),
854        }));
855
856        let server_info_forfeit_addr = server_info.forfeit_address.clone();
857        let dust = server_info.dust;
858        let ds = Arc::clone(&delegator_state);
859
860        let delegator = delegator.clone();
861        let client = Arc::clone(client);
862        handles.push(tokio::spawn(async move {
863            delegate_group(
864                &client,
865                &delegator,
866                vtxo_inputs,
867                outputs,
868                ds.cosigner_pk,
869                &server_info_forfeit_addr,
870                dust,
871                valid_at,
872            )
873            .await;
874        }));
875    }
876
877    for handle in handles {
878        let _ = handle.await;
879    }
880}
881
882/// Prepare, sign, and submit a single delegation group.
883async fn delegate_group<B, W, S, K>(
884    client: &Client<B, W, S, K>,
885    delegator: &DelegatorClient,
886    vtxo_inputs: Vec<intent::Input>,
887    outputs: Vec<intent::Output>,
888    cosigner_pk: PublicKey,
889    forfeit_address: &bitcoin::Address,
890    dust: Amount,
891    valid_at: u64,
892) where
893    B: Blockchain + Send + Sync + 'static,
894    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
895    S: SwapStorage + 'static,
896    K: KeyProvider + Send + Sync + 'static,
897{
898    let input_count = vtxo_inputs.len();
899
900    let mut delegate = match ark_core::batch::prepare_delegate_psbts_at(
901        vtxo_inputs,
902        outputs,
903        cosigner_pk,
904        forfeit_address,
905        dust,
906        Some(valid_at),
907    ) {
908        Ok(d) => d,
909        Err(e) => {
910            tracing::error!("Failed to prepare delegate PSBTs: {e}");
911            return;
912        }
913    };
914
915    if let Err(e) =
916        client.sign_delegate_psbts(&mut delegate.intent.proof, &mut delegate.forfeit_psbts)
917    {
918        tracing::error!("Failed to sign delegate PSBTs: {e}");
919        return;
920    }
921
922    if let Err(e) = delegator
923        .delegate(&delegate.intent, &delegate.forfeit_psbts, None)
924        .await
925    {
926        tracing::error!("Failed to submit delegation: {e}");
927        return;
928    }
929
930    tracing::info!(
931        vtxo_count = input_count,
932        valid_at,
933        "Delegated VTXO group to delegator service"
934    );
935}
936
937/// Fraction of VTXO lifetime remaining at which we self-renew as a safety net.
938const SELF_RENEW_REMAINING_FRACTION: f64 = 0.10;
939
940/// Self-renew VTXOs that are close to expiry.
941///
942/// Only settles VTXOs whose remaining lifetime is less than [`SELF_RENEW_REMAINING_FRACTION`] of
943/// their total lifetime, leaving freshly-received VTXOs alone.
944async fn renew_expiring_vtxos<B, W, S, K>(client: &Client<B, W, S, K>)
945where
946    B: Blockchain + Send + Sync + 'static,
947    W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
948    S: SwapStorage + 'static,
949    K: KeyProvider + Send + Sync + 'static,
950{
951    let (vtxo_list, _) = match client.list_vtxos().await {
952        Ok(v) => v,
953        Err(e) => {
954            tracing::warn!("Failed to list VTXOs for renewal check: {e}");
955            return;
956        }
957    };
958
959    let now = std::time::SystemTime::now()
960        .duration_since(std::time::UNIX_EPOCH)
961        .unwrap_or_default()
962        .as_secs() as i64;
963
964    let expiring_outpoints: Vec<OutPoint> = vtxo_list
965        .all_unspent()
966        .filter(|vtp| {
967            if vtp.expires_at <= 0 || vtp.created_at <= 0 {
968                return false;
969            }
970            let total_lifetime = vtp.expires_at - vtp.created_at;
971            let remaining = vtp.expires_at - now;
972            remaining > 0
973                && (remaining as f64) < (total_lifetime as f64 * SELF_RENEW_REMAINING_FRACTION)
974        })
975        .map(|vtp| vtp.outpoint)
976        .collect();
977
978    if expiring_outpoints.is_empty() {
979        return;
980    }
981
982    tracing::info!(
983        count = expiring_outpoints.len(),
984        "Self-renewing expiring VTXOs"
985    );
986
987    let mut rng = OsRng;
988    match client
989        .settle_vtxos(&mut rng, &expiring_outpoints, &[])
990        .await
991    {
992        Ok(Some(txid)) => {
993            tracing::info!(%txid, "Self-renewed expiring VTXOs");
994        }
995        Ok(None) => {}
996        Err(e) => {
997            tracing::warn!("Failed to self-renew VTXOs: {e}");
998        }
999    }
1000}
1001
1002#[cfg(test)]
1003mod tests {
1004    use super::*;
1005    use bitcoin::hashes::Hash;
1006    use bitcoin::key::Secp256k1;
1007    use bitcoin::Network;
1008    use bitcoin::Sequence;
1009    use bitcoin::Txid;
1010    use bitcoin::XOnlyPublicKey;
1011    use std::str::FromStr;
1012
1013    fn test_keys() -> (XOnlyPublicKey, XOnlyPublicKey, XOnlyPublicKey) {
1014        let server = XOnlyPublicKey::from_str(
1015            "18845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
1016        )
1017        .unwrap();
1018        let owner = XOnlyPublicKey::from_str(
1019            "28845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
1020        )
1021        .unwrap();
1022        let delegator = XOnlyPublicKey::from_str(
1023            "38845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
1024        )
1025        .unwrap();
1026        (server, owner, delegator)
1027    }
1028
1029    fn delegated_vtxo() -> (ArkAddress, Vtxo) {
1030        let secp = Secp256k1::new();
1031        let (server, owner, delegator) = test_keys();
1032        let vtxo = Vtxo::new_with_delegator(
1033            &secp,
1034            server,
1035            owner,
1036            delegator,
1037            Sequence::from_seconds_ceil(86400).unwrap(),
1038            Network::Regtest,
1039        )
1040        .unwrap();
1041        (vtxo.to_ark_address(), vtxo)
1042    }
1043
1044    fn mk_vtp(script: ScriptBuf, amount_sat: u64, expires_at: i64, vout: u32) -> VirtualTxOutPoint {
1045        VirtualTxOutPoint {
1046            outpoint: OutPoint::new(Txid::all_zeros(), vout),
1047            created_at: expires_at - 1000,
1048            expires_at,
1049            amount: Amount::from_sat(amount_sat),
1050            script,
1051            is_preconfirmed: false,
1052            is_swept: false,
1053            is_unrolled: false,
1054            is_spent: false,
1055            spent_by: None,
1056            commitment_txids: vec![],
1057            settled_by: None,
1058            ark_txid: None,
1059            assets: vec![],
1060        }
1061    }
1062
1063    #[test]
1064    fn migration_delay_uses_base_interval_when_healthy() {
1065        assert_eq!(migration_delay(0), MIGRATION_INTERVAL);
1066    }
1067
1068    #[test]
1069    fn migration_delay_backs_off_exponentially_and_caps() {
1070        // 30s * 2^(failures-1): 30s, 60s, 120s, 240s, then saturates at the 5min cap.
1071        assert_eq!(migration_delay(1), MIGRATION_BASE_COOLDOWN);
1072        assert_eq!(migration_delay(2), MIGRATION_BASE_COOLDOWN * 2);
1073        assert_eq!(migration_delay(3), MIGRATION_BASE_COOLDOWN * 4);
1074        assert_eq!(migration_delay(4), MIGRATION_BASE_COOLDOWN * 8);
1075        assert_eq!(migration_delay(5), MIGRATION_MAX_COOLDOWN);
1076        // Large failure counts must not panic (no shift overflow) and stay at the cap.
1077        assert_eq!(migration_delay(100), MIGRATION_MAX_COOLDOWN);
1078        assert_eq!(migration_delay(u32::MAX), MIGRATION_MAX_COOLDOWN);
1079    }
1080
1081    #[test]
1082    fn day_timestamp_normalizes_to_midnight() {
1083        let ts = 1705322700; // 2024-01-15 13:45:00 UTC
1084        let day = day_timestamp(ts);
1085        assert_eq!(day % SECONDS_PER_DAY, 0);
1086        assert!(day <= ts);
1087        assert!(ts - day < SECONDS_PER_DAY);
1088    }
1089
1090    #[test]
1091    fn day_timestamp_already_midnight() {
1092        let ts = SECONDS_PER_DAY * 19738;
1093        assert_eq!(day_timestamp(ts), ts);
1094    }
1095
1096    #[test]
1097    fn group_by_expiry_day_merges_recoverable_into_earliest_group() {
1098        let (addr, vtxo) = delegated_vtxo();
1099        let script = addr.to_p2tr_script_pubkey();
1100        let script_map = ScriptMap::from_addresses(&[(addr, vtxo)]);
1101
1102        let now = std::time::SystemTime::now()
1103            .duration_since(std::time::UNIX_EPOCH)
1104            .unwrap()
1105            .as_secs() as i64;
1106        let day1_midnight = day_timestamp(now) + SECONDS_PER_DAY;
1107        let day2_midnight = day1_midnight + SECONDS_PER_DAY;
1108
1109        let recoverable = mk_vtp(script.clone(), 100, day1_midnight + 500, 0); // sub-dust
1110        let non_recoverable_day1 = mk_vtp(script.clone(), 10_000, day1_midnight + 800, 1);
1111        let non_recoverable_day2 = mk_vtp(script, 10_000, day2_midnight + 800, 2);
1112
1113        let vtxos = [non_recoverable_day2, recoverable, non_recoverable_day1];
1114        let groups = group_by_expiry_day(&vtxos, &script_map, Amount::from_sat(500));
1115
1116        assert_eq!(groups.len(), 2);
1117        assert_eq!(groups[0].0, day_timestamp(day1_midnight + 800));
1118        assert_eq!(groups[1].0, day_timestamp(day2_midnight + 800));
1119        assert_eq!(groups[0].1.len(), 2);
1120        assert_eq!(groups[1].1.len(), 1);
1121    }
1122
1123    #[test]
1124    fn calculate_valid_at_for_non_recoverable_group_is_before_expiry() {
1125        let (_addr, vtxo) = delegated_vtxo();
1126        let script = ScriptBuf::new();
1127
1128        let now = std::time::SystemTime::now()
1129            .duration_since(std::time::UNIX_EPOCH)
1130            .unwrap()
1131            .as_secs() as i64;
1132
1133        let later = mk_vtp(script, 10_000, now + 10_000, 1);
1134        let group = vec![(&later, &vtxo)];
1135
1136        let valid_at = calculate_valid_at(&group, Amount::from_sat(500));
1137
1138        assert!(valid_at > now as u64);
1139        assert!(valid_at < later.expires_at as u64);
1140    }
1141
1142    #[test]
1143    fn calculate_valid_at_for_recoverable_only_group_is_soon() {
1144        let (_addr, vtxo) = delegated_vtxo();
1145        let script = ScriptBuf::new();
1146
1147        let now = std::time::SystemTime::now()
1148            .duration_since(std::time::UNIX_EPOCH)
1149            .unwrap()
1150            .as_secs() as i64;
1151
1152        let recoverable = mk_vtp(script, 100, now + 5_000, 0); // sub-dust at dust=500
1153        let group = vec![(&recoverable, &vtxo)];
1154
1155        let start = std::time::SystemTime::now()
1156            .duration_since(std::time::UNIX_EPOCH)
1157            .unwrap()
1158            .as_secs();
1159        let valid_at = calculate_valid_at(&group, Amount::from_sat(500));
1160        let end = std::time::SystemTime::now()
1161            .duration_since(std::time::UNIX_EPOCH)
1162            .unwrap()
1163            .as_secs();
1164
1165        assert!(valid_at >= start + 60);
1166        assert!(valid_at <= end + 61);
1167    }
1168}