1use crate::error::ErrorContext;
9use crate::key_provider::KeyProvider;
10use crate::swap_storage::SwapStorage;
11use crate::wallet::BoardingWallet;
12use crate::wallet::OnchainWallet;
13use crate::Blockchain;
14use crate::Client;
15use crate::Error;
16use ark_core::intent;
17use ark_core::server::SubscriptionResponse;
18use ark_core::server::VirtualTxOutPoint;
19use ark_core::ArkAddress;
20use ark_core::Vtxo;
21use ark_delegator::DelegatorClient;
22use bitcoin::secp256k1::PublicKey;
23use bitcoin::Amount;
24use bitcoin::OutPoint;
25use bitcoin::ScriptBuf;
26use bitcoin::TxOut;
27use futures::StreamExt;
28use rand::rngs::OsRng;
29use std::collections::BTreeMap;
30use std::collections::HashMap;
31use std::collections::HashSet;
32use std::sync::Arc;
33use std::time::Duration;
34use tokio::sync::mpsc;
35use tokio::sync::watch;
36
37pub struct VtxoWatcherHandle {
41 stop_tx: watch::Sender<bool>,
42}
43
44impl VtxoWatcherHandle {
45 pub fn stop(self) {
47 let _ = self.stop_tx.send(true);
48 }
49}
50
51impl Drop for VtxoWatcherHandle {
52 fn drop(&mut self) {
53 let _ = self.stop_tx.send(true);
54 }
55}
56
57const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
59const MAX_BACKOFF: Duration = Duration::from_secs(30);
60
61const KEY_DISCOVERY_INTERVAL: Duration = Duration::from_secs(10);
63const KEY_DISCOVERY_GAP_LIMIT: u32 = 20;
64
65struct ScriptMap {
70 vtxo_by_script: HashMap<ScriptBuf, Vtxo>,
71 addr_by_script: HashMap<ScriptBuf, ArkAddress>,
72}
73
74impl ScriptMap {
75 fn from_addresses(addresses: &[(ArkAddress, Vtxo)]) -> Self {
76 let mut vtxo_by_script = HashMap::with_capacity(addresses.len());
77 let mut addr_by_script = HashMap::with_capacity(addresses.len());
78 for (addr, vtxo) in addresses {
79 let script = addr.to_p2tr_script_pubkey();
80 vtxo_by_script.insert(script.clone(), vtxo.clone());
81 addr_by_script.insert(script, *addr);
82 }
83 Self {
84 vtxo_by_script,
85 addr_by_script,
86 }
87 }
88
89 fn addresses_for(&self, vtxos: &[VirtualTxOutPoint]) -> Vec<ArkAddress> {
91 let mut seen = HashSet::new();
92 let mut result = Vec::new();
93 for vtp in vtxos {
94 if let Some(addr) = self.addr_by_script.get(&vtp.script) {
95 if seen.insert(&vtp.script) {
96 result.push(*addr);
97 }
98 }
99 }
100 result
101 }
102}
103
104enum WatcherWork {
105 NewVtxos {
106 vtxos: Vec<VirtualTxOutPoint>,
107 script_map: Arc<ScriptMap>,
108 },
109 RenewTick {
110 script_map: Arc<ScriptMap>,
111 },
112}
113
114impl<B, W, S, K> Client<B, W, S, K>
115where
116 B: Blockchain + Send + Sync + 'static,
117 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
118 S: SwapStorage + 'static,
119 K: KeyProvider + Send + Sync + 'static,
120{
121 pub fn start_vtxo_watcher(
133 self: &Arc<Self>,
134 delegator: Arc<DelegatorClient>,
135 ) -> VtxoWatcherHandle {
136 let (stop_tx, stop_rx) = watch::channel(false);
137
138 let client = Arc::clone(self);
139 tokio::spawn(async move {
140 run_watcher_loop(client, delegator, stop_rx).await;
141 tracing::debug!("VTXO watcher stopped");
142 });
143
144 VtxoWatcherHandle { stop_tx }
145 }
146}
147
148async fn run_watcher_loop<B, W, S, K>(
150 client: Arc<Client<B, W, S, K>>,
151 delegator: Arc<DelegatorClient>,
152 mut stop_rx: watch::Receiver<bool>,
153) where
154 B: Blockchain + Send + Sync + 'static,
155 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
156 S: SwapStorage + 'static,
157 K: KeyProvider + Send + Sync + 'static,
158{
159 let mut backoff = INITIAL_BACKOFF;
160
161 loop {
162 if *stop_rx.borrow() {
163 return;
164 }
165
166 let addresses = match client.get_offchain_addresses() {
168 Ok(a) => a,
169 Err(e) => {
170 tracing::error!("Failed to get offchain addresses: {e}");
171 return;
172 }
173 };
174 let script_map = Arc::new(ScriptMap::from_addresses(&addresses));
175 let ark_addresses: Vec<_> = addresses.iter().map(|(addr, _)| *addr).collect();
176
177 let subscription_id = match client.subscribe_to_scripts(ark_addresses, None).await {
178 Ok(id) => id,
179 Err(e) => {
180 tracing::warn!("Failed to subscribe: {e}, retrying in {backoff:?}");
181 if wait_or_stop(&mut stop_rx, backoff).await {
182 return;
183 }
184 backoff = (backoff * 2).min(MAX_BACKOFF);
185 continue;
186 }
187 };
188
189 let mut stream = match client.get_subscription(subscription_id.clone()).await {
190 Ok(s) => s,
191 Err(e) => {
192 tracing::warn!("Failed to get subscription stream: {e}, retrying in {backoff:?}");
193 if wait_or_stop(&mut stop_rx, backoff).await {
194 return;
195 }
196 backoff = (backoff * 2).min(MAX_BACKOFF);
197 continue;
198 }
199 };
200
201 tracing::info!("VTXO watcher connected");
202 backoff = INITIAL_BACKOFF;
203 let mut subscribed_addrs: HashSet<ArkAddress> =
204 addresses.iter().map(|(addr, _)| *addr).collect();
205 let mut script_map = script_map;
206 let mut renew_interval = tokio::time::interval(Duration::from_secs(60));
207 let mut discovery_interval = tokio::time::interval(KEY_DISCOVERY_INTERVAL);
208 let (work_tx, mut work_rx) = mpsc::channel::<WatcherWork>(128);
209
210 let worker_handle = tokio::spawn({
211 let client = client.clone();
212 let delegator = delegator.clone();
213 async move {
214 let mut seen_unspent_outpoints = HashSet::<OutPoint>::new();
215
216 while let Some(first) = work_rx.recv().await {
217 let (
219 mut pending_vtxos,
220 mut latest_script_map,
221 mut should_renew,
222 mut should_sync,
223 ) = match first {
224 WatcherWork::NewVtxos { vtxos, script_map } => {
225 (vtxos, Some(script_map), true, false)
226 }
227 WatcherWork::RenewTick { script_map } => {
228 (Vec::new(), Some(script_map), true, true)
229 }
230 };
231
232 while let Ok(work) = work_rx.try_recv() {
234 match work {
235 WatcherWork::NewVtxos { vtxos, script_map } => {
236 pending_vtxos.extend(vtxos);
237 latest_script_map = Some(script_map);
238 should_renew = true;
239 }
240 WatcherWork::RenewTick { script_map } => {
241 latest_script_map = Some(script_map);
242 should_renew = true;
243 should_sync = true;
244 }
245 }
246 }
247
248 if let (true, Some(script_map)) = (should_sync, latest_script_map.as_deref()) {
249 match collect_new_delegation_candidates(
250 &client,
251 script_map,
252 &mut seen_unspent_outpoints,
253 )
254 .await
255 {
256 Ok(new_candidates) => {
257 if !new_candidates.is_empty() {
258 tracing::debug!(
259 count = new_candidates.len(),
260 "Found new delegatable VTXOs from failsafe polling"
261 );
262 pending_vtxos.extend(new_candidates);
263 }
264 }
265 Err(e) => {
266 tracing::warn!("Failsafe delegation poll failed: {e}");
267 }
268 }
269 }
270
271 if !pending_vtxos.is_empty() {
272 let mut deduped = Vec::new();
273 let mut seen = HashSet::new();
274 for vtxo in pending_vtxos {
275 if seen.insert(vtxo.outpoint) {
276 deduped.push(vtxo);
277 }
278 }
279
280 tracing::debug!(count = deduped.len(), "Processing VTXOs for delegation");
281 if let Some(script_map) = latest_script_map {
282 delegate_vtxos(&client, &delegator, &deduped, &script_map).await;
283 }
284 }
285
286 if should_renew {
287 renew_expiring_vtxos(&client).await;
288 }
289 }
290 }
291 });
292
293 loop {
294 tokio::select! {
295 _ = stop_rx.changed() => {
296 drop(work_tx);
297 let _ = worker_handle.await;
298 return;
299 }
300 _ = renew_interval.tick() => {
301 if work_tx.send(WatcherWork::RenewTick {
302 script_map: Arc::clone(&script_map),
303 }).await.is_err() {
304 tracing::warn!("VTXO worker channel closed, reconnecting in {backoff:?}");
305 break;
306 }
307 }
308 _ = discovery_interval.tick() => {
309 match refresh_subscription_scripts(
310 client.as_ref(),
311 &subscription_id,
312 &mut subscribed_addrs,
313 )
314 .await
315 {
316 Ok(Some(new_script_map)) => {
317 script_map = new_script_map;
318 }
319 Ok(None) => {}
320 Err(e) => {
321 tracing::warn!("Failed to refresh script subscription: {e}");
322 }
323 }
324 }
325 event = stream.next() => {
326 match event {
327 Some(Ok(SubscriptionResponse::Heartbeat)) => {}
328 Some(Ok(SubscriptionResponse::Event(event))) => {
329 if !event.new_vtxos.is_empty() {
330 tracing::debug!(
331 txid = %event.txid,
332 new_vtxos = event.new_vtxos.len(),
333 "Received subscription event with new VTXOs"
334 );
335
336 if work_tx.send(WatcherWork::NewVtxos {
337 vtxos: event.new_vtxos,
338 script_map: Arc::clone(&script_map),
339 })
340 .await.is_err()
341 {
342 tracing::warn!("VTXO worker channel closed. Reconnecting in {backoff:?}");
343 break;
344 }
345 }
346 }
347 Some(Err(e)) => {
348 tracing::warn!("VTXO subscription error: {e}, reconnecting in {backoff:?}");
349 break;
350 }
351 None => {
352 tracing::debug!("VTXO subscription stream ended, reconnecting in {backoff:?}");
353 break;
354 }
355 }
356 }
357 }
358 }
359
360 drop(work_tx);
361 let _ = worker_handle.await;
362
363 if wait_or_stop(&mut stop_rx, backoff).await {
364 return;
365 }
366 backoff = (backoff * 2).min(MAX_BACKOFF);
367 }
368}
369
370async fn wait_or_stop(stop_rx: &mut watch::Receiver<bool>, duration: Duration) -> bool {
372 tokio::select! {
373 _ = stop_rx.changed() => true,
374 _ = tokio::time::sleep(duration) => false,
375 }
376}
377
378async fn refresh_subscription_scripts<B, W, S, K>(
380 client: &Client<B, W, S, K>,
381 subscription_id: &str,
382 subscribed_addrs: &mut HashSet<ArkAddress>,
383) -> Result<Option<Arc<ScriptMap>>, Error>
384where
385 B: Blockchain + Send + Sync + 'static,
386 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
387 S: SwapStorage + 'static,
388 K: KeyProvider + Send + Sync + 'static,
389{
390 let _discovered = client.discover_keys(KEY_DISCOVERY_GAP_LIMIT).await?;
391
392 let addrs = client.get_offchain_addresses()?;
393 let new_addrs: Vec<_> = addrs
394 .iter()
395 .map(|(addr, _)| *addr)
396 .filter(|addr| !subscribed_addrs.contains(addr))
397 .collect();
398
399 if new_addrs.is_empty() {
400 return Ok(None);
401 }
402
403 client
404 .subscribe_to_scripts(new_addrs.clone(), Some(subscription_id.to_string()))
405 .await?;
406
407 let added = new_addrs.len();
408 subscribed_addrs.extend(new_addrs);
409 tracing::info!(
410 added,
411 "Updated watcher subscription with newly derived addresses"
412 );
413
414 Ok(Some(Arc::new(ScriptMap::from_addresses(&addrs))))
415}
416
417async fn collect_new_delegation_candidates<B, W, S, K>(
421 client: &Client<B, W, S, K>,
422 script_map: &ScriptMap,
423 seen_unspent_outpoints: &mut HashSet<OutPoint>,
424) -> Result<Vec<VirtualTxOutPoint>, Error>
425where
426 B: Blockchain + Send + Sync + 'static,
427 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
428 S: SwapStorage + 'static,
429 K: KeyProvider + Send + Sync + 'static,
430{
431 let (vtxo_list, _) = client.list_vtxos().await?;
432
433 let mut current_outpoints = HashSet::new();
434 let mut newly_seen = Vec::new();
435
436 for vtp in vtxo_list.all_unspent() {
437 let Some(vtxo) = script_map.vtxo_by_script.get(&vtp.script) else {
438 continue;
439 };
440
441 if vtxo.delegator_pk().is_none() {
442 continue;
443 }
444
445 current_outpoints.insert(vtp.outpoint);
446
447 if !seen_unspent_outpoints.contains(&vtp.outpoint) {
448 newly_seen.push(vtp.clone());
449 }
450 }
451
452 *seen_unspent_outpoints = current_outpoints;
453
454 Ok(newly_seen)
455}
456
457struct DelegatorState {
459 cosigner_pk: PublicKey,
460 fee: Amount,
461 fee_address_script: ScriptBuf,
462}
463
464async fn fetch_delegator_state(delegator: &DelegatorClient) -> Result<DelegatorState, Error> {
466 let info = delegator
467 .info()
468 .await
469 .context(Error::ad_hoc("failed to get delegator info"))?;
470
471 let cosigner_pk: PublicKey = info
472 .pubkey
473 .parse::<PublicKey>()
474 .context("failed to parse delegator PK")?;
475
476 let fee = info
477 .fee
478 .parse::<u64>()
479 .map(Amount::from_sat)
480 .context("failed to parse delegator fee")?;
481
482 let fee_address_script = info
483 .delegator_address
484 .parse::<ArkAddress>()
485 .context("failed to parse delegator fee address")?
486 .to_p2tr_script_pubkey();
487
488 Ok(DelegatorState {
489 cosigner_pk,
490 fee,
491 fee_address_script,
492 })
493}
494
495const SECONDS_PER_DAY: i64 = 86_400;
497
498fn day_timestamp(ts: i64) -> i64 {
500 ts - ts.rem_euclid(SECONDS_PER_DAY)
501}
502
503fn group_by_expiry_day<'a>(
508 vtxos: &'a [VirtualTxOutPoint],
509 script_map: &'a ScriptMap,
510 dust: Amount,
511) -> Vec<(i64, Vec<(&'a VirtualTxOutPoint, &'a Vtxo)>)> {
512 let mut groups: BTreeMap<i64, Vec<(&'a VirtualTxOutPoint, &'a Vtxo)>> = BTreeMap::new();
513 let mut recoverable: Vec<(&'a VirtualTxOutPoint, &'a Vtxo)> = Vec::new();
514
515 for vtp in vtxos {
516 if vtp.is_spent {
517 continue;
518 }
519
520 let vtxo = match script_map.vtxo_by_script.get(&vtp.script) {
521 Some(v) => v,
522 None => continue,
523 };
524
525 if vtxo.delegator_pk().is_none() {
526 continue;
527 }
528
529 if vtp.is_recoverable(dust) {
530 recoverable.push((vtp, vtxo));
531 } else if vtp.expires_at > 0 {
532 let day = day_timestamp(vtp.expires_at);
533 groups.entry(day).or_default().push((vtp, vtxo));
534 }
535 }
536
537 if !recoverable.is_empty() {
538 if let Some((&earliest_day, _)) = groups.iter().next() {
539 groups.entry(earliest_day).or_default().extend(recoverable);
540 } else {
541 groups.insert(0, recoverable);
542 }
543 }
544
545 groups.into_iter().collect()
546}
547
548fn calculate_valid_at(group_vtxos: &[(&VirtualTxOutPoint, &Vtxo)], dust: Amount) -> u64 {
556 let now_secs = std::time::SystemTime::now()
557 .duration_since(std::time::UNIX_EPOCH)
558 .unwrap_or_default()
559 .as_secs();
560
561 let earliest_activation = group_vtxos
562 .iter()
563 .filter(|(vtp, _)| {
564 !vtp.is_recoverable(dust)
565 && vtp.created_at > 0
566 && vtp.expires_at > 0
567 && vtp.expires_at > vtp.created_at
568 })
569 .map(|(vtp, _)| {
570 let created_at = vtp.created_at as u64;
571 let lifetime = (vtp.expires_at - vtp.created_at) as u64;
572 created_at + (lifetime * 9 / 10)
573 })
574 .min();
575
576 match earliest_activation {
577 Some(valid_at) if valid_at > now_secs => valid_at,
578 _ => now_secs + 60,
579 }
580}
581
582async fn delegate_vtxos<B, W, S, K>(
587 client: &Arc<Client<B, W, S, K>>,
588 delegator: &DelegatorClient,
589 new_vtxos: &[VirtualTxOutPoint],
590 script_map: &ScriptMap,
591) where
592 B: Blockchain + Send + Sync + 'static,
593 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
594 S: SwapStorage + 'static,
595 K: KeyProvider + Send + Sync + 'static,
596{
597 let affected_addresses = script_map.addresses_for(new_vtxos);
599 if affected_addresses.is_empty() {
600 tracing::debug!("No affected addresses resolved from new VTXOs; skipping delegation");
601 return;
602 }
603
604 let vtxo_list = match client
605 .list_vtxos_for_addresses(affected_addresses.into_iter())
606 .await
607 {
608 Ok(v) => v,
609 Err(e) => {
610 tracing::error!("Failed to list VTXOs for delegation: {e}");
611 return;
612 }
613 };
614
615 let new_outpoints: HashSet<_> = new_vtxos.iter().map(|v| v.outpoint).collect();
618 let enriched: Vec<_> = vtxo_list
619 .all_unspent()
620 .filter(|vtp| new_outpoints.contains(&vtp.outpoint))
621 .cloned()
622 .collect();
623
624 let groups = group_by_expiry_day(&enriched, script_map, client.server_info.dust);
625 if groups.is_empty() {
626 tracing::debug!("No delegate-eligible VTXOs after enrichment/grouping; skipping");
627 return;
628 }
629
630 let delegator_state = match fetch_delegator_state(delegator).await {
631 Ok(s) => Arc::new(s),
632 Err(e) => {
633 tracing::error!("{e}");
634 return;
635 }
636 };
637
638 let (to_address, _) = match client.get_offchain_address() {
639 Ok(v) => v,
640 Err(e) => {
641 tracing::error!("Failed to get offchain address for delegation: {e}");
642 return;
643 }
644 };
645 let dest_script = to_address.to_p2tr_script_pubkey();
646
647 let mut handles = Vec::new();
648
649 for (_day, group_vtxos) in groups {
650 let valid_at = calculate_valid_at(&group_vtxos, client.server_info.dust);
651
652 let mut vtxo_inputs = Vec::new();
653 let mut total_amount = Amount::ZERO;
654
655 for (vtp, vtxo) in &group_vtxos {
656 let spend_info = match vtxo.delegate_spend_info() {
657 Ok(info) => info,
658 Err(e) => {
659 tracing::warn!(outpoint = %vtp.outpoint, "Cannot get delegate spend info: {e}");
660 continue;
661 }
662 };
663
664 vtxo_inputs.push(intent::Input::new(
665 vtp.outpoint,
666 vtxo.exit_delay(),
667 None,
668 TxOut {
669 value: vtp.amount,
670 script_pubkey: vtp.script.clone(),
671 },
672 vtxo.tapscripts(),
673 spend_info,
674 vtp.is_spent,
675 false,
676 vtp.assets.clone(),
677 ));
678
679 total_amount += vtp.amount;
680 }
681
682 if vtxo_inputs.is_empty() {
683 continue;
684 }
685
686 let fee = delegator_state.fee;
687 if fee >= total_amount {
688 tracing::warn!(
689 %total_amount, %fee,
690 "Delegator fee exceeds VTXO group value, skipping"
691 );
692 continue;
693 }
694 let net_amount = total_amount - fee;
695
696 if net_amount < client.server_info.dust {
697 tracing::warn!(%net_amount, "Net amount after fee is below dust, skipping");
698 continue;
699 }
700
701 let mut outputs = Vec::new();
702 if fee > Amount::ZERO {
703 outputs.push(intent::Output::Offchain(TxOut {
704 value: fee,
705 script_pubkey: delegator_state.fee_address_script.clone(),
706 }));
707 }
708 outputs.push(intent::Output::Offchain(TxOut {
709 value: net_amount,
710 script_pubkey: dest_script.clone(),
711 }));
712
713 let server_info_forfeit_addr = client.server_info.forfeit_address.clone();
714 let dust = client.server_info.dust;
715 let ds = Arc::clone(&delegator_state);
716
717 let delegator = delegator.clone();
718 let client = Arc::clone(client);
719 handles.push(tokio::spawn(async move {
720 delegate_group(
721 &client,
722 &delegator,
723 vtxo_inputs,
724 outputs,
725 ds.cosigner_pk,
726 &server_info_forfeit_addr,
727 dust,
728 valid_at,
729 )
730 .await;
731 }));
732 }
733
734 for handle in handles {
735 let _ = handle.await;
736 }
737}
738
739async fn delegate_group<B, W, S, K>(
741 client: &Client<B, W, S, K>,
742 delegator: &DelegatorClient,
743 vtxo_inputs: Vec<intent::Input>,
744 outputs: Vec<intent::Output>,
745 cosigner_pk: PublicKey,
746 forfeit_address: &bitcoin::Address,
747 dust: Amount,
748 valid_at: u64,
749) where
750 B: Blockchain + Send + Sync + 'static,
751 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
752 S: SwapStorage + 'static,
753 K: KeyProvider + Send + Sync + 'static,
754{
755 let input_count = vtxo_inputs.len();
756
757 let mut delegate = match ark_core::batch::prepare_delegate_psbts_at(
758 vtxo_inputs,
759 outputs,
760 cosigner_pk,
761 forfeit_address,
762 dust,
763 Some(valid_at),
764 ) {
765 Ok(d) => d,
766 Err(e) => {
767 tracing::error!("Failed to prepare delegate PSBTs: {e}");
768 return;
769 }
770 };
771
772 if let Err(e) =
773 client.sign_delegate_psbts(&mut delegate.intent.proof, &mut delegate.forfeit_psbts)
774 {
775 tracing::error!("Failed to sign delegate PSBTs: {e}");
776 return;
777 }
778
779 if let Err(e) = delegator
780 .delegate(&delegate.intent, &delegate.forfeit_psbts, None)
781 .await
782 {
783 tracing::error!("Failed to submit delegation: {e}");
784 return;
785 }
786
787 tracing::info!(
788 vtxo_count = input_count,
789 valid_at,
790 "Delegated VTXO group to delegator service"
791 );
792}
793
794const SELF_RENEW_REMAINING_FRACTION: f64 = 0.10;
796
797async fn renew_expiring_vtxos<B, W, S, K>(client: &Client<B, W, S, K>)
802where
803 B: Blockchain + Send + Sync + 'static,
804 W: BoardingWallet + OnchainWallet + Send + Sync + 'static,
805 S: SwapStorage + 'static,
806 K: KeyProvider + Send + Sync + 'static,
807{
808 let (vtxo_list, _) = match client.list_vtxos().await {
809 Ok(v) => v,
810 Err(e) => {
811 tracing::warn!("Failed to list VTXOs for renewal check: {e}");
812 return;
813 }
814 };
815
816 let now = std::time::SystemTime::now()
817 .duration_since(std::time::UNIX_EPOCH)
818 .unwrap_or_default()
819 .as_secs() as i64;
820
821 let expiring_outpoints: Vec<OutPoint> = vtxo_list
822 .all_unspent()
823 .filter(|vtp| {
824 if vtp.expires_at <= 0 || vtp.created_at <= 0 {
825 return false;
826 }
827 let total_lifetime = vtp.expires_at - vtp.created_at;
828 let remaining = vtp.expires_at - now;
829 remaining > 0
830 && (remaining as f64) < (total_lifetime as f64 * SELF_RENEW_REMAINING_FRACTION)
831 })
832 .map(|vtp| vtp.outpoint)
833 .collect();
834
835 if expiring_outpoints.is_empty() {
836 return;
837 }
838
839 tracing::info!(
840 count = expiring_outpoints.len(),
841 "Self-renewing expiring VTXOs"
842 );
843
844 let mut rng = OsRng;
845 match client
846 .settle_vtxos(&mut rng, &expiring_outpoints, &[])
847 .await
848 {
849 Ok(Some(txid)) => {
850 tracing::info!(%txid, "Self-renewed expiring VTXOs");
851 }
852 Ok(None) => {}
853 Err(e) => {
854 tracing::warn!("Failed to self-renew VTXOs: {e}");
855 }
856 }
857}
858
859#[cfg(test)]
860mod tests {
861 use super::*;
862 use bitcoin::hashes::Hash;
863 use bitcoin::key::Secp256k1;
864 use bitcoin::Network;
865 use bitcoin::Sequence;
866 use bitcoin::Txid;
867 use bitcoin::XOnlyPublicKey;
868 use std::str::FromStr;
869
870 fn test_keys() -> (XOnlyPublicKey, XOnlyPublicKey, XOnlyPublicKey) {
871 let server = XOnlyPublicKey::from_str(
872 "18845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
873 )
874 .unwrap();
875 let owner = XOnlyPublicKey::from_str(
876 "28845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
877 )
878 .unwrap();
879 let delegator = XOnlyPublicKey::from_str(
880 "38845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",
881 )
882 .unwrap();
883 (server, owner, delegator)
884 }
885
886 fn delegated_vtxo() -> (ArkAddress, Vtxo) {
887 let secp = Secp256k1::new();
888 let (server, owner, delegator) = test_keys();
889 let vtxo = Vtxo::new_with_delegator(
890 &secp,
891 server,
892 owner,
893 delegator,
894 Sequence::from_seconds_ceil(86400).unwrap(),
895 Network::Regtest,
896 )
897 .unwrap();
898 (vtxo.to_ark_address(), vtxo)
899 }
900
901 fn mk_vtp(script: ScriptBuf, amount_sat: u64, expires_at: i64, vout: u32) -> VirtualTxOutPoint {
902 VirtualTxOutPoint {
903 outpoint: OutPoint::new(Txid::all_zeros(), vout),
904 created_at: expires_at - 1000,
905 expires_at,
906 amount: Amount::from_sat(amount_sat),
907 script,
908 is_preconfirmed: false,
909 is_swept: false,
910 is_unrolled: false,
911 is_spent: false,
912 spent_by: None,
913 commitment_txids: vec![],
914 settled_by: None,
915 ark_txid: None,
916 assets: vec![],
917 }
918 }
919
920 #[test]
921 fn day_timestamp_normalizes_to_midnight() {
922 let ts = 1705322700; let day = day_timestamp(ts);
924 assert_eq!(day % SECONDS_PER_DAY, 0);
925 assert!(day <= ts);
926 assert!(ts - day < SECONDS_PER_DAY);
927 }
928
929 #[test]
930 fn day_timestamp_already_midnight() {
931 let ts = SECONDS_PER_DAY * 19738;
932 assert_eq!(day_timestamp(ts), ts);
933 }
934
935 #[test]
936 fn group_by_expiry_day_merges_recoverable_into_earliest_group() {
937 let (addr, vtxo) = delegated_vtxo();
938 let script = addr.to_p2tr_script_pubkey();
939 let script_map = ScriptMap::from_addresses(&[(addr, vtxo)]);
940
941 let now = std::time::SystemTime::now()
942 .duration_since(std::time::UNIX_EPOCH)
943 .unwrap()
944 .as_secs() as i64;
945 let day1_midnight = day_timestamp(now) + SECONDS_PER_DAY;
946 let day2_midnight = day1_midnight + SECONDS_PER_DAY;
947
948 let recoverable = mk_vtp(script.clone(), 100, day1_midnight + 500, 0); let non_recoverable_day1 = mk_vtp(script.clone(), 10_000, day1_midnight + 800, 1);
950 let non_recoverable_day2 = mk_vtp(script, 10_000, day2_midnight + 800, 2);
951
952 let vtxos = [non_recoverable_day2, recoverable, non_recoverable_day1];
953 let groups = group_by_expiry_day(&vtxos, &script_map, Amount::from_sat(500));
954
955 assert_eq!(groups.len(), 2);
956 assert_eq!(groups[0].0, day_timestamp(day1_midnight + 800));
957 assert_eq!(groups[1].0, day_timestamp(day2_midnight + 800));
958 assert_eq!(groups[0].1.len(), 2);
959 assert_eq!(groups[1].1.len(), 1);
960 }
961
962 #[test]
963 fn calculate_valid_at_for_non_recoverable_group_is_before_expiry() {
964 let (_addr, vtxo) = delegated_vtxo();
965 let script = ScriptBuf::new();
966
967 let now = std::time::SystemTime::now()
968 .duration_since(std::time::UNIX_EPOCH)
969 .unwrap()
970 .as_secs() as i64;
971
972 let later = mk_vtp(script, 10_000, now + 10_000, 1);
973 let group = vec![(&later, &vtxo)];
974
975 let valid_at = calculate_valid_at(&group, Amount::from_sat(500));
976
977 assert!(valid_at > now as u64);
978 assert!(valid_at < later.expires_at as u64);
979 }
980
981 #[test]
982 fn calculate_valid_at_for_recoverable_only_group_is_soon() {
983 let (_addr, vtxo) = delegated_vtxo();
984 let script = ScriptBuf::new();
985
986 let now = std::time::SystemTime::now()
987 .duration_since(std::time::UNIX_EPOCH)
988 .unwrap()
989 .as_secs() as i64;
990
991 let recoverable = mk_vtp(script, 100, now + 5_000, 0); let group = vec![(&recoverable, &vtxo)];
993
994 let start = std::time::SystemTime::now()
995 .duration_since(std::time::UNIX_EPOCH)
996 .unwrap()
997 .as_secs();
998 let valid_at = calculate_valid_at(&group, Amount::from_sat(500));
999 let end = std::time::SystemTime::now()
1000 .duration_since(std::time::UNIX_EPOCH)
1001 .unwrap()
1002 .as_secs();
1003
1004 assert!(valid_at >= start + 60);
1005 assert!(valid_at <= end + 61);
1006 }
1007}