1use crate::error::ErrorContext as _;
2use crate::swap_storage::SwapStorage;
3use crate::utils::sleep;
4use crate::utils::timeout_op;
5use crate::wallet::BoardingWallet;
6use crate::wallet::OnchainWallet;
7use crate::Blockchain;
8use crate::Client;
9use crate::Error;
10use ark_core::batch;
11use ark_core::batch::aggregate_nonces;
12use ark_core::batch::complete_delegate_forfeit_txs;
13use ark_core::batch::create_and_sign_forfeit_txs;
14use ark_core::batch::create_asset_preservation_packet;
15use ark_core::batch::generate_nonce_tree;
16use ark_core::batch::sign_batch_tree_tx;
17use ark_core::batch::sign_commitment_psbt;
18use ark_core::batch::Delegate;
19use ark_core::batch::NonceKps;
20use ark_core::intent;
21use ark_core::script::extract_checksig_pubkeys;
22use ark_core::server::BatchTreeEventType;
23use ark_core::server::PartialSigTree;
24use ark_core::server::StreamEvent;
25use ark_core::ArkAddress;
26use ark_core::ArkNote;
27use ark_core::ExplorerUtxo;
28use ark_core::TxGraph;
29use backon::ExponentialBuilder;
30use backon::Retryable;
31use bitcoin::hashes::sha256;
32use bitcoin::hashes::Hash;
33use bitcoin::hex::DisplayHex;
34use bitcoin::key::Keypair;
35use bitcoin::key::Secp256k1;
36use bitcoin::psbt;
37use bitcoin::secp256k1;
38use bitcoin::secp256k1::schnorr;
39use bitcoin::secp256k1::PublicKey;
40use bitcoin::Address;
41use bitcoin::Amount;
42use bitcoin::OutPoint;
43use bitcoin::Psbt;
44use bitcoin::TxOut;
45use bitcoin::Txid;
46use bitcoin::XOnlyPublicKey;
47use futures::StreamExt;
48use jiff::Timestamp;
49use rand::CryptoRng;
50use rand::Rng;
51use std::collections::HashMap;
52
53impl<B, W, S, K> Client<B, W, S, K>
54where
55 B: Blockchain,
56 W: BoardingWallet + OnchainWallet,
57 S: SwapStorage + 'static,
58 K: crate::KeyProvider,
59{
60 pub async fn settle<R>(&self, rng: &mut R) -> Result<Option<Txid>, Error>
63 where
64 R: Rng + CryptoRng + Clone,
65 {
66 let (to_address, _) = self.get_offchain_address()?;
68
69 let (boarding_inputs, vtxo_inputs, total_amount) =
70 self.fetch_commitment_transaction_inputs().await?;
71
72 tracing::debug!(
73 offchain_adress = %to_address.encode(),
74 ?boarding_inputs,
75 ?vtxo_inputs,
76 "Attempting to settle outputs"
77 );
78
79 if boarding_inputs.is_empty() && vtxo_inputs.is_empty() {
80 tracing::debug!("No inputs to board with");
81 return Ok(None);
82 }
83
84 let join_next_batch = || async {
85 self.join_next_batch(
86 &mut rng.clone(),
87 boarding_inputs.clone(),
88 vtxo_inputs.clone(),
89 BatchOutputType::Board {
90 to_address,
91 to_amount: total_amount,
92 },
93 )
94 .await
95 };
96
97 let commitment_txid = join_next_batch
99 .retry(ExponentialBuilder::default().with_max_times(0))
100 .sleep(sleep)
101 .notify(|err: &Error, dur: std::time::Duration| {
103 tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}",);
104 })
105 .await
106 .context("Failed to join batch")?;
107
108 tracing::info!(%commitment_txid, "Settlement success");
109
110 Ok(Some(commitment_txid))
111 }
112
113 pub async fn settle_with_notes<R>(
119 &self,
120 rng: &mut R,
121 notes: Vec<ArkNote>,
122 ) -> Result<Option<Txid>, Error>
123 where
124 R: Rng + CryptoRng + Clone,
125 {
126 let (to_address, _) = self.get_offchain_address()?;
127
128 let (boarding_inputs, vtxo_inputs, mut total_amount) =
129 self.fetch_commitment_transaction_inputs().await?;
130
131 let note_inputs: Vec<intent::Input> = notes
133 .iter()
134 .map(|note| {
135 total_amount += note.value();
136 note.to_intent_input()
137 })
138 .collect::<Result<Vec<_>, _>>()?;
139
140 let all_vtxo_inputs: Vec<intent::Input> =
142 vtxo_inputs.into_iter().chain(note_inputs).collect();
143
144 tracing::debug!(
145 offchain_address = %to_address.encode(),
146 ?boarding_inputs,
147 num_vtxo_inputs = all_vtxo_inputs.len(),
148 num_notes = notes.len(),
149 %total_amount,
150 "Attempting to settle outputs with notes"
151 );
152
153 if boarding_inputs.is_empty() && all_vtxo_inputs.is_empty() {
154 tracing::debug!("No inputs to settle");
155 return Ok(None);
156 }
157
158 let join_next_batch = || async {
159 self.join_next_batch(
160 &mut rng.clone(),
161 boarding_inputs.clone(),
162 all_vtxo_inputs.clone(),
163 BatchOutputType::Board {
164 to_address,
165 to_amount: total_amount,
166 },
167 )
168 .await
169 };
170
171 let commitment_txid = join_next_batch
172 .retry(ExponentialBuilder::default().with_max_times(0))
173 .sleep(sleep)
174 .notify(|err: &Error, dur: std::time::Duration| {
175 tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
176 })
177 .await
178 .context("Failed to join batch")?;
179
180 tracing::info!(%commitment_txid, num_notes = notes.len(), "Settlement with notes success");
181
182 Ok(Some(commitment_txid))
183 }
184
185 pub async fn settle_vtxos<R>(
191 &self,
192 rng: &mut R,
193 vtxo_outpoints: &[OutPoint],
194 boarding_outpoints: &[OutPoint],
195 ) -> Result<Option<Txid>, Error>
196 where
197 R: Rng + CryptoRng + Clone,
198 {
199 let (to_address, _) = self.get_offchain_address()?;
201
202 let (all_boarding_inputs, all_vtxo_inputs, _) =
203 self.fetch_commitment_transaction_inputs().await?;
204
205 let boarding_inputs: Vec<_> = all_boarding_inputs
207 .into_iter()
208 .filter(|input| boarding_outpoints.contains(&input.outpoint()))
209 .collect();
210
211 let vtxo_inputs: Vec<_> = all_vtxo_inputs
213 .into_iter()
214 .filter(|input| vtxo_outpoints.contains(&input.outpoint()))
215 .collect();
216
217 let total_amount = boarding_inputs
219 .iter()
220 .map(|i| i.amount())
221 .chain(vtxo_inputs.iter().map(|i| i.amount()))
222 .fold(Amount::ZERO, |acc, a| acc + a);
223
224 tracing::debug!(
225 offchain_address = %to_address.encode(),
226 ?boarding_inputs,
227 ?vtxo_inputs,
228 %total_amount,
229 "Attempting to settle specific outputs"
230 );
231
232 if boarding_inputs.is_empty() && vtxo_inputs.is_empty() {
233 tracing::debug!("No matching inputs to settle");
234 return Ok(None);
235 }
236
237 let join_next_batch = || async {
238 self.join_next_batch(
239 &mut rng.clone(),
240 boarding_inputs.clone(),
241 vtxo_inputs.clone(),
242 BatchOutputType::Board {
243 to_address,
244 to_amount: total_amount,
245 },
246 )
247 .await
248 };
249
250 let commitment_txid = join_next_batch
252 .retry(ExponentialBuilder::default().with_max_times(0))
253 .sleep(sleep)
254 .notify(|err: &Error, dur: std::time::Duration| {
255 tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}",);
256 })
257 .await
258 .context("Failed to join batch")?;
259
260 tracing::info!(%commitment_txid, "Settlement of specific VTXOs success");
261
262 Ok(Some(commitment_txid))
263 }
264
265 pub async fn collaborative_redeem<R>(
268 &self,
269 rng: &mut R,
270 to_address: Address,
271 to_amount: Amount,
272 ) -> Result<Txid, Error>
273 where
274 R: Rng + CryptoRng + Clone,
275 {
276 let (change_address, _) = self.get_offchain_address()?;
277
278 let (boarding_inputs, vtxo_inputs, total_amount) =
279 self.fetch_commitment_transaction_inputs().await?;
280
281 let onchain_fee = self
282 .fee_estimator
283 .eval_onchain_output(ark_fees::Output {
284 amount: to_amount.to_sat(),
285 script: to_address.script_pubkey().to_string(),
286 })
287 .map_err(Error::ad_hoc)?;
288 let onchain_fee = Amount::from_sat(onchain_fee.to_satoshis());
289
290 let change_amount = total_amount
292 .checked_sub(to_amount)
293 .and_then(|a| a.checked_sub(onchain_fee))
294 .ok_or_else(|| {
295 Error::coin_select(format!(
296 "insufficient balance: {total_amount} < {to_amount} (send) + {onchain_fee} (fee)"
297 ))
298 })?;
299
300 tracing::info!(
301 %to_address,
302 send_amount = %to_amount,
303 fee = %onchain_fee,
304 change_address = %change_address.encode(),
305 %change_amount,
306 ?boarding_inputs,
307 "Attempting to collaboratively redeem outputs"
308 );
309
310 let join_next_batch = || async {
311 self.join_next_batch(
312 &mut rng.clone(),
313 boarding_inputs.clone(),
314 vtxo_inputs.clone(),
315 BatchOutputType::OffBoard {
316 to_address: to_address.clone(),
317 to_amount,
318 change_address,
319 change_amount,
320 },
321 )
322 .await
323 };
324
325 let commitment_txid = join_next_batch
327 .retry(ExponentialBuilder::default().with_max_times(3))
328 .sleep(sleep)
329 .notify(|err: &Error, dur: std::time::Duration| {
331 tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
332 })
333 .await
334 .context("Failed to join batch")?;
335
336 tracing::info!(%commitment_txid, "Collaborative redeem success");
337
338 Ok(commitment_txid)
339 }
340
341 pub async fn collaborative_redeem_vtxo_selection<R>(
344 &self,
345 rng: &mut R,
346 input_vtxos: impl Iterator<Item = OutPoint> + Clone,
347 to_address: Address,
348 to_amount: Amount,
349 ) -> Result<Txid, Error>
350 where
351 R: Rng + CryptoRng + Clone,
352 {
353 let (change_address, _) = self.get_offchain_address()?;
354
355 let (vtxo_list, script_pubkey_to_vtxo_map) =
356 self.list_vtxos().await.context("failed to get VTXO list")?;
357
358 let vtxo_inputs = vtxo_list
359 .all_unspent()
360 .filter(|v| input_vtxos.clone().any(|outpoint| outpoint == v.outpoint))
361 .map(|v| {
362 let vtxo = script_pubkey_to_vtxo_map.get(&v.script).ok_or_else(|| {
363 ark_core::Error::ad_hoc(format!("missing VTXO for script pubkey: {}", v.script))
364 })?;
365 let spend_info = vtxo.forfeit_spend_info()?;
366
367 Ok(intent::Input::new(
368 v.outpoint,
369 vtxo.exit_delay(),
370 None,
372 TxOut {
373 value: v.amount,
374 script_pubkey: vtxo.script_pubkey(),
375 },
376 vtxo.tapscripts(),
377 spend_info,
378 false,
379 v.is_swept,
380 v.assets.clone(),
381 ))
382 })
383 .collect::<Result<Vec<_>, Error>>()?;
384
385 if vtxo_inputs.is_empty() {
386 return Err(Error::ad_hoc("no matching VTXO outpoints found"));
387 }
388
389 let total_input_amount = vtxo_inputs
391 .iter()
392 .fold(Amount::ZERO, |acc, vtxo| acc + vtxo.amount());
393
394 let onchain_fee = self
395 .fee_estimator
396 .eval_onchain_output(ark_fees::Output {
397 amount: to_amount.to_sat(),
398 script: to_address.script_pubkey().to_string(),
399 })
400 .map_err(Error::ad_hoc)?;
401 let onchain_fee = Amount::from_sat(onchain_fee.to_satoshis());
402
403 let change_amount = total_input_amount
405 .checked_sub(to_amount)
406 .and_then(|a| a.checked_sub(onchain_fee))
407 .ok_or_else(|| {
408 Error::coin_select(format!(
409 "insufficient VTXO amount: {total_input_amount} < {to_amount} (send) + {onchain_fee} (fee)"
410 ))
411 })?;
412
413 tracing::info!(
414 %to_address,
415 send_amount = %to_amount,
416 fee = %onchain_fee,
417 change_address = %change_address.encode(),
418 %change_amount,
419 "Attempting to collaboratively redeem outputs"
420 );
421
422 let join_next_batch = || async {
423 self.join_next_batch(
424 &mut rng.clone(),
425 Vec::new(),
426 vtxo_inputs.clone(),
427 BatchOutputType::OffBoard {
428 to_address: to_address.clone(),
429 to_amount,
430 change_address,
431 change_amount,
432 },
433 )
434 .await
435 };
436
437 let commitment_txid = join_next_batch
439 .retry(ExponentialBuilder::default().with_max_times(3))
440 .sleep(sleep)
441 .notify(|err: &Error, dur: std::time::Duration| {
443 tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
444 })
445 .await
446 .context("Failed to join batch")?;
447
448 tracing::info!(%commitment_txid, "Collaborative redeem success");
449
450 Ok(commitment_txid)
451 }
452
453 pub async fn generate_delegate(
467 &self,
468 delegate_cosigner_pk: PublicKey,
469 ) -> Result<Delegate, Error> {
470 let (to_address, _) = self.get_offchain_address()?;
472
473 let (_, vtxo_inputs, _) = self.fetch_commitment_transaction_inputs().await?;
475
476 let total_amount = vtxo_inputs
477 .iter()
478 .fold(Amount::ZERO, |acc, v| acc + v.amount());
479
480 if vtxo_inputs.is_empty() {
481 return Err(Error::ad_hoc("no inputs to settle via delegate"));
482 }
483
484 let server_info = &self.server_info;
485
486 let mut outputs = vec![intent::Output::Offchain(TxOut {
487 value: total_amount,
488 script_pubkey: to_address.to_p2tr_script_pubkey(),
489 })];
490
491 if let Some(packet) = create_asset_preservation_packet(&vtxo_inputs, &outputs)? {
492 outputs.push(intent::Output::AssetPacket(packet.to_txout()));
493 }
494
495 let delegate = batch::prepare_delegate_psbts(
496 vtxo_inputs,
497 outputs,
498 delegate_cosigner_pk,
499 &server_info.forfeit_address,
500 server_info.dust,
501 )?;
502
503 Ok(delegate)
504 }
505
506 pub fn sign_delegate_psbts(
508 &self,
509 intent_psbt: &mut Psbt,
510 forfeit_psbts: &mut [Psbt],
511 ) -> Result<(), Error> {
512 let sign_fn =
513 |input: &mut psbt::Input,
514 msg: secp256k1::Message|
515 -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error> {
516 match &input.witness_script {
517 None => Err(ark_core::Error::ad_hoc(
518 "Missing witness script for psbt::Input",
519 )),
520 Some(script) => {
521 let mut res = vec![];
522 let pks = extract_checksig_pubkeys(script);
523 for pk in pks {
524 if let Ok(keypair) = self.keypair_by_pk(&pk) {
525 let sig = Secp256k1::new().sign_schnorr_no_aux_rand(&msg, &keypair);
526 let pk = keypair.x_only_public_key().0;
527 res.push((sig, pk));
528 }
529 }
530 Ok(res)
531 }
532 }
533 };
534
535 batch::sign_delegate_psbts(sign_fn, intent_psbt, forfeit_psbts)?;
536
537 Ok(())
538 }
539
540 pub async fn settle_delegate<R>(
555 &self,
556 rng: &mut R,
557 delegate: Delegate,
558 own_cosigner_kp: Keypair,
559 ) -> Result<Txid, Error>
560 where
561 R: Rng + CryptoRng,
562 {
563 if own_cosigner_kp.public_key() != delegate.delegate_cosigner_pk {
565 return Err(Error::ad_hoc(
566 "provided cosigner keypair does not match delegate_cosigner_pk",
567 ));
568 }
569
570 let intent_id = timeout_op(
572 self.inner.timeout,
573 self.network_client()
574 .register_intent(delegate.intent.clone()),
575 )
576 .await
577 .context("failed to register delegated intent")??;
578
579 tracing::debug!(intent_id, "Registered delegated intent");
580
581 let network_client = self.network_client();
582 let server_info = &self.server_info;
583
584 #[derive(Debug, PartialEq, Eq)]
585 enum Step {
586 Start,
587 BatchStarted,
588 BatchSigningStarted,
589 Finalized,
590 }
591
592 impl Step {
593 fn next(&self) -> Step {
594 match self {
595 Step::Start => Step::BatchStarted,
596 Step::BatchStarted => Step::BatchSigningStarted,
597 Step::BatchSigningStarted => Step::Finalized,
598 Step::Finalized => Step::Finalized,
599 }
600 }
601 }
602
603 let mut step = Step::Start;
604
605 let own_cosigner_kps = [own_cosigner_kp];
606 let own_cosigner_pks = own_cosigner_kps
607 .iter()
608 .map(|k| k.public_key())
609 .collect::<Vec<_>>();
610
611 let mut batch_id: Option<String> = None;
612
613 let vtxo_input_outpoints = delegate
614 .forfeit_psbts
615 .iter()
616 .map(|psbt| psbt.unsigned_tx.input[0].previous_output)
617 .collect::<Vec<_>>();
618
619 let topics = vtxo_input_outpoints
620 .iter()
621 .map(ToString::to_string)
622 .chain(
623 own_cosigner_pks
624 .iter()
625 .map(|pk| pk.serialize().to_lower_hex_string()),
626 )
627 .collect();
628
629 let mut stream = network_client.get_event_stream(topics).await?;
630
631 let (ark_forfeit_pk, _) = server_info.forfeit_pk.x_only_public_key();
632
633 let mut unsigned_commitment_tx = None;
634
635 let mut vtxo_graph_chunks = Some(Vec::new());
636 let mut vtxo_graph: Option<TxGraph> = None;
637
638 let mut connectors_graph_chunks = Some(Vec::new());
639 let mut batch_expiry = None;
640
641 let mut agg_nonce_pks = HashMap::new();
642
643 let mut our_nonce_trees: Option<HashMap<Keypair, NonceKps>> = None;
644
645 loop {
646 match stream.next().await {
647 Some(Ok(event)) => match event {
648 StreamEvent::BatchStarted(e) => {
649 if step != Step::Start {
650 continue;
651 }
652
653 let hash = sha256::Hash::hash(intent_id.as_bytes());
654 let hash = hash.as_byte_array().to_vec().to_lower_hex_string();
655
656 if e.intent_id_hashes.iter().any(|h| h == &hash) {
657 timeout_op(
658 self.inner.timeout,
659 self.network_client()
660 .confirm_registration(intent_id.clone()),
661 )
662 .await
663 .context("failed to confirm intent registration")??;
664
665 tracing::info!(batch_id = e.id, intent_id, "Intent ID found for batch");
666
667 batch_id = Some(e.id);
668
669 step = Step::BatchStarted;
670
671 batch_expiry = Some(e.batch_expiry);
672 } else {
673 tracing::debug!(
674 batch_id = e.id,
675 intent_id,
676 "Intent ID not found for batch"
677 );
678 }
679 }
680 StreamEvent::TreeTx(e) => {
681 if step != Step::BatchStarted && step != Step::BatchSigningStarted {
682 continue;
683 }
684
685 match e.batch_tree_event_type {
686 BatchTreeEventType::Vtxo => {
687 match &mut vtxo_graph_chunks {
688 Some(vtxo_graph_chunks) => {
689 tracing::debug!("Got new VTXO graph chunk");
690
691 vtxo_graph_chunks.push(e.tx_graph_chunk)
692 }
693 None => {
694 return Err(Error::ark_server(
695 "received unexpected VTXO graph chunk",
696 ));
697 }
698 };
699 }
700 BatchTreeEventType::Connector => {
701 match connectors_graph_chunks {
702 Some(ref mut connectors_graph_chunks) => {
703 tracing::debug!("Got new connectors graph chunk");
704
705 connectors_graph_chunks.push(e.tx_graph_chunk)
706 }
707 None => {
708 return Err(Error::ark_server(
709 "received unexpected connectors graph chunk",
710 ));
711 }
712 };
713 }
714 }
715 }
716 StreamEvent::TreeSignature(e) => {
717 if step != Step::BatchSigningStarted {
718 continue;
719 }
720
721 match e.batch_tree_event_type {
722 BatchTreeEventType::Vtxo => {
723 match vtxo_graph {
724 Some(ref mut vtxo_graph) => {
725 vtxo_graph.apply(|graph| {
726 if graph.root().unsigned_tx.compute_txid() != e.txid {
727 Ok(true)
728 } else {
729 graph.set_signature(e.signature);
730
731 Ok(false)
732 }
733 })?;
734 }
735 None => {
736 return Err(Error::ark_server(
737 "received batch tree signature without TX graph",
738 ));
739 }
740 };
741 }
742 BatchTreeEventType::Connector => {
743 return Err(Error::ark_server(
744 "received batch tree signature for connectors tree",
745 ));
746 }
747 }
748 }
749 StreamEvent::TreeSigningStarted(e) => {
750 if step != Step::BatchStarted {
751 continue;
752 }
753
754 let chunks = vtxo_graph_chunks.take().ok_or(Error::ark_server(
755 "received tree signing started event without VTXO graph chunks",
756 ))?;
757 vtxo_graph = Some(
758 TxGraph::new(chunks)
759 .map_err(Error::from)
760 .context("failed to build VTXO graph before generating nonces")?,
761 );
762
763 tracing::info!(batch_id = e.id, "Batch signing started");
764
765 for own_cosigner_pk in own_cosigner_pks.iter() {
766 if !&e.cosigners_pubkeys.iter().any(|p| p == own_cosigner_pk) {
767 return Err(Error::ark_server(format!(
768 "own cosigner PK is not present in cosigner PKs: {own_cosigner_pk}"
769 )));
770 }
771 }
772
773 let mut our_nonce_tree_map = HashMap::new();
774 for own_cosigner_kp in own_cosigner_kps {
775 let own_cosigner_pk = own_cosigner_kp.public_key();
776 let nonce_tree = generate_nonce_tree(
777 rng,
778 vtxo_graph.as_ref().expect("VTXO graph"),
779 own_cosigner_pk,
780 &e.unsigned_commitment_tx,
781 )
782 .map_err(Error::from)
783 .context("failed to generate VTXO nonce tree")?;
784
785 tracing::info!(
786 cosigner_pk = %own_cosigner_pk,
787 "Submitting nonce tree for cosigner PK"
788 );
789
790 network_client
791 .submit_tree_nonces(
792 &e.id,
793 own_cosigner_pk,
794 nonce_tree.to_nonce_pks(),
795 )
796 .await
797 .map_err(Error::ark_server)
798 .context("failed to submit VTXO nonce tree")?;
799
800 our_nonce_tree_map.insert(own_cosigner_kp, nonce_tree);
801 }
802
803 unsigned_commitment_tx = Some(e.unsigned_commitment_tx);
804 our_nonce_trees = Some(our_nonce_tree_map);
805
806 step = step.next();
807 }
808 StreamEvent::TreeNonces(e) => {
809 if step != Step::BatchSigningStarted {
810 continue;
811 }
812
813 let tree_tx_nonce_pks = e.nonces;
814
815 let cosigner_pk = match tree_tx_nonce_pks.0.iter().find(|(pk, _)| {
816 own_cosigner_pks
817 .iter()
818 .any(|p| &&p.x_only_public_key().0 == pk)
819 }) {
820 Some((pk, _)) => *pk,
821 None => {
822 tracing::debug!(
823 batch_id = e.id,
824 txid = %e.txid,
825 "Received irrelevant TreeNonces event"
826 );
827
828 continue;
829 }
830 };
831
832 tracing::debug!(
833 batch_id = e.id,
834 txid = %e.txid,
835 %cosigner_pk,
836 "Received TreeNonces event"
837 );
838
839 let agg_nonce_pk = aggregate_nonces(tree_tx_nonce_pks);
840
841 agg_nonce_pks.insert(e.txid, agg_nonce_pk);
842
843 if vtxo_graph.is_none() {
844 let chunks = vtxo_graph_chunks.take().ok_or(Error::ark_server(
845 "received tree nonces event without VTXO graph chunks",
846 ))?;
847 vtxo_graph = Some(
848 TxGraph::new(chunks)
849 .map_err(Error::from)
850 .context("failed to build VTXO graph before tree signing")?,
851 );
852 }
853 let vtxo_graph_ref = vtxo_graph.as_ref().expect("just populated");
854
855 if agg_nonce_pks.len() == vtxo_graph_ref.nb_of_nodes() {
856 let cosigner_kp = own_cosigner_kps
857 .iter()
858 .find(|kp| kp.public_key().x_only_public_key().0 == cosigner_pk)
859 .ok_or_else(|| {
860 Error::ad_hoc("no cosigner keypair to sign for own PK")
861 })?;
862
863 let our_nonce_trees = our_nonce_trees.as_mut().ok_or(
864 Error::ark_server("missing nonce trees during batch protocol"),
865 )?;
866
867 let our_nonce_tree =
868 our_nonce_trees
869 .get_mut(cosigner_kp)
870 .ok_or(Error::ark_server(
871 "missing nonce tree during batch protocol",
872 ))?;
873
874 let unsigned_commitment_tx = unsigned_commitment_tx
875 .as_ref()
876 .ok_or_else(|| Error::ad_hoc("missing commitment TX"))?;
877
878 let batch_expiry = batch_expiry
879 .ok_or_else(|| Error::ad_hoc("missing batch expiry"))?;
880
881 let mut partial_sig_tree = PartialSigTree::default();
882 for (txid, _) in vtxo_graph_ref.as_map() {
883 let agg_nonce_pk = agg_nonce_pks.get(&txid).ok_or_else(|| {
884 Error::ad_hoc(format!(
885 "missing aggregated nonce PK for TX {txid}"
886 ))
887 })?;
888
889 let sigs = sign_batch_tree_tx(
890 txid,
891 batch_expiry,
892 ark_forfeit_pk,
893 cosigner_kp,
894 *agg_nonce_pk,
895 vtxo_graph_ref,
896 unsigned_commitment_tx,
897 our_nonce_tree,
898 )
899 .map_err(Error::from)
900 .context("failed to sign VTXO tree")?;
901
902 partial_sig_tree.0.extend(sigs.0);
903 }
904
905 network_client
906 .submit_tree_signatures(
907 &e.id,
908 cosigner_kp.public_key(),
909 partial_sig_tree,
910 )
911 .await
912 .map_err(Error::ark_server)
913 .context("failed to submit VTXO tree signatures")?;
914 }
915 }
916 StreamEvent::TreeNoncesAggregated(e) => {
917 tracing::debug!(batch_id = e.id, "Batch combined nonces generated");
918 }
919 StreamEvent::BatchFinalization(e) => {
920 if step != Step::BatchSigningStarted {
921 continue;
922 }
923
924 tracing::debug!(
925 commitment_txid = %e.commitment_tx.unsigned_tx.compute_txid(),
926 "Batch finalization started (delegate)"
927 );
928
929 let chunks = connectors_graph_chunks.take().ok_or(Error::ark_server(
930 "received batch finalization event without connectors",
931 ))?;
932
933 if chunks.is_empty() {
934 tracing::debug!(batch_id = e.id, "No delegated forfeit transactions");
935 } else {
936 let connectors_graph = TxGraph::new(chunks)
937 .map_err(Error::from)
938 .context(
939 "failed to build connectors graph before completing forfeit TXs",
940 )?;
941
942 tracing::debug!(
943 batch_id = e.id,
944 "Completing delegated forfeit transactions"
945 );
946
947 let signed_forfeit_psbts = complete_delegate_forfeit_txs(
948 &delegate.forfeit_psbts,
949 &connectors_graph.leaves(),
950 )?;
951
952 network_client
953 .submit_signed_forfeit_txs(signed_forfeit_psbts, None)
954 .await?;
955 }
956
957 step = step.next();
958 }
959 StreamEvent::BatchFinalized(e) => {
960 if step != Step::Finalized {
961 continue;
962 }
963
964 let commitment_txid = e.commitment_txid;
965
966 tracing::info!(batch_id = e.id, %commitment_txid, "Delegated batch finalized");
967
968 return Ok(commitment_txid);
969 }
970 StreamEvent::BatchFailed(ref e) => {
971 if Some(&e.id) == batch_id.as_ref() {
972 return Err(Error::ark_server(format!(
973 "batch failed {}: {}",
974 e.id, e.reason
975 )));
976 }
977
978 tracing::debug!("Unrelated batch failed: {e:?}");
979 }
980 StreamEvent::Heartbeat => {}
981 StreamEvent::StreamStarted(_) => {}
982 },
983 Some(Err(e)) => {
984 tracing::error!("Got error from event stream");
985
986 return Err(Error::ark_server(e));
987 }
988 None => {
989 return Err(Error::ark_server("dropped batch event stream"));
990 }
991 }
992 }
993 }
994
995 pub(crate) async fn fetch_commitment_transaction_inputs(
998 &self,
999 ) -> Result<(Vec<batch::OnChainInput>, Vec<intent::Input>, Amount), Error> {
1000 let boarding_outputs = self.inner.wallet.get_boarding_outputs()?;
1002
1003 let mut boarding_inputs: Vec<batch::OnChainInput> = Vec::new();
1004 let mut total_amount = Amount::ZERO;
1005
1006 let mut seen_outpoints = std::collections::HashSet::new();
1008
1009 let now = Timestamp::now();
1010
1011 for boarding_output in boarding_outputs {
1013 let outpoints = timeout_op(
1014 self.inner.timeout,
1015 self.blockchain().find_outpoints(boarding_output.address()),
1016 )
1017 .await
1018 .context("failed to find outpoints")??;
1019
1020 for o in outpoints.iter() {
1021 if let ExplorerUtxo {
1022 outpoint,
1023 amount,
1024 confirmation_blocktime: Some(confirmation_blocktime),
1025 confirmations,
1026 is_spent: false,
1027 } = o
1028 {
1029 if seen_outpoints.contains(outpoint) {
1031 continue;
1032 }
1033
1034 if !boarding_output.can_be_claimed_unilaterally_by_owner(
1036 now.as_duration().try_into().map_err(Error::ad_hoc)?,
1037 std::time::Duration::from_secs(*confirmation_blocktime),
1038 *confirmations,
1039 ) {
1040 seen_outpoints.insert(*outpoint);
1042
1043 boarding_inputs.push(batch::OnChainInput::new(
1044 boarding_output.clone(),
1045 *amount,
1046 *outpoint,
1047 ));
1048 total_amount += *amount;
1049 }
1050 }
1051 }
1052 }
1053
1054 let (vtxo_list, script_pubkey_to_vtxo_map) = self.list_vtxos().await?;
1055
1056 total_amount += vtxo_list
1057 .all_unspent()
1058 .fold(Amount::ZERO, |acc, vtxo| acc + vtxo.amount);
1059
1060 let vtxo_inputs = vtxo_list
1061 .all_unspent()
1062 .map(|virtual_tx_outpoint| {
1063 let vtxo = script_pubkey_to_vtxo_map
1064 .get(&virtual_tx_outpoint.script)
1065 .ok_or_else(|| {
1066 ark_core::Error::ad_hoc(format!(
1067 "missing VTXO for script pubkey: {}",
1068 virtual_tx_outpoint.script
1069 ))
1070 })?;
1071 let spend_info = vtxo.forfeit_spend_info()?;
1072
1073 Ok(intent::Input::new(
1074 virtual_tx_outpoint.outpoint,
1075 vtxo.exit_delay(),
1076 None,
1077 TxOut {
1078 value: virtual_tx_outpoint.amount,
1079 script_pubkey: vtxo.script_pubkey(),
1080 },
1081 vtxo.tapscripts(),
1082 spend_info,
1083 false,
1084 virtual_tx_outpoint.is_swept,
1085 virtual_tx_outpoint.assets.clone(),
1086 ))
1087 })
1088 .collect::<Result<Vec<_>, ark_core::Error>>()?;
1089
1090 Ok((boarding_inputs, vtxo_inputs, total_amount))
1091 }
1092
1093 pub(crate) fn prepare_intent<R>(
1098 &self,
1099 rng: &mut R,
1100 onchain_inputs: Vec<batch::OnChainInput>,
1101 vtxo_inputs: Vec<intent::Input>,
1102 output_type: BatchOutputType,
1103 intent_kind: PrepareIntentKind,
1104 ) -> Result<PreparedIntent, Error>
1105 where
1106 R: Rng + CryptoRng,
1107 {
1108 if onchain_inputs.is_empty() && vtxo_inputs.is_empty() {
1109 return Err(Error::ad_hoc("cannot prepare intent without inputs"));
1110 }
1111
1112 let cosigner_keypair = Keypair::new(self.secp(), rng);
1114
1115 let vtxo_input_outpoints = vtxo_inputs.iter().map(|i| i.outpoint()).collect::<Vec<_>>();
1116
1117 let inputs = {
1118 let boarding_inputs = onchain_inputs.clone().into_iter().map(|o| {
1119 intent::Input::new(
1120 o.outpoint(),
1121 o.boarding_output().exit_delay(),
1122 None,
1123 TxOut {
1124 value: o.amount(),
1125 script_pubkey: o.boarding_output().script_pubkey(),
1126 },
1127 o.boarding_output().tapscripts(),
1128 o.boarding_output().forfeit_spend_info(),
1129 true,
1130 false,
1131 Vec::new(),
1132 )
1133 });
1134
1135 boarding_inputs
1136 .chain(vtxo_inputs.clone())
1137 .collect::<Vec<_>>()
1138 };
1139
1140 let dust = self.server_info.dust;
1141
1142 let mut outputs = vec![];
1143
1144 match output_type {
1145 BatchOutputType::Board {
1146 to_address,
1147 to_amount,
1148 } => {
1149 if to_amount < self.server_info.dust {
1150 return Err(Error::ad_hoc(format!(
1151 "cannot settle into sub-dust VTXO: {to_amount} < {dust}"
1152 )));
1153 }
1154
1155 outputs.push(intent::Output::Offchain(TxOut {
1156 value: to_amount,
1157 script_pubkey: to_address.to_p2tr_script_pubkey(),
1158 }));
1159 }
1160 BatchOutputType::OffBoard {
1161 to_address,
1162 to_amount,
1163 change_amount,
1164 ..
1165 } if change_amount == Amount::ZERO => {
1166 outputs.push(intent::Output::Onchain(TxOut {
1167 value: to_amount,
1168 script_pubkey: to_address.script_pubkey(),
1169 }));
1170 }
1171 BatchOutputType::OffBoard {
1172 to_address,
1173 to_amount,
1174 change_address,
1175 change_amount,
1176 } => {
1177 if change_amount < dust {
1178 return Err(Error::ad_hoc(format!(
1179 "cannot settle with sub-dust change VTXO: {change_amount} < {dust}"
1180 )));
1181 }
1182
1183 outputs.push(intent::Output::Onchain(TxOut {
1184 value: to_amount,
1185 script_pubkey: to_address.script_pubkey(),
1186 }));
1187
1188 outputs.push(intent::Output::Offchain(TxOut {
1189 value: change_amount,
1190 script_pubkey: change_address.to_p2tr_script_pubkey(),
1191 }));
1192 }
1193 }
1194
1195 let cosigner_pk = cosigner_keypair.public_key();
1196
1197 let secp = Secp256k1::new();
1198
1199 let sign_for_vtxo_fn =
1200 |input: &mut psbt::Input,
1201 msg: secp256k1::Message|
1202 -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error> {
1203 match &input.witness_script {
1204 None => Err(ark_core::Error::ad_hoc(
1205 "Missing witness script in psbt::Input when signing intent",
1206 )),
1207 Some(script) => {
1208 let pks = extract_checksig_pubkeys(script);
1209 let mut res = vec![];
1210 for pk in pks {
1211 if let Ok(keypair) = self.keypair_by_pk(&pk) {
1212 let sig = secp.sign_schnorr_no_aux_rand(&msg, &keypair);
1213 res.push((sig, keypair.public_key().into()))
1214 }
1215 }
1216 Ok(res)
1217 }
1218 }
1219 };
1220
1221 let sign_for_onchain_fn =
1222 |input: &mut psbt::Input,
1223 msg: secp256k1::Message|
1224 -> Result<(schnorr::Signature, XOnlyPublicKey), ark_core::Error> {
1225 let onchain_input = onchain_inputs
1226 .iter()
1227 .find(|o| {
1228 Some(o.boarding_output().script_pubkey())
1229 == input.witness_utxo.clone().map(|w| w.script_pubkey)
1230 })
1231 .ok_or_else(|| {
1232 ark_core::Error::ad_hoc(
1233 "could not find signing key for onchain input: {input:?}",
1234 )
1235 })?;
1236
1237 let owner_pk = onchain_input.boarding_output().owner_pk();
1238 let sig = self
1239 .inner
1240 .wallet
1241 .sign_for_pk(&owner_pk, &msg)
1242 .map_err(|e| ark_core::Error::ad_hoc(e.to_string()))?;
1243
1244 Ok((sig, owner_pk))
1245 };
1246
1247 let now = std::time::SystemTime::now()
1248 .duration_since(std::time::UNIX_EPOCH)
1249 .map_err(|e| Error::ad_hoc(e.to_string()))
1250 .context("failed to compute now timestamp")?;
1251 let now = now.as_secs();
1252 let expire_at = now + (2 * 60);
1253
1254 if let Some(packet) = create_asset_preservation_packet(&inputs, &outputs)? {
1255 outputs.push(intent::Output::AssetPacket(packet.to_txout()));
1256 }
1257
1258 let mut onchain_output_indexes = Vec::new();
1259 for (i, output) in outputs.iter().enumerate() {
1260 if matches!(output, intent::Output::Onchain(_)) {
1261 onchain_output_indexes.push(i);
1262 }
1263 }
1264
1265 let message = match intent_kind {
1266 PrepareIntentKind::EstimateFee => intent::IntentMessage::EstimateIntentFee {
1267 onchain_output_indexes,
1268 valid_at: now,
1269 expire_at,
1270 own_cosigner_pks: vec![cosigner_pk],
1271 },
1272 PrepareIntentKind::Register => intent::IntentMessage::Register {
1273 onchain_output_indexes,
1274 valid_at: now,
1275 expire_at,
1276 own_cosigner_pks: vec![cosigner_pk],
1277 },
1278 };
1279
1280 let intent = intent::make_intent(
1281 sign_for_vtxo_fn,
1282 sign_for_onchain_fn,
1283 inputs,
1284 outputs.clone(),
1285 message,
1286 )?;
1287
1288 Ok(PreparedIntent {
1289 intent,
1290 cosigner_keypair,
1291 vtxo_input_outpoints,
1292 outputs,
1293 onchain_inputs,
1294 vtxo_inputs,
1295 })
1296 }
1297
1298 pub(crate) async fn join_next_batch<R>(
1299 &self,
1300 rng: &mut R,
1301 onchain_inputs: Vec<batch::OnChainInput>,
1302 vtxo_inputs: Vec<intent::Input>,
1303 output_type: BatchOutputType,
1304 ) -> Result<Txid, Error>
1305 where
1306 R: Rng + CryptoRng,
1307 {
1308 let prepared = self.prepare_intent(
1309 rng,
1310 onchain_inputs,
1311 vtxo_inputs,
1312 output_type,
1313 PrepareIntentKind::Register,
1314 )?;
1315
1316 let PreparedIntent {
1317 intent,
1318 cosigner_keypair,
1319 vtxo_input_outpoints,
1320 outputs,
1321 onchain_inputs,
1322 vtxo_inputs,
1323 } = prepared;
1324
1325 let onchain_input_outpoints = onchain_inputs
1326 .iter()
1327 .map(|i| i.outpoint())
1328 .collect::<Vec<_>>();
1329
1330 let server_info = &self.server_info;
1331
1332 let own_cosigner_kps = [cosigner_keypair];
1333 let own_cosigner_pks = own_cosigner_kps
1334 .iter()
1335 .map(|k| k.public_key())
1336 .collect::<Vec<_>>();
1337
1338 let secp = Secp256k1::new();
1339
1340 let mut step = Step::Start;
1341
1342 let intent_id = timeout_op(
1343 self.inner.timeout,
1344 self.network_client().register_intent(intent),
1345 )
1346 .await
1347 .context("failed to register intent")??;
1348
1349 tracing::debug!(
1350 intent_id,
1351 ?onchain_input_outpoints,
1352 ?vtxo_input_outpoints,
1353 ?outputs,
1354 "Registered intent for batch"
1355 );
1356
1357 let network_client = self.network_client();
1358
1359 let mut batch_id: Option<String> = None;
1360
1361 let topics = vtxo_input_outpoints
1362 .iter()
1363 .map(ToString::to_string)
1364 .chain(
1365 own_cosigner_pks
1366 .iter()
1367 .map(|pk| pk.serialize().to_lower_hex_string()),
1368 )
1369 .collect();
1370
1371 let mut stream = network_client.get_event_stream(topics).await?;
1372
1373 let (ark_forfeit_pk, _) = server_info.forfeit_pk.x_only_public_key();
1374
1375 let mut unsigned_commitment_tx = None;
1376
1377 let mut vtxo_graph_chunks = Some(Vec::new());
1378 let mut vtxo_graph: Option<TxGraph> = None;
1379
1380 let mut connectors_graph_chunks = Some(Vec::new());
1381 let mut batch_expiry = None;
1382
1383 let mut agg_nonce_pks = HashMap::new();
1384
1385 let mut our_nonce_trees: Option<HashMap<Keypair, NonceKps>> = None;
1386 loop {
1387 match stream.next().await {
1388 Some(Ok(event)) => match event {
1389 StreamEvent::BatchStarted(e) => {
1390 if step != Step::Start {
1391 continue;
1392 }
1393
1394 let hash = sha256::Hash::hash(intent_id.as_bytes());
1395 let hash = hash.as_byte_array().to_vec().to_lower_hex_string();
1396
1397 if e.intent_id_hashes.iter().any(|h| h == &hash) {
1398 timeout_op(
1399 self.inner.timeout,
1400 self.network_client()
1401 .confirm_registration(intent_id.clone()),
1402 )
1403 .await
1404 .context("failed to confirm intent registration")??;
1405
1406 tracing::info!(batch_id = e.id, intent_id, "Intent ID found for batch");
1407
1408 batch_id = Some(e.id);
1409
1410 step = match outputs
1413 .iter()
1414 .any(|o| matches!(o, intent::Output::Offchain(_)))
1415 {
1416 true => Step::BatchStarted,
1417 false => Step::BatchSigningStarted,
1418 };
1419
1420 batch_expiry = Some(e.batch_expiry);
1421 } else {
1422 tracing::debug!(
1423 batch_id = e.id,
1424 intent_id,
1425 "Intent ID not found for batch"
1426 );
1427 }
1428 }
1429 StreamEvent::TreeTx(e) => {
1430 if step != Step::BatchStarted && step != Step::BatchSigningStarted {
1431 continue;
1432 }
1433
1434 match e.batch_tree_event_type {
1435 BatchTreeEventType::Vtxo => {
1436 match &mut vtxo_graph_chunks {
1437 Some(vtxo_graph_chunks) => {
1438 tracing::debug!("Got new VTXO graph chunk");
1439
1440 vtxo_graph_chunks.push(e.tx_graph_chunk)
1441 }
1442 None => {
1443 return Err(Error::ark_server(
1444 "received unexpected VTXO graph chunk",
1445 ));
1446 }
1447 };
1448 }
1449 BatchTreeEventType::Connector => {
1450 match connectors_graph_chunks {
1451 Some(ref mut connectors_graph_chunks) => {
1452 tracing::debug!("Got new connectors graph chunk");
1453
1454 connectors_graph_chunks.push(e.tx_graph_chunk)
1455 }
1456 None => {
1457 return Err(Error::ark_server(
1458 "received unexpected connectors graph chunk",
1459 ));
1460 }
1461 };
1462 }
1463 }
1464 }
1465 StreamEvent::TreeSignature(e) => {
1466 if step != Step::BatchSigningStarted {
1467 continue;
1468 }
1469
1470 match e.batch_tree_event_type {
1471 BatchTreeEventType::Vtxo => {
1472 match vtxo_graph {
1473 Some(ref mut vtxo_graph) => {
1474 vtxo_graph.apply(|graph| {
1475 if graph.root().unsigned_tx.compute_txid() != e.txid {
1476 Ok(true)
1477 } else {
1478 graph.set_signature(e.signature);
1479
1480 Ok(false)
1481 }
1482 })?;
1483 }
1484 None => {
1485 return Err(Error::ark_server(
1486 "received batch tree signature without TX graph",
1487 ));
1488 }
1489 };
1490 }
1491 BatchTreeEventType::Connector => {
1492 return Err(Error::ark_server(
1493 "received batch tree signature for connectors tree",
1494 ));
1495 }
1496 }
1497 }
1498 StreamEvent::TreeSigningStarted(e) => {
1499 if step != Step::BatchStarted {
1500 continue;
1501 }
1502
1503 let chunks = vtxo_graph_chunks.take().ok_or(Error::ark_server(
1504 "received tree signing started event without VTXO graph chunks",
1505 ))?;
1506 vtxo_graph = Some(
1507 TxGraph::new(chunks)
1508 .map_err(Error::from)
1509 .context("failed to build VTXO graph before generating nonces")?,
1510 );
1511
1512 tracing::info!(batch_id = e.id, "Batch signing started");
1513
1514 for own_cosigner_pk in own_cosigner_pks.iter() {
1515 if !&e.cosigners_pubkeys.iter().any(|p| p == own_cosigner_pk) {
1516 return Err(Error::ark_server(format!(
1517 "own cosigner PK is not present in cosigner PKs: {own_cosigner_pk}"
1518 )));
1519 }
1520 }
1521
1522 let mut our_nonce_tree_map = HashMap::new();
1524 for own_cosigner_kp in own_cosigner_kps {
1525 let own_cosigner_pk = own_cosigner_kp.public_key();
1526 let nonce_tree = generate_nonce_tree(
1527 rng,
1528 vtxo_graph.as_ref().expect("VTXO graph"),
1529 own_cosigner_pk,
1530 &e.unsigned_commitment_tx,
1531 )
1532 .map_err(Error::from)
1533 .context("failed to generate VTXO nonce tree")?;
1534
1535 tracing::info!(
1536 cosigner_pk = %own_cosigner_pk,
1537 "Submitting nonce tree for cosigner PK"
1538 );
1539
1540 network_client
1541 .submit_tree_nonces(
1542 &e.id,
1543 own_cosigner_pk,
1544 nonce_tree.to_nonce_pks(),
1545 )
1546 .await
1547 .map_err(Error::ark_server)
1548 .context("failed to submit VTXO nonce tree")?;
1549
1550 our_nonce_tree_map.insert(own_cosigner_kp, nonce_tree);
1551 }
1552
1553 unsigned_commitment_tx = Some(e.unsigned_commitment_tx);
1554 our_nonce_trees = Some(our_nonce_tree_map);
1555
1556 step = step.next();
1557 }
1558 StreamEvent::TreeNonces(e) => {
1559 if step != Step::BatchSigningStarted {
1560 continue;
1561 }
1562
1563 let tree_tx_nonce_pks = e.nonces;
1564
1565 let cosigner_pk = match tree_tx_nonce_pks.0.iter().find(|(pk, _)| {
1566 own_cosigner_pks
1567 .iter()
1568 .any(|p| &&p.x_only_public_key().0 == pk)
1569 }) {
1570 Some((pk, _)) => *pk,
1571 None => {
1572 tracing::debug!(
1573 batch_id = e.id,
1574 txid = %e.txid,
1575 "Received irrelevant TreeNonces event"
1576 );
1577
1578 continue;
1579 }
1580 };
1581
1582 tracing::debug!(
1583 batch_id = e.id,
1584 txid = %e.txid,
1585 %cosigner_pk,
1586 "Received TreeNonces event"
1587 );
1588
1589 let agg_nonce_pk = aggregate_nonces(tree_tx_nonce_pks);
1590
1591 agg_nonce_pks.insert(e.txid, agg_nonce_pk);
1592
1593 if vtxo_graph.is_none() {
1594 let chunks = vtxo_graph_chunks.take().ok_or(Error::ark_server(
1595 "received tree nonces event without VTXO graph chunks",
1596 ))?;
1597 vtxo_graph = Some(
1598 TxGraph::new(chunks)
1599 .map_err(Error::from)
1600 .context("failed to build VTXO graph before tree signing")?,
1601 );
1602 }
1603 let vtxo_graph_ref = vtxo_graph.as_ref().expect("just populated");
1604
1605 if agg_nonce_pks.len() == vtxo_graph_ref.nb_of_nodes() {
1608 let cosigner_kp = own_cosigner_kps
1609 .iter()
1610 .find(|kp| kp.public_key().x_only_public_key().0 == cosigner_pk)
1611 .ok_or_else(|| {
1612 Error::ad_hoc("no cosigner keypair to sign for own PK")
1613 })?;
1614
1615 let our_nonce_trees = our_nonce_trees.as_mut().ok_or(
1616 Error::ark_server("missing nonce trees during batch protocol"),
1617 )?;
1618
1619 let our_nonce_tree =
1620 our_nonce_trees
1621 .get_mut(cosigner_kp)
1622 .ok_or(Error::ark_server(
1623 "missing nonce tree during batch protocol",
1624 ))?;
1625
1626 let unsigned_commitment_tx = unsigned_commitment_tx
1627 .as_ref()
1628 .ok_or_else(|| Error::ad_hoc("missing commitment TX"))?;
1629
1630 let batch_expiry = batch_expiry
1631 .ok_or_else(|| Error::ad_hoc("missing batch expiry"))?;
1632
1633 let mut partial_sig_tree = PartialSigTree::default();
1634 for (txid, _) in vtxo_graph_ref.as_map() {
1635 let agg_nonce_pk = agg_nonce_pks.get(&txid).ok_or_else(|| {
1636 Error::ad_hoc(format!(
1637 "missing aggregated nonce PK for TX {txid}"
1638 ))
1639 })?;
1640
1641 let sigs = sign_batch_tree_tx(
1642 txid,
1643 batch_expiry,
1644 ark_forfeit_pk,
1645 cosigner_kp,
1646 *agg_nonce_pk,
1647 vtxo_graph_ref,
1648 unsigned_commitment_tx,
1649 our_nonce_tree,
1650 )
1651 .map_err(Error::from)
1652 .context("failed to sign VTXO tree")?;
1653
1654 partial_sig_tree.0.extend(sigs.0);
1655 }
1656
1657 network_client
1658 .submit_tree_signatures(
1659 &e.id,
1660 cosigner_kp.public_key(),
1661 partial_sig_tree,
1662 )
1663 .await
1664 .map_err(Error::ark_server)
1665 .context("failed to submit VTXO tree signatures")?;
1666 }
1667 }
1668 StreamEvent::TreeNoncesAggregated(e) => {
1669 tracing::debug!(batch_id = e.id, "Batch combined nonces generated");
1670 }
1671 StreamEvent::BatchFinalization(e) => {
1672 if step != Step::BatchSigningStarted {
1673 continue;
1674 }
1675
1676 tracing::debug!(
1677 commitment_txid = %e.commitment_tx.unsigned_tx.compute_txid(),
1678 "Batch finalization started"
1679 );
1680
1681 let signed_forfeit_psbts = if !vtxo_inputs.is_empty() {
1682 let chunks =
1683 connectors_graph_chunks.take().ok_or(Error::ark_server(
1684 "received batch finalization event without connectors",
1685 ))?;
1686
1687 if chunks.is_empty() {
1688 tracing::debug!(batch_id = e.id, "No forfeit transactions");
1689
1690 Vec::new()
1691 } else {
1692 let connectors_graph = TxGraph::new(chunks)
1693 .map_err(Error::from)
1694 .context(
1695 "failed to build connectors graph before signing forfeit TXs",
1696 )?;
1697
1698 tracing::debug!(batch_id = e.id, "Batch finalization started");
1699
1700 create_and_sign_forfeit_txs(
1701 |input: &mut psbt::Input, msg: secp256k1::Message| match &input
1702 .witness_script
1703 {
1704 None => Err(ark_core::Error::ad_hoc(
1705 "Missing witness script in psbt::Input when signing forfeit",
1706 )),
1707 Some(script) => {
1708 let pks = extract_checksig_pubkeys(script);
1709 let mut res = vec![];
1710 for pk in pks {
1711 if let Ok(keypair) =
1712 self.keypair_by_pk(&pk) {
1713 let sig =
1714 secp.sign_schnorr_no_aux_rand(&msg, &keypair);
1715 res.push((sig, keypair.public_key().into()))
1716 }
1717 }
1718 Ok(res)
1719 }
1720 },
1721 vtxo_inputs.as_slice(),
1722 &connectors_graph.leaves(),
1723 &server_info.forfeit_address,
1724 server_info.dust,
1725 )
1726 .map_err(Error::from)?
1727 }
1728 } else {
1729 Vec::new()
1730 };
1731
1732 let commitment_psbt = if onchain_inputs.is_empty() {
1733 None
1734 } else {
1735 let mut commitment_psbt = e.commitment_tx;
1736
1737 let sign_for_pk_fn = |pk: &XOnlyPublicKey,
1738 msg: &secp256k1::Message|
1739 -> Result<
1740 schnorr::Signature,
1741 ark_core::Error,
1742 > {
1743 self.inner
1744 .wallet
1745 .sign_for_pk(pk, msg)
1746 .map_err(|e| ark_core::Error::ad_hoc(e.to_string()))
1747 };
1748
1749 sign_commitment_psbt(
1750 sign_for_pk_fn,
1751 &mut commitment_psbt,
1752 &onchain_inputs,
1753 )
1754 .map_err(Error::from)?;
1755
1756 Some(commitment_psbt)
1757 };
1758
1759 if !signed_forfeit_psbts.is_empty() || commitment_psbt.is_some() {
1760 network_client
1761 .submit_signed_forfeit_txs(signed_forfeit_psbts, commitment_psbt)
1762 .await?;
1763 }
1764
1765 step = step.next();
1766 }
1767 StreamEvent::BatchFinalized(e) => {
1768 if step != Step::Finalized {
1769 continue;
1770 }
1771
1772 let commitment_txid = e.commitment_txid;
1773
1774 tracing::info!(batch_id = e.id, %commitment_txid, "Batch finalized");
1775
1776 return Ok(commitment_txid);
1777 }
1778 StreamEvent::BatchFailed(ref e) => {
1779 if Some(&e.id) == batch_id.as_ref() {
1780 return Err(Error::ark_server(format!(
1781 "batch failed {}: {}",
1782 e.id, e.reason
1783 )));
1784 }
1785
1786 tracing::debug!("Unrelated batch failed: {e:?}");
1787 }
1788 StreamEvent::Heartbeat => {}
1789 StreamEvent::StreamStarted(_) => {}
1790 },
1791 Some(Err(e)) => {
1792 tracing::error!("Got error from event stream");
1793
1794 return Err(Error::ark_server(e));
1795 }
1796 None => {
1797 return Err(Error::ark_server("dropped batch event stream"));
1798 }
1799 }
1800 }
1801
1802 #[derive(Debug, PartialEq, Eq)]
1803 enum Step {
1804 Start,
1805 BatchStarted,
1806 BatchSigningStarted,
1807 Finalized,
1808 }
1809
1810 impl Step {
1811 fn next(&self) -> Step {
1812 match self {
1813 Step::Start => Step::BatchStarted,
1814 Step::BatchStarted => Step::BatchSigningStarted,
1815 Step::BatchSigningStarted => Step::Finalized,
1816 Step::Finalized => Step::Finalized, }
1818 }
1819 }
1820 }
1821}
1822
1823#[derive(Debug, Clone)]
1824pub(crate) enum PrepareIntentKind {
1825 Register,
1826 EstimateFee,
1827}
1828
1829#[derive(Debug, Clone)]
1830pub(crate) enum BatchOutputType {
1831 Board {
1832 to_address: ArkAddress,
1833 to_amount: Amount,
1834 },
1835 OffBoard {
1836 to_address: Address,
1837 to_amount: Amount,
1838 change_address: ArkAddress,
1839 change_amount: Amount,
1840 },
1841}
1842
1843pub(crate) struct PreparedIntent {
1845 pub intent: intent::Intent,
1847 pub cosigner_keypair: Keypair,
1849 pub vtxo_input_outpoints: Vec<OutPoint>,
1851 pub outputs: Vec<intent::Output>,
1853 pub onchain_inputs: Vec<batch::OnChainInput>,
1855 pub vtxo_inputs: Vec<intent::Input>,
1857}