1use crate::error::ErrorContext;
9use crate::key_provider::KeyProvider;
10use crate::swap_storage::SwapStorage;
11use crate::wallet::BoardingWallet;
12use crate::wallet::OnchainWallet;
13use crate::Blockchain;
14use crate::Client;
15use crate::Error;
16use ark_core::intent;
17use ark_core::server::SubscriptionResponse;
18use ark_core::server::VirtualTxOutPoint;
19use ark_core::ArkAddress;
20use ark_core::Vtxo;
21use ark_delegator::DelegatorClient;
22use bitcoin::secp256k1::PublicKey;
23use bitcoin::Amount;
24use bitcoin::OutPoint;
25use bitcoin::ScriptBuf;
26use bitcoin::TxOut;
27use futures::StreamExt;
28use rand::rngs::OsRng;
29use std::collections::BTreeMap;
30use std::collections::HashMap;
31use std::collections::HashSet;
32use std::sync::Arc;
33use std::time::Duration;
34use tokio::sync::mpsc;
35use tokio::sync::watch;
36
37pub struct VtxoWatcherHandle {
41 stop_tx: watch::Sender<bool>,
42}
43
44impl VtxoWatcherHandle {
45 pub fn stop(self) {
47 let _ = self.stop_tx.send(true);
48 }
49}
50
51impl Drop for VtxoWatcherHandle {
52 fn drop(&mut self) {
53 let _ = self.stop_tx.send(true);
54 }
55}
56
57const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
59const MAX_BACKOFF: Duration = Duration::from_secs(30);
60
61const KEY_DISCOVERY_INTERVAL: Duration = Duration::from_secs(10);
63const KEY_DISCOVERY_GAP_LIMIT: u32 = 20;
64
65const MIGRATION_INTERVAL: Duration = Duration::from_secs(60);
70
71const MIGRATION_BASE_COOLDOWN: Duration = Duration::from_secs(30);
74const MIGRATION_MAX_COOLDOWN: Duration = Duration::from_secs(300);
75
76#[derive(Debug, Clone, Copy)]
78pub struct VtxoWatcherConfig {
79 pub migrate_deprecated_signers: bool,
86}
87
88impl Default for VtxoWatcherConfig {
89 fn default() -> Self {
90 Self {
91 migrate_deprecated_signers: true,
92 }
93 }
94}
95
96struct ScriptMap {
101 vtxo_by_script: HashMap<ScriptBuf, Vtxo>,
102 addr_by_script: HashMap<ScriptBuf, ArkAddress>,
103}
104
105impl ScriptMap {
106 fn from_addresses(addresses: &[(ArkAddress, Vtxo)]) -> Self {
107 let mut vtxo_by_script = HashMap::with_capacity(addresses.len());
108 let mut addr_by_script = HashMap::with_capacity(addresses.len());
109 for (addr, vtxo) in addresses {
110 let script = addr.to_p2tr_script_pubkey();
111 vtxo_by_script.insert(script.clone(), vtxo.clone());
112 addr_by_script.insert(script, *addr);
113 }
114 Self {
115 vtxo_by_script,
116 addr_by_script,
117 }
118 }
119
120 fn addresses_for(&self, vtxos: &[VirtualTxOutPoint]) -> Vec<ArkAddress> {
122 let mut seen = HashSet::new();
123 let mut result = Vec::new();
124 for vtp in vtxos {
125 if let Some(addr) = self.addr_by_script.get(&vtp.script) {
126 if seen.insert(&vtp.script) {
127 result.push(*addr);
128 }
129 }
130 }
131 result
132 }
133}
134
135enum WatcherWork {
136 NewVtxos {
137 vtxos: Vec<VirtualTxOutPoint>,
138 script_map: Arc<ScriptMap>,
139 },
140 RenewTick {
141 script_map: Arc<ScriptMap>,
142 },
143}
144
145impl<B, W, S, K> Client<B, W, S, K>
146where
147 B: Blockchain + Send + Sync + 'static,
148 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
149 S: SwapStorage + 'static,
150 K: KeyProvider + Send + Sync + 'static,
151{
152 pub fn start_vtxo_watcher(
166 self: &Arc<Self>,
167 delegator: Arc<DelegatorClient>,
168 config: VtxoWatcherConfig,
169 ) -> VtxoWatcherHandle {
170 let (stop_tx, stop_rx) = watch::channel(false);
171
172 let client = Arc::clone(self);
173 tokio::spawn(async move {
174 run_watcher_loop(client, delegator, config, stop_rx).await;
175 tracing::debug!("VTXO watcher stopped");
176 });
177
178 VtxoWatcherHandle { stop_tx }
179 }
180}
181
182async fn run_watcher_loop<B, W, S, K>(
184 client: Arc<Client<B, W, S, K>>,
185 delegator: Arc<DelegatorClient>,
186 config: VtxoWatcherConfig,
187 mut stop_rx: watch::Receiver<bool>,
188) where
189 B: Blockchain + Send + Sync + 'static,
190 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
191 S: SwapStorage + 'static,
192 K: KeyProvider + Send + Sync + 'static,
193{
194 let mut backoff = INITIAL_BACKOFF;
195
196 loop {
197 if *stop_rx.borrow() {
198 return;
199 }
200
201 let addresses = match client.get_offchain_addresses() {
203 Ok(a) => a,
204 Err(e) => {
205 tracing::error!("Failed to get offchain addresses: {e}");
206 return;
207 }
208 };
209 let script_map = Arc::new(ScriptMap::from_addresses(&addresses));
210 let ark_addresses: Vec<_> = addresses.iter().map(|(addr, _)| *addr).collect();
211
212 let subscription_id = match client.subscribe_to_scripts(ark_addresses, None).await {
213 Ok(id) => id,
214 Err(e) => {
215 tracing::warn!("Failed to subscribe: {e}, retrying in {backoff:?}");
216 if wait_or_stop(&mut stop_rx, backoff).await {
217 return;
218 }
219 backoff = (backoff * 2).min(MAX_BACKOFF);
220 continue;
221 }
222 };
223
224 let mut stream = match client.get_subscription(subscription_id.clone()).await {
225 Ok(s) => s,
226 Err(e) => {
227 tracing::warn!("Failed to get subscription stream: {e}, retrying in {backoff:?}");
228 if wait_or_stop(&mut stop_rx, backoff).await {
229 return;
230 }
231 backoff = (backoff * 2).min(MAX_BACKOFF);
232 continue;
233 }
234 };
235
236 tracing::info!("VTXO watcher connected");
237 backoff = INITIAL_BACKOFF;
238 let mut subscribed_addrs: HashSet<ArkAddress> =
239 addresses.iter().map(|(addr, _)| *addr).collect();
240 let mut script_map = script_map;
241 let mut renew_interval = tokio::time::interval(Duration::from_secs(60));
242 let mut discovery_interval = tokio::time::interval(KEY_DISCOVERY_INTERVAL);
243 let (work_tx, mut work_rx) = mpsc::channel::<WatcherWork>(128);
244
245 let worker_handle = tokio::spawn({
246 let client = client.clone();
247 let delegator = delegator.clone();
248 async move {
249 let mut seen_unspent_outpoints = HashSet::<OutPoint>::new();
250
251 while let Some(first) = work_rx.recv().await {
252 let (
254 mut pending_vtxos,
255 mut latest_script_map,
256 mut should_renew,
257 mut should_sync,
258 ) = match first {
259 WatcherWork::NewVtxos { vtxos, script_map } => {
260 (vtxos, Some(script_map), true, false)
261 }
262 WatcherWork::RenewTick { script_map } => {
263 (Vec::new(), Some(script_map), true, true)
264 }
265 };
266
267 while let Ok(work) = work_rx.try_recv() {
269 match work {
270 WatcherWork::NewVtxos { vtxos, script_map } => {
271 pending_vtxos.extend(vtxos);
272 latest_script_map = Some(script_map);
273 should_renew = true;
274 }
275 WatcherWork::RenewTick { script_map } => {
276 latest_script_map = Some(script_map);
277 should_renew = true;
278 should_sync = true;
279 }
280 }
281 }
282
283 if let (true, Some(script_map)) = (should_sync, latest_script_map.as_deref()) {
284 match collect_new_delegation_candidates(
285 &client,
286 script_map,
287 &mut seen_unspent_outpoints,
288 )
289 .await
290 {
291 Ok(new_candidates) => {
292 if !new_candidates.is_empty() {
293 tracing::debug!(
294 count = new_candidates.len(),
295 "Found new delegatable VTXOs from failsafe polling"
296 );
297 pending_vtxos.extend(new_candidates);
298 }
299 }
300 Err(e) => {
301 tracing::warn!("Failsafe delegation poll failed: {e}");
302 }
303 }
304 }
305
306 if !pending_vtxos.is_empty() {
307 let mut deduped = Vec::new();
308 let mut seen = HashSet::new();
309 for vtxo in pending_vtxos {
310 if seen.insert(vtxo.outpoint) {
311 deduped.push(vtxo);
312 }
313 }
314
315 tracing::debug!(count = deduped.len(), "Processing VTXOs for delegation");
316 if let Some(script_map) = latest_script_map {
317 delegate_vtxos(&client, &delegator, &deduped, &script_map).await;
318 }
319 }
320
321 if should_renew {
322 renew_expiring_vtxos(&client).await;
323 }
324 }
325 }
326 });
327
328 let migration_handle = config.migrate_deprecated_signers.then(|| {
334 let client = client.clone();
335 let mut stop_rx = stop_rx.clone();
336 tokio::spawn(async move {
337 run_migration_arm(&client, &mut stop_rx).await;
338 })
339 });
340
341 loop {
342 tokio::select! {
343 _ = stop_rx.changed() => {
344 drop(work_tx);
345 let _ = worker_handle.await;
346 if let Some(handle) = migration_handle {
347 handle.abort();
348 }
349 return;
350 }
351 _ = renew_interval.tick() => {
352 if work_tx.send(WatcherWork::RenewTick {
353 script_map: Arc::clone(&script_map),
354 }).await.is_err() {
355 tracing::warn!("VTXO worker channel closed, reconnecting in {backoff:?}");
356 break;
357 }
358 }
359 _ = discovery_interval.tick() => {
360 match refresh_subscription_scripts(
361 client.as_ref(),
362 &subscription_id,
363 &mut subscribed_addrs,
364 )
365 .await
366 {
367 Ok(Some(new_script_map)) => {
368 script_map = new_script_map;
369 }
370 Ok(None) => {}
371 Err(e) => {
372 tracing::warn!("Failed to refresh script subscription: {e}");
373 }
374 }
375 }
376 event = stream.next() => {
377 match event {
378 Some(Ok(SubscriptionResponse::Heartbeat)) => {}
379 Some(Ok(SubscriptionResponse::Event(event))) => {
380 if !event.new_vtxos.is_empty() {
381 tracing::debug!(
382 txid = %event.txid,
383 new_vtxos = event.new_vtxos.len(),
384 "Received subscription event with new VTXOs"
385 );
386
387 if work_tx.send(WatcherWork::NewVtxos {
388 vtxos: event.new_vtxos,
389 script_map: Arc::clone(&script_map),
390 })
391 .await.is_err()
392 {
393 tracing::warn!("VTXO worker channel closed. Reconnecting in {backoff:?}");
394 break;
395 }
396 }
397 }
398 Some(Err(e)) => {
399 tracing::warn!("VTXO subscription error: {e}, reconnecting in {backoff:?}");
400 break;
401 }
402 None => {
403 tracing::debug!("VTXO subscription stream ended, reconnecting in {backoff:?}");
404 break;
405 }
406 }
407 }
408 }
409 }
410
411 drop(work_tx);
412 let _ = worker_handle.await;
413 if let Some(handle) = migration_handle {
416 handle.abort();
417 }
418
419 if wait_or_stop(&mut stop_rx, backoff).await {
420 return;
421 }
422 backoff = (backoff * 2).min(MAX_BACKOFF);
423 }
424}
425
426async fn run_migration_arm<B, W, S, K>(
438 client: &Client<B, W, S, K>,
439 stop_rx: &mut watch::Receiver<bool>,
440) where
441 B: Blockchain + Send + Sync + 'static,
442 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
443 S: SwapStorage + 'static,
444 K: KeyProvider + Send + Sync + 'static,
445{
446 let mut consecutive_failures: u32 = 0;
449 loop {
450 let delay = migration_delay(consecutive_failures);
451 if wait_or_stop(stop_rx, delay).await {
452 return;
453 }
454
455 let mut rng = OsRng;
456 match client.migrate_deprecated_signer_vtxos(&mut rng).await {
457 Ok(report) => {
458 if report.failed() {
459 consecutive_failures = consecutive_failures.saturating_add(1);
460 let next = migration_delay(consecutive_failures);
461 tracing::warn!(
462 txids = ?report.settle_txids(),
463 vtxo_error = ?report.vtxo.error.as_deref(),
464 boarding_error = ?report.boarding.error.as_deref(),
465 "Background migration pass had leg failure; backing off {next:?}"
466 );
467 } else {
468 if report.rotated() {
469 tracing::info!(
470 txids = ?report.settle_txids(),
471 "Background migration rotated funds off deprecated signer(s)"
472 );
473 } else {
474 tracing::debug!("Background migration pass: nothing to migrate");
475 }
476 consecutive_failures = 0;
478 }
479 }
480 Err(e) => {
481 consecutive_failures = consecutive_failures.saturating_add(1);
483 let next = migration_delay(consecutive_failures);
484 tracing::warn!("Background migration pass failed: {e}; backing off {next:?}");
485 }
486 }
487 }
488}
489
490fn migration_delay(consecutive_failures: u32) -> Duration {
495 if consecutive_failures == 0 {
496 return MIGRATION_INTERVAL;
497 }
498 let shift = consecutive_failures - 1;
499 let scaled = MIGRATION_BASE_COOLDOWN
500 .checked_mul(1u32.checked_shl(shift).unwrap_or(u32::MAX))
501 .unwrap_or(MIGRATION_MAX_COOLDOWN);
502 scaled.min(MIGRATION_MAX_COOLDOWN)
503}
504
505async fn wait_or_stop(stop_rx: &mut watch::Receiver<bool>, duration: Duration) -> bool {
507 tokio::select! {
508 _ = stop_rx.changed() => true,
509 _ = tokio::time::sleep(duration) => false,
510 }
511}
512
513async fn refresh_subscription_scripts<B, W, S, K>(
515 client: &Client<B, W, S, K>,
516 subscription_id: &str,
517 subscribed_addrs: &mut HashSet<ArkAddress>,
518) -> Result<Option<Arc<ScriptMap>>, Error>
519where
520 B: Blockchain + Send + Sync + 'static,
521 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
522 S: SwapStorage + 'static,
523 K: KeyProvider + Send + Sync + 'static,
524{
525 let _discovered = client.discover_keys(KEY_DISCOVERY_GAP_LIMIT).await?;
526
527 let addrs = client.get_offchain_addresses()?;
528 let new_addrs: Vec<_> = addrs
529 .iter()
530 .map(|(addr, _)| *addr)
531 .filter(|addr| !subscribed_addrs.contains(addr))
532 .collect();
533
534 if new_addrs.is_empty() {
535 return Ok(None);
536 }
537
538 client
539 .subscribe_to_scripts(new_addrs.clone(), Some(subscription_id.to_string()))
540 .await?;
541
542 let added = new_addrs.len();
543 subscribed_addrs.extend(new_addrs);
544 tracing::info!(
545 added,
546 "Updated watcher subscription with newly derived addresses"
547 );
548
549 Ok(Some(Arc::new(ScriptMap::from_addresses(&addrs))))
550}
551
552async fn collect_new_delegation_candidates<B, W, S, K>(
556 client: &Client<B, W, S, K>,
557 script_map: &ScriptMap,
558 seen_unspent_outpoints: &mut HashSet<OutPoint>,
559) -> Result<Vec<VirtualTxOutPoint>, Error>
560where
561 B: Blockchain + Send + Sync + 'static,
562 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
563 S: SwapStorage + 'static,
564 K: KeyProvider + Send + Sync + 'static,
565{
566 let (vtxo_list, _) = client.list_vtxos().await?;
567
568 let mut current_outpoints = HashSet::new();
569 let mut newly_seen = Vec::new();
570
571 for vtp in vtxo_list.all_unspent() {
572 let Some(vtxo) = script_map.vtxo_by_script.get(&vtp.script) else {
573 continue;
574 };
575
576 if vtxo.delegator_pk().is_none() {
577 continue;
578 }
579
580 current_outpoints.insert(vtp.outpoint);
581
582 if !seen_unspent_outpoints.contains(&vtp.outpoint) {
583 newly_seen.push(vtp.clone());
584 }
585 }
586
587 *seen_unspent_outpoints = current_outpoints;
588
589 Ok(newly_seen)
590}
591
592struct DelegatorState {
594 cosigner_pk: PublicKey,
595 fee: Amount,
596 fee_address_script: ScriptBuf,
597}
598
599async fn fetch_delegator_state(delegator: &DelegatorClient) -> Result<DelegatorState, Error> {
601 let info = delegator
602 .info()
603 .await
604 .context(Error::ad_hoc("failed to get delegator info"))?;
605
606 let cosigner_pk: PublicKey = info
607 .pubkey
608 .parse::<PublicKey>()
609 .context("failed to parse delegator PK")?;
610
611 let fee = info
612 .fee
613 .parse::<u64>()
614 .map(Amount::from_sat)
615 .context("failed to parse delegator fee")?;
616
617 let fee_address_script = info
618 .delegator_address
619 .parse::<ArkAddress>()
620 .context("failed to parse delegator fee address")?
621 .to_p2tr_script_pubkey();
622
623 Ok(DelegatorState {
624 cosigner_pk,
625 fee,
626 fee_address_script,
627 })
628}
629
630const SECONDS_PER_DAY: i64 = 86_400;
632
633fn day_timestamp(ts: i64) -> i64 {
635 ts - ts.rem_euclid(SECONDS_PER_DAY)
636}
637
638fn group_by_expiry_day<'a>(
643 vtxos: &'a [VirtualTxOutPoint],
644 script_map: &'a ScriptMap,
645 dust: Amount,
646) -> Vec<(i64, Vec<(&'a VirtualTxOutPoint, &'a Vtxo)>)> {
647 let mut groups: BTreeMap<i64, Vec<(&'a VirtualTxOutPoint, &'a Vtxo)>> = BTreeMap::new();
648 let mut recoverable: Vec<(&'a VirtualTxOutPoint, &'a Vtxo)> = Vec::new();
649
650 for vtp in vtxos {
651 if vtp.is_spent {
652 continue;
653 }
654
655 let vtxo = match script_map.vtxo_by_script.get(&vtp.script) {
656 Some(v) => v,
657 None => continue,
658 };
659
660 if vtxo.delegator_pk().is_none() {
661 continue;
662 }
663
664 if vtp.is_recoverable(dust) {
665 recoverable.push((vtp, vtxo));
666 } else if vtp.expires_at > 0 {
667 let day = day_timestamp(vtp.expires_at);
668 groups.entry(day).or_default().push((vtp, vtxo));
669 }
670 }
671
672 if !recoverable.is_empty() {
673 if let Some((&earliest_day, _)) = groups.iter().next() {
674 groups.entry(earliest_day).or_default().extend(recoverable);
675 } else {
676 groups.insert(0, recoverable);
677 }
678 }
679
680 groups.into_iter().collect()
681}
682
683fn calculate_valid_at(group_vtxos: &[(&VirtualTxOutPoint, &Vtxo)], dust: Amount) -> u64 {
691 let now_secs = std::time::SystemTime::now()
692 .duration_since(std::time::UNIX_EPOCH)
693 .unwrap_or_default()
694 .as_secs();
695
696 let earliest_activation = group_vtxos
697 .iter()
698 .filter(|(vtp, _)| {
699 !vtp.is_recoverable(dust)
700 && vtp.created_at > 0
701 && vtp.expires_at > 0
702 && vtp.expires_at > vtp.created_at
703 })
704 .map(|(vtp, _)| {
705 let created_at = vtp.created_at as u64;
706 let lifetime = (vtp.expires_at - vtp.created_at) as u64;
707 created_at + (lifetime * 9 / 10)
708 })
709 .min();
710
711 match earliest_activation {
712 Some(valid_at) if valid_at > now_secs => valid_at,
713 _ => now_secs + 60,
714 }
715}
716
717async fn delegate_vtxos<B, W, S, K>(
722 client: &Arc<Client<B, W, S, K>>,
723 delegator: &DelegatorClient,
724 new_vtxos: &[VirtualTxOutPoint],
725 script_map: &ScriptMap,
726) where
727 B: Blockchain + Send + Sync + 'static,
728 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
729 S: SwapStorage + 'static,
730 K: KeyProvider + Send + Sync + 'static,
731{
732 let affected_addresses = script_map.addresses_for(new_vtxos);
734 if affected_addresses.is_empty() {
735 tracing::debug!("No affected addresses resolved from new VTXOs; skipping delegation");
736 return;
737 }
738
739 let vtxo_list = match client
740 .list_vtxos_for_addresses(affected_addresses.into_iter())
741 .await
742 {
743 Ok(v) => v,
744 Err(e) => {
745 tracing::error!("Failed to list VTXOs for delegation: {e}");
746 return;
747 }
748 };
749
750 let new_outpoints: HashSet<_> = new_vtxos.iter().map(|v| v.outpoint).collect();
753 let enriched: Vec<_> = vtxo_list
754 .all_unspent()
755 .filter(|vtp| new_outpoints.contains(&vtp.outpoint))
756 .cloned()
757 .collect();
758
759 let server_info = match client.server_info() {
760 Ok(server_info) => server_info,
761 Err(e) => {
762 tracing::error!("Failed to read server info for delegation: {e}");
763 return;
764 }
765 };
766
767 let groups = group_by_expiry_day(&enriched, script_map, server_info.dust);
768 if groups.is_empty() {
769 tracing::debug!("No delegate-eligible VTXOs after enrichment/grouping; skipping");
770 return;
771 }
772
773 let delegator_state = match fetch_delegator_state(delegator).await {
774 Ok(s) => Arc::new(s),
775 Err(e) => {
776 tracing::error!("{e}");
777 return;
778 }
779 };
780
781 let (to_address, _) = match client.get_offchain_address() {
782 Ok(v) => v,
783 Err(e) => {
784 tracing::error!("Failed to get offchain address for delegation: {e}");
785 return;
786 }
787 };
788 let dest_script = to_address.to_p2tr_script_pubkey();
789
790 let mut handles = Vec::new();
791
792 for (_day, group_vtxos) in groups {
793 let valid_at = calculate_valid_at(&group_vtxos, server_info.dust);
794
795 let mut vtxo_inputs = Vec::new();
796 let mut total_amount = Amount::ZERO;
797
798 for (vtp, vtxo) in &group_vtxos {
799 let spend_info = match vtxo.delegate_spend_info() {
800 Ok(info) => info,
801 Err(e) => {
802 tracing::warn!(outpoint = %vtp.outpoint, "Cannot get delegate spend info: {e}");
803 continue;
804 }
805 };
806
807 vtxo_inputs.push(intent::Input::new(
808 vtp.outpoint,
809 vtxo.exit_delay(),
810 None,
811 TxOut {
812 value: vtp.amount,
813 script_pubkey: vtp.script.clone(),
814 },
815 vtxo.tapscripts(),
816 spend_info,
817 vtp.is_spent,
818 false,
819 vtp.assets.clone(),
820 ));
821
822 total_amount += vtp.amount;
823 }
824
825 if vtxo_inputs.is_empty() {
826 continue;
827 }
828
829 let fee = delegator_state.fee;
830 if fee >= total_amount {
831 tracing::warn!(
832 %total_amount, %fee,
833 "Delegator fee exceeds VTXO group value, skipping"
834 );
835 continue;
836 }
837 let net_amount = total_amount - fee;
838
839 if net_amount < server_info.dust {
840 tracing::warn!(%net_amount, "Net amount after fee is below dust, skipping");
841 continue;
842 }
843
844 let mut outputs = Vec::new();
845 if fee > Amount::ZERO {
846 outputs.push(intent::Output::Offchain(TxOut {
847 value: fee,
848 script_pubkey: delegator_state.fee_address_script.clone(),
849 }));
850 }
851 outputs.push(intent::Output::Offchain(TxOut {
852 value: net_amount,
853 script_pubkey: dest_script.clone(),
854 }));
855
856 let server_info_forfeit_addr = server_info.forfeit_address.clone();
857 let dust = server_info.dust;
858 let ds = Arc::clone(&delegator_state);
859
860 let delegator = delegator.clone();
861 let client = Arc::clone(client);
862 handles.push(tokio::spawn(async move {
863 delegate_group(
864 &client,
865 &delegator,
866 vtxo_inputs,
867 outputs,
868 ds.cosigner_pk,
869 &server_info_forfeit_addr,
870 dust,
871 valid_at,
872 )
873 .await;
874 }));
875 }
876
877 for handle in handles {
878 let _ = handle.await;
879 }
880}
881
882async fn delegate_group<B, W, S, K>(
884 client: &Client<B, W, S, K>,
885 delegator: &DelegatorClient,
886 vtxo_inputs: Vec<intent::Input>,
887 outputs: Vec<intent::Output>,
888 cosigner_pk: PublicKey,
889 forfeit_address: &bitcoin::Address,
890 dust: Amount,
891 valid_at: u64,
892) where
893 B: Blockchain + Send + Sync + 'static,
894 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
895 S: SwapStorage + 'static,
896 K: KeyProvider + Send + Sync + 'static,
897{
898 let input_count = vtxo_inputs.len();
899
900 let mut delegate = match ark_core::batch::prepare_delegate_psbts_at(
901 vtxo_inputs,
902 outputs,
903 cosigner_pk,
904 forfeit_address,
905 dust,
906 Some(valid_at),
907 ) {
908 Ok(d) => d,
909 Err(e) => {
910 tracing::error!("Failed to prepare delegate PSBTs: {e}");
911 return;
912 }
913 };
914
915 if let Err(e) =
916 client.sign_delegate_psbts(&mut delegate.intent.proof, &mut delegate.forfeit_psbts)
917 {
918 tracing::error!("Failed to sign delegate PSBTs: {e}");
919 return;
920 }
921
922 if let Err(e) = delegator
923 .delegate(&delegate.intent, &delegate.forfeit_psbts, None)
924 .await
925 {
926 tracing::error!("Failed to submit delegation: {e}");
927 return;
928 }
929
930 tracing::info!(
931 vtxo_count = input_count,
932 valid_at,
933 "Delegated VTXO group to delegator service"
934 );
935}
936
937const SELF_RENEW_REMAINING_FRACTION: f64 = 0.10;
939
940async fn renew_expiring_vtxos<B, W, S, K>(client: &Client<B, W, S, K>)
945where
946 B: Blockchain + Send + Sync + 'static,
947 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
948 S: SwapStorage + 'static,
949 K: KeyProvider + Send + Sync + 'static,
950{
951 let (vtxo_list, _) = match client.list_vtxos().await {
952 Ok(v) => v,
953 Err(e) => {
954 tracing::warn!("Failed to list VTXOs for renewal check: {e}");
955 return;
956 }
957 };
958
959 let now = std::time::SystemTime::now()
960 .duration_since(std::time::UNIX_EPOCH)
961 .unwrap_or_default()
962 .as_secs() as i64;
963
964 let expiring_outpoints: Vec<OutPoint> = vtxo_list
965 .all_unspent()
966 .filter(|vtp| {
967 if vtp.expires_at <= 0 || vtp.created_at <= 0 {
968 return false;
969 }
970 let total_lifetime = vtp.expires_at - vtp.created_at;
971 let remaining = vtp.expires_at - now;
972 remaining > 0
973 && (remaining as f64) < (total_lifetime as f64 * SELF_RENEW_REMAINING_FRACTION)
974 })
975 .map(|vtp| vtp.outpoint)
976 .collect();
977
978 if expiring_outpoints.is_empty() {
979 return;
980 }
981
982 tracing::info!(
983 count = expiring_outpoints.len(),
984 "Self-renewing expiring VTXOs"
985 );
986
987 let mut rng = OsRng;
988 match client
989 .settle_vtxos(&mut rng, &expiring_outpoints, &[])
990 .await
991 {
992 Ok(Some(txid)) => {
993 tracing::info!(%txid, "Self-renewed expiring VTXOs");
994 }
995 Ok(None) => {}
996 Err(e) => {
997 tracing::warn!("Failed to self-renew VTXOs: {e}");
998 }
999 }
1000}
1001
1002#[cfg(test)]
1003mod tests {
1004 use super::*;
1005 use bitcoin::hashes::Hash;
1006 use bitcoin::key::Secp256k1;
1007 use bitcoin::Network;
1008 use bitcoin::Sequence;
1009 use bitcoin::Txid;
1010 use bitcoin::XOnlyPublicKey;
1011 use std::str::FromStr;
1012
1013 fn test_keys() -> (XOnlyPublicKey, XOnlyPublicKey, XOnlyPublicKey) {
1014 let server = XOnlyPublicKey::from_str(
1015 "18845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
1016 )
1017 .unwrap();
1018 let owner = XOnlyPublicKey::from_str(
1019 "28845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
1020 )
1021 .unwrap();
1022 let delegator = XOnlyPublicKey::from_str(
1023 "38845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
1024 )
1025 .unwrap();
1026 (server, owner, delegator)
1027 }
1028
1029 fn delegated_vtxo() -> (ArkAddress, Vtxo) {
1030 let secp = Secp256k1::new();
1031 let (server, owner, delegator) = test_keys();
1032 let vtxo = Vtxo::new_with_delegator(
1033 &secp,
1034 server,
1035 owner,
1036 delegator,
1037 Sequence::from_seconds_ceil(86400).unwrap(),
1038 Network::Regtest,
1039 )
1040 .unwrap();
1041 (vtxo.to_ark_address(), vtxo)
1042 }
1043
1044 fn mk_vtp(script: ScriptBuf, amount_sat: u64, expires_at: i64, vout: u32) -> VirtualTxOutPoint {
1045 VirtualTxOutPoint {
1046 outpoint: OutPoint::new(Txid::all_zeros(), vout),
1047 created_at: expires_at - 1000,
1048 expires_at,
1049 amount: Amount::from_sat(amount_sat),
1050 script,
1051 is_preconfirmed: false,
1052 is_swept: false,
1053 is_unrolled: false,
1054 is_spent: false,
1055 spent_by: None,
1056 commitment_txids: vec![],
1057 settled_by: None,
1058 ark_txid: None,
1059 assets: vec![],
1060 }
1061 }
1062
1063 #[test]
1064 fn migration_delay_uses_base_interval_when_healthy() {
1065 assert_eq!(migration_delay(0), MIGRATION_INTERVAL);
1066 }
1067
1068 #[test]
1069 fn migration_delay_backs_off_exponentially_and_caps() {
1070 assert_eq!(migration_delay(1), MIGRATION_BASE_COOLDOWN);
1072 assert_eq!(migration_delay(2), MIGRATION_BASE_COOLDOWN * 2);
1073 assert_eq!(migration_delay(3), MIGRATION_BASE_COOLDOWN * 4);
1074 assert_eq!(migration_delay(4), MIGRATION_BASE_COOLDOWN * 8);
1075 assert_eq!(migration_delay(5), MIGRATION_MAX_COOLDOWN);
1076 assert_eq!(migration_delay(100), MIGRATION_MAX_COOLDOWN);
1078 assert_eq!(migration_delay(u32::MAX), MIGRATION_MAX_COOLDOWN);
1079 }
1080
1081 #[test]
1082 fn day_timestamp_normalizes_to_midnight() {
1083 let ts = 1705322700; let day = day_timestamp(ts);
1085 assert_eq!(day % SECONDS_PER_DAY, 0);
1086 assert!(day <= ts);
1087 assert!(ts - day < SECONDS_PER_DAY);
1088 }
1089
1090 #[test]
1091 fn day_timestamp_already_midnight() {
1092 let ts = SECONDS_PER_DAY * 19738;
1093 assert_eq!(day_timestamp(ts), ts);
1094 }
1095
1096 #[test]
1097 fn group_by_expiry_day_merges_recoverable_into_earliest_group() {
1098 let (addr, vtxo) = delegated_vtxo();
1099 let script = addr.to_p2tr_script_pubkey();
1100 let script_map = ScriptMap::from_addresses(&[(addr, vtxo)]);
1101
1102 let now = std::time::SystemTime::now()
1103 .duration_since(std::time::UNIX_EPOCH)
1104 .unwrap()
1105 .as_secs() as i64;
1106 let day1_midnight = day_timestamp(now) + SECONDS_PER_DAY;
1107 let day2_midnight = day1_midnight + SECONDS_PER_DAY;
1108
1109 let recoverable = mk_vtp(script.clone(), 100, day1_midnight + 500, 0); let non_recoverable_day1 = mk_vtp(script.clone(), 10_000, day1_midnight + 800, 1);
1111 let non_recoverable_day2 = mk_vtp(script, 10_000, day2_midnight + 800, 2);
1112
1113 let vtxos = [non_recoverable_day2, recoverable, non_recoverable_day1];
1114 let groups = group_by_expiry_day(&vtxos, &script_map, Amount::from_sat(500));
1115
1116 assert_eq!(groups.len(), 2);
1117 assert_eq!(groups[0].0, day_timestamp(day1_midnight + 800));
1118 assert_eq!(groups[1].0, day_timestamp(day2_midnight + 800));
1119 assert_eq!(groups[0].1.len(), 2);
1120 assert_eq!(groups[1].1.len(), 1);
1121 }
1122
1123 #[test]
1124 fn calculate_valid_at_for_non_recoverable_group_is_before_expiry() {
1125 let (_addr, vtxo) = delegated_vtxo();
1126 let script = ScriptBuf::new();
1127
1128 let now = std::time::SystemTime::now()
1129 .duration_since(std::time::UNIX_EPOCH)
1130 .unwrap()
1131 .as_secs() as i64;
1132
1133 let later = mk_vtp(script, 10_000, now + 10_000, 1);
1134 let group = vec![(&later, &vtxo)];
1135
1136 let valid_at = calculate_valid_at(&group, Amount::from_sat(500));
1137
1138 assert!(valid_at > now as u64);
1139 assert!(valid_at < later.expires_at as u64);
1140 }
1141
1142 #[test]
1143 fn calculate_valid_at_for_recoverable_only_group_is_soon() {
1144 let (_addr, vtxo) = delegated_vtxo();
1145 let script = ScriptBuf::new();
1146
1147 let now = std::time::SystemTime::now()
1148 .duration_since(std::time::UNIX_EPOCH)
1149 .unwrap()
1150 .as_secs() as i64;
1151
1152 let recoverable = mk_vtp(script, 100, now + 5_000, 0); let group = vec![(&recoverable, &vtxo)];
1154
1155 let start = std::time::SystemTime::now()
1156 .duration_since(std::time::UNIX_EPOCH)
1157 .unwrap()
1158 .as_secs();
1159 let valid_at = calculate_valid_at(&group, Amount::from_sat(500));
1160 let end = std::time::SystemTime::now()
1161 .duration_since(std::time::UNIX_EPOCH)
1162 .unwrap()
1163 .as_secs();
1164
1165 assert!(valid_at >= start + 60);
1166 assert!(valid_at <= end + 61);
1167 }
1168}