Skip to main content

ark_client/
batch.rs

1use crate::error::ErrorContext as _;
2use crate::swap_storage::SwapStorage;
3use crate::utils::sleep;
4use crate::utils::timeout_op;
5use crate::wallet::BoardingWallet;
6use crate::wallet::OnchainWallet;
7use crate::Blockchain;
8use crate::Client;
9use crate::Error;
10use ark_core::batch;
11use ark_core::batch::aggregate_nonces;
12use ark_core::batch::complete_delegate_forfeit_txs;
13use ark_core::batch::create_and_sign_forfeit_txs;
14use ark_core::batch::generate_nonce_tree;
15use ark_core::batch::sign_batch_tree_tx;
16use ark_core::batch::sign_commitment_psbt;
17use ark_core::batch::Delegate;
18use ark_core::batch::NonceKps;
19use ark_core::intent;
20use ark_core::script::extract_checksig_pubkeys;
21use ark_core::server::BatchTreeEventType;
22use ark_core::server::PartialSigTree;
23use ark_core::server::StreamEvent;
24use ark_core::ArkAddress;
25use ark_core::ArkNote;
26use ark_core::ExplorerUtxo;
27use ark_core::TxGraph;
28use backon::ExponentialBuilder;
29use backon::Retryable;
30use bitcoin::hashes::sha256;
31use bitcoin::hashes::Hash;
32use bitcoin::hex::DisplayHex;
33use bitcoin::key::Keypair;
34use bitcoin::key::Secp256k1;
35use bitcoin::psbt;
36use bitcoin::secp256k1;
37use bitcoin::secp256k1::schnorr;
38use bitcoin::secp256k1::PublicKey;
39use bitcoin::Address;
40use bitcoin::Amount;
41use bitcoin::OutPoint;
42use bitcoin::Psbt;
43use bitcoin::TxOut;
44use bitcoin::Txid;
45use bitcoin::XOnlyPublicKey;
46use futures::StreamExt;
47use jiff::Timestamp;
48use rand::CryptoRng;
49use rand::Rng;
50use std::collections::HashMap;
51
52impl<B, W, S, K> Client<B, W, S, K>
53where
54    B: Blockchain,
55    W: BoardingWallet + OnchainWallet,
56    S: SwapStorage + 'static,
57    K: crate::KeyProvider,
58{
59    /// Settle _all_ prior VTXOs and boarding outputs into the next batch, generating new confirmed
60    /// VTXOs.
61    pub async fn settle<R>(&self, rng: &mut R) -> Result<Option<Txid>, Error>
62    where
63        R: Rng + CryptoRng + Clone,
64    {
65        // Get off-chain address and send all funds to this address, no change output 🦄
66        let (to_address, _) = self.get_offchain_address()?;
67
68        let (boarding_inputs, vtxo_inputs, total_amount) =
69            self.fetch_commitment_transaction_inputs().await?;
70
71        tracing::debug!(
72            offchain_adress = %to_address.encode(),
73            ?boarding_inputs,
74            ?vtxo_inputs,
75            "Attempting to settle outputs"
76        );
77
78        if boarding_inputs.is_empty() && vtxo_inputs.is_empty() {
79            tracing::debug!("No inputs to board with");
80            return Ok(None);
81        }
82
83        let join_next_batch = || async {
84            self.join_next_batch(
85                &mut rng.clone(),
86                boarding_inputs.clone(),
87                vtxo_inputs.clone(),
88                BatchOutputType::Board {
89                    to_address,
90                    to_amount: total_amount,
91                },
92            )
93            .await
94        };
95
96        // Joining a batch can fail depending on the timing, so we try a few times.
97        let commitment_txid = join_next_batch
98            .retry(ExponentialBuilder::default().with_max_times(0))
99            .sleep(sleep)
100            // TODO: Use `when` to only retry certain errors.
101            .notify(|err: &Error, dur: std::time::Duration| {
102                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}",);
103            })
104            .await
105            .context("Failed to join batch")?;
106
107        tracing::info!(%commitment_txid, "Settlement success");
108
109        Ok(Some(commitment_txid))
110    }
111
112    /// Settle _all_ prior VTXOs, boarding outputs, and the provided ArkNotes into the next batch.
113    ///
114    /// ArkNotes are bearer tokens that can be redeemed by revealing their preimage.
115    /// This method combines them with regular VTXOs and boarding outputs into a single
116    /// settlement transaction.
117    pub async fn settle_with_notes<R>(
118        &self,
119        rng: &mut R,
120        notes: Vec<ArkNote>,
121    ) -> Result<Option<Txid>, Error>
122    where
123        R: Rng + CryptoRng + Clone,
124    {
125        let (to_address, _) = self.get_offchain_address()?;
126
127        let (boarding_inputs, vtxo_inputs, mut total_amount) =
128            self.fetch_commitment_transaction_inputs().await?;
129
130        // Convert arknotes to intent inputs and add their value to total
131        let note_inputs: Vec<intent::Input> = notes
132            .iter()
133            .map(|note| {
134                total_amount += note.value();
135                note.to_intent_input()
136            })
137            .collect::<Result<Vec<_>, _>>()?;
138
139        // Combine VTXO inputs with note inputs
140        let all_vtxo_inputs: Vec<intent::Input> =
141            vtxo_inputs.into_iter().chain(note_inputs).collect();
142
143        tracing::debug!(
144            offchain_address = %to_address.encode(),
145            ?boarding_inputs,
146            num_vtxo_inputs = all_vtxo_inputs.len(),
147            num_notes = notes.len(),
148            %total_amount,
149            "Attempting to settle outputs with notes"
150        );
151
152        if boarding_inputs.is_empty() && all_vtxo_inputs.is_empty() {
153            tracing::debug!("No inputs to settle");
154            return Ok(None);
155        }
156
157        let join_next_batch = || async {
158            self.join_next_batch(
159                &mut rng.clone(),
160                boarding_inputs.clone(),
161                all_vtxo_inputs.clone(),
162                BatchOutputType::Board {
163                    to_address,
164                    to_amount: total_amount,
165                },
166            )
167            .await
168        };
169
170        let commitment_txid = join_next_batch
171            .retry(ExponentialBuilder::default().with_max_times(0))
172            .sleep(sleep)
173            .notify(|err: &Error, dur: std::time::Duration| {
174                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
175            })
176            .await
177            .context("Failed to join batch")?;
178
179        tracing::info!(%commitment_txid, num_notes = notes.len(), "Settlement with notes success");
180
181        Ok(Some(commitment_txid))
182    }
183
184    /// Settle specific VTXOs and boarding outputs by outpoint into the next batch, generating new
185    /// confirmed VTXOs.
186    ///
187    /// Unlike [`Self::settle`], this method allows the caller to specify exactly which VTXOs and
188    /// boarding outputs to settle by providing their outpoints.
189    pub async fn settle_vtxos<R>(
190        &self,
191        rng: &mut R,
192        vtxo_outpoints: &[OutPoint],
193        boarding_outpoints: &[OutPoint],
194    ) -> Result<Option<Txid>, Error>
195    where
196        R: Rng + CryptoRng + Clone,
197    {
198        // Get off-chain address and send all funds to this address, no change output.
199        let (to_address, _) = self.get_offchain_address()?;
200
201        let (all_boarding_inputs, all_vtxo_inputs, _) =
202            self.fetch_commitment_transaction_inputs().await?;
203
204        // Filter boarding inputs to only those specified.
205        let boarding_inputs: Vec<_> = all_boarding_inputs
206            .into_iter()
207            .filter(|input| boarding_outpoints.contains(&input.outpoint()))
208            .collect();
209
210        // Filter VTXO inputs to only those specified.
211        let vtxo_inputs: Vec<_> = all_vtxo_inputs
212            .into_iter()
213            .filter(|input| vtxo_outpoints.contains(&input.outpoint()))
214            .collect();
215
216        // Recalculate total amount from filtered inputs.
217        let total_amount = boarding_inputs
218            .iter()
219            .map(|i| i.amount())
220            .chain(vtxo_inputs.iter().map(|i| i.amount()))
221            .fold(Amount::ZERO, |acc, a| acc + a);
222
223        tracing::debug!(
224            offchain_address = %to_address.encode(),
225            ?boarding_inputs,
226            ?vtxo_inputs,
227            %total_amount,
228            "Attempting to settle specific outputs"
229        );
230
231        if boarding_inputs.is_empty() && vtxo_inputs.is_empty() {
232            tracing::debug!("No matching inputs to settle");
233            return Ok(None);
234        }
235
236        let join_next_batch = || async {
237            self.join_next_batch(
238                &mut rng.clone(),
239                boarding_inputs.clone(),
240                vtxo_inputs.clone(),
241                BatchOutputType::Board {
242                    to_address,
243                    to_amount: total_amount,
244                },
245            )
246            .await
247        };
248
249        // Joining a batch can fail depending on the timing, so we try a few times.
250        let commitment_txid = join_next_batch
251            .retry(ExponentialBuilder::default().with_max_times(0))
252            .sleep(sleep)
253            .notify(|err: &Error, dur: std::time::Duration| {
254                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}",);
255            })
256            .await
257            .context("Failed to join batch")?;
258
259        tracing::info!(%commitment_txid, "Settlement of specific VTXOs success");
260
261        Ok(Some(commitment_txid))
262    }
263
264    /// Settle _some_ prior VTXOs and boarding outputs into the next batch, generating UTXOs as
265    /// outputs to a new commitment transaction.
266    pub async fn collaborative_redeem<R>(
267        &self,
268        rng: &mut R,
269        to_address: Address,
270        to_amount: Amount,
271    ) -> Result<Txid, Error>
272    where
273        R: Rng + CryptoRng + Clone,
274    {
275        let (change_address, _) = self.get_offchain_address()?;
276
277        let (boarding_inputs, vtxo_inputs, total_amount) =
278            self.fetch_commitment_transaction_inputs().await?;
279
280        let onchain_fee = self
281            .fee_estimator
282            .eval_onchain_output(ark_fees::Output {
283                amount: to_amount.to_sat(),
284                script: to_address.script_pubkey().to_string(),
285            })
286            .map_err(Error::ad_hoc)?;
287        let onchain_fee = Amount::from_sat(onchain_fee.to_satoshis());
288
289        // Deduct fee from the requested amount.
290        let net_to_amount = to_amount.checked_sub(onchain_fee).ok_or_else(|| {
291            Error::coin_select(
292                "cannot deduct fees from offboard amount ({onchain_fee} > {to_amount})",
293            )
294        })?;
295
296        let change_amount = total_amount.checked_sub(to_amount).ok_or_else(|| {
297            Error::coin_select(format!(
298                "cannot afford to send {to_amount}, only have {total_amount}"
299            ))
300        })?;
301
302        tracing::info!(
303            %to_address,
304            gross_amount = %to_amount,
305            net_amount = %net_to_amount,
306            fee = %onchain_fee,
307            change_address = %change_address.encode(),
308            %change_amount,
309            ?boarding_inputs,
310            "Attempting to collaboratively redeem outputs"
311        );
312
313        let join_next_batch = || async {
314            self.join_next_batch(
315                &mut rng.clone(),
316                boarding_inputs.clone(),
317                vtxo_inputs.clone(),
318                BatchOutputType::OffBoard {
319                    to_address: to_address.clone(),
320                    to_amount: net_to_amount,
321                    change_address,
322                    change_amount,
323                },
324            )
325            .await
326        };
327
328        // Joining a batch can fail depending on the timing, so we try a few times.
329        let commitment_txid = join_next_batch
330            .retry(ExponentialBuilder::default().with_max_times(3))
331            .sleep(sleep)
332            // TODO: Use `when` to only retry certain errors.
333            .notify(|err: &Error, dur: std::time::Duration| {
334                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
335            })
336            .await
337            .context("Failed to join batch")?;
338
339        tracing::info!(%commitment_txid, "Collaborative redeem success");
340
341        Ok(commitment_txid)
342    }
343
344    /// Settle a selection of VTXOs into the next batch, generating UTXOs as
345    /// outputs to a new commitment transaction.
346    pub async fn collaborative_redeem_vtxo_selection<R>(
347        &self,
348        rng: &mut R,
349        input_vtxos: impl Iterator<Item = OutPoint> + Clone,
350        to_address: Address,
351        to_amount: Amount,
352    ) -> Result<Txid, Error>
353    where
354        R: Rng + CryptoRng + Clone,
355    {
356        let (change_address, _) = self.get_offchain_address()?;
357
358        let (vtxo_list, script_pubkey_to_vtxo_map) =
359            self.list_vtxos().await.context("failed to get VTXO list")?;
360
361        let vtxo_inputs = vtxo_list
362            .all_unspent()
363            .filter(|v| input_vtxos.clone().any(|outpoint| outpoint == v.outpoint))
364            .map(|v| {
365                let vtxo = script_pubkey_to_vtxo_map.get(&v.script).ok_or_else(|| {
366                    ark_core::Error::ad_hoc(format!("missing VTXO for script pubkey: {}", v.script))
367                })?;
368                let spend_info = vtxo.forfeit_spend_info()?;
369
370                Ok(intent::Input::new(
371                    v.outpoint,
372                    vtxo.exit_delay(),
373                    // NOTE: This only works with default VTXOs (single-sig).
374                    None,
375                    TxOut {
376                        value: v.amount,
377                        script_pubkey: vtxo.script_pubkey(),
378                    },
379                    vtxo.tapscripts(),
380                    spend_info,
381                    false,
382                    v.is_swept,
383                ))
384            })
385            .collect::<Result<Vec<_>, Error>>()?;
386
387        if vtxo_inputs.is_empty() {
388            return Err(Error::ad_hoc("no matching VTXO outpoints found"));
389        }
390
391        // Check that total amount is sufficient
392        let total_input_amount = vtxo_inputs
393            .iter()
394            .fold(Amount::ZERO, |acc, vtxo| acc + vtxo.amount());
395
396        let onchain_fee = self
397            .fee_estimator
398            .eval_onchain_output(ark_fees::Output {
399                amount: to_amount.to_sat(),
400                script: to_address.script_pubkey().to_string(),
401            })
402            .map_err(Error::ad_hoc)?;
403        let onchain_fee = Amount::from_sat(onchain_fee.to_satoshis());
404
405        // Deduct fee from the requested amount.
406        let net_to_amount = to_amount.checked_sub(onchain_fee).ok_or_else(|| {
407            Error::coin_select(
408                "cannot deduct fees from offboard amount ({onchain_fee} > {to_amount})",
409            )
410        })?;
411
412        // Check that inputs can cover output + fee.
413        let change_amount = total_input_amount
414            .checked_sub(to_amount)
415            .and_then(|a| a.checked_sub(onchain_fee))
416            .ok_or_else(|| {
417                Error::coin_select(format!(
418                "insufficient VTXO amount: {total_input_amount} (input) < {to_amount} (to_amount) + {onchain_fee} (fee)",
419            ))
420            })?;
421
422        tracing::info!(
423            %to_address,
424            gross_amount = %to_amount,
425            net_amount = %net_to_amount,
426            fee = %onchain_fee,
427            change_address = %change_address.encode(),
428            %change_amount,
429            "Attempting to collaboratively redeem outputs"
430        );
431
432        let join_next_batch = || async {
433            self.join_next_batch(
434                &mut rng.clone(),
435                Vec::new(),
436                vtxo_inputs.clone(),
437                BatchOutputType::OffBoard {
438                    to_address: to_address.clone(),
439                    to_amount: net_to_amount,
440                    change_address,
441                    change_amount,
442                },
443            )
444            .await
445        };
446
447        // Joining a batch can fail depending on the timing, so we try a few times.
448        let commitment_txid = join_next_batch
449            .retry(ExponentialBuilder::default().with_max_times(3))
450            .sleep(sleep)
451            // TODO: Use `when` to only retry certain errors.
452            .notify(|err: &Error, dur: std::time::Duration| {
453                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
454            })
455            .await
456            .context("Failed to join batch")?;
457
458        tracing::info!(%commitment_txid, "Collaborative redeem success");
459
460        Ok(commitment_txid)
461    }
462
463    /// Generate a delegate for settling VTXOs on behalf of the owner.
464    ///
465    /// The owner pre-signs the intent and forfeit transactions, allowing another party to complete
466    /// the settlement at a later time using the provided `delegate_cosigner_pk`.
467    ///
468    /// # Arguments
469    ///
470    /// * `delegate_cosigner_pk` - The cosigner public key that the delegate will use
471    /// * `select_recoverable_vtxos` - Whether to include recoverable VTXOs
472    ///
473    /// # Returns
474    ///
475    /// A [`Delegate`] struct containing all the pre-signed data needed for settlement.
476    pub async fn generate_delegate(
477        &self,
478        delegate_cosigner_pk: PublicKey,
479    ) -> Result<Delegate, Error> {
480        // Get off-chain address and send all funds to this address.
481        let (to_address, _) = self.get_offchain_address()?;
482
483        // Simply collect all VTXOs that can be settled.
484        let (_, vtxo_inputs, _) = self.fetch_commitment_transaction_inputs().await?;
485
486        let total_amount = vtxo_inputs
487            .iter()
488            .fold(Amount::ZERO, |acc, v| acc + v.amount());
489
490        if vtxo_inputs.is_empty() {
491            return Err(Error::ad_hoc("no inputs to settle via delegate"));
492        }
493
494        let server_info = &self.server_info;
495
496        let outputs = vec![intent::Output::Offchain(TxOut {
497            value: total_amount,
498            script_pubkey: to_address.to_p2tr_script_pubkey(),
499        })];
500
501        let delegate = batch::prepare_delegate_psbts(
502            vtxo_inputs,
503            outputs,
504            delegate_cosigner_pk,
505            &server_info.forfeit_address,
506            server_info.dust,
507        )?;
508
509        Ok(delegate)
510    }
511
512    /// Sign a set of delegate PSBTs, including the intent PSBT and the forfeit PSBTs.
513    pub fn sign_delegate_psbts(
514        &self,
515        intent_psbt: &mut Psbt,
516        forfeit_psbts: &mut [Psbt],
517    ) -> Result<(), Error> {
518        let sign_fn =
519            |input: &mut psbt::Input,
520             msg: secp256k1::Message|
521             -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error> {
522                match &input.witness_script {
523                    None => Err(ark_core::Error::ad_hoc(
524                        "Missing witness script for psbt::Input",
525                    )),
526                    Some(script) => {
527                        let mut res = vec![];
528                        let pks = extract_checksig_pubkeys(script);
529                        for pk in pks {
530                            if let Ok(keypair) = self.keypair_by_pk(&pk) {
531                                let sig = Secp256k1::new().sign_schnorr_no_aux_rand(&msg, &keypair);
532                                let pk = keypair.x_only_public_key().0;
533                                res.push((sig, pk));
534                            }
535                        }
536                        Ok(res)
537                    }
538                }
539            };
540
541        batch::sign_delegate_psbts(sign_fn, intent_psbt, forfeit_psbts)?;
542
543        Ok(())
544    }
545
546    /// Settle a delegate by completing the batch protocol using pre-signed data.
547    ///
548    /// This method allows Bob to settle Alice's VTXOs using the pre-signed intent and forfeit
549    /// transactions from the [`Delegate`] struct.
550    ///
551    /// # Arguments
552    ///
553    /// * `rng` - Random number generator for nonce generation
554    /// * `delegate` - The delegate struct containing pre-signed data
555    /// * `own_cosigner_kp` - Bob's cosigner keypair (must match the delegate_cosigner_pk)
556    ///
557    /// # Returns
558    ///
559    /// The commitment transaction ID if successful.
560    pub async fn settle_delegate<R>(
561        &self,
562        rng: &mut R,
563        delegate: Delegate,
564        own_cosigner_kp: Keypair,
565    ) -> Result<Txid, Error>
566    where
567        R: Rng + CryptoRng,
568    {
569        // Verify the cosigner key matches
570        if own_cosigner_kp.public_key() != delegate.delegate_cosigner_pk {
571            return Err(Error::ad_hoc(
572                "provided cosigner keypair does not match delegate_cosigner_pk",
573            ));
574        }
575
576        // Register the pre-signed intent
577        let intent_id = timeout_op(
578            self.inner.timeout,
579            self.network_client()
580                .register_intent(delegate.intent.clone()),
581        )
582        .await
583        .context("failed to register delegated intent")??;
584
585        tracing::debug!(intent_id, "Registered delegated intent");
586
587        let network_client = self.network_client();
588        let server_info = &self.server_info;
589
590        #[derive(Debug, PartialEq, Eq)]
591        enum Step {
592            Start,
593            BatchStarted,
594            BatchSigningStarted,
595            Finalized,
596        }
597
598        impl Step {
599            fn next(&self) -> Step {
600                match self {
601                    Step::Start => Step::BatchStarted,
602                    Step::BatchStarted => Step::BatchSigningStarted,
603                    Step::BatchSigningStarted => Step::Finalized,
604                    Step::Finalized => Step::Finalized,
605                }
606            }
607        }
608
609        let mut step = Step::Start;
610
611        let own_cosigner_kps = [own_cosigner_kp];
612        let own_cosigner_pks = own_cosigner_kps
613            .iter()
614            .map(|k| k.public_key())
615            .collect::<Vec<_>>();
616
617        let mut batch_id: Option<String> = None;
618
619        let vtxo_input_outpoints = delegate
620            .forfeit_psbts
621            .iter()
622            .map(|psbt| psbt.unsigned_tx.input[0].previous_output)
623            .collect::<Vec<_>>();
624
625        let topics = vtxo_input_outpoints
626            .iter()
627            .map(ToString::to_string)
628            .chain(
629                own_cosigner_pks
630                    .iter()
631                    .map(|pk| pk.serialize().to_lower_hex_string()),
632            )
633            .collect();
634
635        let mut stream = network_client.get_event_stream(topics).await?;
636
637        let (ark_forfeit_pk, _) = server_info.forfeit_pk.x_only_public_key();
638
639        let mut unsigned_commitment_tx = None;
640
641        let mut vtxo_graph_chunks = Some(Vec::new());
642        let mut vtxo_graph: Option<TxGraph> = None;
643
644        let mut connectors_graph_chunks = Some(Vec::new());
645        let mut batch_expiry = None;
646
647        let mut agg_nonce_pks = HashMap::new();
648
649        let mut our_nonce_trees: Option<HashMap<Keypair, NonceKps>> = None;
650
651        loop {
652            match stream.next().await {
653                Some(Ok(event)) => match event {
654                    StreamEvent::BatchStarted(e) => {
655                        if step != Step::Start {
656                            continue;
657                        }
658
659                        let hash = sha256::Hash::hash(intent_id.as_bytes());
660                        let hash = hash.as_byte_array().to_vec().to_lower_hex_string();
661
662                        if e.intent_id_hashes.iter().any(|h| h == &hash) {
663                            timeout_op(
664                                self.inner.timeout,
665                                self.network_client()
666                                    .confirm_registration(intent_id.clone()),
667                            )
668                            .await
669                            .context("failed to confirm intent registration")??;
670
671                            tracing::info!(batch_id = e.id, intent_id, "Intent ID found for batch");
672
673                            batch_id = Some(e.id);
674
675                            step = Step::BatchStarted;
676
677                            batch_expiry = Some(e.batch_expiry);
678                        } else {
679                            tracing::debug!(
680                                batch_id = e.id,
681                                intent_id,
682                                "Intent ID not found for batch"
683                            );
684                        }
685                    }
686                    StreamEvent::TreeTx(e) => {
687                        if step != Step::BatchStarted && step != Step::BatchSigningStarted {
688                            continue;
689                        }
690
691                        match e.batch_tree_event_type {
692                            BatchTreeEventType::Vtxo => {
693                                match &mut vtxo_graph_chunks {
694                                    Some(vtxo_graph_chunks) => {
695                                        tracing::debug!("Got new VTXO graph chunk");
696
697                                        vtxo_graph_chunks.push(e.tx_graph_chunk)
698                                    }
699                                    None => {
700                                        return Err(Error::ark_server(
701                                            "received unexpected VTXO graph chunk",
702                                        ));
703                                    }
704                                };
705                            }
706                            BatchTreeEventType::Connector => {
707                                match connectors_graph_chunks {
708                                    Some(ref mut connectors_graph_chunks) => {
709                                        tracing::debug!("Got new connectors graph chunk");
710
711                                        connectors_graph_chunks.push(e.tx_graph_chunk)
712                                    }
713                                    None => {
714                                        return Err(Error::ark_server(
715                                            "received unexpected connectors graph chunk",
716                                        ));
717                                    }
718                                };
719                            }
720                        }
721                    }
722                    StreamEvent::TreeSignature(e) => {
723                        if step != Step::BatchSigningStarted {
724                            continue;
725                        }
726
727                        match e.batch_tree_event_type {
728                            BatchTreeEventType::Vtxo => {
729                                match vtxo_graph {
730                                    Some(ref mut vtxo_graph) => {
731                                        vtxo_graph.apply(|graph| {
732                                            if graph.root().unsigned_tx.compute_txid() != e.txid {
733                                                Ok(true)
734                                            } else {
735                                                graph.set_signature(e.signature);
736
737                                                Ok(false)
738                                            }
739                                        })?;
740                                    }
741                                    None => {
742                                        return Err(Error::ark_server(
743                                            "received batch tree signature without TX graph",
744                                        ));
745                                    }
746                                };
747                            }
748                            BatchTreeEventType::Connector => {
749                                return Err(Error::ark_server(
750                                    "received batch tree signature for connectors tree",
751                                ));
752                            }
753                        }
754                    }
755                    StreamEvent::TreeSigningStarted(e) => {
756                        if step != Step::BatchStarted {
757                            continue;
758                        }
759
760                        let chunks = vtxo_graph_chunks.take().ok_or(Error::ark_server(
761                            "received tree signing started event without VTXO graph chunks",
762                        ))?;
763                        vtxo_graph = Some(
764                            TxGraph::new(chunks)
765                                .map_err(Error::from)
766                                .context("failed to build VTXO graph before generating nonces")?,
767                        );
768
769                        tracing::info!(batch_id = e.id, "Batch signing started");
770
771                        for own_cosigner_pk in own_cosigner_pks.iter() {
772                            if !&e.cosigners_pubkeys.iter().any(|p| p == own_cosigner_pk) {
773                                return Err(Error::ark_server(format!(
774                                    "own cosigner PK is not present in cosigner PKs: {own_cosigner_pk}"
775                                )));
776                            }
777                        }
778
779                        let mut our_nonce_tree_map = HashMap::new();
780                        for own_cosigner_kp in own_cosigner_kps {
781                            let own_cosigner_pk = own_cosigner_kp.public_key();
782                            let nonce_tree = generate_nonce_tree(
783                                rng,
784                                vtxo_graph.as_ref().expect("VTXO graph"),
785                                own_cosigner_pk,
786                                &e.unsigned_commitment_tx,
787                            )
788                            .map_err(Error::from)
789                            .context("failed to generate VTXO nonce tree")?;
790
791                            tracing::info!(
792                                cosigner_pk = %own_cosigner_pk,
793                                "Submitting nonce tree for cosigner PK"
794                            );
795
796                            network_client
797                                .submit_tree_nonces(
798                                    &e.id,
799                                    own_cosigner_pk,
800                                    nonce_tree.to_nonce_pks(),
801                                )
802                                .await
803                                .map_err(Error::ark_server)
804                                .context("failed to submit VTXO nonce tree")?;
805
806                            our_nonce_tree_map.insert(own_cosigner_kp, nonce_tree);
807                        }
808
809                        unsigned_commitment_tx = Some(e.unsigned_commitment_tx);
810                        our_nonce_trees = Some(our_nonce_tree_map);
811
812                        step = step.next();
813                    }
814                    StreamEvent::TreeNonces(e) => {
815                        if step != Step::BatchSigningStarted {
816                            continue;
817                        }
818
819                        let tree_tx_nonce_pks = e.nonces;
820
821                        let cosigner_pk = match tree_tx_nonce_pks.0.iter().find(|(pk, _)| {
822                            own_cosigner_pks
823                                .iter()
824                                .any(|p| &&p.x_only_public_key().0 == pk)
825                        }) {
826                            Some((pk, _)) => *pk,
827                            None => {
828                                tracing::debug!(
829                                    batch_id = e.id,
830                                    txid = %e.txid,
831                                    "Received irrelevant TreeNonces event"
832                                );
833
834                                continue;
835                            }
836                        };
837
838                        tracing::debug!(
839                            batch_id = e.id,
840                            txid = %e.txid,
841                            %cosigner_pk,
842                            "Received TreeNonces event"
843                        );
844
845                        let agg_nonce_pk = aggregate_nonces(tree_tx_nonce_pks);
846
847                        agg_nonce_pks.insert(e.txid, agg_nonce_pk);
848
849                        if vtxo_graph.is_none() {
850                            let chunks = vtxo_graph_chunks.take().ok_or(Error::ark_server(
851                                "received tree nonces event without VTXO graph chunks",
852                            ))?;
853                            vtxo_graph = Some(
854                                TxGraph::new(chunks)
855                                    .map_err(Error::from)
856                                    .context("failed to build VTXO graph before tree signing")?,
857                            );
858                        }
859                        let vtxo_graph_ref = vtxo_graph.as_ref().expect("just populated");
860
861                        if agg_nonce_pks.len() == vtxo_graph_ref.nb_of_nodes() {
862                            let cosigner_kp = own_cosigner_kps
863                                .iter()
864                                .find(|kp| kp.public_key().x_only_public_key().0 == cosigner_pk)
865                                .ok_or_else(|| {
866                                    Error::ad_hoc("no cosigner keypair to sign for own PK")
867                                })?;
868
869                            let our_nonce_trees = our_nonce_trees.as_mut().ok_or(
870                                Error::ark_server("missing nonce trees during batch protocol"),
871                            )?;
872
873                            let our_nonce_tree =
874                                our_nonce_trees
875                                    .get_mut(cosigner_kp)
876                                    .ok_or(Error::ark_server(
877                                        "missing nonce tree during batch protocol",
878                                    ))?;
879
880                            let unsigned_commitment_tx = unsigned_commitment_tx
881                                .as_ref()
882                                .ok_or_else(|| Error::ad_hoc("missing commitment TX"))?;
883
884                            let batch_expiry = batch_expiry
885                                .ok_or_else(|| Error::ad_hoc("missing batch expiry"))?;
886
887                            let mut partial_sig_tree = PartialSigTree::default();
888                            for (txid, _) in vtxo_graph_ref.as_map() {
889                                let agg_nonce_pk = agg_nonce_pks.get(&txid).ok_or_else(|| {
890                                    Error::ad_hoc(format!(
891                                        "missing aggregated nonce PK for TX {txid}"
892                                    ))
893                                })?;
894
895                                let sigs = sign_batch_tree_tx(
896                                    txid,
897                                    batch_expiry,
898                                    ark_forfeit_pk,
899                                    cosigner_kp,
900                                    *agg_nonce_pk,
901                                    vtxo_graph_ref,
902                                    unsigned_commitment_tx,
903                                    our_nonce_tree,
904                                )
905                                .map_err(Error::from)
906                                .context("failed to sign VTXO tree")?;
907
908                                partial_sig_tree.0.extend(sigs.0);
909                            }
910
911                            network_client
912                                .submit_tree_signatures(
913                                    &e.id,
914                                    cosigner_kp.public_key(),
915                                    partial_sig_tree,
916                                )
917                                .await
918                                .map_err(Error::ark_server)
919                                .context("failed to submit VTXO tree signatures")?;
920                        }
921                    }
922                    StreamEvent::TreeNoncesAggregated(e) => {
923                        tracing::debug!(batch_id = e.id, "Batch combined nonces generated");
924                    }
925                    StreamEvent::BatchFinalization(e) => {
926                        if step != Step::BatchSigningStarted {
927                            continue;
928                        }
929
930                        tracing::debug!(
931                            commitment_txid = %e.commitment_tx.unsigned_tx.compute_txid(),
932                            "Batch finalization started (delegate)"
933                        );
934
935                        let chunks = connectors_graph_chunks.take().ok_or(Error::ark_server(
936                            "received batch finalization event without connectors",
937                        ))?;
938
939                        if chunks.is_empty() {
940                            tracing::debug!(batch_id = e.id, "No delegated forfeit transactions");
941                        } else {
942                            let connectors_graph = TxGraph::new(chunks)
943                                .map_err(Error::from)
944                                .context(
945                                "failed to build connectors graph before completing forfeit TXs",
946                            )?;
947
948                            tracing::debug!(
949                                batch_id = e.id,
950                                "Completing delegated forfeit transactions"
951                            );
952
953                            let signed_forfeit_psbts = complete_delegate_forfeit_txs(
954                                &delegate.forfeit_psbts,
955                                &connectors_graph.leaves(),
956                            )?;
957
958                            network_client
959                                .submit_signed_forfeit_txs(signed_forfeit_psbts, None)
960                                .await?;
961                        }
962
963                        step = step.next();
964                    }
965                    StreamEvent::BatchFinalized(e) => {
966                        if step != Step::Finalized {
967                            continue;
968                        }
969
970                        let commitment_txid = e.commitment_txid;
971
972                        tracing::info!(batch_id = e.id, %commitment_txid, "Delegated batch finalized");
973
974                        return Ok(commitment_txid);
975                    }
976                    StreamEvent::BatchFailed(ref e) => {
977                        if Some(&e.id) == batch_id.as_ref() {
978                            return Err(Error::ark_server(format!(
979                                "batch failed {}: {}",
980                                e.id, e.reason
981                            )));
982                        }
983
984                        tracing::debug!("Unrelated batch failed: {e:?}");
985                    }
986                    StreamEvent::Heartbeat => {}
987                },
988                Some(Err(e)) => {
989                    tracing::error!("Got error from event stream");
990
991                    return Err(Error::ark_server(e));
992                }
993                None => {
994                    return Err(Error::ark_server("dropped batch event stream"));
995                }
996            }
997        }
998    }
999
1000    /// Get all the [`batch::OnChainInput`]s and [`batch::VtxoInput`]s that can be used to join an
1001    /// upcoming batch.
1002    pub(crate) async fn fetch_commitment_transaction_inputs(
1003        &self,
1004    ) -> Result<(Vec<batch::OnChainInput>, Vec<intent::Input>, Amount), Error> {
1005        // Get all known boarding outputs.
1006        let boarding_outputs = self.inner.wallet.get_boarding_outputs()?;
1007
1008        let mut boarding_inputs: Vec<batch::OnChainInput> = Vec::new();
1009        let mut total_amount = Amount::ZERO;
1010
1011        // To track unique outpoints and prevent duplicates
1012        let mut seen_outpoints = std::collections::HashSet::new();
1013
1014        let now = Timestamp::now();
1015
1016        // Find outpoints for each boarding output.
1017        for boarding_output in boarding_outputs {
1018            let outpoints = timeout_op(
1019                self.inner.timeout,
1020                self.blockchain().find_outpoints(boarding_output.address()),
1021            )
1022            .await
1023            .context("failed to find outpoints")??;
1024
1025            for o in outpoints.iter() {
1026                if let ExplorerUtxo {
1027                    outpoint,
1028                    amount,
1029                    confirmation_blocktime: Some(confirmation_blocktime),
1030                    is_spent: false,
1031                } = o
1032                {
1033                    // Check for duplicate outpoints
1034                    if seen_outpoints.contains(outpoint) {
1035                        continue;
1036                    }
1037
1038                    // Only include confirmed boarding outputs with an _inactive_ exit path.
1039                    if !boarding_output.can_be_claimed_unilaterally_by_owner(
1040                        now.as_duration().try_into().map_err(Error::ad_hoc)?,
1041                        std::time::Duration::from_secs(*confirmation_blocktime),
1042                    ) {
1043                        // Mark this outpoint as seen
1044                        seen_outpoints.insert(*outpoint);
1045
1046                        boarding_inputs.push(batch::OnChainInput::new(
1047                            boarding_output.clone(),
1048                            *amount,
1049                            *outpoint,
1050                        ));
1051                        total_amount += *amount;
1052                    }
1053                }
1054            }
1055        }
1056
1057        let (vtxo_list, script_pubkey_to_vtxo_map) = self.list_vtxos().await?;
1058
1059        total_amount += vtxo_list
1060            .all_unspent()
1061            .fold(Amount::ZERO, |acc, vtxo| acc + vtxo.amount);
1062
1063        let vtxo_inputs = vtxo_list
1064            .all_unspent()
1065            .map(|virtual_tx_outpoint| {
1066                let vtxo = script_pubkey_to_vtxo_map
1067                    .get(&virtual_tx_outpoint.script)
1068                    .ok_or_else(|| {
1069                        ark_core::Error::ad_hoc(format!(
1070                            "missing VTXO for script pubkey: {}",
1071                            virtual_tx_outpoint.script
1072                        ))
1073                    })?;
1074                let spend_info = vtxo.forfeit_spend_info()?;
1075
1076                Ok(intent::Input::new(
1077                    virtual_tx_outpoint.outpoint,
1078                    vtxo.exit_delay(),
1079                    None,
1080                    TxOut {
1081                        value: virtual_tx_outpoint.amount,
1082                        script_pubkey: vtxo.script_pubkey(),
1083                    },
1084                    vtxo.tapscripts(),
1085                    spend_info,
1086                    false,
1087                    virtual_tx_outpoint.is_swept,
1088                ))
1089            })
1090            .collect::<Result<Vec<_>, ark_core::Error>>()?;
1091
1092        Ok((boarding_inputs, vtxo_inputs, total_amount))
1093    }
1094
1095    /// Prepare an intent for batch registration or fee estimation.
1096    ///
1097    /// This creates a signed intent PSBT along with all the data needed to participate
1098    /// in the batch protocol.
1099    pub(crate) fn prepare_intent<R>(
1100        &self,
1101        rng: &mut R,
1102        onchain_inputs: Vec<batch::OnChainInput>,
1103        vtxo_inputs: Vec<intent::Input>,
1104        output_type: BatchOutputType,
1105        intent_kind: PrepareIntentKind,
1106    ) -> Result<PreparedIntent, Error>
1107    where
1108        R: Rng + CryptoRng,
1109    {
1110        if onchain_inputs.is_empty() && vtxo_inputs.is_empty() {
1111            return Err(Error::ad_hoc("cannot prepare intent without inputs"));
1112        }
1113
1114        // Generate an (ephemeral) cosigner keypair.
1115        let cosigner_keypair = Keypair::new(self.secp(), rng);
1116
1117        let vtxo_input_outpoints = vtxo_inputs.iter().map(|i| i.outpoint()).collect::<Vec<_>>();
1118
1119        let inputs = {
1120            let boarding_inputs = onchain_inputs.clone().into_iter().map(|o| {
1121                intent::Input::new(
1122                    o.outpoint(),
1123                    o.boarding_output().exit_delay(),
1124                    None,
1125                    TxOut {
1126                        value: o.amount(),
1127                        script_pubkey: o.boarding_output().script_pubkey(),
1128                    },
1129                    o.boarding_output().tapscripts(),
1130                    o.boarding_output().forfeit_spend_info(),
1131                    true,
1132                    false,
1133                )
1134            });
1135
1136            boarding_inputs
1137                .chain(vtxo_inputs.clone())
1138                .collect::<Vec<_>>()
1139        };
1140
1141        let dust = self.server_info.dust;
1142
1143        let mut outputs = vec![];
1144
1145        match output_type {
1146            BatchOutputType::Board {
1147                to_address,
1148                to_amount,
1149            } => {
1150                if to_amount < self.server_info.dust {
1151                    return Err(Error::ad_hoc(format!(
1152                        "cannot settle into sub-dust VTXO: {to_amount} < {dust}"
1153                    )));
1154                }
1155
1156                outputs.push(intent::Output::Offchain(TxOut {
1157                    value: to_amount,
1158                    script_pubkey: to_address.to_p2tr_script_pubkey(),
1159                }));
1160            }
1161            BatchOutputType::OffBoard {
1162                to_address,
1163                to_amount,
1164                change_amount,
1165                ..
1166            } if change_amount == Amount::ZERO => {
1167                outputs.push(intent::Output::Onchain(TxOut {
1168                    value: to_amount,
1169                    script_pubkey: to_address.script_pubkey(),
1170                }));
1171            }
1172            BatchOutputType::OffBoard {
1173                to_address,
1174                to_amount,
1175                change_address,
1176                change_amount,
1177            } => {
1178                if change_amount < dust {
1179                    return Err(Error::ad_hoc(format!(
1180                        "cannot settle with sub-dust change VTXO: {change_amount} < {dust}"
1181                    )));
1182                }
1183
1184                outputs.push(intent::Output::Onchain(TxOut {
1185                    value: to_amount,
1186                    script_pubkey: to_address.script_pubkey(),
1187                }));
1188
1189                outputs.push(intent::Output::Offchain(TxOut {
1190                    value: change_amount,
1191                    script_pubkey: change_address.to_p2tr_script_pubkey(),
1192                }));
1193            }
1194        }
1195
1196        let cosigner_pk = cosigner_keypair.public_key();
1197
1198        let secp = Secp256k1::new();
1199
1200        let sign_for_vtxo_fn =
1201            |input: &mut psbt::Input,
1202             msg: secp256k1::Message|
1203             -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error> {
1204                match &input.witness_script {
1205                    None => Err(ark_core::Error::ad_hoc(
1206                        "Missing witness script in psbt::Input when signing intent",
1207                    )),
1208                    Some(script) => {
1209                        let pks = extract_checksig_pubkeys(script);
1210                        let mut res = vec![];
1211                        for pk in pks {
1212                            if let Ok(keypair) = self.keypair_by_pk(&pk) {
1213                                let sig = secp.sign_schnorr_no_aux_rand(&msg, &keypair);
1214                                res.push((sig, keypair.public_key().into()))
1215                            }
1216                        }
1217                        Ok(res)
1218                    }
1219                }
1220            };
1221
1222        let sign_for_onchain_fn =
1223            |input: &mut psbt::Input,
1224             msg: secp256k1::Message|
1225             -> Result<(schnorr::Signature, XOnlyPublicKey), ark_core::Error> {
1226                let onchain_input = onchain_inputs
1227                    .iter()
1228                    .find(|o| {
1229                        Some(o.boarding_output().script_pubkey())
1230                            == input.witness_utxo.clone().map(|w| w.script_pubkey)
1231                    })
1232                    .ok_or_else(|| {
1233                        ark_core::Error::ad_hoc(
1234                            "could not find signing key for onchain input: {input:?}",
1235                        )
1236                    })?;
1237
1238                let owner_pk = onchain_input.boarding_output().owner_pk();
1239                let sig = self
1240                    .inner
1241                    .wallet
1242                    .sign_for_pk(&owner_pk, &msg)
1243                    .map_err(|e| ark_core::Error::ad_hoc(e.to_string()))?;
1244
1245                Ok((sig, owner_pk))
1246            };
1247
1248        let now = std::time::SystemTime::now()
1249            .duration_since(std::time::UNIX_EPOCH)
1250            .map_err(|e| Error::ad_hoc(e.to_string()))
1251            .context("failed to compute now timestamp")?;
1252        let now = now.as_secs();
1253        let expire_at = now + (2 * 60);
1254
1255        let mut onchain_output_indexes = Vec::new();
1256        for (i, output) in outputs.iter().enumerate() {
1257            if matches!(output, intent::Output::Onchain(_)) {
1258                onchain_output_indexes.push(i);
1259            }
1260        }
1261
1262        let message = match intent_kind {
1263            PrepareIntentKind::EstimateFee => intent::IntentMessage::EstimateIntentFee {
1264                onchain_output_indexes,
1265                valid_at: now,
1266                expire_at,
1267                own_cosigner_pks: vec![cosigner_pk],
1268            },
1269            PrepareIntentKind::Register => intent::IntentMessage::Register {
1270                onchain_output_indexes,
1271                valid_at: now,
1272                expire_at,
1273                own_cosigner_pks: vec![cosigner_pk],
1274            },
1275        };
1276
1277        let intent = intent::make_intent(
1278            sign_for_vtxo_fn,
1279            sign_for_onchain_fn,
1280            inputs,
1281            outputs.clone(),
1282            message,
1283        )?;
1284
1285        Ok(PreparedIntent {
1286            intent,
1287            cosigner_keypair,
1288            vtxo_input_outpoints,
1289            outputs,
1290            onchain_inputs,
1291            vtxo_inputs,
1292        })
1293    }
1294
1295    pub(crate) async fn join_next_batch<R>(
1296        &self,
1297        rng: &mut R,
1298        onchain_inputs: Vec<batch::OnChainInput>,
1299        vtxo_inputs: Vec<intent::Input>,
1300        output_type: BatchOutputType,
1301    ) -> Result<Txid, Error>
1302    where
1303        R: Rng + CryptoRng,
1304    {
1305        let prepared = self.prepare_intent(
1306            rng,
1307            onchain_inputs,
1308            vtxo_inputs,
1309            output_type,
1310            PrepareIntentKind::Register,
1311        )?;
1312
1313        let PreparedIntent {
1314            intent,
1315            cosigner_keypair,
1316            vtxo_input_outpoints,
1317            outputs,
1318            onchain_inputs,
1319            vtxo_inputs,
1320        } = prepared;
1321
1322        let onchain_input_outpoints = onchain_inputs
1323            .iter()
1324            .map(|i| i.outpoint())
1325            .collect::<Vec<_>>();
1326
1327        let server_info = &self.server_info;
1328
1329        let own_cosigner_kps = [cosigner_keypair];
1330        let own_cosigner_pks = own_cosigner_kps
1331            .iter()
1332            .map(|k| k.public_key())
1333            .collect::<Vec<_>>();
1334
1335        let secp = Secp256k1::new();
1336
1337        let mut step = Step::Start;
1338
1339        let intent_id = timeout_op(
1340            self.inner.timeout,
1341            self.network_client().register_intent(intent),
1342        )
1343        .await
1344        .context("failed to register intent")??;
1345
1346        tracing::debug!(
1347            intent_id,
1348            ?onchain_input_outpoints,
1349            ?vtxo_input_outpoints,
1350            ?outputs,
1351            "Registered intent for batch"
1352        );
1353
1354        let network_client = self.network_client();
1355
1356        let mut batch_id: Option<String> = None;
1357
1358        let topics = vtxo_input_outpoints
1359            .iter()
1360            .map(ToString::to_string)
1361            .chain(
1362                own_cosigner_pks
1363                    .iter()
1364                    .map(|pk| pk.serialize().to_lower_hex_string()),
1365            )
1366            .collect();
1367
1368        let mut stream = network_client.get_event_stream(topics).await?;
1369
1370        let (ark_forfeit_pk, _) = server_info.forfeit_pk.x_only_public_key();
1371
1372        let mut unsigned_commitment_tx = None;
1373
1374        let mut vtxo_graph_chunks = Some(Vec::new());
1375        let mut vtxo_graph: Option<TxGraph> = None;
1376
1377        let mut connectors_graph_chunks = Some(Vec::new());
1378        let mut batch_expiry = None;
1379
1380        let mut agg_nonce_pks = HashMap::new();
1381
1382        let mut our_nonce_trees: Option<HashMap<Keypair, NonceKps>> = None;
1383        loop {
1384            match stream.next().await {
1385                Some(Ok(event)) => match event {
1386                    StreamEvent::BatchStarted(e) => {
1387                        if step != Step::Start {
1388                            continue;
1389                        }
1390
1391                        let hash = sha256::Hash::hash(intent_id.as_bytes());
1392                        let hash = hash.as_byte_array().to_vec().to_lower_hex_string();
1393
1394                        if e.intent_id_hashes.iter().any(|h| h == &hash) {
1395                            timeout_op(
1396                                self.inner.timeout,
1397                                self.network_client()
1398                                    .confirm_registration(intent_id.clone()),
1399                            )
1400                            .await
1401                            .context("failed to confirm intent registration")??;
1402
1403                            tracing::info!(batch_id = e.id, intent_id, "Intent ID found for batch");
1404
1405                            batch_id = Some(e.id);
1406
1407                            // Depending on whether we are generating new VTXOs or not, we continue
1408                            // with a different step in the state machine.
1409                            step = match outputs
1410                                .iter()
1411                                .any(|o| matches!(o, intent::Output::Offchain(_)))
1412                            {
1413                                true => Step::BatchStarted,
1414                                false => Step::BatchSigningStarted,
1415                            };
1416
1417                            batch_expiry = Some(e.batch_expiry);
1418                        } else {
1419                            tracing::debug!(
1420                                batch_id = e.id,
1421                                intent_id,
1422                                "Intent ID not found for batch"
1423                            );
1424                        }
1425                    }
1426                    StreamEvent::TreeTx(e) => {
1427                        if step != Step::BatchStarted && step != Step::BatchSigningStarted {
1428                            continue;
1429                        }
1430
1431                        match e.batch_tree_event_type {
1432                            BatchTreeEventType::Vtxo => {
1433                                match &mut vtxo_graph_chunks {
1434                                    Some(vtxo_graph_chunks) => {
1435                                        tracing::debug!("Got new VTXO graph chunk");
1436
1437                                        vtxo_graph_chunks.push(e.tx_graph_chunk)
1438                                    }
1439                                    None => {
1440                                        return Err(Error::ark_server(
1441                                            "received unexpected VTXO graph chunk",
1442                                        ));
1443                                    }
1444                                };
1445                            }
1446                            BatchTreeEventType::Connector => {
1447                                match connectors_graph_chunks {
1448                                    Some(ref mut connectors_graph_chunks) => {
1449                                        tracing::debug!("Got new connectors graph chunk");
1450
1451                                        connectors_graph_chunks.push(e.tx_graph_chunk)
1452                                    }
1453                                    None => {
1454                                        return Err(Error::ark_server(
1455                                            "received unexpected connectors graph chunk",
1456                                        ));
1457                                    }
1458                                };
1459                            }
1460                        }
1461                    }
1462                    StreamEvent::TreeSignature(e) => {
1463                        if step != Step::BatchSigningStarted {
1464                            continue;
1465                        }
1466
1467                        match e.batch_tree_event_type {
1468                            BatchTreeEventType::Vtxo => {
1469                                match vtxo_graph {
1470                                    Some(ref mut vtxo_graph) => {
1471                                        vtxo_graph.apply(|graph| {
1472                                            if graph.root().unsigned_tx.compute_txid() != e.txid {
1473                                                Ok(true)
1474                                            } else {
1475                                                graph.set_signature(e.signature);
1476
1477                                                Ok(false)
1478                                            }
1479                                        })?;
1480                                    }
1481                                    None => {
1482                                        return Err(Error::ark_server(
1483                                            "received batch tree signature without TX graph",
1484                                        ));
1485                                    }
1486                                };
1487                            }
1488                            BatchTreeEventType::Connector => {
1489                                return Err(Error::ark_server(
1490                                    "received batch tree signature for connectors tree",
1491                                ));
1492                            }
1493                        }
1494                    }
1495                    StreamEvent::TreeSigningStarted(e) => {
1496                        if step != Step::BatchStarted {
1497                            continue;
1498                        }
1499
1500                        let chunks = vtxo_graph_chunks.take().ok_or(Error::ark_server(
1501                            "received tree signing started event without VTXO graph chunks",
1502                        ))?;
1503                        vtxo_graph = Some(
1504                            TxGraph::new(chunks)
1505                                .map_err(Error::from)
1506                                .context("failed to build VTXO graph before generating nonces")?,
1507                        );
1508
1509                        tracing::info!(batch_id = e.id, "Batch signing started");
1510
1511                        for own_cosigner_pk in own_cosigner_pks.iter() {
1512                            if !&e.cosigners_pubkeys.iter().any(|p| p == own_cosigner_pk) {
1513                                return Err(Error::ark_server(format!(
1514                                    "own cosigner PK is not present in cosigner PKs: {own_cosigner_pk}"
1515                                )));
1516                            }
1517                        }
1518
1519                        // We generate and submit a nonce tree for every cosigner key we provide.
1520                        let mut our_nonce_tree_map = HashMap::new();
1521                        for own_cosigner_kp in own_cosigner_kps {
1522                            let own_cosigner_pk = own_cosigner_kp.public_key();
1523                            let nonce_tree = generate_nonce_tree(
1524                                rng,
1525                                vtxo_graph.as_ref().expect("VTXO graph"),
1526                                own_cosigner_pk,
1527                                &e.unsigned_commitment_tx,
1528                            )
1529                            .map_err(Error::from)
1530                            .context("failed to generate VTXO nonce tree")?;
1531
1532                            tracing::info!(
1533                                cosigner_pk = %own_cosigner_pk,
1534                                "Submitting nonce tree for cosigner PK"
1535                            );
1536
1537                            network_client
1538                                .submit_tree_nonces(
1539                                    &e.id,
1540                                    own_cosigner_pk,
1541                                    nonce_tree.to_nonce_pks(),
1542                                )
1543                                .await
1544                                .map_err(Error::ark_server)
1545                                .context("failed to submit VTXO nonce tree")?;
1546
1547                            our_nonce_tree_map.insert(own_cosigner_kp, nonce_tree);
1548                        }
1549
1550                        unsigned_commitment_tx = Some(e.unsigned_commitment_tx);
1551                        our_nonce_trees = Some(our_nonce_tree_map);
1552
1553                        step = step.next();
1554                    }
1555                    StreamEvent::TreeNonces(e) => {
1556                        if step != Step::BatchSigningStarted {
1557                            continue;
1558                        }
1559
1560                        let tree_tx_nonce_pks = e.nonces;
1561
1562                        let cosigner_pk = match tree_tx_nonce_pks.0.iter().find(|(pk, _)| {
1563                            own_cosigner_pks
1564                                .iter()
1565                                .any(|p| &&p.x_only_public_key().0 == pk)
1566                        }) {
1567                            Some((pk, _)) => *pk,
1568                            None => {
1569                                tracing::debug!(
1570                                    batch_id = e.id,
1571                                    txid = %e.txid,
1572                                    "Received irrelevant TreeNonces event"
1573                                );
1574
1575                                continue;
1576                            }
1577                        };
1578
1579                        tracing::debug!(
1580                            batch_id = e.id,
1581                            txid = %e.txid,
1582                            %cosigner_pk,
1583                            "Received TreeNonces event"
1584                        );
1585
1586                        let agg_nonce_pk = aggregate_nonces(tree_tx_nonce_pks);
1587
1588                        agg_nonce_pks.insert(e.txid, agg_nonce_pk);
1589
1590                        if vtxo_graph.is_none() {
1591                            let chunks = vtxo_graph_chunks.take().ok_or(Error::ark_server(
1592                                "received tree nonces event without VTXO graph chunks",
1593                            ))?;
1594                            vtxo_graph = Some(
1595                                TxGraph::new(chunks)
1596                                    .map_err(Error::from)
1597                                    .context("failed to build VTXO graph before tree signing")?,
1598                            );
1599                        }
1600                        let vtxo_graph_ref = vtxo_graph.as_ref().expect("just populated");
1601
1602                        // Once we collect an aggregated nonce per transaction in our VTXO graph, we
1603                        // can go ahead with signing and submitting.
1604                        if agg_nonce_pks.len() == vtxo_graph_ref.nb_of_nodes() {
1605                            let cosigner_kp = own_cosigner_kps
1606                                .iter()
1607                                .find(|kp| kp.public_key().x_only_public_key().0 == cosigner_pk)
1608                                .ok_or_else(|| {
1609                                    Error::ad_hoc("no cosigner keypair to sign for own PK")
1610                                })?;
1611
1612                            let our_nonce_trees = our_nonce_trees.as_mut().ok_or(
1613                                Error::ark_server("missing nonce trees during batch protocol"),
1614                            )?;
1615
1616                            let our_nonce_tree =
1617                                our_nonce_trees
1618                                    .get_mut(cosigner_kp)
1619                                    .ok_or(Error::ark_server(
1620                                        "missing nonce tree during batch protocol",
1621                                    ))?;
1622
1623                            let unsigned_commitment_tx = unsigned_commitment_tx
1624                                .as_ref()
1625                                .ok_or_else(|| Error::ad_hoc("missing commitment TX"))?;
1626
1627                            let batch_expiry = batch_expiry
1628                                .ok_or_else(|| Error::ad_hoc("missing batch expiry"))?;
1629
1630                            let mut partial_sig_tree = PartialSigTree::default();
1631                            for (txid, _) in vtxo_graph_ref.as_map() {
1632                                let agg_nonce_pk = agg_nonce_pks.get(&txid).ok_or_else(|| {
1633                                    Error::ad_hoc(format!(
1634                                        "missing aggregated nonce PK for TX {txid}"
1635                                    ))
1636                                })?;
1637
1638                                let sigs = sign_batch_tree_tx(
1639                                    txid,
1640                                    batch_expiry,
1641                                    ark_forfeit_pk,
1642                                    cosigner_kp,
1643                                    *agg_nonce_pk,
1644                                    vtxo_graph_ref,
1645                                    unsigned_commitment_tx,
1646                                    our_nonce_tree,
1647                                )
1648                                .map_err(Error::from)
1649                                .context("failed to sign VTXO tree")?;
1650
1651                                partial_sig_tree.0.extend(sigs.0);
1652                            }
1653
1654                            network_client
1655                                .submit_tree_signatures(
1656                                    &e.id,
1657                                    cosigner_kp.public_key(),
1658                                    partial_sig_tree,
1659                                )
1660                                .await
1661                                .map_err(Error::ark_server)
1662                                .context("failed to submit VTXO tree signatures")?;
1663                        }
1664                    }
1665                    StreamEvent::TreeNoncesAggregated(e) => {
1666                        tracing::debug!(batch_id = e.id, "Batch combined nonces generated");
1667                    }
1668                    StreamEvent::BatchFinalization(e) => {
1669                        if step != Step::BatchSigningStarted {
1670                            continue;
1671                        }
1672
1673                        tracing::debug!(
1674                            commitment_txid = %e.commitment_tx.unsigned_tx.compute_txid(),
1675                            "Batch finalization started"
1676                        );
1677
1678                        let signed_forfeit_psbts = if !vtxo_inputs.is_empty() {
1679                            let chunks =
1680                                connectors_graph_chunks.take().ok_or(Error::ark_server(
1681                                    "received batch finalization event without connectors",
1682                                ))?;
1683
1684                            if chunks.is_empty() {
1685                                tracing::debug!(batch_id = e.id, "No forfeit transactions");
1686
1687                                Vec::new()
1688                            } else {
1689                                let connectors_graph = TxGraph::new(chunks)
1690                                    .map_err(Error::from)
1691                                    .context(
1692                                    "failed to build connectors graph before signing forfeit TXs",
1693                                )?;
1694
1695                                tracing::debug!(batch_id = e.id, "Batch finalization started");
1696
1697                                create_and_sign_forfeit_txs(
1698                                    |input: &mut psbt::Input, msg: secp256k1::Message| match &input
1699                                    .witness_script
1700                                {
1701                                    None => Err(ark_core::Error::ad_hoc(
1702                                        "Missing witness script in psbt::Input when signing forfeit",
1703                                    )),
1704                                    Some(script) => {
1705                                        let pks = extract_checksig_pubkeys(script);
1706                                        let mut res = vec![];
1707                                        for pk in pks {
1708                                            if let Ok(keypair) =
1709                                            self.keypair_by_pk(&pk) {
1710                                                let sig =
1711                                                    secp.sign_schnorr_no_aux_rand(&msg, &keypair);
1712                                                res.push((sig, keypair.public_key().into()))
1713                                            }
1714                                        }
1715                                        Ok(res)
1716                                    }
1717                                    },
1718                                    vtxo_inputs.as_slice(),
1719                                    &connectors_graph.leaves(),
1720                                    &server_info.forfeit_address,
1721                                    server_info.dust,
1722                                )
1723                                .map_err(Error::from)?
1724                            }
1725                        } else {
1726                            Vec::new()
1727                        };
1728
1729                        let commitment_psbt = if onchain_inputs.is_empty() {
1730                            None
1731                        } else {
1732                            let mut commitment_psbt = e.commitment_tx;
1733
1734                            let sign_for_pk_fn = |pk: &XOnlyPublicKey,
1735                                                  msg: &secp256k1::Message|
1736                             -> Result<
1737                                schnorr::Signature,
1738                                ark_core::Error,
1739                            > {
1740                                self.inner
1741                                    .wallet
1742                                    .sign_for_pk(pk, msg)
1743                                    .map_err(|e| ark_core::Error::ad_hoc(e.to_string()))
1744                            };
1745
1746                            sign_commitment_psbt(
1747                                sign_for_pk_fn,
1748                                &mut commitment_psbt,
1749                                &onchain_inputs,
1750                            )
1751                            .map_err(Error::from)?;
1752
1753                            Some(commitment_psbt)
1754                        };
1755
1756                        if !signed_forfeit_psbts.is_empty() || commitment_psbt.is_some() {
1757                            network_client
1758                                .submit_signed_forfeit_txs(signed_forfeit_psbts, commitment_psbt)
1759                                .await?;
1760                        }
1761
1762                        step = step.next();
1763                    }
1764                    StreamEvent::BatchFinalized(e) => {
1765                        if step != Step::Finalized {
1766                            continue;
1767                        }
1768
1769                        let commitment_txid = e.commitment_txid;
1770
1771                        tracing::info!(batch_id = e.id, %commitment_txid, "Batch finalized");
1772
1773                        return Ok(commitment_txid);
1774                    }
1775                    StreamEvent::BatchFailed(ref e) => {
1776                        if Some(&e.id) == batch_id.as_ref() {
1777                            return Err(Error::ark_server(format!(
1778                                "batch failed {}: {}",
1779                                e.id, e.reason
1780                            )));
1781                        }
1782
1783                        tracing::debug!("Unrelated batch failed: {e:?}");
1784                    }
1785                    StreamEvent::Heartbeat => {}
1786                },
1787                Some(Err(e)) => {
1788                    tracing::error!("Got error from event stream");
1789
1790                    return Err(Error::ark_server(e));
1791                }
1792                None => {
1793                    return Err(Error::ark_server("dropped batch event stream"));
1794                }
1795            }
1796        }
1797
1798        #[derive(Debug, PartialEq, Eq)]
1799        enum Step {
1800            Start,
1801            BatchStarted,
1802            BatchSigningStarted,
1803            Finalized,
1804        }
1805
1806        impl Step {
1807            fn next(&self) -> Step {
1808                match self {
1809                    Step::Start => Step::BatchStarted,
1810                    Step::BatchStarted => Step::BatchSigningStarted,
1811                    Step::BatchSigningStarted => Step::Finalized,
1812                    Step::Finalized => Step::Finalized, // we can't go further
1813                }
1814            }
1815        }
1816    }
1817}
1818
1819#[derive(Debug, Clone)]
1820pub(crate) enum PrepareIntentKind {
1821    Register,
1822    EstimateFee,
1823}
1824
1825#[derive(Debug, Clone)]
1826pub(crate) enum BatchOutputType {
1827    Board {
1828        to_address: ArkAddress,
1829        to_amount: Amount,
1830    },
1831    OffBoard {
1832        to_address: Address,
1833        to_amount: Amount,
1834        change_address: ArkAddress,
1835        change_amount: Amount,
1836    },
1837}
1838
1839/// Prepared intent data ready for batch registration.
1840pub(crate) struct PreparedIntent {
1841    /// The signed intent.
1842    pub intent: intent::Intent,
1843    /// The ephemeral cosigner keypair.
1844    pub cosigner_keypair: Keypair,
1845    /// VTXO input outpoints (used for event stream topics).
1846    pub vtxo_input_outpoints: Vec<OutPoint>,
1847    /// Intent outputs (used to determine batch protocol steps).
1848    pub outputs: Vec<intent::Output>,
1849    /// The original onchain inputs (needed for commitment signing).
1850    pub onchain_inputs: Vec<batch::OnChainInput>,
1851    /// The original VTXO inputs (needed for forfeit signing).
1852    pub vtxo_inputs: Vec<intent::Input>,
1853}