Skip to main content

ark_client/
batch.rs

1use crate::error::ErrorContext as _;
2use crate::swap_storage::SwapStorage;
3use crate::utils::sleep;
4use crate::utils::timeout_op;
5use crate::wallet::BoardingWallet;
6use crate::wallet::OnchainWallet;
7use crate::Blockchain;
8use crate::Client;
9use crate::Error;
10use ark_core::batch;
11use ark_core::batch::aggregate_nonces;
12use ark_core::batch::complete_delegate_forfeit_txs;
13use ark_core::batch::create_and_sign_forfeit_txs;
14use ark_core::batch::create_asset_preservation_packet;
15use ark_core::batch::generate_nonce_tree;
16use ark_core::batch::sign_batch_tree_tx;
17use ark_core::batch::sign_commitment_psbt;
18use ark_core::batch::Delegate;
19use ark_core::batch::NonceKps;
20use ark_core::intent;
21use ark_core::script::extract_checksig_pubkeys;
22use ark_core::server::BatchTreeEventType;
23use ark_core::server::PartialSigTree;
24use ark_core::server::StreamEvent;
25use ark_core::ArkAddress;
26use ark_core::ArkNote;
27use ark_core::ExplorerUtxo;
28use ark_core::TxGraph;
29use backon::ExponentialBuilder;
30use backon::Retryable;
31use bitcoin::hashes::sha256;
32use bitcoin::hashes::Hash;
33use bitcoin::hex::DisplayHex;
34use bitcoin::key::Keypair;
35use bitcoin::key::Secp256k1;
36use bitcoin::psbt;
37use bitcoin::secp256k1;
38use bitcoin::secp256k1::schnorr;
39use bitcoin::secp256k1::PublicKey;
40use bitcoin::Address;
41use bitcoin::Amount;
42use bitcoin::OutPoint;
43use bitcoin::Psbt;
44use bitcoin::TxOut;
45use bitcoin::Txid;
46use bitcoin::XOnlyPublicKey;
47use futures::StreamExt;
48use rand::CryptoRng;
49use rand::Rng;
50use std::collections::HashMap;
51use std::collections::HashSet;
52
53impl<B, W, S, K> Client<B, W, S, K>
54where
55    B: Blockchain,
56    W: BoardingWallet + OnchainWallet,
57    S: SwapStorage + 'static,
58    K: crate::KeyProvider,
59{
60    /// Settle _all_ prior VTXOs and boarding outputs into the next batch, generating new confirmed
61    /// VTXOs.
62    ///
63    /// Most callers should prefer [`Self::settle`], which only renews VTXOs that have actually
64    /// expired. Settling unexpired VTXOs is rarely necessary.
65    pub async fn settle_all<R>(&self, rng: &mut R) -> Result<Option<Txid>, Error>
66    where
67        R: Rng + CryptoRng + Clone,
68    {
69        self.settle_at(crate::utils::unix_now()?, rng).await
70    }
71
72    pub(crate) async fn settle_at<R>(&self, now: i64, rng: &mut R) -> Result<Option<Txid>, Error>
73    where
74        R: Rng + CryptoRng + Clone,
75    {
76        // Get off-chain address and send all funds to this address, no change output 🦄
77        let (to_address, _) = self.get_offchain_address()?;
78
79        let (boarding_inputs, vtxo_inputs, total_amount) =
80            self.fetch_commitment_transaction_inputs(now).await?;
81
82        tracing::debug!(
83            offchain_adress = %to_address.encode(),
84            ?boarding_inputs,
85            ?vtxo_inputs,
86            "Attempting to settle outputs"
87        );
88
89        if boarding_inputs.is_empty() && vtxo_inputs.is_empty() {
90            tracing::debug!("No inputs to board with");
91            return Ok(None);
92        }
93
94        let join_next_batch = || async {
95            self.join_next_batch(
96                &mut rng.clone(),
97                boarding_inputs.clone(),
98                vtxo_inputs.clone(),
99                BatchOutputType::Board {
100                    to_address,
101                    to_amount: total_amount,
102                },
103            )
104            .await
105        };
106
107        // Joining a batch can fail depending on the timing, so we try a few times.
108        let commitment_txid = join_next_batch
109            .retry(ExponentialBuilder::default().with_max_times(0))
110            .sleep(sleep)
111            // TODO: Use `when` to only retry certain errors.
112            .notify(|err: &Error, dur: std::time::Duration| {
113                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}",);
114            })
115            .await
116            .context("Failed to join batch")?;
117
118        tracing::info!(%commitment_txid, "Settlement success");
119
120        Ok(Some(commitment_txid))
121    }
122
123    /// Settle prior VTXOs that have expired or are recoverable, together with all available
124    /// boarding outputs, into the next batch, generating new confirmed VTXOs.
125    ///
126    /// Healthy (unexpired) VTXOs are left untouched. This is the path callers typically want when
127    /// periodically renewing their wallet: healthy VTXOs do not need to be touched, and including
128    /// them would only inflate batch fees. Boarding outputs are always included because callers
129    /// generally want freshly funded coins to enter the Ark.
130    ///
131    /// NOTE: sub-dust recoverable VTXOs can only be rescued when their combined value exceeds the
132    /// server's dust threshold; otherwise the batch protocol rejects the settlement with a
133    /// `cannot settle into sub-dust VTXO` error. When the wallet holds isolated sub-dust amounts,
134    /// fall back to [`Self::settle_all`], which can roll them in alongside healthy VTXOs that
135    /// act as carrier value.
136    pub async fn settle<R>(&self, rng: &mut R) -> Result<Option<Txid>, Error>
137    where
138        R: Rng + CryptoRng + Clone,
139    {
140        let (vtxo_list, _) = self.list_vtxos().await?;
141        let vtxo_outpoints: Vec<OutPoint> = vtxo_list.recoverable().map(|v| v.outpoint).collect();
142
143        let (boarding_inputs, _, _) = self
144            .fetch_commitment_transaction_inputs(crate::utils::unix_now()?)
145            .await?;
146        let boarding_outpoints: Vec<OutPoint> =
147            boarding_inputs.iter().map(|i| i.outpoint()).collect();
148
149        if vtxo_outpoints.is_empty() && boarding_outpoints.is_empty() {
150            tracing::debug!("No expired/recoverable VTXOs or boarding outputs to settle");
151            return Ok(None);
152        }
153
154        tracing::debug!(
155            num_vtxos = vtxo_outpoints.len(),
156            num_boarding = boarding_outpoints.len(),
157            "Attempting to settle expired/recoverable VTXOs and boarding outputs"
158        );
159
160        self.settle_vtxos(rng, &vtxo_outpoints, &boarding_outpoints)
161            .await
162    }
163
164    /// Settle _all_ prior VTXOs, boarding outputs, and the provided ArkNotes into the next batch.
165    ///
166    /// ArkNotes are bearer tokens that can be redeemed by revealing their preimage.
167    /// This method combines them with regular VTXOs and boarding outputs into a single
168    /// settlement transaction.
169    pub async fn settle_with_notes<R>(
170        &self,
171        rng: &mut R,
172        notes: Vec<ArkNote>,
173    ) -> Result<Option<Txid>, Error>
174    where
175        R: Rng + CryptoRng + Clone,
176    {
177        let (to_address, _) = self.get_offchain_address()?;
178
179        let (boarding_inputs, vtxo_inputs, mut total_amount) = self
180            .fetch_commitment_transaction_inputs(crate::utils::unix_now()?)
181            .await?;
182
183        // Convert arknotes to intent inputs and add their value to total
184        let note_inputs: Vec<intent::Input> = notes
185            .iter()
186            .map(|note| {
187                total_amount += note.value();
188                note.to_intent_input()
189            })
190            .collect::<Result<Vec<_>, _>>()?;
191
192        // Combine VTXO inputs with note inputs
193        let all_vtxo_inputs: Vec<intent::Input> =
194            vtxo_inputs.into_iter().chain(note_inputs).collect();
195
196        tracing::debug!(
197            offchain_address = %to_address.encode(),
198            ?boarding_inputs,
199            num_vtxo_inputs = all_vtxo_inputs.len(),
200            num_notes = notes.len(),
201            %total_amount,
202            "Attempting to settle outputs with notes"
203        );
204
205        if boarding_inputs.is_empty() && all_vtxo_inputs.is_empty() {
206            tracing::debug!("No inputs to settle");
207            return Ok(None);
208        }
209
210        let join_next_batch = || async {
211            self.join_next_batch(
212                &mut rng.clone(),
213                boarding_inputs.clone(),
214                all_vtxo_inputs.clone(),
215                BatchOutputType::Board {
216                    to_address,
217                    to_amount: total_amount,
218                },
219            )
220            .await
221        };
222
223        let commitment_txid = join_next_batch
224            .retry(ExponentialBuilder::default().with_max_times(0))
225            .sleep(sleep)
226            .notify(|err: &Error, dur: std::time::Duration| {
227                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
228            })
229            .await
230            .context("Failed to join batch")?;
231
232        tracing::info!(%commitment_txid, num_notes = notes.len(), "Settlement with notes success");
233
234        Ok(Some(commitment_txid))
235    }
236
237    /// Settle specific VTXOs and boarding outputs by outpoint into the next batch, generating new
238    /// confirmed VTXOs.
239    ///
240    /// Unlike [`Self::settle`], this method allows the caller to specify exactly which VTXOs and
241    /// boarding outputs to settle by providing their outpoints.
242    pub async fn settle_vtxos<R>(
243        &self,
244        rng: &mut R,
245        vtxo_outpoints: &[OutPoint],
246        boarding_outpoints: &[OutPoint],
247    ) -> Result<Option<Txid>, Error>
248    where
249        R: Rng + CryptoRng + Clone,
250    {
251        // Get off-chain address and send all funds to this address, no change output.
252        let (to_address, _) = self.get_offchain_address()?;
253
254        let (all_boarding_inputs, all_vtxo_inputs, _) = self
255            .fetch_commitment_transaction_inputs(crate::utils::unix_now()?)
256            .await?;
257
258        // Filter boarding inputs to only those specified.
259        let boarding_inputs: Vec<_> = all_boarding_inputs
260            .into_iter()
261            .filter(|input| boarding_outpoints.contains(&input.outpoint()))
262            .collect();
263
264        // Filter VTXO inputs to only those specified.
265        let vtxo_inputs: Vec<_> = all_vtxo_inputs
266            .into_iter()
267            .filter(|input| vtxo_outpoints.contains(&input.outpoint()))
268            .collect();
269
270        // Recalculate total amount from filtered inputs.
271        let total_amount = boarding_inputs
272            .iter()
273            .map(|i| i.amount())
274            .chain(vtxo_inputs.iter().map(|i| i.amount()))
275            .fold(Amount::ZERO, |acc, a| acc + a);
276
277        tracing::debug!(
278            offchain_address = %to_address.encode(),
279            ?boarding_inputs,
280            ?vtxo_inputs,
281            %total_amount,
282            "Attempting to settle specific outputs"
283        );
284
285        if boarding_inputs.is_empty() && vtxo_inputs.is_empty() {
286            tracing::debug!("No matching inputs to settle");
287            return Ok(None);
288        }
289
290        let join_next_batch = || async {
291            self.join_next_batch(
292                &mut rng.clone(),
293                boarding_inputs.clone(),
294                vtxo_inputs.clone(),
295                BatchOutputType::Board {
296                    to_address,
297                    to_amount: total_amount,
298                },
299            )
300            .await
301        };
302
303        // Joining a batch can fail depending on the timing, so we try a few times.
304        let commitment_txid = join_next_batch
305            .retry(ExponentialBuilder::default().with_max_times(0))
306            .sleep(sleep)
307            .notify(|err: &Error, dur: std::time::Duration| {
308                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}",);
309            })
310            .await
311            .context("Failed to join batch")?;
312
313        tracing::info!(%commitment_txid, "Settlement of specific VTXOs success");
314
315        Ok(Some(commitment_txid))
316    }
317
318    /// Settle _some_ prior VTXOs and boarding outputs into the next batch, generating UTXOs as
319    /// outputs to a new commitment transaction.
320    pub async fn collaborative_redeem<R>(
321        &self,
322        rng: &mut R,
323        to_address: Address,
324        to_amount: Amount,
325    ) -> Result<Txid, Error>
326    where
327        R: Rng + CryptoRng + Clone,
328    {
329        let (change_address, _) = self.get_offchain_address()?;
330
331        let (boarding_inputs, vtxo_inputs, total_amount) = self
332            .fetch_commitment_transaction_inputs(crate::utils::unix_now()?)
333            .await?;
334
335        let onchain_fee = self.eval_onchain_output_fee(ark_fees::Output {
336            amount: to_amount.to_sat(),
337            script: to_address.script_pubkey().to_string(),
338        })?;
339
340        // Fee comes out of change, not the send amount.
341        let change_amount = total_amount
342            .checked_sub(to_amount)
343            .and_then(|a| a.checked_sub(onchain_fee))
344            .ok_or_else(|| {
345                Error::coin_select(format!(
346                    "insufficient balance: {total_amount} < {to_amount} (send) + {onchain_fee} (fee)"
347                ))
348            })?;
349
350        tracing::info!(
351            %to_address,
352            send_amount = %to_amount,
353            fee = %onchain_fee,
354            change_address = %change_address.encode(),
355            %change_amount,
356            ?boarding_inputs,
357            "Attempting to collaboratively redeem outputs"
358        );
359
360        let join_next_batch = || async {
361            self.join_next_batch(
362                &mut rng.clone(),
363                boarding_inputs.clone(),
364                vtxo_inputs.clone(),
365                BatchOutputType::OffBoard {
366                    to_address: to_address.clone(),
367                    to_amount,
368                    change_address,
369                    change_amount,
370                },
371            )
372            .await
373        };
374
375        // Joining a batch can fail depending on the timing, so we try a few times.
376        let commitment_txid = join_next_batch
377            .retry(ExponentialBuilder::default().with_max_times(3))
378            .sleep(sleep)
379            // TODO: Use `when` to only retry certain errors.
380            .notify(|err: &Error, dur: std::time::Duration| {
381                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
382            })
383            .await
384            .context("Failed to join batch")?;
385
386        tracing::info!(%commitment_txid, "Collaborative redeem success");
387
388        Ok(commitment_txid)
389    }
390
391    /// Settle a selection of VTXOs into the next batch, generating UTXOs as
392    /// outputs to a new commitment transaction.
393    pub async fn collaborative_redeem_vtxo_selection<R>(
394        &self,
395        rng: &mut R,
396        input_vtxos: impl Iterator<Item = OutPoint> + Clone,
397        to_address: Address,
398        to_amount: Amount,
399    ) -> Result<Txid, Error>
400    where
401        R: Rng + CryptoRng + Clone,
402    {
403        let (change_address, _) = self.get_offchain_address()?;
404
405        let vtxo_inputs = self
406            .selected_batch_settleable_vtxo_inputs(input_vtxos)
407            .await?;
408
409        if vtxo_inputs.is_empty() {
410            return Err(Error::ad_hoc("no matching VTXO outpoints found"));
411        }
412
413        // Check that total amount is sufficient
414        let total_input_amount = vtxo_inputs
415            .iter()
416            .fold(Amount::ZERO, |acc, vtxo| acc + vtxo.amount());
417
418        let onchain_fee = self.eval_onchain_output_fee(ark_fees::Output {
419            amount: to_amount.to_sat(),
420            script: to_address.script_pubkey().to_string(),
421        })?;
422
423        // Fee comes out of change, not the send amount.
424        let change_amount = total_input_amount
425            .checked_sub(to_amount)
426            .and_then(|a| a.checked_sub(onchain_fee))
427            .ok_or_else(|| {
428                Error::coin_select(format!(
429                    "insufficient VTXO amount: {total_input_amount} < {to_amount} (send) + {onchain_fee} (fee)"
430                ))
431            })?;
432
433        tracing::info!(
434            %to_address,
435            send_amount = %to_amount,
436            fee = %onchain_fee,
437            change_address = %change_address.encode(),
438            %change_amount,
439            "Attempting to collaboratively redeem outputs"
440        );
441
442        let join_next_batch = || async {
443            self.join_next_batch(
444                &mut rng.clone(),
445                Vec::new(),
446                vtxo_inputs.clone(),
447                BatchOutputType::OffBoard {
448                    to_address: to_address.clone(),
449                    to_amount,
450                    change_address,
451                    change_amount,
452                },
453            )
454            .await
455        };
456
457        // Joining a batch can fail depending on the timing, so we try a few times.
458        let commitment_txid = join_next_batch
459            .retry(ExponentialBuilder::default().with_max_times(3))
460            .sleep(sleep)
461            // TODO: Use `when` to only retry certain errors.
462            .notify(|err: &Error, dur: std::time::Duration| {
463                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
464            })
465            .await
466            .context("Failed to join batch")?;
467
468        tracing::info!(%commitment_txid, "Collaborative redeem success");
469
470        Ok(commitment_txid)
471    }
472
473    pub(crate) async fn selected_batch_settleable_vtxo_inputs(
474        &self,
475        input_vtxos: impl IntoIterator<Item = OutPoint>,
476    ) -> Result<Vec<intent::Input>, Error> {
477        let requested: HashSet<OutPoint> = input_vtxos.into_iter().collect();
478
479        let (vtxo_list, script_pubkey_to_vtxo_map) =
480            self.list_vtxos().await.context("failed to get VTXO list")?;
481        let server_info = self.server_info()?;
482        let now = crate::utils::unix_now()?;
483
484        let matching_unspent = vtxo_list
485            .all_unspent()
486            .filter(|v| requested.contains(&v.outpoint))
487            .collect::<Vec<_>>();
488
489        let settleable_outpoints = vtxo_list
490            .batch_settleable_at(&server_info, now, |script| {
491                script_pubkey_to_vtxo_map
492                    .get(script)
493                    .map(|vtxo| vtxo.server_pk())
494            })
495            .filter(|v| requested.contains(&v.outpoint))
496            .map(|v| v.outpoint)
497            .collect::<HashSet<_>>();
498
499        let blocked = matching_unspent
500            .iter()
501            .filter(|v| !settleable_outpoints.contains(&v.outpoint))
502            .map(|v| v.outpoint.to_string())
503            .collect::<Vec<_>>();
504        if !blocked.is_empty() {
505            return Err(Error::ad_hoc(format!(
506                "selected VTXO outpoints are not batch-settleable because their signer cutoff has passed: {}",
507                blocked.join(", ")
508            )));
509        }
510
511        matching_unspent
512            .into_iter()
513            .filter(|v| settleable_outpoints.contains(&v.outpoint))
514            .map(|v| {
515                let vtxo = script_pubkey_to_vtxo_map.get(&v.script).ok_or_else(|| {
516                    ark_core::Error::ad_hoc(format!("missing VTXO for script pubkey: {}", v.script))
517                })?;
518                let spend_info = vtxo.forfeit_spend_info()?;
519
520                Ok(intent::Input::new(
521                    v.outpoint,
522                    vtxo.exit_delay(),
523                    // NOTE: This only works with default VTXOs (single-sig).
524                    None,
525                    TxOut {
526                        value: v.amount,
527                        script_pubkey: vtxo.script_pubkey(),
528                    },
529                    vtxo.tapscripts(),
530                    spend_info,
531                    false,
532                    v.is_swept,
533                    v.assets.clone(),
534                ))
535            })
536            .collect::<Result<Vec<_>, Error>>()
537    }
538
539    /// Generate a delegate for settling VTXOs on behalf of the owner.
540    ///
541    /// The owner pre-signs the intent and forfeit transactions, allowing another party to complete
542    /// the settlement at a later time using the provided `delegate_cosigner_pk`.
543    ///
544    /// # Arguments
545    ///
546    /// * `delegate_cosigner_pk` - The cosigner public key that the delegate will use
547    /// * `select_recoverable_vtxos` - Whether to include recoverable VTXOs
548    ///
549    /// # Returns
550    ///
551    /// A [`Delegate`] struct containing all the pre-signed data needed for settlement.
552    pub async fn generate_delegate(
553        &self,
554        delegate_cosigner_pk: PublicKey,
555    ) -> Result<Delegate, Error> {
556        // Get off-chain address and send all funds to this address.
557        let (to_address, _) = self.get_offchain_address()?;
558
559        // Simply collect all VTXOs that can be settled.
560        let (_, vtxo_inputs, _) = self
561            .fetch_commitment_transaction_inputs(crate::utils::unix_now()?)
562            .await?;
563
564        let total_amount = vtxo_inputs
565            .iter()
566            .fold(Amount::ZERO, |acc, v| acc + v.amount());
567
568        if vtxo_inputs.is_empty() {
569            return Err(Error::ad_hoc("no inputs to settle via delegate"));
570        }
571
572        let server_info = &self.server_info()?;
573
574        let mut outputs = vec![intent::Output::Offchain(TxOut {
575            value: total_amount,
576            script_pubkey: to_address.to_p2tr_script_pubkey(),
577        })];
578
579        if let Some(packet) = create_asset_preservation_packet(&vtxo_inputs, &outputs)? {
580            outputs.push(intent::Output::AssetPacket(packet.to_txout()));
581        }
582
583        let delegate = batch::prepare_delegate_psbts(
584            vtxo_inputs,
585            outputs,
586            delegate_cosigner_pk,
587            &server_info.forfeit_address,
588            server_info.dust,
589        )?;
590
591        Ok(delegate)
592    }
593
594    /// Sign a set of delegate PSBTs, including the intent PSBT and the forfeit PSBTs.
595    pub fn sign_delegate_psbts(
596        &self,
597        intent_psbt: &mut Psbt,
598        forfeit_psbts: &mut [Psbt],
599    ) -> Result<(), Error> {
600        let sign_fn =
601            |input: &mut psbt::Input,
602             msg: secp256k1::Message|
603             -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error> {
604                match &input.witness_script {
605                    None => Err(ark_core::Error::ad_hoc(
606                        "Missing witness script for psbt::Input",
607                    )),
608                    Some(script) => {
609                        let mut res = vec![];
610                        let pks = extract_checksig_pubkeys(script);
611                        for pk in pks {
612                            if let Ok(keypair) = self.keypair_by_pk(&pk) {
613                                let sig = Secp256k1::new().sign_schnorr_no_aux_rand(&msg, &keypair);
614                                let pk = keypair.x_only_public_key().0;
615                                res.push((sig, pk));
616                            }
617                        }
618                        Ok(res)
619                    }
620                }
621            };
622
623        batch::sign_delegate_psbts(sign_fn, intent_psbt, forfeit_psbts)?;
624
625        Ok(())
626    }
627
628    /// Settle a delegate by completing the batch protocol using pre-signed data.
629    ///
630    /// This method allows Bob to settle Alice's VTXOs using the pre-signed intent and forfeit
631    /// transactions from the [`Delegate`] struct.
632    ///
633    /// # Arguments
634    ///
635    /// * `rng` - Random number generator for nonce generation
636    /// * `delegate` - The delegate struct containing pre-signed data
637    /// * `own_cosigner_kp` - Bob's cosigner keypair (must match the delegate_cosigner_pk)
638    ///
639    /// # Returns
640    ///
641    /// The commitment transaction ID if successful.
642    pub async fn settle_delegate<R>(
643        &self,
644        rng: &mut R,
645        delegate: Delegate,
646        own_cosigner_kp: Keypair,
647    ) -> Result<Txid, Error>
648    where
649        R: Rng + CryptoRng,
650    {
651        // Verify the cosigner key matches
652        if own_cosigner_kp.public_key() != delegate.delegate_cosigner_pk {
653            return Err(Error::ad_hoc(
654                "provided cosigner keypair does not match delegate_cosigner_pk",
655            ));
656        }
657
658        // Register the pre-signed intent
659        let intent_id = timeout_op(
660            self.inner.timeout,
661            self.network_client()
662                .register_intent(delegate.intent.clone()),
663        )
664        .await
665        .context("failed to register delegated intent")??;
666
667        tracing::debug!(intent_id, "Registered delegated intent");
668
669        let network_client = self.network_client();
670        let server_info = &self.server_info()?;
671
672        #[derive(Debug, PartialEq, Eq)]
673        enum Step {
674            Start,
675            BatchStarted,
676            BatchSigningStarted,
677            Finalized,
678        }
679
680        impl Step {
681            fn next(&self) -> Step {
682                match self {
683                    Step::Start => Step::BatchStarted,
684                    Step::BatchStarted => Step::BatchSigningStarted,
685                    Step::BatchSigningStarted => Step::Finalized,
686                    Step::Finalized => Step::Finalized,
687                }
688            }
689        }
690
691        let mut step = Step::Start;
692
693        let own_cosigner_kps = [own_cosigner_kp];
694        let own_cosigner_pks = own_cosigner_kps
695            .iter()
696            .map(|k| k.public_key())
697            .collect::<Vec<_>>();
698
699        let mut batch_id: Option<String> = None;
700
701        let vtxo_input_outpoints = delegate
702            .forfeit_psbts
703            .iter()
704            .map(|psbt| psbt.unsigned_tx.input[0].previous_output)
705            .collect::<Vec<_>>();
706
707        let topics = vtxo_input_outpoints
708            .iter()
709            .map(ToString::to_string)
710            .chain(
711                own_cosigner_pks
712                    .iter()
713                    .map(|pk| pk.serialize().to_lower_hex_string()),
714            )
715            .collect();
716
717        let mut stream = network_client.get_event_stream(topics).await?;
718
719        let (ark_forfeit_pk, _) = server_info.forfeit_pk.x_only_public_key();
720
721        let mut unsigned_commitment_tx = None;
722
723        let mut vtxo_batch_tree_graph_chunks = Some(Vec::new());
724        let mut vtxo_batch_tree_graph: Option<TxGraph> = None;
725
726        let mut connectors_graph_chunks = Some(Vec::new());
727        let mut batch_expiry = None;
728
729        let mut agg_nonce_pks = HashMap::new();
730
731        let mut our_nonce_trees: Option<HashMap<Keypair, NonceKps>> = None;
732
733        loop {
734            match timeout_op(self.inner.timeout, stream.next())
735                .await
736                .context("timed out waiting for batch event")?
737            {
738                Some(Ok(event)) => match event {
739                    StreamEvent::BatchStarted(e) => {
740                        if step != Step::Start {
741                            continue;
742                        }
743
744                        let hash = sha256::Hash::hash(intent_id.as_bytes());
745                        let hash = hash.as_byte_array().to_vec().to_lower_hex_string();
746
747                        if e.intent_id_hashes.iter().any(|h| h == &hash) {
748                            timeout_op(
749                                self.inner.timeout,
750                                self.network_client()
751                                    .confirm_registration(intent_id.clone()),
752                            )
753                            .await
754                            .context("failed to confirm intent registration")??;
755
756                            tracing::info!(batch_id = e.id, intent_id, "Intent ID found for batch");
757
758                            batch_id = Some(e.id);
759
760                            step = Step::BatchStarted;
761
762                            batch_expiry = Some(e.batch_expiry);
763                        } else {
764                            tracing::debug!(
765                                batch_id = e.id,
766                                intent_id,
767                                "Intent ID not found for batch"
768                            );
769                        }
770                    }
771                    StreamEvent::TreeTx(e) => {
772                        if step != Step::BatchStarted && step != Step::BatchSigningStarted {
773                            continue;
774                        }
775
776                        match e.batch_tree_event_type {
777                            BatchTreeEventType::Vtxo => {
778                                match &mut vtxo_batch_tree_graph_chunks {
779                                    Some(vtxo_batch_tree_graph_chunks) => {
780                                        tracing::debug!("Got new VTXO batch-tree graph chunk");
781
782                                        vtxo_batch_tree_graph_chunks.push(e.tx_graph_chunk)
783                                    }
784                                    None => {
785                                        return Err(Error::ark_server(
786                                            "received unexpected VTXO batch-tree graph chunk",
787                                        ));
788                                    }
789                                };
790                            }
791                            BatchTreeEventType::Connector => {
792                                match connectors_graph_chunks {
793                                    Some(ref mut connectors_graph_chunks) => {
794                                        tracing::debug!("Got new connectors graph chunk");
795
796                                        connectors_graph_chunks.push(e.tx_graph_chunk)
797                                    }
798                                    None => {
799                                        return Err(Error::ark_server(
800                                            "received unexpected connectors graph chunk",
801                                        ));
802                                    }
803                                };
804                            }
805                        }
806                    }
807                    StreamEvent::TreeSignature(e) => {
808                        if step != Step::BatchSigningStarted {
809                            continue;
810                        }
811
812                        match e.batch_tree_event_type {
813                            BatchTreeEventType::Vtxo => {
814                                match vtxo_batch_tree_graph {
815                                    Some(ref mut vtxo_batch_tree_graph) => {
816                                        vtxo_batch_tree_graph.apply(|graph| {
817                                            if graph.root().unsigned_tx.compute_txid() != e.txid {
818                                                Ok(true)
819                                            } else {
820                                                graph.set_signature(e.signature);
821
822                                                Ok(false)
823                                            }
824                                        })?;
825                                    }
826                                    None => {
827                                        return Err(Error::ark_server(
828                                            "received batch-tree signature without transaction graph",
829                                        ));
830                                    }
831                                };
832                            }
833                            BatchTreeEventType::Connector => {
834                                return Err(Error::ark_server(
835                                    "received batch-tree signature for connector tree",
836                                ));
837                            }
838                        }
839                    }
840                    StreamEvent::TreeSigningStarted(e) => {
841                        if step != Step::BatchStarted {
842                            continue;
843                        }
844
845                        let chunks = vtxo_batch_tree_graph_chunks.take().ok_or(Error::ark_server(
846                            "received batch-tree signing started event without VTXO batch-tree graph chunks",
847                        ))?;
848                        vtxo_batch_tree_graph =
849                            Some(TxGraph::new(chunks).map_err(Error::from).context(
850                                "failed to build VTXO batch-tree graph before generating nonces",
851                            )?);
852
853                        tracing::info!(batch_id = e.id, "Batch signing started");
854
855                        for own_cosigner_pk in own_cosigner_pks.iter() {
856                            if !&e.cosigners_pubkeys.iter().any(|p| p == own_cosigner_pk) {
857                                return Err(Error::ark_server(format!(
858                                    "own cosigner PK is not present in cosigner PKs: {own_cosigner_pk}"
859                                )));
860                            }
861                        }
862
863                        let mut our_nonce_tree_map = HashMap::new();
864                        for own_cosigner_kp in own_cosigner_kps {
865                            let own_cosigner_pk = own_cosigner_kp.public_key();
866                            let nonce_tree = generate_nonce_tree(
867                                rng,
868                                vtxo_batch_tree_graph
869                                    .as_ref()
870                                    .expect("VTXO batch-tree graph"),
871                                own_cosigner_pk,
872                                &e.unsigned_commitment_tx,
873                            )
874                            .map_err(Error::from)
875                            .context("failed to generate VTXO nonce tree")?;
876
877                            tracing::info!(
878                                cosigner_pk = %own_cosigner_pk,
879                                "Submitting nonce tree for cosigner PK"
880                            );
881
882                            network_client
883                                .submit_tree_nonces(
884                                    &e.id,
885                                    own_cosigner_pk,
886                                    nonce_tree.to_nonce_pks(),
887                                )
888                                .await
889                                .map_err(Error::ark_server)
890                                .context("failed to submit VTXO nonce tree")?;
891
892                            our_nonce_tree_map.insert(own_cosigner_kp, nonce_tree);
893                        }
894
895                        unsigned_commitment_tx = Some(e.unsigned_commitment_tx);
896                        our_nonce_trees = Some(our_nonce_tree_map);
897
898                        step = step.next();
899                    }
900                    StreamEvent::TreeNonces(e) => {
901                        if step != Step::BatchSigningStarted {
902                            continue;
903                        }
904
905                        let tree_tx_nonce_pks = e.nonces;
906
907                        let cosigner_pk = match tree_tx_nonce_pks.0.iter().find(|(pk, _)| {
908                            own_cosigner_pks
909                                .iter()
910                                .any(|p| &&p.x_only_public_key().0 == pk)
911                        }) {
912                            Some((pk, _)) => *pk,
913                            None => {
914                                tracing::debug!(
915                                    batch_id = e.id,
916                                    txid = %e.txid,
917                                    "Received irrelevant TreeNonces event"
918                                );
919
920                                continue;
921                            }
922                        };
923
924                        tracing::debug!(
925                            batch_id = e.id,
926                            txid = %e.txid,
927                            %cosigner_pk,
928                            "Received TreeNonces event"
929                        );
930
931                        let agg_nonce_pk = aggregate_nonces(tree_tx_nonce_pks);
932
933                        agg_nonce_pks.insert(e.txid, agg_nonce_pk);
934
935                        if vtxo_batch_tree_graph.is_none() {
936                            let chunks = vtxo_batch_tree_graph_chunks.take().ok_or(Error::ark_server(
937                                "received batch-tree nonces event without VTXO batch-tree graph chunks",
938                            ))?;
939                            vtxo_batch_tree_graph = Some(
940                                TxGraph::new(chunks)
941                                    .map_err(Error::from)
942                                    .context("failed to build VTXO batch-tree graph before batch-tree signing")?,
943                            );
944                        }
945                        let vtxo_batch_tree_graph_ref =
946                            vtxo_batch_tree_graph.as_ref().expect("just populated");
947
948                        if agg_nonce_pks.len() == vtxo_batch_tree_graph_ref.nb_of_nodes() {
949                            let cosigner_kp = own_cosigner_kps
950                                .iter()
951                                .find(|kp| kp.public_key().x_only_public_key().0 == cosigner_pk)
952                                .ok_or_else(|| {
953                                    Error::ad_hoc("no cosigner keypair to sign for own PK")
954                                })?;
955
956                            let our_nonce_trees = our_nonce_trees.as_mut().ok_or(
957                                Error::ark_server("missing nonce trees during batch protocol"),
958                            )?;
959
960                            let our_nonce_tree =
961                                our_nonce_trees
962                                    .get_mut(cosigner_kp)
963                                    .ok_or(Error::ark_server(
964                                        "missing nonce tree during batch protocol",
965                                    ))?;
966
967                            let unsigned_commitment_tx = unsigned_commitment_tx
968                                .as_ref()
969                                .ok_or_else(|| Error::ad_hoc("missing commitment TX"))?;
970
971                            let batch_expiry = batch_expiry
972                                .ok_or_else(|| Error::ad_hoc("missing batch expiry"))?;
973
974                            let mut partial_sig_tree = PartialSigTree::default();
975                            for (txid, _) in vtxo_batch_tree_graph_ref.as_map() {
976                                let agg_nonce_pk = agg_nonce_pks.get(&txid).ok_or_else(|| {
977                                    Error::ad_hoc(format!(
978                                        "missing aggregated nonce PK for TX {txid}"
979                                    ))
980                                })?;
981
982                                let sigs = sign_batch_tree_tx(
983                                    txid,
984                                    batch_expiry,
985                                    ark_forfeit_pk,
986                                    cosigner_kp,
987                                    *agg_nonce_pk,
988                                    vtxo_batch_tree_graph_ref,
989                                    unsigned_commitment_tx,
990                                    our_nonce_tree,
991                                )
992                                .map_err(Error::from)
993                                .context("failed to sign VTXO batch-tree transactions")?;
994
995                                partial_sig_tree.0.extend(sigs.0);
996                            }
997
998                            network_client
999                                .submit_tree_signatures(
1000                                    &e.id,
1001                                    cosigner_kp.public_key(),
1002                                    partial_sig_tree,
1003                                )
1004                                .await
1005                                .map_err(Error::ark_server)
1006                                .context("failed to submit VTXO batch-tree signatures")?;
1007                        }
1008                    }
1009                    StreamEvent::TreeNoncesAggregated(e) => {
1010                        tracing::debug!(batch_id = e.id, "Batch combined nonces generated");
1011                    }
1012                    StreamEvent::BatchFinalization(e) => {
1013                        if step != Step::BatchSigningStarted {
1014                            continue;
1015                        }
1016
1017                        tracing::debug!(
1018                            commitment_txid = %e.commitment_tx.unsigned_tx.compute_txid(),
1019                            "Batch finalization started (delegate)"
1020                        );
1021
1022                        let chunks = connectors_graph_chunks.take().ok_or(Error::ark_server(
1023                            "received batch finalization event without connectors",
1024                        ))?;
1025
1026                        if chunks.is_empty() {
1027                            tracing::debug!(batch_id = e.id, "No delegated forfeit transactions");
1028                        } else {
1029                            let connectors_graph = TxGraph::new(chunks)
1030                                .map_err(Error::from)
1031                                .context(
1032                                "failed to build connectors graph before completing forfeit TXs",
1033                            )?;
1034
1035                            tracing::debug!(
1036                                batch_id = e.id,
1037                                "Completing delegated forfeit transactions"
1038                            );
1039
1040                            let signed_forfeit_psbts = complete_delegate_forfeit_txs(
1041                                &delegate.forfeit_psbts,
1042                                &connectors_graph.leaves(),
1043                            )?;
1044
1045                            network_client
1046                                .submit_signed_forfeit_txs(signed_forfeit_psbts, None)
1047                                .await?;
1048                        }
1049
1050                        step = step.next();
1051                    }
1052                    StreamEvent::BatchFinalized(e) => {
1053                        if step != Step::Finalized {
1054                            continue;
1055                        }
1056
1057                        let commitment_txid = e.commitment_txid;
1058
1059                        tracing::info!(batch_id = e.id, %commitment_txid, "Delegated batch finalized");
1060
1061                        return Ok(commitment_txid);
1062                    }
1063                    StreamEvent::BatchFailed(ref e) => {
1064                        if Some(&e.id) == batch_id.as_ref() {
1065                            return Err(Error::ark_server(format!(
1066                                "batch failed {}: {}",
1067                                e.id, e.reason
1068                            )));
1069                        }
1070
1071                        tracing::debug!("Unrelated batch failed: {e:?}");
1072                    }
1073                    StreamEvent::Heartbeat => {}
1074                    StreamEvent::StreamStarted(_) => {}
1075                },
1076                Some(Err(e)) => {
1077                    tracing::error!("Got error from event stream");
1078
1079                    return Err(Error::ark_server(e));
1080                }
1081                None => {
1082                    return Err(Error::ark_server("dropped batch event stream"));
1083                }
1084            }
1085        }
1086    }
1087
1088    /// Get all the [`batch::OnChainInput`]s and [`batch::VtxoInput`]s that can be used to join an
1089    /// upcoming batch.
1090    pub(crate) async fn fetch_commitment_transaction_inputs(
1091        &self,
1092        now: i64,
1093    ) -> Result<(Vec<batch::OnChainInput>, Vec<intent::Input>, Amount), Error> {
1094        let now = u64::try_from(now).map_err(|_| Error::ad_hoc("negative timestamp"))?;
1095
1096        // Get all known boarding outputs.
1097        let boarding_outputs = self.inner.wallet.get_boarding_outputs()?;
1098
1099        let mut boarding_inputs: Vec<batch::OnChainInput> = Vec::new();
1100        let mut total_amount = Amount::ZERO;
1101
1102        // To track unique outpoints and prevent duplicates
1103        let mut seen_outpoints = HashSet::new();
1104
1105        // Snapshot once; reused for the boarding cutoff filter and the VTXO dust/cutoff logic
1106        // below.
1107        let server_info = self.server_info()?;
1108
1109        // Find outpoints for each boarding output.
1110        for boarding_output in boarding_outputs {
1111            let outpoints = timeout_op(
1112                self.inner.timeout,
1113                self.blockchain().find_outpoints(boarding_output.address()),
1114            )
1115            .await
1116            .context("failed to find outpoints")??;
1117
1118            for o in outpoints.iter() {
1119                if let ExplorerUtxo {
1120                    outpoint,
1121                    amount,
1122                    confirmation_blocktime: Some(confirmation_blocktime),
1123                    confirmations,
1124                    is_spent: false,
1125                } = o
1126                {
1127                    // Check for duplicate outpoints
1128                    if seen_outpoints.contains(outpoint) {
1129                        continue;
1130                    }
1131
1132                    // Skip boarding outputs whose server key is past its cooperative-sign
1133                    // cutoff — the operator won't co-sign the old key's forfeit path.
1134                    // These must be recovered via unilateral exit (send_on_chain).
1135                    if server_info
1136                        .signer_requires_recovery_at(boarding_output.server_pk(), now as i64)
1137                    {
1138                        continue;
1139                    }
1140
1141                    // Only include confirmed boarding outputs with an _inactive_ exit path.
1142                    if !boarding_output.can_be_claimed_unilaterally_by_owner(
1143                        std::time::Duration::from_secs(now),
1144                        std::time::Duration::from_secs(*confirmation_blocktime),
1145                        *confirmations,
1146                    ) {
1147                        // Mark this outpoint as seen
1148                        seen_outpoints.insert(*outpoint);
1149
1150                        boarding_inputs.push(batch::OnChainInput::new(
1151                            boarding_output.clone(),
1152                            *amount,
1153                            *outpoint,
1154                        ));
1155                        total_amount += *amount;
1156                    }
1157                }
1158            }
1159        }
1160
1161        let (vtxo_list, script_pubkey_to_vtxo_map) = self.list_vtxos().await?;
1162        // Reuse the caller-supplied timestamp (not a fresh wall-clock) so the VTXO cutoff filter
1163        // below is evaluated against the same instant as the boarding filter above, and so a
1164        // test-injected `now` deterministically controls both.
1165        let settleable_vtxos: Vec<_> = vtxo_list
1166            .batch_settleable_at(&server_info, now as i64, |script| {
1167                script_pubkey_to_vtxo_map
1168                    .get(script)
1169                    .map(|vtxo| vtxo.server_pk())
1170            })
1171            .collect();
1172
1173        total_amount += settleable_vtxos
1174            .iter()
1175            .fold(Amount::ZERO, |acc, vtxo| acc + vtxo.amount);
1176
1177        let vtxo_inputs = settleable_vtxos
1178            .into_iter()
1179            .map(|virtual_tx_outpoint| {
1180                let vtxo = script_pubkey_to_vtxo_map
1181                    .get(&virtual_tx_outpoint.script)
1182                    .ok_or_else(|| {
1183                        ark_core::Error::ad_hoc(format!(
1184                            "missing VTXO for script pubkey: {}",
1185                            virtual_tx_outpoint.script
1186                        ))
1187                    })?;
1188                let spend_info = vtxo.forfeit_spend_info()?;
1189
1190                Ok(intent::Input::new(
1191                    virtual_tx_outpoint.outpoint,
1192                    vtxo.exit_delay(),
1193                    None,
1194                    TxOut {
1195                        value: virtual_tx_outpoint.amount,
1196                        script_pubkey: vtxo.script_pubkey(),
1197                    },
1198                    vtxo.tapscripts(),
1199                    spend_info,
1200                    false,
1201                    virtual_tx_outpoint.is_swept,
1202                    virtual_tx_outpoint.assets.clone(),
1203                ))
1204            })
1205            .collect::<Result<Vec<_>, ark_core::Error>>()?;
1206
1207        Ok((boarding_inputs, vtxo_inputs, total_amount))
1208    }
1209
1210    /// Prepare an intent for batch registration or fee estimation.
1211    ///
1212    /// This creates a signed intent PSBT along with all the data needed to participate
1213    /// in the batch protocol.
1214    pub(crate) fn prepare_intent<R>(
1215        &self,
1216        rng: &mut R,
1217        onchain_inputs: Vec<batch::OnChainInput>,
1218        vtxo_inputs: Vec<intent::Input>,
1219        output_type: BatchOutputType,
1220        intent_kind: PrepareIntentKind,
1221    ) -> Result<PreparedIntent, Error>
1222    where
1223        R: Rng + CryptoRng,
1224    {
1225        if onchain_inputs.is_empty() && vtxo_inputs.is_empty() {
1226            return Err(Error::ad_hoc("cannot prepare intent without inputs"));
1227        }
1228
1229        // Generate an (ephemeral) cosigner keypair.
1230        let cosigner_keypair = Keypair::new(self.secp(), rng);
1231
1232        let vtxo_input_outpoints = vtxo_inputs.iter().map(|i| i.outpoint()).collect::<Vec<_>>();
1233
1234        let inputs = {
1235            let boarding_inputs = onchain_inputs.clone().into_iter().map(|o| {
1236                intent::Input::new(
1237                    o.outpoint(),
1238                    o.boarding_output().exit_delay(),
1239                    None,
1240                    TxOut {
1241                        value: o.amount(),
1242                        script_pubkey: o.boarding_output().script_pubkey(),
1243                    },
1244                    o.boarding_output().tapscripts(),
1245                    o.boarding_output().forfeit_spend_info(),
1246                    true,
1247                    false,
1248                    Vec::new(),
1249                )
1250            });
1251
1252            boarding_inputs
1253                .chain(vtxo_inputs.clone())
1254                .collect::<Vec<_>>()
1255        };
1256
1257        let dust = self.server_info()?.dust;
1258
1259        let mut outputs = vec![];
1260
1261        match output_type {
1262            BatchOutputType::Board {
1263                to_address,
1264                to_amount,
1265            } => {
1266                if to_amount < dust {
1267                    return Err(Error::ad_hoc(format!(
1268                        "cannot settle into sub-dust VTXO: {to_amount} < {dust}"
1269                    )));
1270                }
1271
1272                outputs.push(intent::Output::Offchain(TxOut {
1273                    value: to_amount,
1274                    script_pubkey: to_address.to_p2tr_script_pubkey(),
1275                }));
1276            }
1277            BatchOutputType::OffBoard {
1278                to_address,
1279                to_amount,
1280                change_amount,
1281                ..
1282            } if change_amount == Amount::ZERO => {
1283                outputs.push(intent::Output::Onchain(TxOut {
1284                    value: to_amount,
1285                    script_pubkey: to_address.script_pubkey(),
1286                }));
1287            }
1288            BatchOutputType::OffBoard {
1289                to_address,
1290                to_amount,
1291                change_address,
1292                change_amount,
1293            } => {
1294                if change_amount < dust {
1295                    return Err(Error::ad_hoc(format!(
1296                        "cannot settle with sub-dust change VTXO: {change_amount} < {dust}"
1297                    )));
1298                }
1299
1300                outputs.push(intent::Output::Onchain(TxOut {
1301                    value: to_amount,
1302                    script_pubkey: to_address.script_pubkey(),
1303                }));
1304
1305                outputs.push(intent::Output::Offchain(TxOut {
1306                    value: change_amount,
1307                    script_pubkey: change_address.to_p2tr_script_pubkey(),
1308                }));
1309            }
1310        }
1311
1312        let cosigner_pk = cosigner_keypair.public_key();
1313
1314        let secp = Secp256k1::new();
1315
1316        let sign_for_vtxo_fn =
1317            |input: &mut psbt::Input,
1318             msg: secp256k1::Message|
1319             -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error> {
1320                match &input.witness_script {
1321                    None => Err(ark_core::Error::ad_hoc(
1322                        "Missing witness script in psbt::Input when signing intent",
1323                    )),
1324                    Some(script) => {
1325                        let pks = extract_checksig_pubkeys(script);
1326                        let mut res = vec![];
1327                        for pk in pks {
1328                            if let Ok(keypair) = self.keypair_by_pk(&pk) {
1329                                let sig = secp.sign_schnorr_no_aux_rand(&msg, &keypair);
1330                                res.push((sig, keypair.public_key().into()))
1331                            }
1332                        }
1333                        Ok(res)
1334                    }
1335                }
1336            };
1337
1338        let sign_for_onchain_fn =
1339            |input: &mut psbt::Input,
1340             msg: secp256k1::Message|
1341             -> Result<(schnorr::Signature, XOnlyPublicKey), ark_core::Error> {
1342                let onchain_input = onchain_inputs
1343                    .iter()
1344                    .find(|o| {
1345                        Some(o.boarding_output().script_pubkey())
1346                            == input.witness_utxo.clone().map(|w| w.script_pubkey)
1347                    })
1348                    .ok_or_else(|| {
1349                        ark_core::Error::ad_hoc(
1350                            "could not find signing key for onchain input: {input:?}",
1351                        )
1352                    })?;
1353
1354                let owner_pk = onchain_input.boarding_output().owner_pk();
1355                let sig = self
1356                    .inner
1357                    .wallet
1358                    .sign_for_pk(&owner_pk, &msg)
1359                    .map_err(|e| ark_core::Error::ad_hoc(e.to_string()))?;
1360
1361                Ok((sig, owner_pk))
1362            };
1363
1364        let now = std::time::SystemTime::now()
1365            .duration_since(std::time::UNIX_EPOCH)
1366            .map_err(|e| Error::ad_hoc(e.to_string()))
1367            .context("failed to compute now timestamp")?;
1368        let now = now.as_secs();
1369        let expire_at = now + (2 * 60);
1370
1371        if let Some(packet) = create_asset_preservation_packet(&inputs, &outputs)? {
1372            outputs.push(intent::Output::AssetPacket(packet.to_txout()));
1373        }
1374
1375        let mut onchain_output_indexes = Vec::new();
1376        for (i, output) in outputs.iter().enumerate() {
1377            if matches!(output, intent::Output::Onchain(_)) {
1378                onchain_output_indexes.push(i);
1379            }
1380        }
1381
1382        let message = match intent_kind {
1383            PrepareIntentKind::EstimateFee => intent::IntentMessage::EstimateIntentFee {
1384                onchain_output_indexes,
1385                valid_at: now,
1386                expire_at,
1387                own_cosigner_pks: vec![cosigner_pk],
1388            },
1389            PrepareIntentKind::Register => intent::IntentMessage::Register {
1390                onchain_output_indexes,
1391                valid_at: now,
1392                expire_at,
1393                own_cosigner_pks: vec![cosigner_pk],
1394            },
1395        };
1396
1397        let intent = intent::make_intent(
1398            sign_for_vtxo_fn,
1399            sign_for_onchain_fn,
1400            inputs,
1401            outputs.clone(),
1402            message,
1403        )?;
1404
1405        Ok(PreparedIntent {
1406            intent,
1407            cosigner_keypair,
1408            vtxo_input_outpoints,
1409            outputs,
1410            onchain_inputs,
1411            vtxo_inputs,
1412        })
1413    }
1414
1415    pub(crate) async fn join_next_batch<R>(
1416        &self,
1417        rng: &mut R,
1418        onchain_inputs: Vec<batch::OnChainInput>,
1419        vtxo_inputs: Vec<intent::Input>,
1420        output_type: BatchOutputType,
1421    ) -> Result<Txid, Error>
1422    where
1423        R: Rng + CryptoRng,
1424    {
1425        let prepared = self.prepare_intent(
1426            rng,
1427            onchain_inputs,
1428            vtxo_inputs,
1429            output_type,
1430            PrepareIntentKind::Register,
1431        )?;
1432
1433        let PreparedIntent {
1434            intent,
1435            cosigner_keypair,
1436            vtxo_input_outpoints,
1437            outputs,
1438            onchain_inputs,
1439            vtxo_inputs,
1440        } = prepared;
1441
1442        let onchain_input_outpoints = onchain_inputs
1443            .iter()
1444            .map(|i| i.outpoint())
1445            .collect::<Vec<_>>();
1446
1447        let server_info = &self.server_info()?;
1448
1449        let own_cosigner_kps = [cosigner_keypair];
1450        let own_cosigner_pks = own_cosigner_kps
1451            .iter()
1452            .map(|k| k.public_key())
1453            .collect::<Vec<_>>();
1454
1455        let secp = Secp256k1::new();
1456
1457        let mut step = Step::Start;
1458
1459        let intent_id = timeout_op(
1460            self.inner.timeout,
1461            self.network_client().register_intent(intent),
1462        )
1463        .await
1464        .context("failed to register intent")??;
1465
1466        tracing::debug!(
1467            intent_id,
1468            ?onchain_input_outpoints,
1469            ?vtxo_input_outpoints,
1470            ?outputs,
1471            "Registered intent for batch"
1472        );
1473
1474        let network_client = self.network_client();
1475
1476        let mut batch_id: Option<String> = None;
1477
1478        let topics = vtxo_input_outpoints
1479            .iter()
1480            .map(ToString::to_string)
1481            .chain(
1482                own_cosigner_pks
1483                    .iter()
1484                    .map(|pk| pk.serialize().to_lower_hex_string()),
1485            )
1486            .collect();
1487
1488        let mut stream = network_client.get_event_stream(topics).await?;
1489
1490        let (ark_forfeit_pk, _) = server_info.forfeit_pk.x_only_public_key();
1491
1492        let mut unsigned_commitment_tx = None;
1493
1494        let mut vtxo_batch_tree_graph_chunks = Some(Vec::new());
1495        let mut vtxo_batch_tree_graph: Option<TxGraph> = None;
1496
1497        let mut connectors_graph_chunks = Some(Vec::new());
1498        let mut batch_expiry = None;
1499
1500        let mut agg_nonce_pks = HashMap::new();
1501
1502        let mut our_nonce_trees: Option<HashMap<Keypair, NonceKps>> = None;
1503        loop {
1504            match timeout_op(self.inner.timeout, stream.next())
1505                .await
1506                .context("timed out waiting for batch event")?
1507            {
1508                Some(Ok(event)) => match event {
1509                    StreamEvent::BatchStarted(e) => {
1510                        if step != Step::Start {
1511                            continue;
1512                        }
1513
1514                        let hash = sha256::Hash::hash(intent_id.as_bytes());
1515                        let hash = hash.as_byte_array().to_vec().to_lower_hex_string();
1516
1517                        if e.intent_id_hashes.iter().any(|h| h == &hash) {
1518                            timeout_op(
1519                                self.inner.timeout,
1520                                self.network_client()
1521                                    .confirm_registration(intent_id.clone()),
1522                            )
1523                            .await
1524                            .context("failed to confirm intent registration")??;
1525
1526                            tracing::info!(batch_id = e.id, intent_id, "Intent ID found for batch");
1527
1528                            batch_id = Some(e.id);
1529
1530                            // Depending on whether we are generating new VTXOs or not, we continue
1531                            // with a different step in the state machine.
1532                            step = match outputs
1533                                .iter()
1534                                .any(|o| matches!(o, intent::Output::Offchain(_)))
1535                            {
1536                                true => Step::BatchStarted,
1537                                false => Step::BatchSigningStarted,
1538                            };
1539
1540                            batch_expiry = Some(e.batch_expiry);
1541                        } else {
1542                            tracing::debug!(
1543                                batch_id = e.id,
1544                                intent_id,
1545                                "Intent ID not found for batch"
1546                            );
1547                        }
1548                    }
1549                    StreamEvent::TreeTx(e) => {
1550                        if step != Step::BatchStarted && step != Step::BatchSigningStarted {
1551                            continue;
1552                        }
1553
1554                        match e.batch_tree_event_type {
1555                            BatchTreeEventType::Vtxo => {
1556                                match &mut vtxo_batch_tree_graph_chunks {
1557                                    Some(vtxo_batch_tree_graph_chunks) => {
1558                                        tracing::debug!("Got new VTXO batch-tree graph chunk");
1559
1560                                        vtxo_batch_tree_graph_chunks.push(e.tx_graph_chunk)
1561                                    }
1562                                    None => {
1563                                        return Err(Error::ark_server(
1564                                            "received unexpected VTXO batch-tree graph chunk",
1565                                        ));
1566                                    }
1567                                };
1568                            }
1569                            BatchTreeEventType::Connector => {
1570                                match connectors_graph_chunks {
1571                                    Some(ref mut connectors_graph_chunks) => {
1572                                        tracing::debug!("Got new connectors graph chunk");
1573
1574                                        connectors_graph_chunks.push(e.tx_graph_chunk)
1575                                    }
1576                                    None => {
1577                                        return Err(Error::ark_server(
1578                                            "received unexpected connectors graph chunk",
1579                                        ));
1580                                    }
1581                                };
1582                            }
1583                        }
1584                    }
1585                    StreamEvent::TreeSignature(e) => {
1586                        if step != Step::BatchSigningStarted {
1587                            continue;
1588                        }
1589
1590                        match e.batch_tree_event_type {
1591                            BatchTreeEventType::Vtxo => {
1592                                match vtxo_batch_tree_graph {
1593                                    Some(ref mut vtxo_batch_tree_graph) => {
1594                                        vtxo_batch_tree_graph.apply(|graph| {
1595                                            if graph.root().unsigned_tx.compute_txid() != e.txid {
1596                                                Ok(true)
1597                                            } else {
1598                                                graph.set_signature(e.signature);
1599
1600                                                Ok(false)
1601                                            }
1602                                        })?;
1603                                    }
1604                                    None => {
1605                                        return Err(Error::ark_server(
1606                                            "received batch-tree signature without transaction graph",
1607                                        ));
1608                                    }
1609                                };
1610                            }
1611                            BatchTreeEventType::Connector => {
1612                                return Err(Error::ark_server(
1613                                    "received batch-tree signature for connector tree",
1614                                ));
1615                            }
1616                        }
1617                    }
1618                    StreamEvent::TreeSigningStarted(e) => {
1619                        if step != Step::BatchStarted {
1620                            continue;
1621                        }
1622
1623                        let chunks = vtxo_batch_tree_graph_chunks.take().ok_or(Error::ark_server(
1624                            "received batch-tree signing started event without VTXO batch-tree graph chunks",
1625                        ))?;
1626                        vtxo_batch_tree_graph =
1627                            Some(TxGraph::new(chunks).map_err(Error::from).context(
1628                                "failed to build VTXO batch-tree graph before generating nonces",
1629                            )?);
1630
1631                        tracing::info!(batch_id = e.id, "Batch signing started");
1632
1633                        for own_cosigner_pk in own_cosigner_pks.iter() {
1634                            if !&e.cosigners_pubkeys.iter().any(|p| p == own_cosigner_pk) {
1635                                return Err(Error::ark_server(format!(
1636                                    "own cosigner PK is not present in cosigner PKs: {own_cosigner_pk}"
1637                                )));
1638                            }
1639                        }
1640
1641                        // We generate and submit a nonce tree for every cosigner key we provide.
1642                        let mut our_nonce_tree_map = HashMap::new();
1643                        for own_cosigner_kp in own_cosigner_kps {
1644                            let own_cosigner_pk = own_cosigner_kp.public_key();
1645                            let nonce_tree = generate_nonce_tree(
1646                                rng,
1647                                vtxo_batch_tree_graph
1648                                    .as_ref()
1649                                    .expect("VTXO batch-tree graph"),
1650                                own_cosigner_pk,
1651                                &e.unsigned_commitment_tx,
1652                            )
1653                            .map_err(Error::from)
1654                            .context("failed to generate VTXO nonce tree")?;
1655
1656                            tracing::info!(
1657                                cosigner_pk = %own_cosigner_pk,
1658                                "Submitting nonce tree for cosigner PK"
1659                            );
1660
1661                            network_client
1662                                .submit_tree_nonces(
1663                                    &e.id,
1664                                    own_cosigner_pk,
1665                                    nonce_tree.to_nonce_pks(),
1666                                )
1667                                .await
1668                                .map_err(Error::ark_server)
1669                                .context("failed to submit VTXO nonce tree")?;
1670
1671                            our_nonce_tree_map.insert(own_cosigner_kp, nonce_tree);
1672                        }
1673
1674                        unsigned_commitment_tx = Some(e.unsigned_commitment_tx);
1675                        our_nonce_trees = Some(our_nonce_tree_map);
1676
1677                        step = step.next();
1678                    }
1679                    StreamEvent::TreeNonces(e) => {
1680                        if step != Step::BatchSigningStarted {
1681                            continue;
1682                        }
1683
1684                        let tree_tx_nonce_pks = e.nonces;
1685
1686                        let cosigner_pk = match tree_tx_nonce_pks.0.iter().find(|(pk, _)| {
1687                            own_cosigner_pks
1688                                .iter()
1689                                .any(|p| &&p.x_only_public_key().0 == pk)
1690                        }) {
1691                            Some((pk, _)) => *pk,
1692                            None => {
1693                                tracing::debug!(
1694                                    batch_id = e.id,
1695                                    txid = %e.txid,
1696                                    "Received irrelevant TreeNonces event"
1697                                );
1698
1699                                continue;
1700                            }
1701                        };
1702
1703                        tracing::debug!(
1704                            batch_id = e.id,
1705                            txid = %e.txid,
1706                            %cosigner_pk,
1707                            "Received TreeNonces event"
1708                        );
1709
1710                        let agg_nonce_pk = aggregate_nonces(tree_tx_nonce_pks);
1711
1712                        agg_nonce_pks.insert(e.txid, agg_nonce_pk);
1713
1714                        if vtxo_batch_tree_graph.is_none() {
1715                            let chunks = vtxo_batch_tree_graph_chunks.take().ok_or(Error::ark_server(
1716                                "received batch-tree nonces event without VTXO batch-tree graph chunks",
1717                            ))?;
1718                            vtxo_batch_tree_graph = Some(
1719                                TxGraph::new(chunks)
1720                                    .map_err(Error::from)
1721                                    .context("failed to build VTXO batch-tree graph before batch-tree signing")?,
1722                            );
1723                        }
1724                        let vtxo_batch_tree_graph_ref =
1725                            vtxo_batch_tree_graph.as_ref().expect("just populated");
1726
1727                        // Once we collect an aggregated nonce per transaction in our VTXO
1728                        // batch-tree graph, we can sign and submit our partial signatures.
1729                        if agg_nonce_pks.len() == vtxo_batch_tree_graph_ref.nb_of_nodes() {
1730                            let cosigner_kp = own_cosigner_kps
1731                                .iter()
1732                                .find(|kp| kp.public_key().x_only_public_key().0 == cosigner_pk)
1733                                .ok_or_else(|| {
1734                                    Error::ad_hoc("no cosigner keypair to sign for own PK")
1735                                })?;
1736
1737                            let our_nonce_trees = our_nonce_trees.as_mut().ok_or(
1738                                Error::ark_server("missing nonce trees during batch protocol"),
1739                            )?;
1740
1741                            let our_nonce_tree =
1742                                our_nonce_trees
1743                                    .get_mut(cosigner_kp)
1744                                    .ok_or(Error::ark_server(
1745                                        "missing nonce tree during batch protocol",
1746                                    ))?;
1747
1748                            let unsigned_commitment_tx = unsigned_commitment_tx
1749                                .as_ref()
1750                                .ok_or_else(|| Error::ad_hoc("missing commitment TX"))?;
1751
1752                            let batch_expiry = batch_expiry
1753                                .ok_or_else(|| Error::ad_hoc("missing batch expiry"))?;
1754
1755                            let mut partial_sig_tree = PartialSigTree::default();
1756                            for (txid, _) in vtxo_batch_tree_graph_ref.as_map() {
1757                                let agg_nonce_pk = agg_nonce_pks.get(&txid).ok_or_else(|| {
1758                                    Error::ad_hoc(format!(
1759                                        "missing aggregated nonce PK for TX {txid}"
1760                                    ))
1761                                })?;
1762
1763                                let sigs = sign_batch_tree_tx(
1764                                    txid,
1765                                    batch_expiry,
1766                                    ark_forfeit_pk,
1767                                    cosigner_kp,
1768                                    *agg_nonce_pk,
1769                                    vtxo_batch_tree_graph_ref,
1770                                    unsigned_commitment_tx,
1771                                    our_nonce_tree,
1772                                )
1773                                .map_err(Error::from)
1774                                .context("failed to sign VTXO batch-tree transactions")?;
1775
1776                                partial_sig_tree.0.extend(sigs.0);
1777                            }
1778
1779                            network_client
1780                                .submit_tree_signatures(
1781                                    &e.id,
1782                                    cosigner_kp.public_key(),
1783                                    partial_sig_tree,
1784                                )
1785                                .await
1786                                .map_err(Error::ark_server)
1787                                .context("failed to submit VTXO batch-tree signatures")?;
1788                        }
1789                    }
1790                    StreamEvent::TreeNoncesAggregated(e) => {
1791                        tracing::debug!(batch_id = e.id, "Batch combined nonces generated");
1792                    }
1793                    StreamEvent::BatchFinalization(e) => {
1794                        if step != Step::BatchSigningStarted {
1795                            continue;
1796                        }
1797
1798                        tracing::debug!(
1799                            commitment_txid = %e.commitment_tx.unsigned_tx.compute_txid(),
1800                            "Batch finalization started"
1801                        );
1802
1803                        let signed_forfeit_psbts = if !vtxo_inputs.is_empty() {
1804                            let chunks =
1805                                connectors_graph_chunks.take().ok_or(Error::ark_server(
1806                                    "received batch finalization event without connectors",
1807                                ))?;
1808
1809                            if chunks.is_empty() {
1810                                tracing::debug!(batch_id = e.id, "No forfeit transactions");
1811
1812                                Vec::new()
1813                            } else {
1814                                let connectors_graph = TxGraph::new(chunks)
1815                                    .map_err(Error::from)
1816                                    .context(
1817                                    "failed to build connectors graph before signing forfeit TXs",
1818                                )?;
1819
1820                                tracing::debug!(batch_id = e.id, "Batch finalization started");
1821
1822                                create_and_sign_forfeit_txs(
1823                                    |input: &mut psbt::Input, msg: secp256k1::Message| match &input
1824                                    .witness_script
1825                                {
1826                                    None => Err(ark_core::Error::ad_hoc(
1827                                        "Missing witness script in psbt::Input when signing forfeit",
1828                                    )),
1829                                    Some(script) => {
1830                                        let pks = extract_checksig_pubkeys(script);
1831                                        let mut res = vec![];
1832                                        for pk in pks {
1833                                            if let Ok(keypair) =
1834                                            self.keypair_by_pk(&pk) {
1835                                                let sig =
1836                                                    secp.sign_schnorr_no_aux_rand(&msg, &keypair);
1837                                                res.push((sig, keypair.public_key().into()))
1838                                            }
1839                                        }
1840                                        Ok(res)
1841                                    }
1842                                    },
1843                                    vtxo_inputs.as_slice(),
1844                                    &connectors_graph.leaves(),
1845                                    &server_info.forfeit_address,
1846                                    server_info.dust,
1847                                )
1848                                .map_err(Error::from)?
1849                            }
1850                        } else {
1851                            Vec::new()
1852                        };
1853
1854                        let commitment_psbt = if onchain_inputs.is_empty() {
1855                            None
1856                        } else {
1857                            let mut commitment_psbt = e.commitment_tx;
1858
1859                            let sign_for_pk_fn = |pk: &XOnlyPublicKey,
1860                                                  msg: &secp256k1::Message|
1861                             -> Result<
1862                                schnorr::Signature,
1863                                ark_core::Error,
1864                            > {
1865                                self.inner
1866                                    .wallet
1867                                    .sign_for_pk(pk, msg)
1868                                    .map_err(|e| ark_core::Error::ad_hoc(e.to_string()))
1869                            };
1870
1871                            sign_commitment_psbt(
1872                                sign_for_pk_fn,
1873                                &mut commitment_psbt,
1874                                &onchain_inputs,
1875                            )
1876                            .map_err(Error::from)?;
1877
1878                            Some(commitment_psbt)
1879                        };
1880
1881                        if !signed_forfeit_psbts.is_empty() || commitment_psbt.is_some() {
1882                            network_client
1883                                .submit_signed_forfeit_txs(signed_forfeit_psbts, commitment_psbt)
1884                                .await?;
1885                        }
1886
1887                        step = step.next();
1888                    }
1889                    StreamEvent::BatchFinalized(e) => {
1890                        if step != Step::Finalized {
1891                            continue;
1892                        }
1893
1894                        let commitment_txid = e.commitment_txid;
1895
1896                        tracing::info!(batch_id = e.id, %commitment_txid, "Batch finalized");
1897
1898                        return Ok(commitment_txid);
1899                    }
1900                    StreamEvent::BatchFailed(ref e) => {
1901                        if Some(&e.id) == batch_id.as_ref() {
1902                            return Err(Error::ark_server(format!(
1903                                "batch failed {}: {}",
1904                                e.id, e.reason
1905                            )));
1906                        }
1907
1908                        tracing::debug!("Unrelated batch failed: {e:?}");
1909                    }
1910                    StreamEvent::Heartbeat => {}
1911                    StreamEvent::StreamStarted(_) => {}
1912                },
1913                Some(Err(e)) => {
1914                    tracing::error!("Got error from event stream");
1915
1916                    return Err(Error::ark_server(e));
1917                }
1918                None => {
1919                    return Err(Error::ark_server("dropped batch event stream"));
1920                }
1921            }
1922        }
1923
1924        #[derive(Debug, PartialEq, Eq)]
1925        enum Step {
1926            Start,
1927            BatchStarted,
1928            BatchSigningStarted,
1929            Finalized,
1930        }
1931
1932        impl Step {
1933            fn next(&self) -> Step {
1934                match self {
1935                    Step::Start => Step::BatchStarted,
1936                    Step::BatchStarted => Step::BatchSigningStarted,
1937                    Step::BatchSigningStarted => Step::Finalized,
1938                    Step::Finalized => Step::Finalized, // we can't go further
1939                }
1940            }
1941        }
1942    }
1943}
1944
1945#[derive(Debug, Clone)]
1946pub(crate) enum PrepareIntentKind {
1947    Register,
1948    EstimateFee,
1949}
1950
1951#[derive(Debug, Clone)]
1952pub(crate) enum BatchOutputType {
1953    Board {
1954        to_address: ArkAddress,
1955        to_amount: Amount,
1956    },
1957    OffBoard {
1958        to_address: Address,
1959        to_amount: Amount,
1960        change_address: ArkAddress,
1961        change_amount: Amount,
1962    },
1963}
1964
1965/// Prepared intent data ready for batch registration.
1966pub(crate) struct PreparedIntent {
1967    /// The signed intent.
1968    pub intent: intent::Intent,
1969    /// The ephemeral cosigner keypair.
1970    pub cosigner_keypair: Keypair,
1971    /// VTXO input outpoints (used for event stream topics).
1972    pub vtxo_input_outpoints: Vec<OutPoint>,
1973    /// Intent outputs (used to determine batch protocol steps).
1974    pub outputs: Vec<intent::Output>,
1975    /// The original onchain inputs (needed for commitment signing).
1976    pub onchain_inputs: Vec<batch::OnChainInput>,
1977    /// The original VTXO inputs (needed for forfeit signing).
1978    pub vtxo_inputs: Vec<intent::Input>,
1979}