Skip to main content

ark_client/
batch.rs

1use crate::error::ErrorContext as _;
2use crate::swap_storage::SwapStorage;
3use crate::utils::sleep;
4use crate::utils::timeout_op;
5use crate::wallet::BoardingWallet;
6use crate::wallet::OnchainWallet;
7use crate::Blockchain;
8use crate::Client;
9use crate::Error;
10use ark_core::batch;
11use ark_core::batch::aggregate_nonces;
12use ark_core::batch::complete_delegate_forfeit_txs;
13use ark_core::batch::create_and_sign_forfeit_txs;
14use ark_core::batch::create_asset_preservation_packet;
15use ark_core::batch::generate_nonce_tree;
16use ark_core::batch::sign_batch_tree_tx;
17use ark_core::batch::sign_commitment_psbt;
18use ark_core::batch::Delegate;
19use ark_core::batch::NonceKps;
20use ark_core::intent;
21use ark_core::script::extract_checksig_pubkeys;
22use ark_core::server::BatchTreeEventType;
23use ark_core::server::PartialSigTree;
24use ark_core::server::StreamEvent;
25use ark_core::ArkAddress;
26use ark_core::ArkNote;
27use ark_core::ExplorerUtxo;
28use ark_core::TxGraph;
29use backon::ExponentialBuilder;
30use backon::Retryable;
31use bitcoin::hashes::sha256;
32use bitcoin::hashes::Hash;
33use bitcoin::hex::DisplayHex;
34use bitcoin::key::Keypair;
35use bitcoin::key::Secp256k1;
36use bitcoin::psbt;
37use bitcoin::secp256k1;
38use bitcoin::secp256k1::schnorr;
39use bitcoin::secp256k1::PublicKey;
40use bitcoin::Address;
41use bitcoin::Amount;
42use bitcoin::OutPoint;
43use bitcoin::Psbt;
44use bitcoin::TxOut;
45use bitcoin::Txid;
46use bitcoin::XOnlyPublicKey;
47use futures::StreamExt;
48use jiff::Timestamp;
49use rand::CryptoRng;
50use rand::Rng;
51use std::collections::HashMap;
52
53impl<B, W, S, K> Client<B, W, S, K>
54where
55    B: Blockchain,
56    W: BoardingWallet + OnchainWallet,
57    S: SwapStorage + 'static,
58    K: crate::KeyProvider,
59{
60    /// Settle _all_ prior VTXOs and boarding outputs into the next batch, generating new confirmed
61    /// VTXOs.
62    pub async fn settle<R>(&self, rng: &mut R) -> Result<Option<Txid>, Error>
63    where
64        R: Rng + CryptoRng + Clone,
65    {
66        // Get off-chain address and send all funds to this address, no change output 🦄
67        let (to_address, _) = self.get_offchain_address()?;
68
69        let (boarding_inputs, vtxo_inputs, total_amount) =
70            self.fetch_commitment_transaction_inputs().await?;
71
72        tracing::debug!(
73            offchain_adress = %to_address.encode(),
74            ?boarding_inputs,
75            ?vtxo_inputs,
76            "Attempting to settle outputs"
77        );
78
79        if boarding_inputs.is_empty() && vtxo_inputs.is_empty() {
80            tracing::debug!("No inputs to board with");
81            return Ok(None);
82        }
83
84        let join_next_batch = || async {
85            self.join_next_batch(
86                &mut rng.clone(),
87                boarding_inputs.clone(),
88                vtxo_inputs.clone(),
89                BatchOutputType::Board {
90                    to_address,
91                    to_amount: total_amount,
92                },
93            )
94            .await
95        };
96
97        // Joining a batch can fail depending on the timing, so we try a few times.
98        let commitment_txid = join_next_batch
99            .retry(ExponentialBuilder::default().with_max_times(0))
100            .sleep(sleep)
101            // TODO: Use `when` to only retry certain errors.
102            .notify(|err: &Error, dur: std::time::Duration| {
103                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}",);
104            })
105            .await
106            .context("Failed to join batch")?;
107
108        tracing::info!(%commitment_txid, "Settlement success");
109
110        Ok(Some(commitment_txid))
111    }
112
113    /// Settle _all_ prior VTXOs, boarding outputs, and the provided ArkNotes into the next batch.
114    ///
115    /// ArkNotes are bearer tokens that can be redeemed by revealing their preimage.
116    /// This method combines them with regular VTXOs and boarding outputs into a single
117    /// settlement transaction.
118    pub async fn settle_with_notes<R>(
119        &self,
120        rng: &mut R,
121        notes: Vec<ArkNote>,
122    ) -> Result<Option<Txid>, Error>
123    where
124        R: Rng + CryptoRng + Clone,
125    {
126        let (to_address, _) = self.get_offchain_address()?;
127
128        let (boarding_inputs, vtxo_inputs, mut total_amount) =
129            self.fetch_commitment_transaction_inputs().await?;
130
131        // Convert arknotes to intent inputs and add their value to total
132        let note_inputs: Vec<intent::Input> = notes
133            .iter()
134            .map(|note| {
135                total_amount += note.value();
136                note.to_intent_input()
137            })
138            .collect::<Result<Vec<_>, _>>()?;
139
140        // Combine VTXO inputs with note inputs
141        let all_vtxo_inputs: Vec<intent::Input> =
142            vtxo_inputs.into_iter().chain(note_inputs).collect();
143
144        tracing::debug!(
145            offchain_address = %to_address.encode(),
146            ?boarding_inputs,
147            num_vtxo_inputs = all_vtxo_inputs.len(),
148            num_notes = notes.len(),
149            %total_amount,
150            "Attempting to settle outputs with notes"
151        );
152
153        if boarding_inputs.is_empty() && all_vtxo_inputs.is_empty() {
154            tracing::debug!("No inputs to settle");
155            return Ok(None);
156        }
157
158        let join_next_batch = || async {
159            self.join_next_batch(
160                &mut rng.clone(),
161                boarding_inputs.clone(),
162                all_vtxo_inputs.clone(),
163                BatchOutputType::Board {
164                    to_address,
165                    to_amount: total_amount,
166                },
167            )
168            .await
169        };
170
171        let commitment_txid = join_next_batch
172            .retry(ExponentialBuilder::default().with_max_times(0))
173            .sleep(sleep)
174            .notify(|err: &Error, dur: std::time::Duration| {
175                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
176            })
177            .await
178            .context("Failed to join batch")?;
179
180        tracing::info!(%commitment_txid, num_notes = notes.len(), "Settlement with notes success");
181
182        Ok(Some(commitment_txid))
183    }
184
185    /// Settle specific VTXOs and boarding outputs by outpoint into the next batch, generating new
186    /// confirmed VTXOs.
187    ///
188    /// Unlike [`Self::settle`], this method allows the caller to specify exactly which VTXOs and
189    /// boarding outputs to settle by providing their outpoints.
190    pub async fn settle_vtxos<R>(
191        &self,
192        rng: &mut R,
193        vtxo_outpoints: &[OutPoint],
194        boarding_outpoints: &[OutPoint],
195    ) -> Result<Option<Txid>, Error>
196    where
197        R: Rng + CryptoRng + Clone,
198    {
199        // Get off-chain address and send all funds to this address, no change output.
200        let (to_address, _) = self.get_offchain_address()?;
201
202        let (all_boarding_inputs, all_vtxo_inputs, _) =
203            self.fetch_commitment_transaction_inputs().await?;
204
205        // Filter boarding inputs to only those specified.
206        let boarding_inputs: Vec<_> = all_boarding_inputs
207            .into_iter()
208            .filter(|input| boarding_outpoints.contains(&input.outpoint()))
209            .collect();
210
211        // Filter VTXO inputs to only those specified.
212        let vtxo_inputs: Vec<_> = all_vtxo_inputs
213            .into_iter()
214            .filter(|input| vtxo_outpoints.contains(&input.outpoint()))
215            .collect();
216
217        // Recalculate total amount from filtered inputs.
218        let total_amount = boarding_inputs
219            .iter()
220            .map(|i| i.amount())
221            .chain(vtxo_inputs.iter().map(|i| i.amount()))
222            .fold(Amount::ZERO, |acc, a| acc + a);
223
224        tracing::debug!(
225            offchain_address = %to_address.encode(),
226            ?boarding_inputs,
227            ?vtxo_inputs,
228            %total_amount,
229            "Attempting to settle specific outputs"
230        );
231
232        if boarding_inputs.is_empty() && vtxo_inputs.is_empty() {
233            tracing::debug!("No matching inputs to settle");
234            return Ok(None);
235        }
236
237        let join_next_batch = || async {
238            self.join_next_batch(
239                &mut rng.clone(),
240                boarding_inputs.clone(),
241                vtxo_inputs.clone(),
242                BatchOutputType::Board {
243                    to_address,
244                    to_amount: total_amount,
245                },
246            )
247            .await
248        };
249
250        // Joining a batch can fail depending on the timing, so we try a few times.
251        let commitment_txid = join_next_batch
252            .retry(ExponentialBuilder::default().with_max_times(0))
253            .sleep(sleep)
254            .notify(|err: &Error, dur: std::time::Duration| {
255                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}",);
256            })
257            .await
258            .context("Failed to join batch")?;
259
260        tracing::info!(%commitment_txid, "Settlement of specific VTXOs success");
261
262        Ok(Some(commitment_txid))
263    }
264
265    /// Settle _some_ prior VTXOs and boarding outputs into the next batch, generating UTXOs as
266    /// outputs to a new commitment transaction.
267    pub async fn collaborative_redeem<R>(
268        &self,
269        rng: &mut R,
270        to_address: Address,
271        to_amount: Amount,
272    ) -> Result<Txid, Error>
273    where
274        R: Rng + CryptoRng + Clone,
275    {
276        let (change_address, _) = self.get_offchain_address()?;
277
278        let (boarding_inputs, vtxo_inputs, total_amount) =
279            self.fetch_commitment_transaction_inputs().await?;
280
281        let onchain_fee = self
282            .fee_estimator
283            .eval_onchain_output(ark_fees::Output {
284                amount: to_amount.to_sat(),
285                script: to_address.script_pubkey().to_string(),
286            })
287            .map_err(Error::ad_hoc)?;
288        let onchain_fee = Amount::from_sat(onchain_fee.to_satoshis());
289
290        // Fee comes out of change, not the send amount.
291        let change_amount = total_amount
292            .checked_sub(to_amount)
293            .and_then(|a| a.checked_sub(onchain_fee))
294            .ok_or_else(|| {
295                Error::coin_select(format!(
296                    "insufficient balance: {total_amount} < {to_amount} (send) + {onchain_fee} (fee)"
297                ))
298            })?;
299
300        tracing::info!(
301            %to_address,
302            send_amount = %to_amount,
303            fee = %onchain_fee,
304            change_address = %change_address.encode(),
305            %change_amount,
306            ?boarding_inputs,
307            "Attempting to collaboratively redeem outputs"
308        );
309
310        let join_next_batch = || async {
311            self.join_next_batch(
312                &mut rng.clone(),
313                boarding_inputs.clone(),
314                vtxo_inputs.clone(),
315                BatchOutputType::OffBoard {
316                    to_address: to_address.clone(),
317                    to_amount,
318                    change_address,
319                    change_amount,
320                },
321            )
322            .await
323        };
324
325        // Joining a batch can fail depending on the timing, so we try a few times.
326        let commitment_txid = join_next_batch
327            .retry(ExponentialBuilder::default().with_max_times(3))
328            .sleep(sleep)
329            // TODO: Use `when` to only retry certain errors.
330            .notify(|err: &Error, dur: std::time::Duration| {
331                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
332            })
333            .await
334            .context("Failed to join batch")?;
335
336        tracing::info!(%commitment_txid, "Collaborative redeem success");
337
338        Ok(commitment_txid)
339    }
340
341    /// Settle a selection of VTXOs into the next batch, generating UTXOs as
342    /// outputs to a new commitment transaction.
343    pub async fn collaborative_redeem_vtxo_selection<R>(
344        &self,
345        rng: &mut R,
346        input_vtxos: impl Iterator<Item = OutPoint> + Clone,
347        to_address: Address,
348        to_amount: Amount,
349    ) -> Result<Txid, Error>
350    where
351        R: Rng + CryptoRng + Clone,
352    {
353        let (change_address, _) = self.get_offchain_address()?;
354
355        let (vtxo_list, script_pubkey_to_vtxo_map) =
356            self.list_vtxos().await.context("failed to get VTXO list")?;
357
358        let vtxo_inputs = vtxo_list
359            .all_unspent()
360            .filter(|v| input_vtxos.clone().any(|outpoint| outpoint == v.outpoint))
361            .map(|v| {
362                let vtxo = script_pubkey_to_vtxo_map.get(&v.script).ok_or_else(|| {
363                    ark_core::Error::ad_hoc(format!("missing VTXO for script pubkey: {}", v.script))
364                })?;
365                let spend_info = vtxo.forfeit_spend_info()?;
366
367                Ok(intent::Input::new(
368                    v.outpoint,
369                    vtxo.exit_delay(),
370                    // NOTE: This only works with default VTXOs (single-sig).
371                    None,
372                    TxOut {
373                        value: v.amount,
374                        script_pubkey: vtxo.script_pubkey(),
375                    },
376                    vtxo.tapscripts(),
377                    spend_info,
378                    false,
379                    v.is_swept,
380                    v.assets.clone(),
381                ))
382            })
383            .collect::<Result<Vec<_>, Error>>()?;
384
385        if vtxo_inputs.is_empty() {
386            return Err(Error::ad_hoc("no matching VTXO outpoints found"));
387        }
388
389        // Check that total amount is sufficient
390        let total_input_amount = vtxo_inputs
391            .iter()
392            .fold(Amount::ZERO, |acc, vtxo| acc + vtxo.amount());
393
394        let onchain_fee = self
395            .fee_estimator
396            .eval_onchain_output(ark_fees::Output {
397                amount: to_amount.to_sat(),
398                script: to_address.script_pubkey().to_string(),
399            })
400            .map_err(Error::ad_hoc)?;
401        let onchain_fee = Amount::from_sat(onchain_fee.to_satoshis());
402
403        // Fee comes out of change, not the send amount.
404        let change_amount = total_input_amount
405            .checked_sub(to_amount)
406            .and_then(|a| a.checked_sub(onchain_fee))
407            .ok_or_else(|| {
408                Error::coin_select(format!(
409                    "insufficient VTXO amount: {total_input_amount} < {to_amount} (send) + {onchain_fee} (fee)"
410                ))
411            })?;
412
413        tracing::info!(
414            %to_address,
415            send_amount = %to_amount,
416            fee = %onchain_fee,
417            change_address = %change_address.encode(),
418            %change_amount,
419            "Attempting to collaboratively redeem outputs"
420        );
421
422        let join_next_batch = || async {
423            self.join_next_batch(
424                &mut rng.clone(),
425                Vec::new(),
426                vtxo_inputs.clone(),
427                BatchOutputType::OffBoard {
428                    to_address: to_address.clone(),
429                    to_amount,
430                    change_address,
431                    change_amount,
432                },
433            )
434            .await
435        };
436
437        // Joining a batch can fail depending on the timing, so we try a few times.
438        let commitment_txid = join_next_batch
439            .retry(ExponentialBuilder::default().with_max_times(3))
440            .sleep(sleep)
441            // TODO: Use `when` to only retry certain errors.
442            .notify(|err: &Error, dur: std::time::Duration| {
443                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
444            })
445            .await
446            .context("Failed to join batch")?;
447
448        tracing::info!(%commitment_txid, "Collaborative redeem success");
449
450        Ok(commitment_txid)
451    }
452
453    /// Generate a delegate for settling VTXOs on behalf of the owner.
454    ///
455    /// The owner pre-signs the intent and forfeit transactions, allowing another party to complete
456    /// the settlement at a later time using the provided `delegate_cosigner_pk`.
457    ///
458    /// # Arguments
459    ///
460    /// * `delegate_cosigner_pk` - The cosigner public key that the delegate will use
461    /// * `select_recoverable_vtxos` - Whether to include recoverable VTXOs
462    ///
463    /// # Returns
464    ///
465    /// A [`Delegate`] struct containing all the pre-signed data needed for settlement.
466    pub async fn generate_delegate(
467        &self,
468        delegate_cosigner_pk: PublicKey,
469    ) -> Result<Delegate, Error> {
470        // Get off-chain address and send all funds to this address.
471        let (to_address, _) = self.get_offchain_address()?;
472
473        // Simply collect all VTXOs that can be settled.
474        let (_, vtxo_inputs, _) = self.fetch_commitment_transaction_inputs().await?;
475
476        let total_amount = vtxo_inputs
477            .iter()
478            .fold(Amount::ZERO, |acc, v| acc + v.amount());
479
480        if vtxo_inputs.is_empty() {
481            return Err(Error::ad_hoc("no inputs to settle via delegate"));
482        }
483
484        let server_info = &self.server_info;
485
486        let mut outputs = vec![intent::Output::Offchain(TxOut {
487            value: total_amount,
488            script_pubkey: to_address.to_p2tr_script_pubkey(),
489        })];
490
491        if let Some(packet) = create_asset_preservation_packet(&vtxo_inputs, &outputs)? {
492            outputs.push(intent::Output::AssetPacket(packet.to_txout()));
493        }
494
495        let delegate = batch::prepare_delegate_psbts(
496            vtxo_inputs,
497            outputs,
498            delegate_cosigner_pk,
499            &server_info.forfeit_address,
500            server_info.dust,
501        )?;
502
503        Ok(delegate)
504    }
505
506    /// Sign a set of delegate PSBTs, including the intent PSBT and the forfeit PSBTs.
507    pub fn sign_delegate_psbts(
508        &self,
509        intent_psbt: &mut Psbt,
510        forfeit_psbts: &mut [Psbt],
511    ) -> Result<(), Error> {
512        let sign_fn =
513            |input: &mut psbt::Input,
514             msg: secp256k1::Message|
515             -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error> {
516                match &input.witness_script {
517                    None => Err(ark_core::Error::ad_hoc(
518                        "Missing witness script for psbt::Input",
519                    )),
520                    Some(script) => {
521                        let mut res = vec![];
522                        let pks = extract_checksig_pubkeys(script);
523                        for pk in pks {
524                            if let Ok(keypair) = self.keypair_by_pk(&pk) {
525                                let sig = Secp256k1::new().sign_schnorr_no_aux_rand(&msg, &keypair);
526                                let pk = keypair.x_only_public_key().0;
527                                res.push((sig, pk));
528                            }
529                        }
530                        Ok(res)
531                    }
532                }
533            };
534
535        batch::sign_delegate_psbts(sign_fn, intent_psbt, forfeit_psbts)?;
536
537        Ok(())
538    }
539
540    /// Settle a delegate by completing the batch protocol using pre-signed data.
541    ///
542    /// This method allows Bob to settle Alice's VTXOs using the pre-signed intent and forfeit
543    /// transactions from the [`Delegate`] struct.
544    ///
545    /// # Arguments
546    ///
547    /// * `rng` - Random number generator for nonce generation
548    /// * `delegate` - The delegate struct containing pre-signed data
549    /// * `own_cosigner_kp` - Bob's cosigner keypair (must match the delegate_cosigner_pk)
550    ///
551    /// # Returns
552    ///
553    /// The commitment transaction ID if successful.
554    pub async fn settle_delegate<R>(
555        &self,
556        rng: &mut R,
557        delegate: Delegate,
558        own_cosigner_kp: Keypair,
559    ) -> Result<Txid, Error>
560    where
561        R: Rng + CryptoRng,
562    {
563        // Verify the cosigner key matches
564        if own_cosigner_kp.public_key() != delegate.delegate_cosigner_pk {
565            return Err(Error::ad_hoc(
566                "provided cosigner keypair does not match delegate_cosigner_pk",
567            ));
568        }
569
570        // Register the pre-signed intent
571        let intent_id = timeout_op(
572            self.inner.timeout,
573            self.network_client()
574                .register_intent(delegate.intent.clone()),
575        )
576        .await
577        .context("failed to register delegated intent")??;
578
579        tracing::debug!(intent_id, "Registered delegated intent");
580
581        let network_client = self.network_client();
582        let server_info = &self.server_info;
583
584        #[derive(Debug, PartialEq, Eq)]
585        enum Step {
586            Start,
587            BatchStarted,
588            BatchSigningStarted,
589            Finalized,
590        }
591
592        impl Step {
593            fn next(&self) -> Step {
594                match self {
595                    Step::Start => Step::BatchStarted,
596                    Step::BatchStarted => Step::BatchSigningStarted,
597                    Step::BatchSigningStarted => Step::Finalized,
598                    Step::Finalized => Step::Finalized,
599                }
600            }
601        }
602
603        let mut step = Step::Start;
604
605        let own_cosigner_kps = [own_cosigner_kp];
606        let own_cosigner_pks = own_cosigner_kps
607            .iter()
608            .map(|k| k.public_key())
609            .collect::<Vec<_>>();
610
611        let mut batch_id: Option<String> = None;
612
613        let vtxo_input_outpoints = delegate
614            .forfeit_psbts
615            .iter()
616            .map(|psbt| psbt.unsigned_tx.input[0].previous_output)
617            .collect::<Vec<_>>();
618
619        let topics = vtxo_input_outpoints
620            .iter()
621            .map(ToString::to_string)
622            .chain(
623                own_cosigner_pks
624                    .iter()
625                    .map(|pk| pk.serialize().to_lower_hex_string()),
626            )
627            .collect();
628
629        let mut stream = network_client.get_event_stream(topics).await?;
630
631        let (ark_forfeit_pk, _) = server_info.forfeit_pk.x_only_public_key();
632
633        let mut unsigned_commitment_tx = None;
634
635        let mut vtxo_graph_chunks = Some(Vec::new());
636        let mut vtxo_graph: Option<TxGraph> = None;
637
638        let mut connectors_graph_chunks = Some(Vec::new());
639        let mut batch_expiry = None;
640
641        let mut agg_nonce_pks = HashMap::new();
642
643        let mut our_nonce_trees: Option<HashMap<Keypair, NonceKps>> = None;
644
645        loop {
646            match stream.next().await {
647                Some(Ok(event)) => match event {
648                    StreamEvent::BatchStarted(e) => {
649                        if step != Step::Start {
650                            continue;
651                        }
652
653                        let hash = sha256::Hash::hash(intent_id.as_bytes());
654                        let hash = hash.as_byte_array().to_vec().to_lower_hex_string();
655
656                        if e.intent_id_hashes.iter().any(|h| h == &hash) {
657                            timeout_op(
658                                self.inner.timeout,
659                                self.network_client()
660                                    .confirm_registration(intent_id.clone()),
661                            )
662                            .await
663                            .context("failed to confirm intent registration")??;
664
665                            tracing::info!(batch_id = e.id, intent_id, "Intent ID found for batch");
666
667                            batch_id = Some(e.id);
668
669                            step = Step::BatchStarted;
670
671                            batch_expiry = Some(e.batch_expiry);
672                        } else {
673                            tracing::debug!(
674                                batch_id = e.id,
675                                intent_id,
676                                "Intent ID not found for batch"
677                            );
678                        }
679                    }
680                    StreamEvent::TreeTx(e) => {
681                        if step != Step::BatchStarted && step != Step::BatchSigningStarted {
682                            continue;
683                        }
684
685                        match e.batch_tree_event_type {
686                            BatchTreeEventType::Vtxo => {
687                                match &mut vtxo_graph_chunks {
688                                    Some(vtxo_graph_chunks) => {
689                                        tracing::debug!("Got new VTXO graph chunk");
690
691                                        vtxo_graph_chunks.push(e.tx_graph_chunk)
692                                    }
693                                    None => {
694                                        return Err(Error::ark_server(
695                                            "received unexpected VTXO graph chunk",
696                                        ));
697                                    }
698                                };
699                            }
700                            BatchTreeEventType::Connector => {
701                                match connectors_graph_chunks {
702                                    Some(ref mut connectors_graph_chunks) => {
703                                        tracing::debug!("Got new connectors graph chunk");
704
705                                        connectors_graph_chunks.push(e.tx_graph_chunk)
706                                    }
707                                    None => {
708                                        return Err(Error::ark_server(
709                                            "received unexpected connectors graph chunk",
710                                        ));
711                                    }
712                                };
713                            }
714                        }
715                    }
716                    StreamEvent::TreeSignature(e) => {
717                        if step != Step::BatchSigningStarted {
718                            continue;
719                        }
720
721                        match e.batch_tree_event_type {
722                            BatchTreeEventType::Vtxo => {
723                                match vtxo_graph {
724                                    Some(ref mut vtxo_graph) => {
725                                        vtxo_graph.apply(|graph| {
726                                            if graph.root().unsigned_tx.compute_txid() != e.txid {
727                                                Ok(true)
728                                            } else {
729                                                graph.set_signature(e.signature);
730
731                                                Ok(false)
732                                            }
733                                        })?;
734                                    }
735                                    None => {
736                                        return Err(Error::ark_server(
737                                            "received batch tree signature without TX graph",
738                                        ));
739                                    }
740                                };
741                            }
742                            BatchTreeEventType::Connector => {
743                                return Err(Error::ark_server(
744                                    "received batch tree signature for connectors tree",
745                                ));
746                            }
747                        }
748                    }
749                    StreamEvent::TreeSigningStarted(e) => {
750                        if step != Step::BatchStarted {
751                            continue;
752                        }
753
754                        let chunks = vtxo_graph_chunks.take().ok_or(Error::ark_server(
755                            "received tree signing started event without VTXO graph chunks",
756                        ))?;
757                        vtxo_graph = Some(
758                            TxGraph::new(chunks)
759                                .map_err(Error::from)
760                                .context("failed to build VTXO graph before generating nonces")?,
761                        );
762
763                        tracing::info!(batch_id = e.id, "Batch signing started");
764
765                        for own_cosigner_pk in own_cosigner_pks.iter() {
766                            if !&e.cosigners_pubkeys.iter().any(|p| p == own_cosigner_pk) {
767                                return Err(Error::ark_server(format!(
768                                    "own cosigner PK is not present in cosigner PKs: {own_cosigner_pk}"
769                                )));
770                            }
771                        }
772
773                        let mut our_nonce_tree_map = HashMap::new();
774                        for own_cosigner_kp in own_cosigner_kps {
775                            let own_cosigner_pk = own_cosigner_kp.public_key();
776                            let nonce_tree = generate_nonce_tree(
777                                rng,
778                                vtxo_graph.as_ref().expect("VTXO graph"),
779                                own_cosigner_pk,
780                                &e.unsigned_commitment_tx,
781                            )
782                            .map_err(Error::from)
783                            .context("failed to generate VTXO nonce tree")?;
784
785                            tracing::info!(
786                                cosigner_pk = %own_cosigner_pk,
787                                "Submitting nonce tree for cosigner PK"
788                            );
789
790                            network_client
791                                .submit_tree_nonces(
792                                    &e.id,
793                                    own_cosigner_pk,
794                                    nonce_tree.to_nonce_pks(),
795                                )
796                                .await
797                                .map_err(Error::ark_server)
798                                .context("failed to submit VTXO nonce tree")?;
799
800                            our_nonce_tree_map.insert(own_cosigner_kp, nonce_tree);
801                        }
802
803                        unsigned_commitment_tx = Some(e.unsigned_commitment_tx);
804                        our_nonce_trees = Some(our_nonce_tree_map);
805
806                        step = step.next();
807                    }
808                    StreamEvent::TreeNonces(e) => {
809                        if step != Step::BatchSigningStarted {
810                            continue;
811                        }
812
813                        let tree_tx_nonce_pks = e.nonces;
814
815                        let cosigner_pk = match tree_tx_nonce_pks.0.iter().find(|(pk, _)| {
816                            own_cosigner_pks
817                                .iter()
818                                .any(|p| &&p.x_only_public_key().0 == pk)
819                        }) {
820                            Some((pk, _)) => *pk,
821                            None => {
822                                tracing::debug!(
823                                    batch_id = e.id,
824                                    txid = %e.txid,
825                                    "Received irrelevant TreeNonces event"
826                                );
827
828                                continue;
829                            }
830                        };
831
832                        tracing::debug!(
833                            batch_id = e.id,
834                            txid = %e.txid,
835                            %cosigner_pk,
836                            "Received TreeNonces event"
837                        );
838
839                        let agg_nonce_pk = aggregate_nonces(tree_tx_nonce_pks);
840
841                        agg_nonce_pks.insert(e.txid, agg_nonce_pk);
842
843                        if vtxo_graph.is_none() {
844                            let chunks = vtxo_graph_chunks.take().ok_or(Error::ark_server(
845                                "received tree nonces event without VTXO graph chunks",
846                            ))?;
847                            vtxo_graph = Some(
848                                TxGraph::new(chunks)
849                                    .map_err(Error::from)
850                                    .context("failed to build VTXO graph before tree signing")?,
851                            );
852                        }
853                        let vtxo_graph_ref = vtxo_graph.as_ref().expect("just populated");
854
855                        if agg_nonce_pks.len() == vtxo_graph_ref.nb_of_nodes() {
856                            let cosigner_kp = own_cosigner_kps
857                                .iter()
858                                .find(|kp| kp.public_key().x_only_public_key().0 == cosigner_pk)
859                                .ok_or_else(|| {
860                                    Error::ad_hoc("no cosigner keypair to sign for own PK")
861                                })?;
862
863                            let our_nonce_trees = our_nonce_trees.as_mut().ok_or(
864                                Error::ark_server("missing nonce trees during batch protocol"),
865                            )?;
866
867                            let our_nonce_tree =
868                                our_nonce_trees
869                                    .get_mut(cosigner_kp)
870                                    .ok_or(Error::ark_server(
871                                        "missing nonce tree during batch protocol",
872                                    ))?;
873
874                            let unsigned_commitment_tx = unsigned_commitment_tx
875                                .as_ref()
876                                .ok_or_else(|| Error::ad_hoc("missing commitment TX"))?;
877
878                            let batch_expiry = batch_expiry
879                                .ok_or_else(|| Error::ad_hoc("missing batch expiry"))?;
880
881                            let mut partial_sig_tree = PartialSigTree::default();
882                            for (txid, _) in vtxo_graph_ref.as_map() {
883                                let agg_nonce_pk = agg_nonce_pks.get(&txid).ok_or_else(|| {
884                                    Error::ad_hoc(format!(
885                                        "missing aggregated nonce PK for TX {txid}"
886                                    ))
887                                })?;
888
889                                let sigs = sign_batch_tree_tx(
890                                    txid,
891                                    batch_expiry,
892                                    ark_forfeit_pk,
893                                    cosigner_kp,
894                                    *agg_nonce_pk,
895                                    vtxo_graph_ref,
896                                    unsigned_commitment_tx,
897                                    our_nonce_tree,
898                                )
899                                .map_err(Error::from)
900                                .context("failed to sign VTXO tree")?;
901
902                                partial_sig_tree.0.extend(sigs.0);
903                            }
904
905                            network_client
906                                .submit_tree_signatures(
907                                    &e.id,
908                                    cosigner_kp.public_key(),
909                                    partial_sig_tree,
910                                )
911                                .await
912                                .map_err(Error::ark_server)
913                                .context("failed to submit VTXO tree signatures")?;
914                        }
915                    }
916                    StreamEvent::TreeNoncesAggregated(e) => {
917                        tracing::debug!(batch_id = e.id, "Batch combined nonces generated");
918                    }
919                    StreamEvent::BatchFinalization(e) => {
920                        if step != Step::BatchSigningStarted {
921                            continue;
922                        }
923
924                        tracing::debug!(
925                            commitment_txid = %e.commitment_tx.unsigned_tx.compute_txid(),
926                            "Batch finalization started (delegate)"
927                        );
928
929                        let chunks = connectors_graph_chunks.take().ok_or(Error::ark_server(
930                            "received batch finalization event without connectors",
931                        ))?;
932
933                        if chunks.is_empty() {
934                            tracing::debug!(batch_id = e.id, "No delegated forfeit transactions");
935                        } else {
936                            let connectors_graph = TxGraph::new(chunks)
937                                .map_err(Error::from)
938                                .context(
939                                "failed to build connectors graph before completing forfeit TXs",
940                            )?;
941
942                            tracing::debug!(
943                                batch_id = e.id,
944                                "Completing delegated forfeit transactions"
945                            );
946
947                            let signed_forfeit_psbts = complete_delegate_forfeit_txs(
948                                &delegate.forfeit_psbts,
949                                &connectors_graph.leaves(),
950                            )?;
951
952                            network_client
953                                .submit_signed_forfeit_txs(signed_forfeit_psbts, None)
954                                .await?;
955                        }
956
957                        step = step.next();
958                    }
959                    StreamEvent::BatchFinalized(e) => {
960                        if step != Step::Finalized {
961                            continue;
962                        }
963
964                        let commitment_txid = e.commitment_txid;
965
966                        tracing::info!(batch_id = e.id, %commitment_txid, "Delegated batch finalized");
967
968                        return Ok(commitment_txid);
969                    }
970                    StreamEvent::BatchFailed(ref e) => {
971                        if Some(&e.id) == batch_id.as_ref() {
972                            return Err(Error::ark_server(format!(
973                                "batch failed {}: {}",
974                                e.id, e.reason
975                            )));
976                        }
977
978                        tracing::debug!("Unrelated batch failed: {e:?}");
979                    }
980                    StreamEvent::Heartbeat => {}
981                    StreamEvent::StreamStarted(_) => {}
982                },
983                Some(Err(e)) => {
984                    tracing::error!("Got error from event stream");
985
986                    return Err(Error::ark_server(e));
987                }
988                None => {
989                    return Err(Error::ark_server("dropped batch event stream"));
990                }
991            }
992        }
993    }
994
995    /// Get all the [`batch::OnChainInput`]s and [`batch::VtxoInput`]s that can be used to join an
996    /// upcoming batch.
997    pub(crate) async fn fetch_commitment_transaction_inputs(
998        &self,
999    ) -> Result<(Vec<batch::OnChainInput>, Vec<intent::Input>, Amount), Error> {
1000        // Get all known boarding outputs.
1001        let boarding_outputs = self.inner.wallet.get_boarding_outputs()?;
1002
1003        let mut boarding_inputs: Vec<batch::OnChainInput> = Vec::new();
1004        let mut total_amount = Amount::ZERO;
1005
1006        // To track unique outpoints and prevent duplicates
1007        let mut seen_outpoints = std::collections::HashSet::new();
1008
1009        let now = Timestamp::now();
1010
1011        // Find outpoints for each boarding output.
1012        for boarding_output in boarding_outputs {
1013            let outpoints = timeout_op(
1014                self.inner.timeout,
1015                self.blockchain().find_outpoints(boarding_output.address()),
1016            )
1017            .await
1018            .context("failed to find outpoints")??;
1019
1020            for o in outpoints.iter() {
1021                if let ExplorerUtxo {
1022                    outpoint,
1023                    amount,
1024                    confirmation_blocktime: Some(confirmation_blocktime),
1025                    confirmations,
1026                    is_spent: false,
1027                } = o
1028                {
1029                    // Check for duplicate outpoints
1030                    if seen_outpoints.contains(outpoint) {
1031                        continue;
1032                    }
1033
1034                    // Only include confirmed boarding outputs with an _inactive_ exit path.
1035                    if !boarding_output.can_be_claimed_unilaterally_by_owner(
1036                        now.as_duration().try_into().map_err(Error::ad_hoc)?,
1037                        std::time::Duration::from_secs(*confirmation_blocktime),
1038                        *confirmations,
1039                    ) {
1040                        // Mark this outpoint as seen
1041                        seen_outpoints.insert(*outpoint);
1042
1043                        boarding_inputs.push(batch::OnChainInput::new(
1044                            boarding_output.clone(),
1045                            *amount,
1046                            *outpoint,
1047                        ));
1048                        total_amount += *amount;
1049                    }
1050                }
1051            }
1052        }
1053
1054        let (vtxo_list, script_pubkey_to_vtxo_map) = self.list_vtxos().await?;
1055
1056        total_amount += vtxo_list
1057            .all_unspent()
1058            .fold(Amount::ZERO, |acc, vtxo| acc + vtxo.amount);
1059
1060        let vtxo_inputs = vtxo_list
1061            .all_unspent()
1062            .map(|virtual_tx_outpoint| {
1063                let vtxo = script_pubkey_to_vtxo_map
1064                    .get(&virtual_tx_outpoint.script)
1065                    .ok_or_else(|| {
1066                        ark_core::Error::ad_hoc(format!(
1067                            "missing VTXO for script pubkey: {}",
1068                            virtual_tx_outpoint.script
1069                        ))
1070                    })?;
1071                let spend_info = vtxo.forfeit_spend_info()?;
1072
1073                Ok(intent::Input::new(
1074                    virtual_tx_outpoint.outpoint,
1075                    vtxo.exit_delay(),
1076                    None,
1077                    TxOut {
1078                        value: virtual_tx_outpoint.amount,
1079                        script_pubkey: vtxo.script_pubkey(),
1080                    },
1081                    vtxo.tapscripts(),
1082                    spend_info,
1083                    false,
1084                    virtual_tx_outpoint.is_swept,
1085                    virtual_tx_outpoint.assets.clone(),
1086                ))
1087            })
1088            .collect::<Result<Vec<_>, ark_core::Error>>()?;
1089
1090        Ok((boarding_inputs, vtxo_inputs, total_amount))
1091    }
1092
1093    /// Prepare an intent for batch registration or fee estimation.
1094    ///
1095    /// This creates a signed intent PSBT along with all the data needed to participate
1096    /// in the batch protocol.
1097    pub(crate) fn prepare_intent<R>(
1098        &self,
1099        rng: &mut R,
1100        onchain_inputs: Vec<batch::OnChainInput>,
1101        vtxo_inputs: Vec<intent::Input>,
1102        output_type: BatchOutputType,
1103        intent_kind: PrepareIntentKind,
1104    ) -> Result<PreparedIntent, Error>
1105    where
1106        R: Rng + CryptoRng,
1107    {
1108        if onchain_inputs.is_empty() && vtxo_inputs.is_empty() {
1109            return Err(Error::ad_hoc("cannot prepare intent without inputs"));
1110        }
1111
1112        // Generate an (ephemeral) cosigner keypair.
1113        let cosigner_keypair = Keypair::new(self.secp(), rng);
1114
1115        let vtxo_input_outpoints = vtxo_inputs.iter().map(|i| i.outpoint()).collect::<Vec<_>>();
1116
1117        let inputs = {
1118            let boarding_inputs = onchain_inputs.clone().into_iter().map(|o| {
1119                intent::Input::new(
1120                    o.outpoint(),
1121                    o.boarding_output().exit_delay(),
1122                    None,
1123                    TxOut {
1124                        value: o.amount(),
1125                        script_pubkey: o.boarding_output().script_pubkey(),
1126                    },
1127                    o.boarding_output().tapscripts(),
1128                    o.boarding_output().forfeit_spend_info(),
1129                    true,
1130                    false,
1131                    Vec::new(),
1132                )
1133            });
1134
1135            boarding_inputs
1136                .chain(vtxo_inputs.clone())
1137                .collect::<Vec<_>>()
1138        };
1139
1140        let dust = self.server_info.dust;
1141
1142        let mut outputs = vec![];
1143
1144        match output_type {
1145            BatchOutputType::Board {
1146                to_address,
1147                to_amount,
1148            } => {
1149                if to_amount < self.server_info.dust {
1150                    return Err(Error::ad_hoc(format!(
1151                        "cannot settle into sub-dust VTXO: {to_amount} < {dust}"
1152                    )));
1153                }
1154
1155                outputs.push(intent::Output::Offchain(TxOut {
1156                    value: to_amount,
1157                    script_pubkey: to_address.to_p2tr_script_pubkey(),
1158                }));
1159            }
1160            BatchOutputType::OffBoard {
1161                to_address,
1162                to_amount,
1163                change_amount,
1164                ..
1165            } if change_amount == Amount::ZERO => {
1166                outputs.push(intent::Output::Onchain(TxOut {
1167                    value: to_amount,
1168                    script_pubkey: to_address.script_pubkey(),
1169                }));
1170            }
1171            BatchOutputType::OffBoard {
1172                to_address,
1173                to_amount,
1174                change_address,
1175                change_amount,
1176            } => {
1177                if change_amount < dust {
1178                    return Err(Error::ad_hoc(format!(
1179                        "cannot settle with sub-dust change VTXO: {change_amount} < {dust}"
1180                    )));
1181                }
1182
1183                outputs.push(intent::Output::Onchain(TxOut {
1184                    value: to_amount,
1185                    script_pubkey: to_address.script_pubkey(),
1186                }));
1187
1188                outputs.push(intent::Output::Offchain(TxOut {
1189                    value: change_amount,
1190                    script_pubkey: change_address.to_p2tr_script_pubkey(),
1191                }));
1192            }
1193        }
1194
1195        let cosigner_pk = cosigner_keypair.public_key();
1196
1197        let secp = Secp256k1::new();
1198
1199        let sign_for_vtxo_fn =
1200            |input: &mut psbt::Input,
1201             msg: secp256k1::Message|
1202             -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error> {
1203                match &input.witness_script {
1204                    None => Err(ark_core::Error::ad_hoc(
1205                        "Missing witness script in psbt::Input when signing intent",
1206                    )),
1207                    Some(script) => {
1208                        let pks = extract_checksig_pubkeys(script);
1209                        let mut res = vec![];
1210                        for pk in pks {
1211                            if let Ok(keypair) = self.keypair_by_pk(&pk) {
1212                                let sig = secp.sign_schnorr_no_aux_rand(&msg, &keypair);
1213                                res.push((sig, keypair.public_key().into()))
1214                            }
1215                        }
1216                        Ok(res)
1217                    }
1218                }
1219            };
1220
1221        let sign_for_onchain_fn =
1222            |input: &mut psbt::Input,
1223             msg: secp256k1::Message|
1224             -> Result<(schnorr::Signature, XOnlyPublicKey), ark_core::Error> {
1225                let onchain_input = onchain_inputs
1226                    .iter()
1227                    .find(|o| {
1228                        Some(o.boarding_output().script_pubkey())
1229                            == input.witness_utxo.clone().map(|w| w.script_pubkey)
1230                    })
1231                    .ok_or_else(|| {
1232                        ark_core::Error::ad_hoc(
1233                            "could not find signing key for onchain input: {input:?}",
1234                        )
1235                    })?;
1236
1237                let owner_pk = onchain_input.boarding_output().owner_pk();
1238                let sig = self
1239                    .inner
1240                    .wallet
1241                    .sign_for_pk(&owner_pk, &msg)
1242                    .map_err(|e| ark_core::Error::ad_hoc(e.to_string()))?;
1243
1244                Ok((sig, owner_pk))
1245            };
1246
1247        let now = std::time::SystemTime::now()
1248            .duration_since(std::time::UNIX_EPOCH)
1249            .map_err(|e| Error::ad_hoc(e.to_string()))
1250            .context("failed to compute now timestamp")?;
1251        let now = now.as_secs();
1252        let expire_at = now + (2 * 60);
1253
1254        if let Some(packet) = create_asset_preservation_packet(&inputs, &outputs)? {
1255            outputs.push(intent::Output::AssetPacket(packet.to_txout()));
1256        }
1257
1258        let mut onchain_output_indexes = Vec::new();
1259        for (i, output) in outputs.iter().enumerate() {
1260            if matches!(output, intent::Output::Onchain(_)) {
1261                onchain_output_indexes.push(i);
1262            }
1263        }
1264
1265        let message = match intent_kind {
1266            PrepareIntentKind::EstimateFee => intent::IntentMessage::EstimateIntentFee {
1267                onchain_output_indexes,
1268                valid_at: now,
1269                expire_at,
1270                own_cosigner_pks: vec![cosigner_pk],
1271            },
1272            PrepareIntentKind::Register => intent::IntentMessage::Register {
1273                onchain_output_indexes,
1274                valid_at: now,
1275                expire_at,
1276                own_cosigner_pks: vec![cosigner_pk],
1277            },
1278        };
1279
1280        let intent = intent::make_intent(
1281            sign_for_vtxo_fn,
1282            sign_for_onchain_fn,
1283            inputs,
1284            outputs.clone(),
1285            message,
1286        )?;
1287
1288        Ok(PreparedIntent {
1289            intent,
1290            cosigner_keypair,
1291            vtxo_input_outpoints,
1292            outputs,
1293            onchain_inputs,
1294            vtxo_inputs,
1295        })
1296    }
1297
1298    pub(crate) async fn join_next_batch<R>(
1299        &self,
1300        rng: &mut R,
1301        onchain_inputs: Vec<batch::OnChainInput>,
1302        vtxo_inputs: Vec<intent::Input>,
1303        output_type: BatchOutputType,
1304    ) -> Result<Txid, Error>
1305    where
1306        R: Rng + CryptoRng,
1307    {
1308        let prepared = self.prepare_intent(
1309            rng,
1310            onchain_inputs,
1311            vtxo_inputs,
1312            output_type,
1313            PrepareIntentKind::Register,
1314        )?;
1315
1316        let PreparedIntent {
1317            intent,
1318            cosigner_keypair,
1319            vtxo_input_outpoints,
1320            outputs,
1321            onchain_inputs,
1322            vtxo_inputs,
1323        } = prepared;
1324
1325        let onchain_input_outpoints = onchain_inputs
1326            .iter()
1327            .map(|i| i.outpoint())
1328            .collect::<Vec<_>>();
1329
1330        let server_info = &self.server_info;
1331
1332        let own_cosigner_kps = [cosigner_keypair];
1333        let own_cosigner_pks = own_cosigner_kps
1334            .iter()
1335            .map(|k| k.public_key())
1336            .collect::<Vec<_>>();
1337
1338        let secp = Secp256k1::new();
1339
1340        let mut step = Step::Start;
1341
1342        let intent_id = timeout_op(
1343            self.inner.timeout,
1344            self.network_client().register_intent(intent),
1345        )
1346        .await
1347        .context("failed to register intent")??;
1348
1349        tracing::debug!(
1350            intent_id,
1351            ?onchain_input_outpoints,
1352            ?vtxo_input_outpoints,
1353            ?outputs,
1354            "Registered intent for batch"
1355        );
1356
1357        let network_client = self.network_client();
1358
1359        let mut batch_id: Option<String> = None;
1360
1361        let topics = vtxo_input_outpoints
1362            .iter()
1363            .map(ToString::to_string)
1364            .chain(
1365                own_cosigner_pks
1366                    .iter()
1367                    .map(|pk| pk.serialize().to_lower_hex_string()),
1368            )
1369            .collect();
1370
1371        let mut stream = network_client.get_event_stream(topics).await?;
1372
1373        let (ark_forfeit_pk, _) = server_info.forfeit_pk.x_only_public_key();
1374
1375        let mut unsigned_commitment_tx = None;
1376
1377        let mut vtxo_graph_chunks = Some(Vec::new());
1378        let mut vtxo_graph: Option<TxGraph> = None;
1379
1380        let mut connectors_graph_chunks = Some(Vec::new());
1381        let mut batch_expiry = None;
1382
1383        let mut agg_nonce_pks = HashMap::new();
1384
1385        let mut our_nonce_trees: Option<HashMap<Keypair, NonceKps>> = None;
1386        loop {
1387            match stream.next().await {
1388                Some(Ok(event)) => match event {
1389                    StreamEvent::BatchStarted(e) => {
1390                        if step != Step::Start {
1391                            continue;
1392                        }
1393
1394                        let hash = sha256::Hash::hash(intent_id.as_bytes());
1395                        let hash = hash.as_byte_array().to_vec().to_lower_hex_string();
1396
1397                        if e.intent_id_hashes.iter().any(|h| h == &hash) {
1398                            timeout_op(
1399                                self.inner.timeout,
1400                                self.network_client()
1401                                    .confirm_registration(intent_id.clone()),
1402                            )
1403                            .await
1404                            .context("failed to confirm intent registration")??;
1405
1406                            tracing::info!(batch_id = e.id, intent_id, "Intent ID found for batch");
1407
1408                            batch_id = Some(e.id);
1409
1410                            // Depending on whether we are generating new VTXOs or not, we continue
1411                            // with a different step in the state machine.
1412                            step = match outputs
1413                                .iter()
1414                                .any(|o| matches!(o, intent::Output::Offchain(_)))
1415                            {
1416                                true => Step::BatchStarted,
1417                                false => Step::BatchSigningStarted,
1418                            };
1419
1420                            batch_expiry = Some(e.batch_expiry);
1421                        } else {
1422                            tracing::debug!(
1423                                batch_id = e.id,
1424                                intent_id,
1425                                "Intent ID not found for batch"
1426                            );
1427                        }
1428                    }
1429                    StreamEvent::TreeTx(e) => {
1430                        if step != Step::BatchStarted && step != Step::BatchSigningStarted {
1431                            continue;
1432                        }
1433
1434                        match e.batch_tree_event_type {
1435                            BatchTreeEventType::Vtxo => {
1436                                match &mut vtxo_graph_chunks {
1437                                    Some(vtxo_graph_chunks) => {
1438                                        tracing::debug!("Got new VTXO graph chunk");
1439
1440                                        vtxo_graph_chunks.push(e.tx_graph_chunk)
1441                                    }
1442                                    None => {
1443                                        return Err(Error::ark_server(
1444                                            "received unexpected VTXO graph chunk",
1445                                        ));
1446                                    }
1447                                };
1448                            }
1449                            BatchTreeEventType::Connector => {
1450                                match connectors_graph_chunks {
1451                                    Some(ref mut connectors_graph_chunks) => {
1452                                        tracing::debug!("Got new connectors graph chunk");
1453
1454                                        connectors_graph_chunks.push(e.tx_graph_chunk)
1455                                    }
1456                                    None => {
1457                                        return Err(Error::ark_server(
1458                                            "received unexpected connectors graph chunk",
1459                                        ));
1460                                    }
1461                                };
1462                            }
1463                        }
1464                    }
1465                    StreamEvent::TreeSignature(e) => {
1466                        if step != Step::BatchSigningStarted {
1467                            continue;
1468                        }
1469
1470                        match e.batch_tree_event_type {
1471                            BatchTreeEventType::Vtxo => {
1472                                match vtxo_graph {
1473                                    Some(ref mut vtxo_graph) => {
1474                                        vtxo_graph.apply(|graph| {
1475                                            if graph.root().unsigned_tx.compute_txid() != e.txid {
1476                                                Ok(true)
1477                                            } else {
1478                                                graph.set_signature(e.signature);
1479
1480                                                Ok(false)
1481                                            }
1482                                        })?;
1483                                    }
1484                                    None => {
1485                                        return Err(Error::ark_server(
1486                                            "received batch tree signature without TX graph",
1487                                        ));
1488                                    }
1489                                };
1490                            }
1491                            BatchTreeEventType::Connector => {
1492                                return Err(Error::ark_server(
1493                                    "received batch tree signature for connectors tree",
1494                                ));
1495                            }
1496                        }
1497                    }
1498                    StreamEvent::TreeSigningStarted(e) => {
1499                        if step != Step::BatchStarted {
1500                            continue;
1501                        }
1502
1503                        let chunks = vtxo_graph_chunks.take().ok_or(Error::ark_server(
1504                            "received tree signing started event without VTXO graph chunks",
1505                        ))?;
1506                        vtxo_graph = Some(
1507                            TxGraph::new(chunks)
1508                                .map_err(Error::from)
1509                                .context("failed to build VTXO graph before generating nonces")?,
1510                        );
1511
1512                        tracing::info!(batch_id = e.id, "Batch signing started");
1513
1514                        for own_cosigner_pk in own_cosigner_pks.iter() {
1515                            if !&e.cosigners_pubkeys.iter().any(|p| p == own_cosigner_pk) {
1516                                return Err(Error::ark_server(format!(
1517                                    "own cosigner PK is not present in cosigner PKs: {own_cosigner_pk}"
1518                                )));
1519                            }
1520                        }
1521
1522                        // We generate and submit a nonce tree for every cosigner key we provide.
1523                        let mut our_nonce_tree_map = HashMap::new();
1524                        for own_cosigner_kp in own_cosigner_kps {
1525                            let own_cosigner_pk = own_cosigner_kp.public_key();
1526                            let nonce_tree = generate_nonce_tree(
1527                                rng,
1528                                vtxo_graph.as_ref().expect("VTXO graph"),
1529                                own_cosigner_pk,
1530                                &e.unsigned_commitment_tx,
1531                            )
1532                            .map_err(Error::from)
1533                            .context("failed to generate VTXO nonce tree")?;
1534
1535                            tracing::info!(
1536                                cosigner_pk = %own_cosigner_pk,
1537                                "Submitting nonce tree for cosigner PK"
1538                            );
1539
1540                            network_client
1541                                .submit_tree_nonces(
1542                                    &e.id,
1543                                    own_cosigner_pk,
1544                                    nonce_tree.to_nonce_pks(),
1545                                )
1546                                .await
1547                                .map_err(Error::ark_server)
1548                                .context("failed to submit VTXO nonce tree")?;
1549
1550                            our_nonce_tree_map.insert(own_cosigner_kp, nonce_tree);
1551                        }
1552
1553                        unsigned_commitment_tx = Some(e.unsigned_commitment_tx);
1554                        our_nonce_trees = Some(our_nonce_tree_map);
1555
1556                        step = step.next();
1557                    }
1558                    StreamEvent::TreeNonces(e) => {
1559                        if step != Step::BatchSigningStarted {
1560                            continue;
1561                        }
1562
1563                        let tree_tx_nonce_pks = e.nonces;
1564
1565                        let cosigner_pk = match tree_tx_nonce_pks.0.iter().find(|(pk, _)| {
1566                            own_cosigner_pks
1567                                .iter()
1568                                .any(|p| &&p.x_only_public_key().0 == pk)
1569                        }) {
1570                            Some((pk, _)) => *pk,
1571                            None => {
1572                                tracing::debug!(
1573                                    batch_id = e.id,
1574                                    txid = %e.txid,
1575                                    "Received irrelevant TreeNonces event"
1576                                );
1577
1578                                continue;
1579                            }
1580                        };
1581
1582                        tracing::debug!(
1583                            batch_id = e.id,
1584                            txid = %e.txid,
1585                            %cosigner_pk,
1586                            "Received TreeNonces event"
1587                        );
1588
1589                        let agg_nonce_pk = aggregate_nonces(tree_tx_nonce_pks);
1590
1591                        agg_nonce_pks.insert(e.txid, agg_nonce_pk);
1592
1593                        if vtxo_graph.is_none() {
1594                            let chunks = vtxo_graph_chunks.take().ok_or(Error::ark_server(
1595                                "received tree nonces event without VTXO graph chunks",
1596                            ))?;
1597                            vtxo_graph = Some(
1598                                TxGraph::new(chunks)
1599                                    .map_err(Error::from)
1600                                    .context("failed to build VTXO graph before tree signing")?,
1601                            );
1602                        }
1603                        let vtxo_graph_ref = vtxo_graph.as_ref().expect("just populated");
1604
1605                        // Once we collect an aggregated nonce per transaction in our VTXO graph, we
1606                        // can go ahead with signing and submitting.
1607                        if agg_nonce_pks.len() == vtxo_graph_ref.nb_of_nodes() {
1608                            let cosigner_kp = own_cosigner_kps
1609                                .iter()
1610                                .find(|kp| kp.public_key().x_only_public_key().0 == cosigner_pk)
1611                                .ok_or_else(|| {
1612                                    Error::ad_hoc("no cosigner keypair to sign for own PK")
1613                                })?;
1614
1615                            let our_nonce_trees = our_nonce_trees.as_mut().ok_or(
1616                                Error::ark_server("missing nonce trees during batch protocol"),
1617                            )?;
1618
1619                            let our_nonce_tree =
1620                                our_nonce_trees
1621                                    .get_mut(cosigner_kp)
1622                                    .ok_or(Error::ark_server(
1623                                        "missing nonce tree during batch protocol",
1624                                    ))?;
1625
1626                            let unsigned_commitment_tx = unsigned_commitment_tx
1627                                .as_ref()
1628                                .ok_or_else(|| Error::ad_hoc("missing commitment TX"))?;
1629
1630                            let batch_expiry = batch_expiry
1631                                .ok_or_else(|| Error::ad_hoc("missing batch expiry"))?;
1632
1633                            let mut partial_sig_tree = PartialSigTree::default();
1634                            for (txid, _) in vtxo_graph_ref.as_map() {
1635                                let agg_nonce_pk = agg_nonce_pks.get(&txid).ok_or_else(|| {
1636                                    Error::ad_hoc(format!(
1637                                        "missing aggregated nonce PK for TX {txid}"
1638                                    ))
1639                                })?;
1640
1641                                let sigs = sign_batch_tree_tx(
1642                                    txid,
1643                                    batch_expiry,
1644                                    ark_forfeit_pk,
1645                                    cosigner_kp,
1646                                    *agg_nonce_pk,
1647                                    vtxo_graph_ref,
1648                                    unsigned_commitment_tx,
1649                                    our_nonce_tree,
1650                                )
1651                                .map_err(Error::from)
1652                                .context("failed to sign VTXO tree")?;
1653
1654                                partial_sig_tree.0.extend(sigs.0);
1655                            }
1656
1657                            network_client
1658                                .submit_tree_signatures(
1659                                    &e.id,
1660                                    cosigner_kp.public_key(),
1661                                    partial_sig_tree,
1662                                )
1663                                .await
1664                                .map_err(Error::ark_server)
1665                                .context("failed to submit VTXO tree signatures")?;
1666                        }
1667                    }
1668                    StreamEvent::TreeNoncesAggregated(e) => {
1669                        tracing::debug!(batch_id = e.id, "Batch combined nonces generated");
1670                    }
1671                    StreamEvent::BatchFinalization(e) => {
1672                        if step != Step::BatchSigningStarted {
1673                            continue;
1674                        }
1675
1676                        tracing::debug!(
1677                            commitment_txid = %e.commitment_tx.unsigned_tx.compute_txid(),
1678                            "Batch finalization started"
1679                        );
1680
1681                        let signed_forfeit_psbts = if !vtxo_inputs.is_empty() {
1682                            let chunks =
1683                                connectors_graph_chunks.take().ok_or(Error::ark_server(
1684                                    "received batch finalization event without connectors",
1685                                ))?;
1686
1687                            if chunks.is_empty() {
1688                                tracing::debug!(batch_id = e.id, "No forfeit transactions");
1689
1690                                Vec::new()
1691                            } else {
1692                                let connectors_graph = TxGraph::new(chunks)
1693                                    .map_err(Error::from)
1694                                    .context(
1695                                    "failed to build connectors graph before signing forfeit TXs",
1696                                )?;
1697
1698                                tracing::debug!(batch_id = e.id, "Batch finalization started");
1699
1700                                create_and_sign_forfeit_txs(
1701                                    |input: &mut psbt::Input, msg: secp256k1::Message| match &input
1702                                    .witness_script
1703                                {
1704                                    None => Err(ark_core::Error::ad_hoc(
1705                                        "Missing witness script in psbt::Input when signing forfeit",
1706                                    )),
1707                                    Some(script) => {
1708                                        let pks = extract_checksig_pubkeys(script);
1709                                        let mut res = vec![];
1710                                        for pk in pks {
1711                                            if let Ok(keypair) =
1712                                            self.keypair_by_pk(&pk) {
1713                                                let sig =
1714                                                    secp.sign_schnorr_no_aux_rand(&msg, &keypair);
1715                                                res.push((sig, keypair.public_key().into()))
1716                                            }
1717                                        }
1718                                        Ok(res)
1719                                    }
1720                                    },
1721                                    vtxo_inputs.as_slice(),
1722                                    &connectors_graph.leaves(),
1723                                    &server_info.forfeit_address,
1724                                    server_info.dust,
1725                                )
1726                                .map_err(Error::from)?
1727                            }
1728                        } else {
1729                            Vec::new()
1730                        };
1731
1732                        let commitment_psbt = if onchain_inputs.is_empty() {
1733                            None
1734                        } else {
1735                            let mut commitment_psbt = e.commitment_tx;
1736
1737                            let sign_for_pk_fn = |pk: &XOnlyPublicKey,
1738                                                  msg: &secp256k1::Message|
1739                             -> Result<
1740                                schnorr::Signature,
1741                                ark_core::Error,
1742                            > {
1743                                self.inner
1744                                    .wallet
1745                                    .sign_for_pk(pk, msg)
1746                                    .map_err(|e| ark_core::Error::ad_hoc(e.to_string()))
1747                            };
1748
1749                            sign_commitment_psbt(
1750                                sign_for_pk_fn,
1751                                &mut commitment_psbt,
1752                                &onchain_inputs,
1753                            )
1754                            .map_err(Error::from)?;
1755
1756                            Some(commitment_psbt)
1757                        };
1758
1759                        if !signed_forfeit_psbts.is_empty() || commitment_psbt.is_some() {
1760                            network_client
1761                                .submit_signed_forfeit_txs(signed_forfeit_psbts, commitment_psbt)
1762                                .await?;
1763                        }
1764
1765                        step = step.next();
1766                    }
1767                    StreamEvent::BatchFinalized(e) => {
1768                        if step != Step::Finalized {
1769                            continue;
1770                        }
1771
1772                        let commitment_txid = e.commitment_txid;
1773
1774                        tracing::info!(batch_id = e.id, %commitment_txid, "Batch finalized");
1775
1776                        return Ok(commitment_txid);
1777                    }
1778                    StreamEvent::BatchFailed(ref e) => {
1779                        if Some(&e.id) == batch_id.as_ref() {
1780                            return Err(Error::ark_server(format!(
1781                                "batch failed {}: {}",
1782                                e.id, e.reason
1783                            )));
1784                        }
1785
1786                        tracing::debug!("Unrelated batch failed: {e:?}");
1787                    }
1788                    StreamEvent::Heartbeat => {}
1789                    StreamEvent::StreamStarted(_) => {}
1790                },
1791                Some(Err(e)) => {
1792                    tracing::error!("Got error from event stream");
1793
1794                    return Err(Error::ark_server(e));
1795                }
1796                None => {
1797                    return Err(Error::ark_server("dropped batch event stream"));
1798                }
1799            }
1800        }
1801
1802        #[derive(Debug, PartialEq, Eq)]
1803        enum Step {
1804            Start,
1805            BatchStarted,
1806            BatchSigningStarted,
1807            Finalized,
1808        }
1809
1810        impl Step {
1811            fn next(&self) -> Step {
1812                match self {
1813                    Step::Start => Step::BatchStarted,
1814                    Step::BatchStarted => Step::BatchSigningStarted,
1815                    Step::BatchSigningStarted => Step::Finalized,
1816                    Step::Finalized => Step::Finalized, // we can't go further
1817                }
1818            }
1819        }
1820    }
1821}
1822
1823#[derive(Debug, Clone)]
1824pub(crate) enum PrepareIntentKind {
1825    Register,
1826    EstimateFee,
1827}
1828
1829#[derive(Debug, Clone)]
1830pub(crate) enum BatchOutputType {
1831    Board {
1832        to_address: ArkAddress,
1833        to_amount: Amount,
1834    },
1835    OffBoard {
1836        to_address: Address,
1837        to_amount: Amount,
1838        change_address: ArkAddress,
1839        change_amount: Amount,
1840    },
1841}
1842
1843/// Prepared intent data ready for batch registration.
1844pub(crate) struct PreparedIntent {
1845    /// The signed intent.
1846    pub intent: intent::Intent,
1847    /// The ephemeral cosigner keypair.
1848    pub cosigner_keypair: Keypair,
1849    /// VTXO input outpoints (used for event stream topics).
1850    pub vtxo_input_outpoints: Vec<OutPoint>,
1851    /// Intent outputs (used to determine batch protocol steps).
1852    pub outputs: Vec<intent::Output>,
1853    /// The original onchain inputs (needed for commitment signing).
1854    pub onchain_inputs: Vec<batch::OnChainInput>,
1855    /// The original VTXO inputs (needed for forfeit signing).
1856    pub vtxo_inputs: Vec<intent::Input>,
1857}