1use crate::error::ErrorContext as _;
2use crate::swap_storage::SwapStorage;
3use crate::utils::sleep;
4use crate::utils::timeout_op;
5use crate::wallet::BoardingWallet;
6use crate::wallet::OnchainWallet;
7use crate::Blockchain;
8use crate::Client;
9use crate::Error;
10use ark_core::batch;
11use ark_core::batch::aggregate_nonces;
12use ark_core::batch::complete_delegate_forfeit_txs;
13use ark_core::batch::create_and_sign_forfeit_txs;
14use ark_core::batch::create_asset_preservation_packet;
15use ark_core::batch::generate_nonce_tree;
16use ark_core::batch::sign_batch_tree_tx;
17use ark_core::batch::sign_commitment_psbt;
18use ark_core::batch::Delegate;
19use ark_core::batch::NonceKps;
20use ark_core::intent;
21use ark_core::script::extract_checksig_pubkeys;
22use ark_core::server::BatchTreeEventType;
23use ark_core::server::PartialSigTree;
24use ark_core::server::StreamEvent;
25use ark_core::ArkAddress;
26use ark_core::ArkNote;
27use ark_core::ExplorerUtxo;
28use ark_core::TxGraph;
29use backon::ExponentialBuilder;
30use backon::Retryable;
31use bitcoin::hashes::sha256;
32use bitcoin::hashes::Hash;
33use bitcoin::hex::DisplayHex;
34use bitcoin::key::Keypair;
35use bitcoin::key::Secp256k1;
36use bitcoin::psbt;
37use bitcoin::secp256k1;
38use bitcoin::secp256k1::schnorr;
39use bitcoin::secp256k1::PublicKey;
40use bitcoin::Address;
41use bitcoin::Amount;
42use bitcoin::OutPoint;
43use bitcoin::Psbt;
44use bitcoin::TxOut;
45use bitcoin::Txid;
46use bitcoin::XOnlyPublicKey;
47use futures::StreamExt;
48use rand::CryptoRng;
49use rand::Rng;
50use std::collections::HashMap;
51use std::collections::HashSet;
52
53impl<B, W, S, K> Client<B, W, S, K>
54where
55 B: Blockchain,
56 W: BoardingWallet + OnchainWallet,
57 S: SwapStorage + 'static,
58 K: crate::KeyProvider,
59{
60 pub async fn settle_all<R>(&self, rng: &mut R) -> Result<Option<Txid>, Error>
66 where
67 R: Rng + CryptoRng + Clone,
68 {
69 self.settle_at(crate::utils::unix_now()?, rng).await
70 }
71
72 pub(crate) async fn settle_at<R>(&self, now: i64, rng: &mut R) -> Result<Option<Txid>, Error>
73 where
74 R: Rng + CryptoRng + Clone,
75 {
76 let (to_address, _) = self.get_offchain_address()?;
78
79 let (boarding_inputs, vtxo_inputs, total_amount) =
80 self.fetch_commitment_transaction_inputs(now).await?;
81
82 tracing::debug!(
83 offchain_adress = %to_address.encode(),
84 ?boarding_inputs,
85 ?vtxo_inputs,
86 "Attempting to settle outputs"
87 );
88
89 if boarding_inputs.is_empty() && vtxo_inputs.is_empty() {
90 tracing::debug!("No inputs to board with");
91 return Ok(None);
92 }
93
94 let join_next_batch = || async {
95 self.join_next_batch(
96 &mut rng.clone(),
97 boarding_inputs.clone(),
98 vtxo_inputs.clone(),
99 BatchOutputType::Board {
100 to_address,
101 to_amount: total_amount,
102 },
103 )
104 .await
105 };
106
107 let commitment_txid = join_next_batch
109 .retry(ExponentialBuilder::default().with_max_times(0))
110 .sleep(sleep)
111 .notify(|err: &Error, dur: std::time::Duration| {
113 tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}",);
114 })
115 .await
116 .context("Failed to join batch")?;
117
118 tracing::info!(%commitment_txid, "Settlement success");
119
120 Ok(Some(commitment_txid))
121 }
122
123 pub async fn settle<R>(&self, rng: &mut R) -> Result<Option<Txid>, Error>
137 where
138 R: Rng + CryptoRng + Clone,
139 {
140 let (vtxo_list, _) = self.list_vtxos().await?;
141 let vtxo_outpoints: Vec<OutPoint> = vtxo_list.recoverable().map(|v| v.outpoint).collect();
142
143 let (boarding_inputs, _, _) = self
144 .fetch_commitment_transaction_inputs(crate::utils::unix_now()?)
145 .await?;
146 let boarding_outpoints: Vec<OutPoint> =
147 boarding_inputs.iter().map(|i| i.outpoint()).collect();
148
149 if vtxo_outpoints.is_empty() && boarding_outpoints.is_empty() {
150 tracing::debug!("No expired/recoverable VTXOs or boarding outputs to settle");
151 return Ok(None);
152 }
153
154 tracing::debug!(
155 num_vtxos = vtxo_outpoints.len(),
156 num_boarding = boarding_outpoints.len(),
157 "Attempting to settle expired/recoverable VTXOs and boarding outputs"
158 );
159
160 self.settle_vtxos(rng, &vtxo_outpoints, &boarding_outpoints)
161 .await
162 }
163
164 pub async fn settle_with_notes<R>(
170 &self,
171 rng: &mut R,
172 notes: Vec<ArkNote>,
173 ) -> Result<Option<Txid>, Error>
174 where
175 R: Rng + CryptoRng + Clone,
176 {
177 let (to_address, _) = self.get_offchain_address()?;
178
179 let (boarding_inputs, vtxo_inputs, mut total_amount) = self
180 .fetch_commitment_transaction_inputs(crate::utils::unix_now()?)
181 .await?;
182
183 let note_inputs: Vec<intent::Input> = notes
185 .iter()
186 .map(|note| {
187 total_amount += note.value();
188 note.to_intent_input()
189 })
190 .collect::<Result<Vec<_>, _>>()?;
191
192 let all_vtxo_inputs: Vec<intent::Input> =
194 vtxo_inputs.into_iter().chain(note_inputs).collect();
195
196 tracing::debug!(
197 offchain_address = %to_address.encode(),
198 ?boarding_inputs,
199 num_vtxo_inputs = all_vtxo_inputs.len(),
200 num_notes = notes.len(),
201 %total_amount,
202 "Attempting to settle outputs with notes"
203 );
204
205 if boarding_inputs.is_empty() && all_vtxo_inputs.is_empty() {
206 tracing::debug!("No inputs to settle");
207 return Ok(None);
208 }
209
210 let join_next_batch = || async {
211 self.join_next_batch(
212 &mut rng.clone(),
213 boarding_inputs.clone(),
214 all_vtxo_inputs.clone(),
215 BatchOutputType::Board {
216 to_address,
217 to_amount: total_amount,
218 },
219 )
220 .await
221 };
222
223 let commitment_txid = join_next_batch
224 .retry(ExponentialBuilder::default().with_max_times(0))
225 .sleep(sleep)
226 .notify(|err: &Error, dur: std::time::Duration| {
227 tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
228 })
229 .await
230 .context("Failed to join batch")?;
231
232 tracing::info!(%commitment_txid, num_notes = notes.len(), "Settlement with notes success");
233
234 Ok(Some(commitment_txid))
235 }
236
237 pub async fn settle_vtxos<R>(
243 &self,
244 rng: &mut R,
245 vtxo_outpoints: &[OutPoint],
246 boarding_outpoints: &[OutPoint],
247 ) -> Result<Option<Txid>, Error>
248 where
249 R: Rng + CryptoRng + Clone,
250 {
251 let (to_address, _) = self.get_offchain_address()?;
253
254 let (all_boarding_inputs, all_vtxo_inputs, _) = self
255 .fetch_commitment_transaction_inputs(crate::utils::unix_now()?)
256 .await?;
257
258 let boarding_inputs: Vec<_> = all_boarding_inputs
260 .into_iter()
261 .filter(|input| boarding_outpoints.contains(&input.outpoint()))
262 .collect();
263
264 let vtxo_inputs: Vec<_> = all_vtxo_inputs
266 .into_iter()
267 .filter(|input| vtxo_outpoints.contains(&input.outpoint()))
268 .collect();
269
270 let total_amount = boarding_inputs
272 .iter()
273 .map(|i| i.amount())
274 .chain(vtxo_inputs.iter().map(|i| i.amount()))
275 .fold(Amount::ZERO, |acc, a| acc + a);
276
277 tracing::debug!(
278 offchain_address = %to_address.encode(),
279 ?boarding_inputs,
280 ?vtxo_inputs,
281 %total_amount,
282 "Attempting to settle specific outputs"
283 );
284
285 if boarding_inputs.is_empty() && vtxo_inputs.is_empty() {
286 tracing::debug!("No matching inputs to settle");
287 return Ok(None);
288 }
289
290 let join_next_batch = || async {
291 self.join_next_batch(
292 &mut rng.clone(),
293 boarding_inputs.clone(),
294 vtxo_inputs.clone(),
295 BatchOutputType::Board {
296 to_address,
297 to_amount: total_amount,
298 },
299 )
300 .await
301 };
302
303 let commitment_txid = join_next_batch
305 .retry(ExponentialBuilder::default().with_max_times(0))
306 .sleep(sleep)
307 .notify(|err: &Error, dur: std::time::Duration| {
308 tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}",);
309 })
310 .await
311 .context("Failed to join batch")?;
312
313 tracing::info!(%commitment_txid, "Settlement of specific VTXOs success");
314
315 Ok(Some(commitment_txid))
316 }
317
318 pub async fn collaborative_redeem<R>(
321 &self,
322 rng: &mut R,
323 to_address: Address,
324 to_amount: Amount,
325 ) -> Result<Txid, Error>
326 where
327 R: Rng + CryptoRng + Clone,
328 {
329 let (change_address, _) = self.get_offchain_address()?;
330
331 let (boarding_inputs, vtxo_inputs, total_amount) = self
332 .fetch_commitment_transaction_inputs(crate::utils::unix_now()?)
333 .await?;
334
335 let onchain_fee = self.eval_onchain_output_fee(ark_fees::Output {
336 amount: to_amount.to_sat(),
337 script: to_address.script_pubkey().to_string(),
338 })?;
339
340 let change_amount = total_amount
342 .checked_sub(to_amount)
343 .and_then(|a| a.checked_sub(onchain_fee))
344 .ok_or_else(|| {
345 Error::coin_select(format!(
346 "insufficient balance: {total_amount} < {to_amount} (send) + {onchain_fee} (fee)"
347 ))
348 })?;
349
350 tracing::info!(
351 %to_address,
352 send_amount = %to_amount,
353 fee = %onchain_fee,
354 change_address = %change_address.encode(),
355 %change_amount,
356 ?boarding_inputs,
357 "Attempting to collaboratively redeem outputs"
358 );
359
360 let join_next_batch = || async {
361 self.join_next_batch(
362 &mut rng.clone(),
363 boarding_inputs.clone(),
364 vtxo_inputs.clone(),
365 BatchOutputType::OffBoard {
366 to_address: to_address.clone(),
367 to_amount,
368 change_address,
369 change_amount,
370 },
371 )
372 .await
373 };
374
375 let commitment_txid = join_next_batch
377 .retry(ExponentialBuilder::default().with_max_times(3))
378 .sleep(sleep)
379 .notify(|err: &Error, dur: std::time::Duration| {
381 tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
382 })
383 .await
384 .context("Failed to join batch")?;
385
386 tracing::info!(%commitment_txid, "Collaborative redeem success");
387
388 Ok(commitment_txid)
389 }
390
391 pub async fn collaborative_redeem_vtxo_selection<R>(
394 &self,
395 rng: &mut R,
396 input_vtxos: impl Iterator<Item = OutPoint> + Clone,
397 to_address: Address,
398 to_amount: Amount,
399 ) -> Result<Txid, Error>
400 where
401 R: Rng + CryptoRng + Clone,
402 {
403 let (change_address, _) = self.get_offchain_address()?;
404
405 let vtxo_inputs = self
406 .selected_batch_settleable_vtxo_inputs(input_vtxos)
407 .await?;
408
409 if vtxo_inputs.is_empty() {
410 return Err(Error::ad_hoc("no matching VTXO outpoints found"));
411 }
412
413 let total_input_amount = vtxo_inputs
415 .iter()
416 .fold(Amount::ZERO, |acc, vtxo| acc + vtxo.amount());
417
418 let onchain_fee = self.eval_onchain_output_fee(ark_fees::Output {
419 amount: to_amount.to_sat(),
420 script: to_address.script_pubkey().to_string(),
421 })?;
422
423 let change_amount = total_input_amount
425 .checked_sub(to_amount)
426 .and_then(|a| a.checked_sub(onchain_fee))
427 .ok_or_else(|| {
428 Error::coin_select(format!(
429 "insufficient VTXO amount: {total_input_amount} < {to_amount} (send) + {onchain_fee} (fee)"
430 ))
431 })?;
432
433 tracing::info!(
434 %to_address,
435 send_amount = %to_amount,
436 fee = %onchain_fee,
437 change_address = %change_address.encode(),
438 %change_amount,
439 "Attempting to collaboratively redeem outputs"
440 );
441
442 let join_next_batch = || async {
443 self.join_next_batch(
444 &mut rng.clone(),
445 Vec::new(),
446 vtxo_inputs.clone(),
447 BatchOutputType::OffBoard {
448 to_address: to_address.clone(),
449 to_amount,
450 change_address,
451 change_amount,
452 },
453 )
454 .await
455 };
456
457 let commitment_txid = join_next_batch
459 .retry(ExponentialBuilder::default().with_max_times(3))
460 .sleep(sleep)
461 .notify(|err: &Error, dur: std::time::Duration| {
463 tracing::warn!("Retrying joining next batch after {dur:?}. Error: {err}");
464 })
465 .await
466 .context("Failed to join batch")?;
467
468 tracing::info!(%commitment_txid, "Collaborative redeem success");
469
470 Ok(commitment_txid)
471 }
472
473 pub(crate) async fn selected_batch_settleable_vtxo_inputs(
474 &self,
475 input_vtxos: impl IntoIterator<Item = OutPoint>,
476 ) -> Result<Vec<intent::Input>, Error> {
477 let requested: HashSet<OutPoint> = input_vtxos.into_iter().collect();
478
479 let (vtxo_list, script_pubkey_to_vtxo_map) =
480 self.list_vtxos().await.context("failed to get VTXO list")?;
481 let server_info = self.server_info()?;
482 let now = crate::utils::unix_now()?;
483
484 let matching_unspent = vtxo_list
485 .all_unspent()
486 .filter(|v| requested.contains(&v.outpoint))
487 .collect::<Vec<_>>();
488
489 let settleable_outpoints = vtxo_list
490 .batch_settleable_at(&server_info, now, |script| {
491 script_pubkey_to_vtxo_map
492 .get(script)
493 .map(|vtxo| vtxo.server_pk())
494 })
495 .filter(|v| requested.contains(&v.outpoint))
496 .map(|v| v.outpoint)
497 .collect::<HashSet<_>>();
498
499 let blocked = matching_unspent
500 .iter()
501 .filter(|v| !settleable_outpoints.contains(&v.outpoint))
502 .map(|v| v.outpoint.to_string())
503 .collect::<Vec<_>>();
504 if !blocked.is_empty() {
505 return Err(Error::ad_hoc(format!(
506 "selected VTXO outpoints are not batch-settleable because their signer cutoff has passed: {}",
507 blocked.join(", ")
508 )));
509 }
510
511 matching_unspent
512 .into_iter()
513 .filter(|v| settleable_outpoints.contains(&v.outpoint))
514 .map(|v| {
515 let vtxo = script_pubkey_to_vtxo_map.get(&v.script).ok_or_else(|| {
516 ark_core::Error::ad_hoc(format!("missing VTXO for script pubkey: {}", v.script))
517 })?;
518 let spend_info = vtxo.forfeit_spend_info()?;
519
520 Ok(intent::Input::new(
521 v.outpoint,
522 vtxo.exit_delay(),
523 None,
525 TxOut {
526 value: v.amount,
527 script_pubkey: vtxo.script_pubkey(),
528 },
529 vtxo.tapscripts(),
530 spend_info,
531 false,
532 v.is_swept,
533 v.assets.clone(),
534 ))
535 })
536 .collect::<Result<Vec<_>, Error>>()
537 }
538
539 pub async fn generate_delegate(
553 &self,
554 delegate_cosigner_pk: PublicKey,
555 ) -> Result<Delegate, Error> {
556 let (to_address, _) = self.get_offchain_address()?;
558
559 let (_, vtxo_inputs, _) = self
561 .fetch_commitment_transaction_inputs(crate::utils::unix_now()?)
562 .await?;
563
564 let total_amount = vtxo_inputs
565 .iter()
566 .fold(Amount::ZERO, |acc, v| acc + v.amount());
567
568 if vtxo_inputs.is_empty() {
569 return Err(Error::ad_hoc("no inputs to settle via delegate"));
570 }
571
572 let server_info = &self.server_info()?;
573
574 let mut outputs = vec![intent::Output::Offchain(TxOut {
575 value: total_amount,
576 script_pubkey: to_address.to_p2tr_script_pubkey(),
577 })];
578
579 if let Some(packet) = create_asset_preservation_packet(&vtxo_inputs, &outputs)? {
580 outputs.push(intent::Output::AssetPacket(packet.to_txout()));
581 }
582
583 let delegate = batch::prepare_delegate_psbts(
584 vtxo_inputs,
585 outputs,
586 delegate_cosigner_pk,
587 &server_info.forfeit_address,
588 server_info.dust,
589 )?;
590
591 Ok(delegate)
592 }
593
594 pub fn sign_delegate_psbts(
596 &self,
597 intent_psbt: &mut Psbt,
598 forfeit_psbts: &mut [Psbt],
599 ) -> Result<(), Error> {
600 let sign_fn =
601 |input: &mut psbt::Input,
602 msg: secp256k1::Message|
603 -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error> {
604 match &input.witness_script {
605 None => Err(ark_core::Error::ad_hoc(
606 "Missing witness script for psbt::Input",
607 )),
608 Some(script) => {
609 let mut res = vec![];
610 let pks = extract_checksig_pubkeys(script);
611 for pk in pks {
612 if let Ok(keypair) = self.keypair_by_pk(&pk) {
613 let sig = Secp256k1::new().sign_schnorr_no_aux_rand(&msg, &keypair);
614 let pk = keypair.x_only_public_key().0;
615 res.push((sig, pk));
616 }
617 }
618 Ok(res)
619 }
620 }
621 };
622
623 batch::sign_delegate_psbts(sign_fn, intent_psbt, forfeit_psbts)?;
624
625 Ok(())
626 }
627
628 pub async fn settle_delegate<R>(
643 &self,
644 rng: &mut R,
645 delegate: Delegate,
646 own_cosigner_kp: Keypair,
647 ) -> Result<Txid, Error>
648 where
649 R: Rng + CryptoRng,
650 {
651 if own_cosigner_kp.public_key() != delegate.delegate_cosigner_pk {
653 return Err(Error::ad_hoc(
654 "provided cosigner keypair does not match delegate_cosigner_pk",
655 ));
656 }
657
658 let intent_id = timeout_op(
660 self.inner.timeout,
661 self.network_client()
662 .register_intent(delegate.intent.clone()),
663 )
664 .await
665 .context("failed to register delegated intent")??;
666
667 tracing::debug!(intent_id, "Registered delegated intent");
668
669 let network_client = self.network_client();
670 let server_info = &self.server_info()?;
671
672 #[derive(Debug, PartialEq, Eq)]
673 enum Step {
674 Start,
675 BatchStarted,
676 BatchSigningStarted,
677 Finalized,
678 }
679
680 impl Step {
681 fn next(&self) -> Step {
682 match self {
683 Step::Start => Step::BatchStarted,
684 Step::BatchStarted => Step::BatchSigningStarted,
685 Step::BatchSigningStarted => Step::Finalized,
686 Step::Finalized => Step::Finalized,
687 }
688 }
689 }
690
691 let mut step = Step::Start;
692
693 let own_cosigner_kps = [own_cosigner_kp];
694 let own_cosigner_pks = own_cosigner_kps
695 .iter()
696 .map(|k| k.public_key())
697 .collect::<Vec<_>>();
698
699 let mut batch_id: Option<String> = None;
700
701 let vtxo_input_outpoints = delegate
702 .forfeit_psbts
703 .iter()
704 .map(|psbt| psbt.unsigned_tx.input[0].previous_output)
705 .collect::<Vec<_>>();
706
707 let topics = vtxo_input_outpoints
708 .iter()
709 .map(ToString::to_string)
710 .chain(
711 own_cosigner_pks
712 .iter()
713 .map(|pk| pk.serialize().to_lower_hex_string()),
714 )
715 .collect();
716
717 let mut stream = network_client.get_event_stream(topics).await?;
718
719 let (ark_forfeit_pk, _) = server_info.forfeit_pk.x_only_public_key();
720
721 let mut unsigned_commitment_tx = None;
722
723 let mut vtxo_batch_tree_graph_chunks = Some(Vec::new());
724 let mut vtxo_batch_tree_graph: Option<TxGraph> = None;
725
726 let mut connectors_graph_chunks = Some(Vec::new());
727 let mut batch_expiry = None;
728
729 let mut agg_nonce_pks = HashMap::new();
730
731 let mut our_nonce_trees: Option<HashMap<Keypair, NonceKps>> = None;
732
733 loop {
734 match timeout_op(self.inner.timeout, stream.next())
735 .await
736 .context("timed out waiting for batch event")?
737 {
738 Some(Ok(event)) => match event {
739 StreamEvent::BatchStarted(e) => {
740 if step != Step::Start {
741 continue;
742 }
743
744 let hash = sha256::Hash::hash(intent_id.as_bytes());
745 let hash = hash.as_byte_array().to_vec().to_lower_hex_string();
746
747 if e.intent_id_hashes.iter().any(|h| h == &hash) {
748 timeout_op(
749 self.inner.timeout,
750 self.network_client()
751 .confirm_registration(intent_id.clone()),
752 )
753 .await
754 .context("failed to confirm intent registration")??;
755
756 tracing::info!(batch_id = e.id, intent_id, "Intent ID found for batch");
757
758 batch_id = Some(e.id);
759
760 step = Step::BatchStarted;
761
762 batch_expiry = Some(e.batch_expiry);
763 } else {
764 tracing::debug!(
765 batch_id = e.id,
766 intent_id,
767 "Intent ID not found for batch"
768 );
769 }
770 }
771 StreamEvent::TreeTx(e) => {
772 if step != Step::BatchStarted && step != Step::BatchSigningStarted {
773 continue;
774 }
775
776 match e.batch_tree_event_type {
777 BatchTreeEventType::Vtxo => {
778 match &mut vtxo_batch_tree_graph_chunks {
779 Some(vtxo_batch_tree_graph_chunks) => {
780 tracing::debug!("Got new VTXO batch-tree graph chunk");
781
782 vtxo_batch_tree_graph_chunks.push(e.tx_graph_chunk)
783 }
784 None => {
785 return Err(Error::ark_server(
786 "received unexpected VTXO batch-tree graph chunk",
787 ));
788 }
789 };
790 }
791 BatchTreeEventType::Connector => {
792 match connectors_graph_chunks {
793 Some(ref mut connectors_graph_chunks) => {
794 tracing::debug!("Got new connectors graph chunk");
795
796 connectors_graph_chunks.push(e.tx_graph_chunk)
797 }
798 None => {
799 return Err(Error::ark_server(
800 "received unexpected connectors graph chunk",
801 ));
802 }
803 };
804 }
805 }
806 }
807 StreamEvent::TreeSignature(e) => {
808 if step != Step::BatchSigningStarted {
809 continue;
810 }
811
812 match e.batch_tree_event_type {
813 BatchTreeEventType::Vtxo => {
814 match vtxo_batch_tree_graph {
815 Some(ref mut vtxo_batch_tree_graph) => {
816 vtxo_batch_tree_graph.apply(|graph| {
817 if graph.root().unsigned_tx.compute_txid() != e.txid {
818 Ok(true)
819 } else {
820 graph.set_signature(e.signature);
821
822 Ok(false)
823 }
824 })?;
825 }
826 None => {
827 return Err(Error::ark_server(
828 "received batch-tree signature without transaction graph",
829 ));
830 }
831 };
832 }
833 BatchTreeEventType::Connector => {
834 return Err(Error::ark_server(
835 "received batch-tree signature for connector tree",
836 ));
837 }
838 }
839 }
840 StreamEvent::TreeSigningStarted(e) => {
841 if step != Step::BatchStarted {
842 continue;
843 }
844
845 let chunks = vtxo_batch_tree_graph_chunks.take().ok_or(Error::ark_server(
846 "received batch-tree signing started event without VTXO batch-tree graph chunks",
847 ))?;
848 vtxo_batch_tree_graph =
849 Some(TxGraph::new(chunks).map_err(Error::from).context(
850 "failed to build VTXO batch-tree graph before generating nonces",
851 )?);
852
853 tracing::info!(batch_id = e.id, "Batch signing started");
854
855 for own_cosigner_pk in own_cosigner_pks.iter() {
856 if !&e.cosigners_pubkeys.iter().any(|p| p == own_cosigner_pk) {
857 return Err(Error::ark_server(format!(
858 "own cosigner PK is not present in cosigner PKs: {own_cosigner_pk}"
859 )));
860 }
861 }
862
863 let mut our_nonce_tree_map = HashMap::new();
864 for own_cosigner_kp in own_cosigner_kps {
865 let own_cosigner_pk = own_cosigner_kp.public_key();
866 let nonce_tree = generate_nonce_tree(
867 rng,
868 vtxo_batch_tree_graph
869 .as_ref()
870 .expect("VTXO batch-tree graph"),
871 own_cosigner_pk,
872 &e.unsigned_commitment_tx,
873 )
874 .map_err(Error::from)
875 .context("failed to generate VTXO nonce tree")?;
876
877 tracing::info!(
878 cosigner_pk = %own_cosigner_pk,
879 "Submitting nonce tree for cosigner PK"
880 );
881
882 network_client
883 .submit_tree_nonces(
884 &e.id,
885 own_cosigner_pk,
886 nonce_tree.to_nonce_pks(),
887 )
888 .await
889 .map_err(Error::ark_server)
890 .context("failed to submit VTXO nonce tree")?;
891
892 our_nonce_tree_map.insert(own_cosigner_kp, nonce_tree);
893 }
894
895 unsigned_commitment_tx = Some(e.unsigned_commitment_tx);
896 our_nonce_trees = Some(our_nonce_tree_map);
897
898 step = step.next();
899 }
900 StreamEvent::TreeNonces(e) => {
901 if step != Step::BatchSigningStarted {
902 continue;
903 }
904
905 let tree_tx_nonce_pks = e.nonces;
906
907 let cosigner_pk = match tree_tx_nonce_pks.0.iter().find(|(pk, _)| {
908 own_cosigner_pks
909 .iter()
910 .any(|p| &&p.x_only_public_key().0 == pk)
911 }) {
912 Some((pk, _)) => *pk,
913 None => {
914 tracing::debug!(
915 batch_id = e.id,
916 txid = %e.txid,
917 "Received irrelevant TreeNonces event"
918 );
919
920 continue;
921 }
922 };
923
924 tracing::debug!(
925 batch_id = e.id,
926 txid = %e.txid,
927 %cosigner_pk,
928 "Received TreeNonces event"
929 );
930
931 let agg_nonce_pk = aggregate_nonces(tree_tx_nonce_pks);
932
933 agg_nonce_pks.insert(e.txid, agg_nonce_pk);
934
935 if vtxo_batch_tree_graph.is_none() {
936 let chunks = vtxo_batch_tree_graph_chunks.take().ok_or(Error::ark_server(
937 "received batch-tree nonces event without VTXO batch-tree graph chunks",
938 ))?;
939 vtxo_batch_tree_graph = Some(
940 TxGraph::new(chunks)
941 .map_err(Error::from)
942 .context("failed to build VTXO batch-tree graph before batch-tree signing")?,
943 );
944 }
945 let vtxo_batch_tree_graph_ref =
946 vtxo_batch_tree_graph.as_ref().expect("just populated");
947
948 if agg_nonce_pks.len() == vtxo_batch_tree_graph_ref.nb_of_nodes() {
949 let cosigner_kp = own_cosigner_kps
950 .iter()
951 .find(|kp| kp.public_key().x_only_public_key().0 == cosigner_pk)
952 .ok_or_else(|| {
953 Error::ad_hoc("no cosigner keypair to sign for own PK")
954 })?;
955
956 let our_nonce_trees = our_nonce_trees.as_mut().ok_or(
957 Error::ark_server("missing nonce trees during batch protocol"),
958 )?;
959
960 let our_nonce_tree =
961 our_nonce_trees
962 .get_mut(cosigner_kp)
963 .ok_or(Error::ark_server(
964 "missing nonce tree during batch protocol",
965 ))?;
966
967 let unsigned_commitment_tx = unsigned_commitment_tx
968 .as_ref()
969 .ok_or_else(|| Error::ad_hoc("missing commitment TX"))?;
970
971 let batch_expiry = batch_expiry
972 .ok_or_else(|| Error::ad_hoc("missing batch expiry"))?;
973
974 let mut partial_sig_tree = PartialSigTree::default();
975 for (txid, _) in vtxo_batch_tree_graph_ref.as_map() {
976 let agg_nonce_pk = agg_nonce_pks.get(&txid).ok_or_else(|| {
977 Error::ad_hoc(format!(
978 "missing aggregated nonce PK for TX {txid}"
979 ))
980 })?;
981
982 let sigs = sign_batch_tree_tx(
983 txid,
984 batch_expiry,
985 ark_forfeit_pk,
986 cosigner_kp,
987 *agg_nonce_pk,
988 vtxo_batch_tree_graph_ref,
989 unsigned_commitment_tx,
990 our_nonce_tree,
991 )
992 .map_err(Error::from)
993 .context("failed to sign VTXO batch-tree transactions")?;
994
995 partial_sig_tree.0.extend(sigs.0);
996 }
997
998 network_client
999 .submit_tree_signatures(
1000 &e.id,
1001 cosigner_kp.public_key(),
1002 partial_sig_tree,
1003 )
1004 .await
1005 .map_err(Error::ark_server)
1006 .context("failed to submit VTXO batch-tree signatures")?;
1007 }
1008 }
1009 StreamEvent::TreeNoncesAggregated(e) => {
1010 tracing::debug!(batch_id = e.id, "Batch combined nonces generated");
1011 }
1012 StreamEvent::BatchFinalization(e) => {
1013 if step != Step::BatchSigningStarted {
1014 continue;
1015 }
1016
1017 tracing::debug!(
1018 commitment_txid = %e.commitment_tx.unsigned_tx.compute_txid(),
1019 "Batch finalization started (delegate)"
1020 );
1021
1022 let chunks = connectors_graph_chunks.take().ok_or(Error::ark_server(
1023 "received batch finalization event without connectors",
1024 ))?;
1025
1026 if chunks.is_empty() {
1027 tracing::debug!(batch_id = e.id, "No delegated forfeit transactions");
1028 } else {
1029 let connectors_graph = TxGraph::new(chunks)
1030 .map_err(Error::from)
1031 .context(
1032 "failed to build connectors graph before completing forfeit TXs",
1033 )?;
1034
1035 tracing::debug!(
1036 batch_id = e.id,
1037 "Completing delegated forfeit transactions"
1038 );
1039
1040 let signed_forfeit_psbts = complete_delegate_forfeit_txs(
1041 &delegate.forfeit_psbts,
1042 &connectors_graph.leaves(),
1043 )?;
1044
1045 network_client
1046 .submit_signed_forfeit_txs(signed_forfeit_psbts, None)
1047 .await?;
1048 }
1049
1050 step = step.next();
1051 }
1052 StreamEvent::BatchFinalized(e) => {
1053 if step != Step::Finalized {
1054 continue;
1055 }
1056
1057 let commitment_txid = e.commitment_txid;
1058
1059 tracing::info!(batch_id = e.id, %commitment_txid, "Delegated batch finalized");
1060
1061 return Ok(commitment_txid);
1062 }
1063 StreamEvent::BatchFailed(ref e) => {
1064 if Some(&e.id) == batch_id.as_ref() {
1065 return Err(Error::ark_server(format!(
1066 "batch failed {}: {}",
1067 e.id, e.reason
1068 )));
1069 }
1070
1071 tracing::debug!("Unrelated batch failed: {e:?}");
1072 }
1073 StreamEvent::Heartbeat => {}
1074 StreamEvent::StreamStarted(_) => {}
1075 },
1076 Some(Err(e)) => {
1077 tracing::error!("Got error from event stream");
1078
1079 return Err(Error::ark_server(e));
1080 }
1081 None => {
1082 return Err(Error::ark_server("dropped batch event stream"));
1083 }
1084 }
1085 }
1086 }
1087
1088 pub(crate) async fn fetch_commitment_transaction_inputs(
1091 &self,
1092 now: i64,
1093 ) -> Result<(Vec<batch::OnChainInput>, Vec<intent::Input>, Amount), Error> {
1094 let now = u64::try_from(now).map_err(|_| Error::ad_hoc("negative timestamp"))?;
1095
1096 let boarding_outputs = self.inner.wallet.get_boarding_outputs()?;
1098
1099 let mut boarding_inputs: Vec<batch::OnChainInput> = Vec::new();
1100 let mut total_amount = Amount::ZERO;
1101
1102 let mut seen_outpoints = HashSet::new();
1104
1105 let server_info = self.server_info()?;
1108
1109 for boarding_output in boarding_outputs {
1111 let outpoints = timeout_op(
1112 self.inner.timeout,
1113 self.blockchain().find_outpoints(boarding_output.address()),
1114 )
1115 .await
1116 .context("failed to find outpoints")??;
1117
1118 for o in outpoints.iter() {
1119 if let ExplorerUtxo {
1120 outpoint,
1121 amount,
1122 confirmation_blocktime: Some(confirmation_blocktime),
1123 confirmations,
1124 is_spent: false,
1125 } = o
1126 {
1127 if seen_outpoints.contains(outpoint) {
1129 continue;
1130 }
1131
1132 if server_info
1136 .signer_requires_recovery_at(boarding_output.server_pk(), now as i64)
1137 {
1138 continue;
1139 }
1140
1141 if !boarding_output.can_be_claimed_unilaterally_by_owner(
1143 std::time::Duration::from_secs(now),
1144 std::time::Duration::from_secs(*confirmation_blocktime),
1145 *confirmations,
1146 ) {
1147 seen_outpoints.insert(*outpoint);
1149
1150 boarding_inputs.push(batch::OnChainInput::new(
1151 boarding_output.clone(),
1152 *amount,
1153 *outpoint,
1154 ));
1155 total_amount += *amount;
1156 }
1157 }
1158 }
1159 }
1160
1161 let (vtxo_list, script_pubkey_to_vtxo_map) = self.list_vtxos().await?;
1162 let settleable_vtxos: Vec<_> = vtxo_list
1166 .batch_settleable_at(&server_info, now as i64, |script| {
1167 script_pubkey_to_vtxo_map
1168 .get(script)
1169 .map(|vtxo| vtxo.server_pk())
1170 })
1171 .collect();
1172
1173 total_amount += settleable_vtxos
1174 .iter()
1175 .fold(Amount::ZERO, |acc, vtxo| acc + vtxo.amount);
1176
1177 let vtxo_inputs = settleable_vtxos
1178 .into_iter()
1179 .map(|virtual_tx_outpoint| {
1180 let vtxo = script_pubkey_to_vtxo_map
1181 .get(&virtual_tx_outpoint.script)
1182 .ok_or_else(|| {
1183 ark_core::Error::ad_hoc(format!(
1184 "missing VTXO for script pubkey: {}",
1185 virtual_tx_outpoint.script
1186 ))
1187 })?;
1188 let spend_info = vtxo.forfeit_spend_info()?;
1189
1190 Ok(intent::Input::new(
1191 virtual_tx_outpoint.outpoint,
1192 vtxo.exit_delay(),
1193 None,
1194 TxOut {
1195 value: virtual_tx_outpoint.amount,
1196 script_pubkey: vtxo.script_pubkey(),
1197 },
1198 vtxo.tapscripts(),
1199 spend_info,
1200 false,
1201 virtual_tx_outpoint.is_swept,
1202 virtual_tx_outpoint.assets.clone(),
1203 ))
1204 })
1205 .collect::<Result<Vec<_>, ark_core::Error>>()?;
1206
1207 Ok((boarding_inputs, vtxo_inputs, total_amount))
1208 }
1209
1210 pub(crate) fn prepare_intent<R>(
1215 &self,
1216 rng: &mut R,
1217 onchain_inputs: Vec<batch::OnChainInput>,
1218 vtxo_inputs: Vec<intent::Input>,
1219 output_type: BatchOutputType,
1220 intent_kind: PrepareIntentKind,
1221 ) -> Result<PreparedIntent, Error>
1222 where
1223 R: Rng + CryptoRng,
1224 {
1225 if onchain_inputs.is_empty() && vtxo_inputs.is_empty() {
1226 return Err(Error::ad_hoc("cannot prepare intent without inputs"));
1227 }
1228
1229 let cosigner_keypair = Keypair::new(self.secp(), rng);
1231
1232 let vtxo_input_outpoints = vtxo_inputs.iter().map(|i| i.outpoint()).collect::<Vec<_>>();
1233
1234 let inputs = {
1235 let boarding_inputs = onchain_inputs.clone().into_iter().map(|o| {
1236 intent::Input::new(
1237 o.outpoint(),
1238 o.boarding_output().exit_delay(),
1239 None,
1240 TxOut {
1241 value: o.amount(),
1242 script_pubkey: o.boarding_output().script_pubkey(),
1243 },
1244 o.boarding_output().tapscripts(),
1245 o.boarding_output().forfeit_spend_info(),
1246 true,
1247 false,
1248 Vec::new(),
1249 )
1250 });
1251
1252 boarding_inputs
1253 .chain(vtxo_inputs.clone())
1254 .collect::<Vec<_>>()
1255 };
1256
1257 let dust = self.server_info()?.dust;
1258
1259 let mut outputs = vec![];
1260
1261 match output_type {
1262 BatchOutputType::Board {
1263 to_address,
1264 to_amount,
1265 } => {
1266 if to_amount < dust {
1267 return Err(Error::ad_hoc(format!(
1268 "cannot settle into sub-dust VTXO: {to_amount} < {dust}"
1269 )));
1270 }
1271
1272 outputs.push(intent::Output::Offchain(TxOut {
1273 value: to_amount,
1274 script_pubkey: to_address.to_p2tr_script_pubkey(),
1275 }));
1276 }
1277 BatchOutputType::OffBoard {
1278 to_address,
1279 to_amount,
1280 change_amount,
1281 ..
1282 } if change_amount == Amount::ZERO => {
1283 outputs.push(intent::Output::Onchain(TxOut {
1284 value: to_amount,
1285 script_pubkey: to_address.script_pubkey(),
1286 }));
1287 }
1288 BatchOutputType::OffBoard {
1289 to_address,
1290 to_amount,
1291 change_address,
1292 change_amount,
1293 } => {
1294 if change_amount < dust {
1295 return Err(Error::ad_hoc(format!(
1296 "cannot settle with sub-dust change VTXO: {change_amount} < {dust}"
1297 )));
1298 }
1299
1300 outputs.push(intent::Output::Onchain(TxOut {
1301 value: to_amount,
1302 script_pubkey: to_address.script_pubkey(),
1303 }));
1304
1305 outputs.push(intent::Output::Offchain(TxOut {
1306 value: change_amount,
1307 script_pubkey: change_address.to_p2tr_script_pubkey(),
1308 }));
1309 }
1310 }
1311
1312 let cosigner_pk = cosigner_keypair.public_key();
1313
1314 let secp = Secp256k1::new();
1315
1316 let sign_for_vtxo_fn =
1317 |input: &mut psbt::Input,
1318 msg: secp256k1::Message|
1319 -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error> {
1320 match &input.witness_script {
1321 None => Err(ark_core::Error::ad_hoc(
1322 "Missing witness script in psbt::Input when signing intent",
1323 )),
1324 Some(script) => {
1325 let pks = extract_checksig_pubkeys(script);
1326 let mut res = vec![];
1327 for pk in pks {
1328 if let Ok(keypair) = self.keypair_by_pk(&pk) {
1329 let sig = secp.sign_schnorr_no_aux_rand(&msg, &keypair);
1330 res.push((sig, keypair.public_key().into()))
1331 }
1332 }
1333 Ok(res)
1334 }
1335 }
1336 };
1337
1338 let sign_for_onchain_fn =
1339 |input: &mut psbt::Input,
1340 msg: secp256k1::Message|
1341 -> Result<(schnorr::Signature, XOnlyPublicKey), ark_core::Error> {
1342 let onchain_input = onchain_inputs
1343 .iter()
1344 .find(|o| {
1345 Some(o.boarding_output().script_pubkey())
1346 == input.witness_utxo.clone().map(|w| w.script_pubkey)
1347 })
1348 .ok_or_else(|| {
1349 ark_core::Error::ad_hoc(
1350 "could not find signing key for onchain input: {input:?}",
1351 )
1352 })?;
1353
1354 let owner_pk = onchain_input.boarding_output().owner_pk();
1355 let sig = self
1356 .inner
1357 .wallet
1358 .sign_for_pk(&owner_pk, &msg)
1359 .map_err(|e| ark_core::Error::ad_hoc(e.to_string()))?;
1360
1361 Ok((sig, owner_pk))
1362 };
1363
1364 let now = std::time::SystemTime::now()
1365 .duration_since(std::time::UNIX_EPOCH)
1366 .map_err(|e| Error::ad_hoc(e.to_string()))
1367 .context("failed to compute now timestamp")?;
1368 let now = now.as_secs();
1369 let expire_at = now + (2 * 60);
1370
1371 if let Some(packet) = create_asset_preservation_packet(&inputs, &outputs)? {
1372 outputs.push(intent::Output::AssetPacket(packet.to_txout()));
1373 }
1374
1375 let mut onchain_output_indexes = Vec::new();
1376 for (i, output) in outputs.iter().enumerate() {
1377 if matches!(output, intent::Output::Onchain(_)) {
1378 onchain_output_indexes.push(i);
1379 }
1380 }
1381
1382 let message = match intent_kind {
1383 PrepareIntentKind::EstimateFee => intent::IntentMessage::EstimateIntentFee {
1384 onchain_output_indexes,
1385 valid_at: now,
1386 expire_at,
1387 own_cosigner_pks: vec![cosigner_pk],
1388 },
1389 PrepareIntentKind::Register => intent::IntentMessage::Register {
1390 onchain_output_indexes,
1391 valid_at: now,
1392 expire_at,
1393 own_cosigner_pks: vec![cosigner_pk],
1394 },
1395 };
1396
1397 let intent = intent::make_intent(
1398 sign_for_vtxo_fn,
1399 sign_for_onchain_fn,
1400 inputs,
1401 outputs.clone(),
1402 message,
1403 )?;
1404
1405 Ok(PreparedIntent {
1406 intent,
1407 cosigner_keypair,
1408 vtxo_input_outpoints,
1409 outputs,
1410 onchain_inputs,
1411 vtxo_inputs,
1412 })
1413 }
1414
1415 pub(crate) async fn join_next_batch<R>(
1416 &self,
1417 rng: &mut R,
1418 onchain_inputs: Vec<batch::OnChainInput>,
1419 vtxo_inputs: Vec<intent::Input>,
1420 output_type: BatchOutputType,
1421 ) -> Result<Txid, Error>
1422 where
1423 R: Rng + CryptoRng,
1424 {
1425 let prepared = self.prepare_intent(
1426 rng,
1427 onchain_inputs,
1428 vtxo_inputs,
1429 output_type,
1430 PrepareIntentKind::Register,
1431 )?;
1432
1433 let PreparedIntent {
1434 intent,
1435 cosigner_keypair,
1436 vtxo_input_outpoints,
1437 outputs,
1438 onchain_inputs,
1439 vtxo_inputs,
1440 } = prepared;
1441
1442 let onchain_input_outpoints = onchain_inputs
1443 .iter()
1444 .map(|i| i.outpoint())
1445 .collect::<Vec<_>>();
1446
1447 let server_info = &self.server_info()?;
1448
1449 let own_cosigner_kps = [cosigner_keypair];
1450 let own_cosigner_pks = own_cosigner_kps
1451 .iter()
1452 .map(|k| k.public_key())
1453 .collect::<Vec<_>>();
1454
1455 let secp = Secp256k1::new();
1456
1457 let mut step = Step::Start;
1458
1459 let intent_id = timeout_op(
1460 self.inner.timeout,
1461 self.network_client().register_intent(intent),
1462 )
1463 .await
1464 .context("failed to register intent")??;
1465
1466 tracing::debug!(
1467 intent_id,
1468 ?onchain_input_outpoints,
1469 ?vtxo_input_outpoints,
1470 ?outputs,
1471 "Registered intent for batch"
1472 );
1473
1474 let network_client = self.network_client();
1475
1476 let mut batch_id: Option<String> = None;
1477
1478 let topics = vtxo_input_outpoints
1479 .iter()
1480 .map(ToString::to_string)
1481 .chain(
1482 own_cosigner_pks
1483 .iter()
1484 .map(|pk| pk.serialize().to_lower_hex_string()),
1485 )
1486 .collect();
1487
1488 let mut stream = network_client.get_event_stream(topics).await?;
1489
1490 let (ark_forfeit_pk, _) = server_info.forfeit_pk.x_only_public_key();
1491
1492 let mut unsigned_commitment_tx = None;
1493
1494 let mut vtxo_batch_tree_graph_chunks = Some(Vec::new());
1495 let mut vtxo_batch_tree_graph: Option<TxGraph> = None;
1496
1497 let mut connectors_graph_chunks = Some(Vec::new());
1498 let mut batch_expiry = None;
1499
1500 let mut agg_nonce_pks = HashMap::new();
1501
1502 let mut our_nonce_trees: Option<HashMap<Keypair, NonceKps>> = None;
1503 loop {
1504 match timeout_op(self.inner.timeout, stream.next())
1505 .await
1506 .context("timed out waiting for batch event")?
1507 {
1508 Some(Ok(event)) => match event {
1509 StreamEvent::BatchStarted(e) => {
1510 if step != Step::Start {
1511 continue;
1512 }
1513
1514 let hash = sha256::Hash::hash(intent_id.as_bytes());
1515 let hash = hash.as_byte_array().to_vec().to_lower_hex_string();
1516
1517 if e.intent_id_hashes.iter().any(|h| h == &hash) {
1518 timeout_op(
1519 self.inner.timeout,
1520 self.network_client()
1521 .confirm_registration(intent_id.clone()),
1522 )
1523 .await
1524 .context("failed to confirm intent registration")??;
1525
1526 tracing::info!(batch_id = e.id, intent_id, "Intent ID found for batch");
1527
1528 batch_id = Some(e.id);
1529
1530 step = match outputs
1533 .iter()
1534 .any(|o| matches!(o, intent::Output::Offchain(_)))
1535 {
1536 true => Step::BatchStarted,
1537 false => Step::BatchSigningStarted,
1538 };
1539
1540 batch_expiry = Some(e.batch_expiry);
1541 } else {
1542 tracing::debug!(
1543 batch_id = e.id,
1544 intent_id,
1545 "Intent ID not found for batch"
1546 );
1547 }
1548 }
1549 StreamEvent::TreeTx(e) => {
1550 if step != Step::BatchStarted && step != Step::BatchSigningStarted {
1551 continue;
1552 }
1553
1554 match e.batch_tree_event_type {
1555 BatchTreeEventType::Vtxo => {
1556 match &mut vtxo_batch_tree_graph_chunks {
1557 Some(vtxo_batch_tree_graph_chunks) => {
1558 tracing::debug!("Got new VTXO batch-tree graph chunk");
1559
1560 vtxo_batch_tree_graph_chunks.push(e.tx_graph_chunk)
1561 }
1562 None => {
1563 return Err(Error::ark_server(
1564 "received unexpected VTXO batch-tree graph chunk",
1565 ));
1566 }
1567 };
1568 }
1569 BatchTreeEventType::Connector => {
1570 match connectors_graph_chunks {
1571 Some(ref mut connectors_graph_chunks) => {
1572 tracing::debug!("Got new connectors graph chunk");
1573
1574 connectors_graph_chunks.push(e.tx_graph_chunk)
1575 }
1576 None => {
1577 return Err(Error::ark_server(
1578 "received unexpected connectors graph chunk",
1579 ));
1580 }
1581 };
1582 }
1583 }
1584 }
1585 StreamEvent::TreeSignature(e) => {
1586 if step != Step::BatchSigningStarted {
1587 continue;
1588 }
1589
1590 match e.batch_tree_event_type {
1591 BatchTreeEventType::Vtxo => {
1592 match vtxo_batch_tree_graph {
1593 Some(ref mut vtxo_batch_tree_graph) => {
1594 vtxo_batch_tree_graph.apply(|graph| {
1595 if graph.root().unsigned_tx.compute_txid() != e.txid {
1596 Ok(true)
1597 } else {
1598 graph.set_signature(e.signature);
1599
1600 Ok(false)
1601 }
1602 })?;
1603 }
1604 None => {
1605 return Err(Error::ark_server(
1606 "received batch-tree signature without transaction graph",
1607 ));
1608 }
1609 };
1610 }
1611 BatchTreeEventType::Connector => {
1612 return Err(Error::ark_server(
1613 "received batch-tree signature for connector tree",
1614 ));
1615 }
1616 }
1617 }
1618 StreamEvent::TreeSigningStarted(e) => {
1619 if step != Step::BatchStarted {
1620 continue;
1621 }
1622
1623 let chunks = vtxo_batch_tree_graph_chunks.take().ok_or(Error::ark_server(
1624 "received batch-tree signing started event without VTXO batch-tree graph chunks",
1625 ))?;
1626 vtxo_batch_tree_graph =
1627 Some(TxGraph::new(chunks).map_err(Error::from).context(
1628 "failed to build VTXO batch-tree graph before generating nonces",
1629 )?);
1630
1631 tracing::info!(batch_id = e.id, "Batch signing started");
1632
1633 for own_cosigner_pk in own_cosigner_pks.iter() {
1634 if !&e.cosigners_pubkeys.iter().any(|p| p == own_cosigner_pk) {
1635 return Err(Error::ark_server(format!(
1636 "own cosigner PK is not present in cosigner PKs: {own_cosigner_pk}"
1637 )));
1638 }
1639 }
1640
1641 let mut our_nonce_tree_map = HashMap::new();
1643 for own_cosigner_kp in own_cosigner_kps {
1644 let own_cosigner_pk = own_cosigner_kp.public_key();
1645 let nonce_tree = generate_nonce_tree(
1646 rng,
1647 vtxo_batch_tree_graph
1648 .as_ref()
1649 .expect("VTXO batch-tree graph"),
1650 own_cosigner_pk,
1651 &e.unsigned_commitment_tx,
1652 )
1653 .map_err(Error::from)
1654 .context("failed to generate VTXO nonce tree")?;
1655
1656 tracing::info!(
1657 cosigner_pk = %own_cosigner_pk,
1658 "Submitting nonce tree for cosigner PK"
1659 );
1660
1661 network_client
1662 .submit_tree_nonces(
1663 &e.id,
1664 own_cosigner_pk,
1665 nonce_tree.to_nonce_pks(),
1666 )
1667 .await
1668 .map_err(Error::ark_server)
1669 .context("failed to submit VTXO nonce tree")?;
1670
1671 our_nonce_tree_map.insert(own_cosigner_kp, nonce_tree);
1672 }
1673
1674 unsigned_commitment_tx = Some(e.unsigned_commitment_tx);
1675 our_nonce_trees = Some(our_nonce_tree_map);
1676
1677 step = step.next();
1678 }
1679 StreamEvent::TreeNonces(e) => {
1680 if step != Step::BatchSigningStarted {
1681 continue;
1682 }
1683
1684 let tree_tx_nonce_pks = e.nonces;
1685
1686 let cosigner_pk = match tree_tx_nonce_pks.0.iter().find(|(pk, _)| {
1687 own_cosigner_pks
1688 .iter()
1689 .any(|p| &&p.x_only_public_key().0 == pk)
1690 }) {
1691 Some((pk, _)) => *pk,
1692 None => {
1693 tracing::debug!(
1694 batch_id = e.id,
1695 txid = %e.txid,
1696 "Received irrelevant TreeNonces event"
1697 );
1698
1699 continue;
1700 }
1701 };
1702
1703 tracing::debug!(
1704 batch_id = e.id,
1705 txid = %e.txid,
1706 %cosigner_pk,
1707 "Received TreeNonces event"
1708 );
1709
1710 let agg_nonce_pk = aggregate_nonces(tree_tx_nonce_pks);
1711
1712 agg_nonce_pks.insert(e.txid, agg_nonce_pk);
1713
1714 if vtxo_batch_tree_graph.is_none() {
1715 let chunks = vtxo_batch_tree_graph_chunks.take().ok_or(Error::ark_server(
1716 "received batch-tree nonces event without VTXO batch-tree graph chunks",
1717 ))?;
1718 vtxo_batch_tree_graph = Some(
1719 TxGraph::new(chunks)
1720 .map_err(Error::from)
1721 .context("failed to build VTXO batch-tree graph before batch-tree signing")?,
1722 );
1723 }
1724 let vtxo_batch_tree_graph_ref =
1725 vtxo_batch_tree_graph.as_ref().expect("just populated");
1726
1727 if agg_nonce_pks.len() == vtxo_batch_tree_graph_ref.nb_of_nodes() {
1730 let cosigner_kp = own_cosigner_kps
1731 .iter()
1732 .find(|kp| kp.public_key().x_only_public_key().0 == cosigner_pk)
1733 .ok_or_else(|| {
1734 Error::ad_hoc("no cosigner keypair to sign for own PK")
1735 })?;
1736
1737 let our_nonce_trees = our_nonce_trees.as_mut().ok_or(
1738 Error::ark_server("missing nonce trees during batch protocol"),
1739 )?;
1740
1741 let our_nonce_tree =
1742 our_nonce_trees
1743 .get_mut(cosigner_kp)
1744 .ok_or(Error::ark_server(
1745 "missing nonce tree during batch protocol",
1746 ))?;
1747
1748 let unsigned_commitment_tx = unsigned_commitment_tx
1749 .as_ref()
1750 .ok_or_else(|| Error::ad_hoc("missing commitment TX"))?;
1751
1752 let batch_expiry = batch_expiry
1753 .ok_or_else(|| Error::ad_hoc("missing batch expiry"))?;
1754
1755 let mut partial_sig_tree = PartialSigTree::default();
1756 for (txid, _) in vtxo_batch_tree_graph_ref.as_map() {
1757 let agg_nonce_pk = agg_nonce_pks.get(&txid).ok_or_else(|| {
1758 Error::ad_hoc(format!(
1759 "missing aggregated nonce PK for TX {txid}"
1760 ))
1761 })?;
1762
1763 let sigs = sign_batch_tree_tx(
1764 txid,
1765 batch_expiry,
1766 ark_forfeit_pk,
1767 cosigner_kp,
1768 *agg_nonce_pk,
1769 vtxo_batch_tree_graph_ref,
1770 unsigned_commitment_tx,
1771 our_nonce_tree,
1772 )
1773 .map_err(Error::from)
1774 .context("failed to sign VTXO batch-tree transactions")?;
1775
1776 partial_sig_tree.0.extend(sigs.0);
1777 }
1778
1779 network_client
1780 .submit_tree_signatures(
1781 &e.id,
1782 cosigner_kp.public_key(),
1783 partial_sig_tree,
1784 )
1785 .await
1786 .map_err(Error::ark_server)
1787 .context("failed to submit VTXO batch-tree signatures")?;
1788 }
1789 }
1790 StreamEvent::TreeNoncesAggregated(e) => {
1791 tracing::debug!(batch_id = e.id, "Batch combined nonces generated");
1792 }
1793 StreamEvent::BatchFinalization(e) => {
1794 if step != Step::BatchSigningStarted {
1795 continue;
1796 }
1797
1798 tracing::debug!(
1799 commitment_txid = %e.commitment_tx.unsigned_tx.compute_txid(),
1800 "Batch finalization started"
1801 );
1802
1803 let signed_forfeit_psbts = if !vtxo_inputs.is_empty() {
1804 let chunks =
1805 connectors_graph_chunks.take().ok_or(Error::ark_server(
1806 "received batch finalization event without connectors",
1807 ))?;
1808
1809 if chunks.is_empty() {
1810 tracing::debug!(batch_id = e.id, "No forfeit transactions");
1811
1812 Vec::new()
1813 } else {
1814 let connectors_graph = TxGraph::new(chunks)
1815 .map_err(Error::from)
1816 .context(
1817 "failed to build connectors graph before signing forfeit TXs",
1818 )?;
1819
1820 tracing::debug!(batch_id = e.id, "Batch finalization started");
1821
1822 create_and_sign_forfeit_txs(
1823 |input: &mut psbt::Input, msg: secp256k1::Message| match &input
1824 .witness_script
1825 {
1826 None => Err(ark_core::Error::ad_hoc(
1827 "Missing witness script in psbt::Input when signing forfeit",
1828 )),
1829 Some(script) => {
1830 let pks = extract_checksig_pubkeys(script);
1831 let mut res = vec![];
1832 for pk in pks {
1833 if let Ok(keypair) =
1834 self.keypair_by_pk(&pk) {
1835 let sig =
1836 secp.sign_schnorr_no_aux_rand(&msg, &keypair);
1837 res.push((sig, keypair.public_key().into()))
1838 }
1839 }
1840 Ok(res)
1841 }
1842 },
1843 vtxo_inputs.as_slice(),
1844 &connectors_graph.leaves(),
1845 &server_info.forfeit_address,
1846 server_info.dust,
1847 )
1848 .map_err(Error::from)?
1849 }
1850 } else {
1851 Vec::new()
1852 };
1853
1854 let commitment_psbt = if onchain_inputs.is_empty() {
1855 None
1856 } else {
1857 let mut commitment_psbt = e.commitment_tx;
1858
1859 let sign_for_pk_fn = |pk: &XOnlyPublicKey,
1860 msg: &secp256k1::Message|
1861 -> Result<
1862 schnorr::Signature,
1863 ark_core::Error,
1864 > {
1865 self.inner
1866 .wallet
1867 .sign_for_pk(pk, msg)
1868 .map_err(|e| ark_core::Error::ad_hoc(e.to_string()))
1869 };
1870
1871 sign_commitment_psbt(
1872 sign_for_pk_fn,
1873 &mut commitment_psbt,
1874 &onchain_inputs,
1875 )
1876 .map_err(Error::from)?;
1877
1878 Some(commitment_psbt)
1879 };
1880
1881 if !signed_forfeit_psbts.is_empty() || commitment_psbt.is_some() {
1882 network_client
1883 .submit_signed_forfeit_txs(signed_forfeit_psbts, commitment_psbt)
1884 .await?;
1885 }
1886
1887 step = step.next();
1888 }
1889 StreamEvent::BatchFinalized(e) => {
1890 if step != Step::Finalized {
1891 continue;
1892 }
1893
1894 let commitment_txid = e.commitment_txid;
1895
1896 tracing::info!(batch_id = e.id, %commitment_txid, "Batch finalized");
1897
1898 return Ok(commitment_txid);
1899 }
1900 StreamEvent::BatchFailed(ref e) => {
1901 if Some(&e.id) == batch_id.as_ref() {
1902 return Err(Error::ark_server(format!(
1903 "batch failed {}: {}",
1904 e.id, e.reason
1905 )));
1906 }
1907
1908 tracing::debug!("Unrelated batch failed: {e:?}");
1909 }
1910 StreamEvent::Heartbeat => {}
1911 StreamEvent::StreamStarted(_) => {}
1912 },
1913 Some(Err(e)) => {
1914 tracing::error!("Got error from event stream");
1915
1916 return Err(Error::ark_server(e));
1917 }
1918 None => {
1919 return Err(Error::ark_server("dropped batch event stream"));
1920 }
1921 }
1922 }
1923
1924 #[derive(Debug, PartialEq, Eq)]
1925 enum Step {
1926 Start,
1927 BatchStarted,
1928 BatchSigningStarted,
1929 Finalized,
1930 }
1931
1932 impl Step {
1933 fn next(&self) -> Step {
1934 match self {
1935 Step::Start => Step::BatchStarted,
1936 Step::BatchStarted => Step::BatchSigningStarted,
1937 Step::BatchSigningStarted => Step::Finalized,
1938 Step::Finalized => Step::Finalized, }
1940 }
1941 }
1942 }
1943}
1944
1945#[derive(Debug, Clone)]
1946pub(crate) enum PrepareIntentKind {
1947 Register,
1948 EstimateFee,
1949}
1950
1951#[derive(Debug, Clone)]
1952pub(crate) enum BatchOutputType {
1953 Board {
1954 to_address: ArkAddress,
1955 to_amount: Amount,
1956 },
1957 OffBoard {
1958 to_address: Address,
1959 to_amount: Amount,
1960 change_address: ArkAddress,
1961 change_amount: Amount,
1962 },
1963}
1964
1965pub(crate) struct PreparedIntent {
1967 pub intent: intent::Intent,
1969 pub cosigner_keypair: Keypair,
1971 pub vtxo_input_outpoints: Vec<OutPoint>,
1973 pub outputs: Vec<intent::Output>,
1975 pub onchain_inputs: Vec<batch::OnChainInput>,
1977 pub vtxo_inputs: Vec<intent::Input>,
1979}