Skip to main content

ark_client/
batch.rs

1use crate::error::ErrorContext as _;
2use crate::swap_storage::SwapStorage;
3use crate::utils::sleep;
4use crate::utils::timeout_op;
5use crate::wallet::BoardingWallet;
6use crate::wallet::OnchainWallet;
7use crate::Blockchain;
8use crate::Client;
9use crate::Error;
10use ark_core::batch;
11use ark_core::batch::aggregate_nonces;
12use ark_core::batch::complete_delegate_forfeit_txs;
13use ark_core::batch::create_and_sign_forfeit_txs;
14use ark_core::batch::create_asset_preservation_packet;
15use ark_core::batch::generate_nonce_tree;
16use ark_core::batch::sign_batch_tree_tx;
17use ark_core::batch::sign_commitment_psbt;
18use ark_core::batch::Delegate;
19use ark_core::batch::NonceKps;
20use ark_core::intent;
21use ark_core::script::extract_checksig_pubkeys;
22use ark_core::server::BatchTreeEventType;
23use ark_core::server::PartialSigTree;
24use ark_core::server::StreamEvent;
25use ark_core::ArkAddress;
26use ark_core::ArkNote;
27use ark_core::ExplorerUtxo;
28use ark_core::TxGraph;
29use backon::ExponentialBuilder;
30use backon::Retryable;
31use bitcoin::hashes::sha256;
32use bitcoin::hashes::Hash;
33use bitcoin::hex::DisplayHex;
34use bitcoin::key::Keypair;
35use bitcoin::key::Secp256k1;
36use bitcoin::psbt;
37use bitcoin::secp256k1;
38use bitcoin::secp256k1::schnorr;
39use bitcoin::secp256k1::PublicKey;
40use bitcoin::Address;
41use bitcoin::Amount;
42use bitcoin::OutPoint;
43use bitcoin::Psbt;
44use bitcoin::TxOut;
45use bitcoin::Txid;
46use bitcoin::XOnlyPublicKey;
47use futures::StreamExt;
48use jiff::Timestamp;
49use rand::CryptoRng;
50use rand::Rng;
51use std::collections::HashMap;
52
53impl<B, W, S, K> Client<B, W, S, K>
54where
55    B: Blockchain,
56    W: BoardingWallet + OnchainWallet,
57    S: SwapStorage + 'static,
58    K: crate::KeyProvider,
59{
60    /// Settle _all_ prior VTXOs and boarding outputs into the next batch, generating new confirmed
61    /// VTXOs.
62    pub async fn settle<R>(&self, rng: &mut R) -> Result<Option<Txid>, Error>
63    where
64        R: Rng + CryptoRng + Clone,
65    {
66        // Get off-chain address and send all funds to this address, no change output 🦄
67        let (to_address, _) = self.get_offchain_address()?;
68
69        let (boarding_inputs, vtxo_inputs, total_amount) =
70            self.fetch_commitment_transaction_inputs().await?;
71
72        tracing::debug!(
73            offchain_adress = %to_address.encode(),
74            ?boarding_inputs,
75            ?vtxo_inputs,
76            "Attempting to settle outputs"
77        );
78
79        if boarding_inputs.is_empty() && vtxo_inputs.is_empty() {
80            tracing::debug!("No inputs to board with");
81            return Ok(None);
82        }
83
84        let join_next_batch = || async {
85            self.join_next_batch(
86                &mut rng.clone(),
87                boarding_inputs.clone(),
88                vtxo_inputs.clone(),
89                BatchOutputType::Board {
90                    to_address,
91                    to_amount: total_amount,
92                },
93            )
94            .await
95        };
96
97        // Joining a batch can fail depending on the timing, so we try a few times.
98        let commitment_txid = join_next_batch
99            .retry(ExponentialBuilder::default().with_max_times(0))
100            .sleep(sleep)
101            // TODO: Use `when` to only retry certain errors.
102            .notify(|err: &Error, dur: std::time::Duration| {
103                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}",);
104            })
105            .await
106            .context("Failed to join batch")?;
107
108        tracing::info!(%commitment_txid, "Settlement success");
109
110        Ok(Some(commitment_txid))
111    }
112
113    /// Settle _all_ prior VTXOs, boarding outputs, and the provided ArkNotes into the next batch.
114    ///
115    /// ArkNotes are bearer tokens that can be redeemed by revealing their preimage.
116    /// This method combines them with regular VTXOs and boarding outputs into a single
117    /// settlement transaction.
118    pub async fn settle_with_notes<R>(
119        &self,
120        rng: &mut R,
121        notes: Vec<ArkNote>,
122    ) -> Result<Option<Txid>, Error>
123    where
124        R: Rng + CryptoRng + Clone,
125    {
126        let (to_address, _) = self.get_offchain_address()?;
127
128        let (boarding_inputs, vtxo_inputs, mut total_amount) =
129            self.fetch_commitment_transaction_inputs().await?;
130
131        // Convert arknotes to intent inputs and add their value to total
132        let note_inputs: Vec<intent::Input> = notes
133            .iter()
134            .map(|note| {
135                total_amount += note.value();
136                note.to_intent_input()
137            })
138            .collect::<Result<Vec<_>, _>>()?;
139
140        // Combine VTXO inputs with note inputs
141        let all_vtxo_inputs: Vec<intent::Input> =
142            vtxo_inputs.into_iter().chain(note_inputs).collect();
143
144        tracing::debug!(
145            offchain_address = %to_address.encode(),
146            ?boarding_inputs,
147            num_vtxo_inputs = all_vtxo_inputs.len(),
148            num_notes = notes.len(),
149            %total_amount,
150            "Attempting to settle outputs with notes"
151        );
152
153        if boarding_inputs.is_empty() && all_vtxo_inputs.is_empty() {
154            tracing::debug!("No inputs to settle");
155            return Ok(None);
156        }
157
158        let join_next_batch = || async {
159            self.join_next_batch(
160                &mut rng.clone(),
161                boarding_inputs.clone(),
162                all_vtxo_inputs.clone(),
163                BatchOutputType::Board {
164                    to_address,
165                    to_amount: total_amount,
166                },
167            )
168            .await
169        };
170
171        let commitment_txid = join_next_batch
172            .retry(ExponentialBuilder::default().with_max_times(0))
173            .sleep(sleep)
174            .notify(|err: &Error, dur: std::time::Duration| {
175                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
176            })
177            .await
178            .context("Failed to join batch")?;
179
180        tracing::info!(%commitment_txid, num_notes = notes.len(), "Settlement with notes success");
181
182        Ok(Some(commitment_txid))
183    }
184
185    /// Settle specific VTXOs and boarding outputs by outpoint into the next batch, generating new
186    /// confirmed VTXOs.
187    ///
188    /// Unlike [`Self::settle`], this method allows the caller to specify exactly which VTXOs and
189    /// boarding outputs to settle by providing their outpoints.
190    pub async fn settle_vtxos<R>(
191        &self,
192        rng: &mut R,
193        vtxo_outpoints: &[OutPoint],
194        boarding_outpoints: &[OutPoint],
195    ) -> Result<Option<Txid>, Error>
196    where
197        R: Rng + CryptoRng + Clone,
198    {
199        // Get off-chain address and send all funds to this address, no change output.
200        let (to_address, _) = self.get_offchain_address()?;
201
202        let (all_boarding_inputs, all_vtxo_inputs, _) =
203            self.fetch_commitment_transaction_inputs().await?;
204
205        // Filter boarding inputs to only those specified.
206        let boarding_inputs: Vec<_> = all_boarding_inputs
207            .into_iter()
208            .filter(|input| boarding_outpoints.contains(&input.outpoint()))
209            .collect();
210
211        // Filter VTXO inputs to only those specified.
212        let vtxo_inputs: Vec<_> = all_vtxo_inputs
213            .into_iter()
214            .filter(|input| vtxo_outpoints.contains(&input.outpoint()))
215            .collect();
216
217        // Recalculate total amount from filtered inputs.
218        let total_amount = boarding_inputs
219            .iter()
220            .map(|i| i.amount())
221            .chain(vtxo_inputs.iter().map(|i| i.amount()))
222            .fold(Amount::ZERO, |acc, a| acc + a);
223
224        tracing::debug!(
225            offchain_address = %to_address.encode(),
226            ?boarding_inputs,
227            ?vtxo_inputs,
228            %total_amount,
229            "Attempting to settle specific outputs"
230        );
231
232        if boarding_inputs.is_empty() && vtxo_inputs.is_empty() {
233            tracing::debug!("No matching inputs to settle");
234            return Ok(None);
235        }
236
237        let join_next_batch = || async {
238            self.join_next_batch(
239                &mut rng.clone(),
240                boarding_inputs.clone(),
241                vtxo_inputs.clone(),
242                BatchOutputType::Board {
243                    to_address,
244                    to_amount: total_amount,
245                },
246            )
247            .await
248        };
249
250        // Joining a batch can fail depending on the timing, so we try a few times.
251        let commitment_txid = join_next_batch
252            .retry(ExponentialBuilder::default().with_max_times(0))
253            .sleep(sleep)
254            .notify(|err: &Error, dur: std::time::Duration| {
255                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}",);
256            })
257            .await
258            .context("Failed to join batch")?;
259
260        tracing::info!(%commitment_txid, "Settlement of specific VTXOs success");
261
262        Ok(Some(commitment_txid))
263    }
264
265    /// Settle _some_ prior VTXOs and boarding outputs into the next batch, generating UTXOs as
266    /// outputs to a new commitment transaction.
267    pub async fn collaborative_redeem<R>(
268        &self,
269        rng: &mut R,
270        to_address: Address,
271        to_amount: Amount,
272    ) -> Result<Txid, Error>
273    where
274        R: Rng + CryptoRng + Clone,
275    {
276        let (change_address, _) = self.get_offchain_address()?;
277
278        let (boarding_inputs, vtxo_inputs, total_amount) =
279            self.fetch_commitment_transaction_inputs().await?;
280
281        let onchain_fee = self
282            .fee_estimator
283            .eval_onchain_output(ark_fees::Output {
284                amount: to_amount.to_sat(),
285                script: to_address.script_pubkey().to_string(),
286            })
287            .map_err(Error::ad_hoc)?;
288        let onchain_fee = Amount::from_sat(onchain_fee.to_satoshis());
289
290        // Fee comes out of change, not the send amount.
291        let change_amount = total_amount
292            .checked_sub(to_amount)
293            .and_then(|a| a.checked_sub(onchain_fee))
294            .ok_or_else(|| {
295                Error::coin_select(format!(
296                    "insufficient balance: {total_amount} < {to_amount} (send) + {onchain_fee} (fee)"
297                ))
298            })?;
299
300        tracing::info!(
301            %to_address,
302            send_amount = %to_amount,
303            fee = %onchain_fee,
304            change_address = %change_address.encode(),
305            %change_amount,
306            ?boarding_inputs,
307            "Attempting to collaboratively redeem outputs"
308        );
309
310        let join_next_batch = || async {
311            self.join_next_batch(
312                &mut rng.clone(),
313                boarding_inputs.clone(),
314                vtxo_inputs.clone(),
315                BatchOutputType::OffBoard {
316                    to_address: to_address.clone(),
317                    to_amount,
318                    change_address,
319                    change_amount,
320                },
321            )
322            .await
323        };
324
325        // Joining a batch can fail depending on the timing, so we try a few times.
326        let commitment_txid = join_next_batch
327            .retry(ExponentialBuilder::default().with_max_times(3))
328            .sleep(sleep)
329            // TODO: Use `when` to only retry certain errors.
330            .notify(|err: &Error, dur: std::time::Duration| {
331                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
332            })
333            .await
334            .context("Failed to join batch")?;
335
336        tracing::info!(%commitment_txid, "Collaborative redeem success");
337
338        Ok(commitment_txid)
339    }
340
341    /// Settle a selection of VTXOs into the next batch, generating UTXOs as
342    /// outputs to a new commitment transaction.
343    pub async fn collaborative_redeem_vtxo_selection<R>(
344        &self,
345        rng: &mut R,
346        input_vtxos: impl Iterator<Item = OutPoint> + Clone,
347        to_address: Address,
348        to_amount: Amount,
349    ) -> Result<Txid, Error>
350    where
351        R: Rng + CryptoRng + Clone,
352    {
353        let (change_address, _) = self.get_offchain_address()?;
354
355        let (vtxo_list, script_pubkey_to_vtxo_map) =
356            self.list_vtxos().await.context("failed to get VTXO list")?;
357
358        let vtxo_inputs = vtxo_list
359            .all_unspent()
360            .filter(|v| input_vtxos.clone().any(|outpoint| outpoint == v.outpoint))
361            .map(|v| {
362                let vtxo = script_pubkey_to_vtxo_map.get(&v.script).ok_or_else(|| {
363                    ark_core::Error::ad_hoc(format!("missing VTXO for script pubkey: {}", v.script))
364                })?;
365                let spend_info = vtxo.forfeit_spend_info()?;
366
367                Ok(intent::Input::new(
368                    v.outpoint,
369                    vtxo.exit_delay(),
370                    // NOTE: This only works with default VTXOs (single-sig).
371                    None,
372                    TxOut {
373                        value: v.amount,
374                        script_pubkey: vtxo.script_pubkey(),
375                    },
376                    vtxo.tapscripts(),
377                    spend_info,
378                    false,
379                    v.is_swept,
380                    v.assets.clone(),
381                ))
382            })
383            .collect::<Result<Vec<_>, Error>>()?;
384
385        if vtxo_inputs.is_empty() {
386            return Err(Error::ad_hoc("no matching VTXO outpoints found"));
387        }
388
389        // Check that total amount is sufficient
390        let total_input_amount = vtxo_inputs
391            .iter()
392            .fold(Amount::ZERO, |acc, vtxo| acc + vtxo.amount());
393
394        let onchain_fee = self
395            .fee_estimator
396            .eval_onchain_output(ark_fees::Output {
397                amount: to_amount.to_sat(),
398                script: to_address.script_pubkey().to_string(),
399            })
400            .map_err(Error::ad_hoc)?;
401        let onchain_fee = Amount::from_sat(onchain_fee.to_satoshis());
402
403        // Fee comes out of change, not the send amount.
404        let change_amount = total_input_amount
405            .checked_sub(to_amount)
406            .and_then(|a| a.checked_sub(onchain_fee))
407            .ok_or_else(|| {
408                Error::coin_select(format!(
409                    "insufficient VTXO amount: {total_input_amount} < {to_amount} (send) + {onchain_fee} (fee)"
410                ))
411            })?;
412
413        tracing::info!(
414            %to_address,
415            send_amount = %to_amount,
416            fee = %onchain_fee,
417            change_address = %change_address.encode(),
418            %change_amount,
419            "Attempting to collaboratively redeem outputs"
420        );
421
422        let join_next_batch = || async {
423            self.join_next_batch(
424                &mut rng.clone(),
425                Vec::new(),
426                vtxo_inputs.clone(),
427                BatchOutputType::OffBoard {
428                    to_address: to_address.clone(),
429                    to_amount,
430                    change_address,
431                    change_amount,
432                },
433            )
434            .await
435        };
436
437        // Joining a batch can fail depending on the timing, so we try a few times.
438        let commitment_txid = join_next_batch
439            .retry(ExponentialBuilder::default().with_max_times(3))
440            .sleep(sleep)
441            // TODO: Use `when` to only retry certain errors.
442            .notify(|err: &Error, dur: std::time::Duration| {
443                tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
444            })
445            .await
446            .context("Failed to join batch")?;
447
448        tracing::info!(%commitment_txid, "Collaborative redeem success");
449
450        Ok(commitment_txid)
451    }
452
453    /// Generate a delegate for settling VTXOs on behalf of the owner.
454    ///
455    /// The owner pre-signs the intent and forfeit transactions, allowing another party to complete
456    /// the settlement at a later time using the provided `delegate_cosigner_pk`.
457    ///
458    /// # Arguments
459    ///
460    /// * `delegate_cosigner_pk` - The cosigner public key that the delegate will use
461    /// * `select_recoverable_vtxos` - Whether to include recoverable VTXOs
462    ///
463    /// # Returns
464    ///
465    /// A [`Delegate`] struct containing all the pre-signed data needed for settlement.
466    pub async fn generate_delegate(
467        &self,
468        delegate_cosigner_pk: PublicKey,
469    ) -> Result<Delegate, Error> {
470        // Get off-chain address and send all funds to this address.
471        let (to_address, _) = self.get_offchain_address()?;
472
473        // Simply collect all VTXOs that can be settled.
474        let (_, vtxo_inputs, _) = self.fetch_commitment_transaction_inputs().await?;
475
476        let total_amount = vtxo_inputs
477            .iter()
478            .fold(Amount::ZERO, |acc, v| acc + v.amount());
479
480        if vtxo_inputs.is_empty() {
481            return Err(Error::ad_hoc("no inputs to settle via delegate"));
482        }
483
484        let server_info = &self.server_info;
485
486        let mut outputs = vec![intent::Output::Offchain(TxOut {
487            value: total_amount,
488            script_pubkey: to_address.to_p2tr_script_pubkey(),
489        })];
490
491        if let Some(packet) = create_asset_preservation_packet(&vtxo_inputs, &outputs)? {
492            outputs.push(intent::Output::AssetPacket(packet.to_txout()));
493        }
494
495        let delegate = batch::prepare_delegate_psbts(
496            vtxo_inputs,
497            outputs,
498            delegate_cosigner_pk,
499            &server_info.forfeit_address,
500            server_info.dust,
501        )?;
502
503        Ok(delegate)
504    }
505
506    /// Sign a set of delegate PSBTs, including the intent PSBT and the forfeit PSBTs.
507    pub fn sign_delegate_psbts(
508        &self,
509        intent_psbt: &mut Psbt,
510        forfeit_psbts: &mut [Psbt],
511    ) -> Result<(), Error> {
512        let sign_fn =
513            |input: &mut psbt::Input,
514             msg: secp256k1::Message|
515             -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error> {
516                match &input.witness_script {
517                    None => Err(ark_core::Error::ad_hoc(
518                        "Missing witness script for psbt::Input",
519                    )),
520                    Some(script) => {
521                        let mut res = vec![];
522                        let pks = extract_checksig_pubkeys(script);
523                        for pk in pks {
524                            if let Ok(keypair) = self.keypair_by_pk(&pk) {
525                                let sig = Secp256k1::new().sign_schnorr_no_aux_rand(&msg, &keypair);
526                                let pk = keypair.x_only_public_key().0;
527                                res.push((sig, pk));
528                            }
529                        }
530                        Ok(res)
531                    }
532                }
533            };
534
535        batch::sign_delegate_psbts(sign_fn, intent_psbt, forfeit_psbts)?;
536
537        Ok(())
538    }
539
540    /// Settle a delegate by completing the batch protocol using pre-signed data.
541    ///
542    /// This method allows Bob to settle Alice's VTXOs using the pre-signed intent and forfeit
543    /// transactions from the [`Delegate`] struct.
544    ///
545    /// # Arguments
546    ///
547    /// * `rng` - Random number generator for nonce generation
548    /// * `delegate` - The delegate struct containing pre-signed data
549    /// * `own_cosigner_kp` - Bob's cosigner keypair (must match the delegate_cosigner_pk)
550    ///
551    /// # Returns
552    ///
553    /// The commitment transaction ID if successful.
554    pub async fn settle_delegate<R>(
555        &self,
556        rng: &mut R,
557        delegate: Delegate,
558        own_cosigner_kp: Keypair,
559    ) -> Result<Txid, Error>
560    where
561        R: Rng + CryptoRng,
562    {
563        // Verify the cosigner key matches
564        if own_cosigner_kp.public_key() != delegate.delegate_cosigner_pk {
565            return Err(Error::ad_hoc(
566                "provided cosigner keypair does not match delegate_cosigner_pk",
567            ));
568        }
569
570        // Register the pre-signed intent
571        let intent_id = timeout_op(
572            self.inner.timeout,
573            self.network_client()
574                .register_intent(delegate.intent.clone()),
575        )
576        .await
577        .context("failed to register delegated intent")??;
578
579        tracing::debug!(intent_id, "Registered delegated intent");
580
581        let network_client = self.network_client();
582        let server_info = &self.server_info;
583
584        #[derive(Debug, PartialEq, Eq)]
585        enum Step {
586            Start,
587            BatchStarted,
588            BatchSigningStarted,
589            Finalized,
590        }
591
592        impl Step {
593            fn next(&self) -> Step {
594                match self {
595                    Step::Start => Step::BatchStarted,
596                    Step::BatchStarted => Step::BatchSigningStarted,
597                    Step::BatchSigningStarted => Step::Finalized,
598                    Step::Finalized => Step::Finalized,
599                }
600            }
601        }
602
603        let mut step = Step::Start;
604
605        let own_cosigner_kps = [own_cosigner_kp];
606        let own_cosigner_pks = own_cosigner_kps
607            .iter()
608            .map(|k| k.public_key())
609            .collect::<Vec<_>>();
610
611        let mut batch_id: Option<String> = None;
612
613        let vtxo_input_outpoints = delegate
614            .forfeit_psbts
615            .iter()
616            .map(|psbt| psbt.unsigned_tx.input[0].previous_output)
617            .collect::<Vec<_>>();
618
619        let topics = vtxo_input_outpoints
620            .iter()
621            .map(ToString::to_string)
622            .chain(
623                own_cosigner_pks
624                    .iter()
625                    .map(|pk| pk.serialize().to_lower_hex_string()),
626            )
627            .collect();
628
629        let mut stream = network_client.get_event_stream(topics).await?;
630
631        let (ark_forfeit_pk, _) = server_info.forfeit_pk.x_only_public_key();
632
633        let mut unsigned_commitment_tx = None;
634
635        let mut vtxo_batch_tree_graph_chunks = Some(Vec::new());
636        let mut vtxo_batch_tree_graph: Option<TxGraph> = None;
637
638        let mut connectors_graph_chunks = Some(Vec::new());
639        let mut batch_expiry = None;
640
641        let mut agg_nonce_pks = HashMap::new();
642
643        let mut our_nonce_trees: Option<HashMap<Keypair, NonceKps>> = None;
644
645        loop {
646            match timeout_op(self.inner.timeout, stream.next())
647                .await
648                .context("timed out waiting for batch event")?
649            {
650                Some(Ok(event)) => match event {
651                    StreamEvent::BatchStarted(e) => {
652                        if step != Step::Start {
653                            continue;
654                        }
655
656                        let hash = sha256::Hash::hash(intent_id.as_bytes());
657                        let hash = hash.as_byte_array().to_vec().to_lower_hex_string();
658
659                        if e.intent_id_hashes.iter().any(|h| h == &hash) {
660                            timeout_op(
661                                self.inner.timeout,
662                                self.network_client()
663                                    .confirm_registration(intent_id.clone()),
664                            )
665                            .await
666                            .context("failed to confirm intent registration")??;
667
668                            tracing::info!(batch_id = e.id, intent_id, "Intent ID found for batch");
669
670                            batch_id = Some(e.id);
671
672                            step = Step::BatchStarted;
673
674                            batch_expiry = Some(e.batch_expiry);
675                        } else {
676                            tracing::debug!(
677                                batch_id = e.id,
678                                intent_id,
679                                "Intent ID not found for batch"
680                            );
681                        }
682                    }
683                    StreamEvent::TreeTx(e) => {
684                        if step != Step::BatchStarted && step != Step::BatchSigningStarted {
685                            continue;
686                        }
687
688                        match e.batch_tree_event_type {
689                            BatchTreeEventType::Vtxo => {
690                                match &mut vtxo_batch_tree_graph_chunks {
691                                    Some(vtxo_batch_tree_graph_chunks) => {
692                                        tracing::debug!("Got new VTXO batch-tree graph chunk");
693
694                                        vtxo_batch_tree_graph_chunks.push(e.tx_graph_chunk)
695                                    }
696                                    None => {
697                                        return Err(Error::ark_server(
698                                            "received unexpected VTXO batch-tree graph chunk",
699                                        ));
700                                    }
701                                };
702                            }
703                            BatchTreeEventType::Connector => {
704                                match connectors_graph_chunks {
705                                    Some(ref mut connectors_graph_chunks) => {
706                                        tracing::debug!("Got new connectors graph chunk");
707
708                                        connectors_graph_chunks.push(e.tx_graph_chunk)
709                                    }
710                                    None => {
711                                        return Err(Error::ark_server(
712                                            "received unexpected connectors graph chunk",
713                                        ));
714                                    }
715                                };
716                            }
717                        }
718                    }
719                    StreamEvent::TreeSignature(e) => {
720                        if step != Step::BatchSigningStarted {
721                            continue;
722                        }
723
724                        match e.batch_tree_event_type {
725                            BatchTreeEventType::Vtxo => {
726                                match vtxo_batch_tree_graph {
727                                    Some(ref mut vtxo_batch_tree_graph) => {
728                                        vtxo_batch_tree_graph.apply(|graph| {
729                                            if graph.root().unsigned_tx.compute_txid() != e.txid {
730                                                Ok(true)
731                                            } else {
732                                                graph.set_signature(e.signature);
733
734                                                Ok(false)
735                                            }
736                                        })?;
737                                    }
738                                    None => {
739                                        return Err(Error::ark_server(
740                                            "received batch-tree signature without transaction graph",
741                                        ));
742                                    }
743                                };
744                            }
745                            BatchTreeEventType::Connector => {
746                                return Err(Error::ark_server(
747                                    "received batch-tree signature for connector tree",
748                                ));
749                            }
750                        }
751                    }
752                    StreamEvent::TreeSigningStarted(e) => {
753                        if step != Step::BatchStarted {
754                            continue;
755                        }
756
757                        let chunks = vtxo_batch_tree_graph_chunks.take().ok_or(Error::ark_server(
758                            "received batch-tree signing started event without VTXO batch-tree graph chunks",
759                        ))?;
760                        vtxo_batch_tree_graph =
761                            Some(TxGraph::new(chunks).map_err(Error::from).context(
762                                "failed to build VTXO batch-tree graph before generating nonces",
763                            )?);
764
765                        tracing::info!(batch_id = e.id, "Batch signing started");
766
767                        for own_cosigner_pk in own_cosigner_pks.iter() {
768                            if !&e.cosigners_pubkeys.iter().any(|p| p == own_cosigner_pk) {
769                                return Err(Error::ark_server(format!(
770                                    "own cosigner PK is not present in cosigner PKs: {own_cosigner_pk}"
771                                )));
772                            }
773                        }
774
775                        let mut our_nonce_tree_map = HashMap::new();
776                        for own_cosigner_kp in own_cosigner_kps {
777                            let own_cosigner_pk = own_cosigner_kp.public_key();
778                            let nonce_tree = generate_nonce_tree(
779                                rng,
780                                vtxo_batch_tree_graph
781                                    .as_ref()
782                                    .expect("VTXO batch-tree graph"),
783                                own_cosigner_pk,
784                                &e.unsigned_commitment_tx,
785                            )
786                            .map_err(Error::from)
787                            .context("failed to generate VTXO nonce tree")?;
788
789                            tracing::info!(
790                                cosigner_pk = %own_cosigner_pk,
791                                "Submitting nonce tree for cosigner PK"
792                            );
793
794                            network_client
795                                .submit_tree_nonces(
796                                    &e.id,
797                                    own_cosigner_pk,
798                                    nonce_tree.to_nonce_pks(),
799                                )
800                                .await
801                                .map_err(Error::ark_server)
802                                .context("failed to submit VTXO nonce tree")?;
803
804                            our_nonce_tree_map.insert(own_cosigner_kp, nonce_tree);
805                        }
806
807                        unsigned_commitment_tx = Some(e.unsigned_commitment_tx);
808                        our_nonce_trees = Some(our_nonce_tree_map);
809
810                        step = step.next();
811                    }
812                    StreamEvent::TreeNonces(e) => {
813                        if step != Step::BatchSigningStarted {
814                            continue;
815                        }
816
817                        let tree_tx_nonce_pks = e.nonces;
818
819                        let cosigner_pk = match tree_tx_nonce_pks.0.iter().find(|(pk, _)| {
820                            own_cosigner_pks
821                                .iter()
822                                .any(|p| &&p.x_only_public_key().0 == pk)
823                        }) {
824                            Some((pk, _)) => *pk,
825                            None => {
826                                tracing::debug!(
827                                    batch_id = e.id,
828                                    txid = %e.txid,
829                                    "Received irrelevant TreeNonces event"
830                                );
831
832                                continue;
833                            }
834                        };
835
836                        tracing::debug!(
837                            batch_id = e.id,
838                            txid = %e.txid,
839                            %cosigner_pk,
840                            "Received TreeNonces event"
841                        );
842
843                        let agg_nonce_pk = aggregate_nonces(tree_tx_nonce_pks);
844
845                        agg_nonce_pks.insert(e.txid, agg_nonce_pk);
846
847                        if vtxo_batch_tree_graph.is_none() {
848                            let chunks = vtxo_batch_tree_graph_chunks.take().ok_or(Error::ark_server(
849                                "received batch-tree nonces event without VTXO batch-tree graph chunks",
850                            ))?;
851                            vtxo_batch_tree_graph = Some(
852                                TxGraph::new(chunks)
853                                    .map_err(Error::from)
854                                    .context("failed to build VTXO batch-tree graph before batch-tree signing")?,
855                            );
856                        }
857                        let vtxo_batch_tree_graph_ref =
858                            vtxo_batch_tree_graph.as_ref().expect("just populated");
859
860                        if agg_nonce_pks.len() == vtxo_batch_tree_graph_ref.nb_of_nodes() {
861                            let cosigner_kp = own_cosigner_kps
862                                .iter()
863                                .find(|kp| kp.public_key().x_only_public_key().0 == cosigner_pk)
864                                .ok_or_else(|| {
865                                    Error::ad_hoc("no cosigner keypair to sign for own PK")
866                                })?;
867
868                            let our_nonce_trees = our_nonce_trees.as_mut().ok_or(
869                                Error::ark_server("missing nonce trees during batch protocol"),
870                            )?;
871
872                            let our_nonce_tree =
873                                our_nonce_trees
874                                    .get_mut(cosigner_kp)
875                                    .ok_or(Error::ark_server(
876                                        "missing nonce tree during batch protocol",
877                                    ))?;
878
879                            let unsigned_commitment_tx = unsigned_commitment_tx
880                                .as_ref()
881                                .ok_or_else(|| Error::ad_hoc("missing commitment TX"))?;
882
883                            let batch_expiry = batch_expiry
884                                .ok_or_else(|| Error::ad_hoc("missing batch expiry"))?;
885
886                            let mut partial_sig_tree = PartialSigTree::default();
887                            for (txid, _) in vtxo_batch_tree_graph_ref.as_map() {
888                                let agg_nonce_pk = agg_nonce_pks.get(&txid).ok_or_else(|| {
889                                    Error::ad_hoc(format!(
890                                        "missing aggregated nonce PK for TX {txid}"
891                                    ))
892                                })?;
893
894                                let sigs = sign_batch_tree_tx(
895                                    txid,
896                                    batch_expiry,
897                                    ark_forfeit_pk,
898                                    cosigner_kp,
899                                    *agg_nonce_pk,
900                                    vtxo_batch_tree_graph_ref,
901                                    unsigned_commitment_tx,
902                                    our_nonce_tree,
903                                )
904                                .map_err(Error::from)
905                                .context("failed to sign VTXO batch-tree transactions")?;
906
907                                partial_sig_tree.0.extend(sigs.0);
908                            }
909
910                            network_client
911                                .submit_tree_signatures(
912                                    &e.id,
913                                    cosigner_kp.public_key(),
914                                    partial_sig_tree,
915                                )
916                                .await
917                                .map_err(Error::ark_server)
918                                .context("failed to submit VTXO batch-tree signatures")?;
919                        }
920                    }
921                    StreamEvent::TreeNoncesAggregated(e) => {
922                        tracing::debug!(batch_id = e.id, "Batch combined nonces generated");
923                    }
924                    StreamEvent::BatchFinalization(e) => {
925                        if step != Step::BatchSigningStarted {
926                            continue;
927                        }
928
929                        tracing::debug!(
930                            commitment_txid = %e.commitment_tx.unsigned_tx.compute_txid(),
931                            "Batch finalization started (delegate)"
932                        );
933
934                        let chunks = connectors_graph_chunks.take().ok_or(Error::ark_server(
935                            "received batch finalization event without connectors",
936                        ))?;
937
938                        if chunks.is_empty() {
939                            tracing::debug!(batch_id = e.id, "No delegated forfeit transactions");
940                        } else {
941                            let connectors_graph = TxGraph::new(chunks)
942                                .map_err(Error::from)
943                                .context(
944                                "failed to build connectors graph before completing forfeit TXs",
945                            )?;
946
947                            tracing::debug!(
948                                batch_id = e.id,
949                                "Completing delegated forfeit transactions"
950                            );
951
952                            let signed_forfeit_psbts = complete_delegate_forfeit_txs(
953                                &delegate.forfeit_psbts,
954                                &connectors_graph.leaves(),
955                            )?;
956
957                            network_client
958                                .submit_signed_forfeit_txs(signed_forfeit_psbts, None)
959                                .await?;
960                        }
961
962                        step = step.next();
963                    }
964                    StreamEvent::BatchFinalized(e) => {
965                        if step != Step::Finalized {
966                            continue;
967                        }
968
969                        let commitment_txid = e.commitment_txid;
970
971                        tracing::info!(batch_id = e.id, %commitment_txid, "Delegated batch finalized");
972
973                        return Ok(commitment_txid);
974                    }
975                    StreamEvent::BatchFailed(ref e) => {
976                        if Some(&e.id) == batch_id.as_ref() {
977                            return Err(Error::ark_server(format!(
978                                "batch failed {}: {}",
979                                e.id, e.reason
980                            )));
981                        }
982
983                        tracing::debug!("Unrelated batch failed: {e:?}");
984                    }
985                    StreamEvent::Heartbeat => {}
986                    StreamEvent::StreamStarted(_) => {}
987                },
988                Some(Err(e)) => {
989                    tracing::error!("Got error from event stream");
990
991                    return Err(Error::ark_server(e));
992                }
993                None => {
994                    return Err(Error::ark_server("dropped batch event stream"));
995                }
996            }
997        }
998    }
999
1000    /// Get all the [`batch::OnChainInput`]s and [`batch::VtxoInput`]s that can be used to join an
1001    /// upcoming batch.
1002    pub(crate) async fn fetch_commitment_transaction_inputs(
1003        &self,
1004    ) -> Result<(Vec<batch::OnChainInput>, Vec<intent::Input>, Amount), Error> {
1005        // Get all known boarding outputs.
1006        let boarding_outputs = self.inner.wallet.get_boarding_outputs()?;
1007
1008        let mut boarding_inputs: Vec<batch::OnChainInput> = Vec::new();
1009        let mut total_amount = Amount::ZERO;
1010
1011        // To track unique outpoints and prevent duplicates
1012        let mut seen_outpoints = std::collections::HashSet::new();
1013
1014        let now = Timestamp::now();
1015
1016        // Find outpoints for each boarding output.
1017        for boarding_output in boarding_outputs {
1018            let outpoints = timeout_op(
1019                self.inner.timeout,
1020                self.blockchain().find_outpoints(boarding_output.address()),
1021            )
1022            .await
1023            .context("failed to find outpoints")??;
1024
1025            for o in outpoints.iter() {
1026                if let ExplorerUtxo {
1027                    outpoint,
1028                    amount,
1029                    confirmation_blocktime: Some(confirmation_blocktime),
1030                    confirmations,
1031                    is_spent: false,
1032                } = o
1033                {
1034                    // Check for duplicate outpoints
1035                    if seen_outpoints.contains(outpoint) {
1036                        continue;
1037                    }
1038
1039                    // Only include confirmed boarding outputs with an _inactive_ exit path.
1040                    if !boarding_output.can_be_claimed_unilaterally_by_owner(
1041                        now.as_duration().try_into().map_err(Error::ad_hoc)?,
1042                        std::time::Duration::from_secs(*confirmation_blocktime),
1043                        *confirmations,
1044                    ) {
1045                        // Mark this outpoint as seen
1046                        seen_outpoints.insert(*outpoint);
1047
1048                        boarding_inputs.push(batch::OnChainInput::new(
1049                            boarding_output.clone(),
1050                            *amount,
1051                            *outpoint,
1052                        ));
1053                        total_amount += *amount;
1054                    }
1055                }
1056            }
1057        }
1058
1059        let (vtxo_list, script_pubkey_to_vtxo_map) = self.list_vtxos().await?;
1060
1061        total_amount += vtxo_list
1062            .all_unspent()
1063            .fold(Amount::ZERO, |acc, vtxo| acc + vtxo.amount);
1064
1065        let vtxo_inputs = vtxo_list
1066            .all_unspent()
1067            .map(|virtual_tx_outpoint| {
1068                let vtxo = script_pubkey_to_vtxo_map
1069                    .get(&virtual_tx_outpoint.script)
1070                    .ok_or_else(|| {
1071                        ark_core::Error::ad_hoc(format!(
1072                            "missing VTXO for script pubkey: {}",
1073                            virtual_tx_outpoint.script
1074                        ))
1075                    })?;
1076                let spend_info = vtxo.forfeit_spend_info()?;
1077
1078                Ok(intent::Input::new(
1079                    virtual_tx_outpoint.outpoint,
1080                    vtxo.exit_delay(),
1081                    None,
1082                    TxOut {
1083                        value: virtual_tx_outpoint.amount,
1084                        script_pubkey: vtxo.script_pubkey(),
1085                    },
1086                    vtxo.tapscripts(),
1087                    spend_info,
1088                    false,
1089                    virtual_tx_outpoint.is_swept,
1090                    virtual_tx_outpoint.assets.clone(),
1091                ))
1092            })
1093            .collect::<Result<Vec<_>, ark_core::Error>>()?;
1094
1095        Ok((boarding_inputs, vtxo_inputs, total_amount))
1096    }
1097
1098    /// Prepare an intent for batch registration or fee estimation.
1099    ///
1100    /// This creates a signed intent PSBT along with all the data needed to participate
1101    /// in the batch protocol.
1102    pub(crate) fn prepare_intent<R>(
1103        &self,
1104        rng: &mut R,
1105        onchain_inputs: Vec<batch::OnChainInput>,
1106        vtxo_inputs: Vec<intent::Input>,
1107        output_type: BatchOutputType,
1108        intent_kind: PrepareIntentKind,
1109    ) -> Result<PreparedIntent, Error>
1110    where
1111        R: Rng + CryptoRng,
1112    {
1113        if onchain_inputs.is_empty() && vtxo_inputs.is_empty() {
1114            return Err(Error::ad_hoc("cannot prepare intent without inputs"));
1115        }
1116
1117        // Generate an (ephemeral) cosigner keypair.
1118        let cosigner_keypair = Keypair::new(self.secp(), rng);
1119
1120        let vtxo_input_outpoints = vtxo_inputs.iter().map(|i| i.outpoint()).collect::<Vec<_>>();
1121
1122        let inputs = {
1123            let boarding_inputs = onchain_inputs.clone().into_iter().map(|o| {
1124                intent::Input::new(
1125                    o.outpoint(),
1126                    o.boarding_output().exit_delay(),
1127                    None,
1128                    TxOut {
1129                        value: o.amount(),
1130                        script_pubkey: o.boarding_output().script_pubkey(),
1131                    },
1132                    o.boarding_output().tapscripts(),
1133                    o.boarding_output().forfeit_spend_info(),
1134                    true,
1135                    false,
1136                    Vec::new(),
1137                )
1138            });
1139
1140            boarding_inputs
1141                .chain(vtxo_inputs.clone())
1142                .collect::<Vec<_>>()
1143        };
1144
1145        let dust = self.server_info.dust;
1146
1147        let mut outputs = vec![];
1148
1149        match output_type {
1150            BatchOutputType::Board {
1151                to_address,
1152                to_amount,
1153            } => {
1154                if to_amount < self.server_info.dust {
1155                    return Err(Error::ad_hoc(format!(
1156                        "cannot settle into sub-dust VTXO: {to_amount} < {dust}"
1157                    )));
1158                }
1159
1160                outputs.push(intent::Output::Offchain(TxOut {
1161                    value: to_amount,
1162                    script_pubkey: to_address.to_p2tr_script_pubkey(),
1163                }));
1164            }
1165            BatchOutputType::OffBoard {
1166                to_address,
1167                to_amount,
1168                change_amount,
1169                ..
1170            } if change_amount == Amount::ZERO => {
1171                outputs.push(intent::Output::Onchain(TxOut {
1172                    value: to_amount,
1173                    script_pubkey: to_address.script_pubkey(),
1174                }));
1175            }
1176            BatchOutputType::OffBoard {
1177                to_address,
1178                to_amount,
1179                change_address,
1180                change_amount,
1181            } => {
1182                if change_amount < dust {
1183                    return Err(Error::ad_hoc(format!(
1184                        "cannot settle with sub-dust change VTXO: {change_amount} < {dust}"
1185                    )));
1186                }
1187
1188                outputs.push(intent::Output::Onchain(TxOut {
1189                    value: to_amount,
1190                    script_pubkey: to_address.script_pubkey(),
1191                }));
1192
1193                outputs.push(intent::Output::Offchain(TxOut {
1194                    value: change_amount,
1195                    script_pubkey: change_address.to_p2tr_script_pubkey(),
1196                }));
1197            }
1198        }
1199
1200        let cosigner_pk = cosigner_keypair.public_key();
1201
1202        let secp = Secp256k1::new();
1203
1204        let sign_for_vtxo_fn =
1205            |input: &mut psbt::Input,
1206             msg: secp256k1::Message|
1207             -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error> {
1208                match &input.witness_script {
1209                    None => Err(ark_core::Error::ad_hoc(
1210                        "Missing witness script in psbt::Input when signing intent",
1211                    )),
1212                    Some(script) => {
1213                        let pks = extract_checksig_pubkeys(script);
1214                        let mut res = vec![];
1215                        for pk in pks {
1216                            if let Ok(keypair) = self.keypair_by_pk(&pk) {
1217                                let sig = secp.sign_schnorr_no_aux_rand(&msg, &keypair);
1218                                res.push((sig, keypair.public_key().into()))
1219                            }
1220                        }
1221                        Ok(res)
1222                    }
1223                }
1224            };
1225
1226        let sign_for_onchain_fn =
1227            |input: &mut psbt::Input,
1228             msg: secp256k1::Message|
1229             -> Result<(schnorr::Signature, XOnlyPublicKey), ark_core::Error> {
1230                let onchain_input = onchain_inputs
1231                    .iter()
1232                    .find(|o| {
1233                        Some(o.boarding_output().script_pubkey())
1234                            == input.witness_utxo.clone().map(|w| w.script_pubkey)
1235                    })
1236                    .ok_or_else(|| {
1237                        ark_core::Error::ad_hoc(
1238                            "could not find signing key for onchain input: {input:?}",
1239                        )
1240                    })?;
1241
1242                let owner_pk = onchain_input.boarding_output().owner_pk();
1243                let sig = self
1244                    .inner
1245                    .wallet
1246                    .sign_for_pk(&owner_pk, &msg)
1247                    .map_err(|e| ark_core::Error::ad_hoc(e.to_string()))?;
1248
1249                Ok((sig, owner_pk))
1250            };
1251
1252        let now = std::time::SystemTime::now()
1253            .duration_since(std::time::UNIX_EPOCH)
1254            .map_err(|e| Error::ad_hoc(e.to_string()))
1255            .context("failed to compute now timestamp")?;
1256        let now = now.as_secs();
1257        let expire_at = now + (2 * 60);
1258
1259        if let Some(packet) = create_asset_preservation_packet(&inputs, &outputs)? {
1260            outputs.push(intent::Output::AssetPacket(packet.to_txout()));
1261        }
1262
1263        let mut onchain_output_indexes = Vec::new();
1264        for (i, output) in outputs.iter().enumerate() {
1265            if matches!(output, intent::Output::Onchain(_)) {
1266                onchain_output_indexes.push(i);
1267            }
1268        }
1269
1270        let message = match intent_kind {
1271            PrepareIntentKind::EstimateFee => intent::IntentMessage::EstimateIntentFee {
1272                onchain_output_indexes,
1273                valid_at: now,
1274                expire_at,
1275                own_cosigner_pks: vec![cosigner_pk],
1276            },
1277            PrepareIntentKind::Register => intent::IntentMessage::Register {
1278                onchain_output_indexes,
1279                valid_at: now,
1280                expire_at,
1281                own_cosigner_pks: vec![cosigner_pk],
1282            },
1283        };
1284
1285        let intent = intent::make_intent(
1286            sign_for_vtxo_fn,
1287            sign_for_onchain_fn,
1288            inputs,
1289            outputs.clone(),
1290            message,
1291        )?;
1292
1293        Ok(PreparedIntent {
1294            intent,
1295            cosigner_keypair,
1296            vtxo_input_outpoints,
1297            outputs,
1298            onchain_inputs,
1299            vtxo_inputs,
1300        })
1301    }
1302
1303    pub(crate) async fn join_next_batch<R>(
1304        &self,
1305        rng: &mut R,
1306        onchain_inputs: Vec<batch::OnChainInput>,
1307        vtxo_inputs: Vec<intent::Input>,
1308        output_type: BatchOutputType,
1309    ) -> Result<Txid, Error>
1310    where
1311        R: Rng + CryptoRng,
1312    {
1313        let prepared = self.prepare_intent(
1314            rng,
1315            onchain_inputs,
1316            vtxo_inputs,
1317            output_type,
1318            PrepareIntentKind::Register,
1319        )?;
1320
1321        let PreparedIntent {
1322            intent,
1323            cosigner_keypair,
1324            vtxo_input_outpoints,
1325            outputs,
1326            onchain_inputs,
1327            vtxo_inputs,
1328        } = prepared;
1329
1330        let onchain_input_outpoints = onchain_inputs
1331            .iter()
1332            .map(|i| i.outpoint())
1333            .collect::<Vec<_>>();
1334
1335        let server_info = &self.server_info;
1336
1337        let own_cosigner_kps = [cosigner_keypair];
1338        let own_cosigner_pks = own_cosigner_kps
1339            .iter()
1340            .map(|k| k.public_key())
1341            .collect::<Vec<_>>();
1342
1343        let secp = Secp256k1::new();
1344
1345        let mut step = Step::Start;
1346
1347        let intent_id = timeout_op(
1348            self.inner.timeout,
1349            self.network_client().register_intent(intent),
1350        )
1351        .await
1352        .context("failed to register intent")??;
1353
1354        tracing::debug!(
1355            intent_id,
1356            ?onchain_input_outpoints,
1357            ?vtxo_input_outpoints,
1358            ?outputs,
1359            "Registered intent for batch"
1360        );
1361
1362        let network_client = self.network_client();
1363
1364        let mut batch_id: Option<String> = None;
1365
1366        let topics = vtxo_input_outpoints
1367            .iter()
1368            .map(ToString::to_string)
1369            .chain(
1370                own_cosigner_pks
1371                    .iter()
1372                    .map(|pk| pk.serialize().to_lower_hex_string()),
1373            )
1374            .collect();
1375
1376        let mut stream = network_client.get_event_stream(topics).await?;
1377
1378        let (ark_forfeit_pk, _) = server_info.forfeit_pk.x_only_public_key();
1379
1380        let mut unsigned_commitment_tx = None;
1381
1382        let mut vtxo_batch_tree_graph_chunks = Some(Vec::new());
1383        let mut vtxo_batch_tree_graph: Option<TxGraph> = None;
1384
1385        let mut connectors_graph_chunks = Some(Vec::new());
1386        let mut batch_expiry = None;
1387
1388        let mut agg_nonce_pks = HashMap::new();
1389
1390        let mut our_nonce_trees: Option<HashMap<Keypair, NonceKps>> = None;
1391        loop {
1392            match timeout_op(self.inner.timeout, stream.next())
1393                .await
1394                .context("timed out waiting for batch event")?
1395            {
1396                Some(Ok(event)) => match event {
1397                    StreamEvent::BatchStarted(e) => {
1398                        if step != Step::Start {
1399                            continue;
1400                        }
1401
1402                        let hash = sha256::Hash::hash(intent_id.as_bytes());
1403                        let hash = hash.as_byte_array().to_vec().to_lower_hex_string();
1404
1405                        if e.intent_id_hashes.iter().any(|h| h == &hash) {
1406                            timeout_op(
1407                                self.inner.timeout,
1408                                self.network_client()
1409                                    .confirm_registration(intent_id.clone()),
1410                            )
1411                            .await
1412                            .context("failed to confirm intent registration")??;
1413
1414                            tracing::info!(batch_id = e.id, intent_id, "Intent ID found for batch");
1415
1416                            batch_id = Some(e.id);
1417
1418                            // Depending on whether we are generating new VTXOs or not, we continue
1419                            // with a different step in the state machine.
1420                            step = match outputs
1421                                .iter()
1422                                .any(|o| matches!(o, intent::Output::Offchain(_)))
1423                            {
1424                                true => Step::BatchStarted,
1425                                false => Step::BatchSigningStarted,
1426                            };
1427
1428                            batch_expiry = Some(e.batch_expiry);
1429                        } else {
1430                            tracing::debug!(
1431                                batch_id = e.id,
1432                                intent_id,
1433                                "Intent ID not found for batch"
1434                            );
1435                        }
1436                    }
1437                    StreamEvent::TreeTx(e) => {
1438                        if step != Step::BatchStarted && step != Step::BatchSigningStarted {
1439                            continue;
1440                        }
1441
1442                        match e.batch_tree_event_type {
1443                            BatchTreeEventType::Vtxo => {
1444                                match &mut vtxo_batch_tree_graph_chunks {
1445                                    Some(vtxo_batch_tree_graph_chunks) => {
1446                                        tracing::debug!("Got new VTXO batch-tree graph chunk");
1447
1448                                        vtxo_batch_tree_graph_chunks.push(e.tx_graph_chunk)
1449                                    }
1450                                    None => {
1451                                        return Err(Error::ark_server(
1452                                            "received unexpected VTXO batch-tree graph chunk",
1453                                        ));
1454                                    }
1455                                };
1456                            }
1457                            BatchTreeEventType::Connector => {
1458                                match connectors_graph_chunks {
1459                                    Some(ref mut connectors_graph_chunks) => {
1460                                        tracing::debug!("Got new connectors graph chunk");
1461
1462                                        connectors_graph_chunks.push(e.tx_graph_chunk)
1463                                    }
1464                                    None => {
1465                                        return Err(Error::ark_server(
1466                                            "received unexpected connectors graph chunk",
1467                                        ));
1468                                    }
1469                                };
1470                            }
1471                        }
1472                    }
1473                    StreamEvent::TreeSignature(e) => {
1474                        if step != Step::BatchSigningStarted {
1475                            continue;
1476                        }
1477
1478                        match e.batch_tree_event_type {
1479                            BatchTreeEventType::Vtxo => {
1480                                match vtxo_batch_tree_graph {
1481                                    Some(ref mut vtxo_batch_tree_graph) => {
1482                                        vtxo_batch_tree_graph.apply(|graph| {
1483                                            if graph.root().unsigned_tx.compute_txid() != e.txid {
1484                                                Ok(true)
1485                                            } else {
1486                                                graph.set_signature(e.signature);
1487
1488                                                Ok(false)
1489                                            }
1490                                        })?;
1491                                    }
1492                                    None => {
1493                                        return Err(Error::ark_server(
1494                                            "received batch-tree signature without transaction graph",
1495                                        ));
1496                                    }
1497                                };
1498                            }
1499                            BatchTreeEventType::Connector => {
1500                                return Err(Error::ark_server(
1501                                    "received batch-tree signature for connector tree",
1502                                ));
1503                            }
1504                        }
1505                    }
1506                    StreamEvent::TreeSigningStarted(e) => {
1507                        if step != Step::BatchStarted {
1508                            continue;
1509                        }
1510
1511                        let chunks = vtxo_batch_tree_graph_chunks.take().ok_or(Error::ark_server(
1512                            "received batch-tree signing started event without VTXO batch-tree graph chunks",
1513                        ))?;
1514                        vtxo_batch_tree_graph =
1515                            Some(TxGraph::new(chunks).map_err(Error::from).context(
1516                                "failed to build VTXO batch-tree graph before generating nonces",
1517                            )?);
1518
1519                        tracing::info!(batch_id = e.id, "Batch signing started");
1520
1521                        for own_cosigner_pk in own_cosigner_pks.iter() {
1522                            if !&e.cosigners_pubkeys.iter().any(|p| p == own_cosigner_pk) {
1523                                return Err(Error::ark_server(format!(
1524                                    "own cosigner PK is not present in cosigner PKs: {own_cosigner_pk}"
1525                                )));
1526                            }
1527                        }
1528
1529                        // We generate and submit a nonce tree for every cosigner key we provide.
1530                        let mut our_nonce_tree_map = HashMap::new();
1531                        for own_cosigner_kp in own_cosigner_kps {
1532                            let own_cosigner_pk = own_cosigner_kp.public_key();
1533                            let nonce_tree = generate_nonce_tree(
1534                                rng,
1535                                vtxo_batch_tree_graph
1536                                    .as_ref()
1537                                    .expect("VTXO batch-tree graph"),
1538                                own_cosigner_pk,
1539                                &e.unsigned_commitment_tx,
1540                            )
1541                            .map_err(Error::from)
1542                            .context("failed to generate VTXO nonce tree")?;
1543
1544                            tracing::info!(
1545                                cosigner_pk = %own_cosigner_pk,
1546                                "Submitting nonce tree for cosigner PK"
1547                            );
1548
1549                            network_client
1550                                .submit_tree_nonces(
1551                                    &e.id,
1552                                    own_cosigner_pk,
1553                                    nonce_tree.to_nonce_pks(),
1554                                )
1555                                .await
1556                                .map_err(Error::ark_server)
1557                                .context("failed to submit VTXO nonce tree")?;
1558
1559                            our_nonce_tree_map.insert(own_cosigner_kp, nonce_tree);
1560                        }
1561
1562                        unsigned_commitment_tx = Some(e.unsigned_commitment_tx);
1563                        our_nonce_trees = Some(our_nonce_tree_map);
1564
1565                        step = step.next();
1566                    }
1567                    StreamEvent::TreeNonces(e) => {
1568                        if step != Step::BatchSigningStarted {
1569                            continue;
1570                        }
1571
1572                        let tree_tx_nonce_pks = e.nonces;
1573
1574                        let cosigner_pk = match tree_tx_nonce_pks.0.iter().find(|(pk, _)| {
1575                            own_cosigner_pks
1576                                .iter()
1577                                .any(|p| &&p.x_only_public_key().0 == pk)
1578                        }) {
1579                            Some((pk, _)) => *pk,
1580                            None => {
1581                                tracing::debug!(
1582                                    batch_id = e.id,
1583                                    txid = %e.txid,
1584                                    "Received irrelevant TreeNonces event"
1585                                );
1586
1587                                continue;
1588                            }
1589                        };
1590
1591                        tracing::debug!(
1592                            batch_id = e.id,
1593                            txid = %e.txid,
1594                            %cosigner_pk,
1595                            "Received TreeNonces event"
1596                        );
1597
1598                        let agg_nonce_pk = aggregate_nonces(tree_tx_nonce_pks);
1599
1600                        agg_nonce_pks.insert(e.txid, agg_nonce_pk);
1601
1602                        if vtxo_batch_tree_graph.is_none() {
1603                            let chunks = vtxo_batch_tree_graph_chunks.take().ok_or(Error::ark_server(
1604                                "received batch-tree nonces event without VTXO batch-tree graph chunks",
1605                            ))?;
1606                            vtxo_batch_tree_graph = Some(
1607                                TxGraph::new(chunks)
1608                                    .map_err(Error::from)
1609                                    .context("failed to build VTXO batch-tree graph before batch-tree signing")?,
1610                            );
1611                        }
1612                        let vtxo_batch_tree_graph_ref =
1613                            vtxo_batch_tree_graph.as_ref().expect("just populated");
1614
1615                        // Once we collect an aggregated nonce per transaction in our VTXO
1616                        // batch-tree graph, we can sign and submit our partial signatures.
1617                        if agg_nonce_pks.len() == vtxo_batch_tree_graph_ref.nb_of_nodes() {
1618                            let cosigner_kp = own_cosigner_kps
1619                                .iter()
1620                                .find(|kp| kp.public_key().x_only_public_key().0 == cosigner_pk)
1621                                .ok_or_else(|| {
1622                                    Error::ad_hoc("no cosigner keypair to sign for own PK")
1623                                })?;
1624
1625                            let our_nonce_trees = our_nonce_trees.as_mut().ok_or(
1626                                Error::ark_server("missing nonce trees during batch protocol"),
1627                            )?;
1628
1629                            let our_nonce_tree =
1630                                our_nonce_trees
1631                                    .get_mut(cosigner_kp)
1632                                    .ok_or(Error::ark_server(
1633                                        "missing nonce tree during batch protocol",
1634                                    ))?;
1635
1636                            let unsigned_commitment_tx = unsigned_commitment_tx
1637                                .as_ref()
1638                                .ok_or_else(|| Error::ad_hoc("missing commitment TX"))?;
1639
1640                            let batch_expiry = batch_expiry
1641                                .ok_or_else(|| Error::ad_hoc("missing batch expiry"))?;
1642
1643                            let mut partial_sig_tree = PartialSigTree::default();
1644                            for (txid, _) in vtxo_batch_tree_graph_ref.as_map() {
1645                                let agg_nonce_pk = agg_nonce_pks.get(&txid).ok_or_else(|| {
1646                                    Error::ad_hoc(format!(
1647                                        "missing aggregated nonce PK for TX {txid}"
1648                                    ))
1649                                })?;
1650
1651                                let sigs = sign_batch_tree_tx(
1652                                    txid,
1653                                    batch_expiry,
1654                                    ark_forfeit_pk,
1655                                    cosigner_kp,
1656                                    *agg_nonce_pk,
1657                                    vtxo_batch_tree_graph_ref,
1658                                    unsigned_commitment_tx,
1659                                    our_nonce_tree,
1660                                )
1661                                .map_err(Error::from)
1662                                .context("failed to sign VTXO batch-tree transactions")?;
1663
1664                                partial_sig_tree.0.extend(sigs.0);
1665                            }
1666
1667                            network_client
1668                                .submit_tree_signatures(
1669                                    &e.id,
1670                                    cosigner_kp.public_key(),
1671                                    partial_sig_tree,
1672                                )
1673                                .await
1674                                .map_err(Error::ark_server)
1675                                .context("failed to submit VTXO batch-tree signatures")?;
1676                        }
1677                    }
1678                    StreamEvent::TreeNoncesAggregated(e) => {
1679                        tracing::debug!(batch_id = e.id, "Batch combined nonces generated");
1680                    }
1681                    StreamEvent::BatchFinalization(e) => {
1682                        if step != Step::BatchSigningStarted {
1683                            continue;
1684                        }
1685
1686                        tracing::debug!(
1687                            commitment_txid = %e.commitment_tx.unsigned_tx.compute_txid(),
1688                            "Batch finalization started"
1689                        );
1690
1691                        let signed_forfeit_psbts = if !vtxo_inputs.is_empty() {
1692                            let chunks =
1693                                connectors_graph_chunks.take().ok_or(Error::ark_server(
1694                                    "received batch finalization event without connectors",
1695                                ))?;
1696
1697                            if chunks.is_empty() {
1698                                tracing::debug!(batch_id = e.id, "No forfeit transactions");
1699
1700                                Vec::new()
1701                            } else {
1702                                let connectors_graph = TxGraph::new(chunks)
1703                                    .map_err(Error::from)
1704                                    .context(
1705                                    "failed to build connectors graph before signing forfeit TXs",
1706                                )?;
1707
1708                                tracing::debug!(batch_id = e.id, "Batch finalization started");
1709
1710                                create_and_sign_forfeit_txs(
1711                                    |input: &mut psbt::Input, msg: secp256k1::Message| match &input
1712                                    .witness_script
1713                                {
1714                                    None => Err(ark_core::Error::ad_hoc(
1715                                        "Missing witness script in psbt::Input when signing forfeit",
1716                                    )),
1717                                    Some(script) => {
1718                                        let pks = extract_checksig_pubkeys(script);
1719                                        let mut res = vec![];
1720                                        for pk in pks {
1721                                            if let Ok(keypair) =
1722                                            self.keypair_by_pk(&pk) {
1723                                                let sig =
1724                                                    secp.sign_schnorr_no_aux_rand(&msg, &keypair);
1725                                                res.push((sig, keypair.public_key().into()))
1726                                            }
1727                                        }
1728                                        Ok(res)
1729                                    }
1730                                    },
1731                                    vtxo_inputs.as_slice(),
1732                                    &connectors_graph.leaves(),
1733                                    &server_info.forfeit_address,
1734                                    server_info.dust,
1735                                )
1736                                .map_err(Error::from)?
1737                            }
1738                        } else {
1739                            Vec::new()
1740                        };
1741
1742                        let commitment_psbt = if onchain_inputs.is_empty() {
1743                            None
1744                        } else {
1745                            let mut commitment_psbt = e.commitment_tx;
1746
1747                            let sign_for_pk_fn = |pk: &XOnlyPublicKey,
1748                                                  msg: &secp256k1::Message|
1749                             -> Result<
1750                                schnorr::Signature,
1751                                ark_core::Error,
1752                            > {
1753                                self.inner
1754                                    .wallet
1755                                    .sign_for_pk(pk, msg)
1756                                    .map_err(|e| ark_core::Error::ad_hoc(e.to_string()))
1757                            };
1758
1759                            sign_commitment_psbt(
1760                                sign_for_pk_fn,
1761                                &mut commitment_psbt,
1762                                &onchain_inputs,
1763                            )
1764                            .map_err(Error::from)?;
1765
1766                            Some(commitment_psbt)
1767                        };
1768
1769                        if !signed_forfeit_psbts.is_empty() || commitment_psbt.is_some() {
1770                            network_client
1771                                .submit_signed_forfeit_txs(signed_forfeit_psbts, commitment_psbt)
1772                                .await?;
1773                        }
1774
1775                        step = step.next();
1776                    }
1777                    StreamEvent::BatchFinalized(e) => {
1778                        if step != Step::Finalized {
1779                            continue;
1780                        }
1781
1782                        let commitment_txid = e.commitment_txid;
1783
1784                        tracing::info!(batch_id = e.id, %commitment_txid, "Batch finalized");
1785
1786                        return Ok(commitment_txid);
1787                    }
1788                    StreamEvent::BatchFailed(ref e) => {
1789                        if Some(&e.id) == batch_id.as_ref() {
1790                            return Err(Error::ark_server(format!(
1791                                "batch failed {}: {}",
1792                                e.id, e.reason
1793                            )));
1794                        }
1795
1796                        tracing::debug!("Unrelated batch failed: {e:?}");
1797                    }
1798                    StreamEvent::Heartbeat => {}
1799                    StreamEvent::StreamStarted(_) => {}
1800                },
1801                Some(Err(e)) => {
1802                    tracing::error!("Got error from event stream");
1803
1804                    return Err(Error::ark_server(e));
1805                }
1806                None => {
1807                    return Err(Error::ark_server("dropped batch event stream"));
1808                }
1809            }
1810        }
1811
1812        #[derive(Debug, PartialEq, Eq)]
1813        enum Step {
1814            Start,
1815            BatchStarted,
1816            BatchSigningStarted,
1817            Finalized,
1818        }
1819
1820        impl Step {
1821            fn next(&self) -> Step {
1822                match self {
1823                    Step::Start => Step::BatchStarted,
1824                    Step::BatchStarted => Step::BatchSigningStarted,
1825                    Step::BatchSigningStarted => Step::Finalized,
1826                    Step::Finalized => Step::Finalized, // we can't go further
1827                }
1828            }
1829        }
1830    }
1831}
1832
1833#[derive(Debug, Clone)]
1834pub(crate) enum PrepareIntentKind {
1835    Register,
1836    EstimateFee,
1837}
1838
1839#[derive(Debug, Clone)]
1840pub(crate) enum BatchOutputType {
1841    Board {
1842        to_address: ArkAddress,
1843        to_amount: Amount,
1844    },
1845    OffBoard {
1846        to_address: Address,
1847        to_amount: Amount,
1848        change_address: ArkAddress,
1849        change_amount: Amount,
1850    },
1851}
1852
1853/// Prepared intent data ready for batch registration.
1854pub(crate) struct PreparedIntent {
1855    /// The signed intent.
1856    pub intent: intent::Intent,
1857    /// The ephemeral cosigner keypair.
1858    pub cosigner_keypair: Keypair,
1859    /// VTXO input outpoints (used for event stream topics).
1860    pub vtxo_input_outpoints: Vec<OutPoint>,
1861    /// Intent outputs (used to determine batch protocol steps).
1862    pub outputs: Vec<intent::Output>,
1863    /// The original onchain inputs (needed for commitment signing).
1864    pub onchain_inputs: Vec<batch::OnChainInput>,
1865    /// The original VTXO inputs (needed for forfeit signing).
1866    pub vtxo_inputs: Vec<intent::Input>,
1867}