1use crate::error::ErrorContext;
2use crate::swap_storage::SwapStorage;
3use crate::utils::timeout_op;
4use crate::wallet::BoardingWallet;
5use crate::wallet::OnchainWallet;
6use crate::Blockchain;
7use crate::Client;
8use crate::Error;
9use ark_core::asset::AssetId;
10use ark_core::coin_select::select_vtxos;
11use ark_core::coin_select::select_vtxos_for_asset;
12use ark_core::coin_select::VirtualTxOutPoint;
13use ark_core::intent;
14use ark_core::script::extract_checksig_pubkeys;
15use ark_core::send::build_asset_send_transactions;
16use ark_core::send::sign_ark_transaction;
17use ark_core::send::sign_checkpoint_transaction;
18use ark_core::send::OffchainTransactions;
19use ark_core::send::SendReceiver;
20use ark_core::send::VtxoInput;
21use ark_core::server::PendingTx;
22use bitcoin::key::Secp256k1;
23use bitcoin::psbt;
24use bitcoin::secp256k1;
25use bitcoin::secp256k1::schnorr;
26use bitcoin::Amount;
27use bitcoin::OutPoint;
28use bitcoin::TxOut;
29use bitcoin::Txid;
30use bitcoin::XOnlyPublicKey;
31use std::collections::HashMap;
32use std::collections::HashSet;
33use std::time::Duration;
34
35impl<B, W, S, K> Client<B, W, S, K>
36where
37 B: Blockchain,
38 W: BoardingWallet + OnchainWallet,
39 S: SwapStorage + 'static,
40 K: crate::KeyProvider,
41{
42 pub async fn send(&self, receivers: Vec<SendReceiver>) -> Result<Txid, Error> {
58 let selected = self
60 .auto_select_send_inputs(&receivers)
61 .await
62 .context("failed to auto-select send inputs")?;
63
64 let txid = self
65 .send_with_selected_inputs(selected, receivers)
66 .await
67 .context("failed to send with selected inputs")?;
68
69 Ok(txid)
70 }
71
72 pub async fn send_selection(
94 &self,
95 vtxo_outpoints: &[OutPoint],
96 receivers: Vec<SendReceiver>,
97 ) -> Result<Txid, Error> {
98 let selected = self
100 .resolve_selected_send_inputs(vtxo_outpoints)
101 .await
102 .context("failed to resolve selected send inputs")?;
103
104 let txid = self
105 .send_with_selected_inputs(selected, receivers)
106 .await
107 .context("failed to send with selected inputs")?;
108
109 Ok(txid)
110 }
111
112 pub async fn continue_pending_offchain_txs(&self) -> Result<Vec<Txid>, Error> {
126 let pending_txs = self.fetch_pending_offchain_txs().await?;
127
128 if pending_txs.is_empty() {
129 return Ok(vec![]);
130 }
131
132 let mut finalized_txids = Vec::new();
133
134 for pending_tx in pending_txs {
135 let ark_txid = pending_tx.ark_txid;
136 self.sign_and_finalize_pending_tx(pending_tx).await?;
137 finalized_txids.push(ark_txid);
138 }
139
140 Ok(finalized_txids)
141 }
142
143 pub async fn list_pending_offchain_txs(&self) -> Result<Vec<PendingTx>, Error> {
152 self.fetch_pending_offchain_txs().await
153 }
154
155 #[cfg(feature = "test-utils")]
165 pub async fn submit_offchain_tx(
166 &self,
167 vtxo_inputs: Vec<VtxoInput>,
168 address: ark_core::ArkAddress,
169 amount: Amount,
170 ) -> Result<Txid, Error> {
171 let receivers = vec![SendReceiver {
172 address,
173 amount,
174 assets: Vec::new(),
175 }];
176 let pending_tx = self.build_and_submit(vtxo_inputs, receivers).await?;
177 Ok(pending_tx.ark_txid)
178 }
179
180 fn make_sign_fn(
184 &self,
185 ) -> impl FnMut(
186 &mut psbt::Input,
187 secp256k1::Message,
188 ) -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error>
189 + '_ {
190 |input, msg| {
191 let script = input
192 .witness_script
193 .as_ref()
194 .ok_or_else(|| ark_core::Error::ad_hoc("Missing witness script for psbt::Input"))?;
195 let pks = extract_checksig_pubkeys(script);
196 let secp = Secp256k1::new();
197 let mut sigs = vec![];
198 for pk in pks {
199 if let Ok(keypair) = self.keypair_by_pk(&pk) {
200 let sig = secp.sign_schnorr_no_aux_rand(&msg, &keypair);
201 sigs.push((sig, keypair.x_only_public_key().0));
202 }
203 }
204 Ok(sigs)
205 }
206 }
207
208 async fn auto_select_send_inputs(
209 &self,
210 receivers: &[SendReceiver],
211 ) -> Result<Vec<VtxoInput>, Error> {
212 let (vtxo_list, script_pubkey_to_vtxo_map) = self
213 .list_vtxos()
214 .await
215 .context("failed to get spendable VTXOs")?;
216
217 let spendable = vtxo_list
218 .spendable_offchain()
219 .map(|vtxo| VirtualTxOutPoint {
220 outpoint: vtxo.outpoint,
221 script_pubkey: vtxo.script.clone(),
222 expire_at: vtxo.expires_at,
223 amount: vtxo.amount,
224 assets: vtxo.assets.clone(),
225 })
226 .collect::<Vec<_>>();
227
228 let mut selected_outpoints = HashSet::new();
229 let mut selected = Vec::new();
230 let mut asset_changes: HashMap<AssetId, u64> = HashMap::new();
231 let mut btc_needed = Amount::ZERO;
232 let mut btc_provided = Amount::ZERO;
233
234 for receiver in receivers {
235 btc_needed += receiver.amount;
236
237 for asset in &receiver.assets {
238 let mut amount_to_select = asset.amount;
239
240 if let Some(existing_change) = asset_changes.get_mut(&asset.asset_id) {
241 if amount_to_select <= *existing_change {
242 *existing_change -= amount_to_select;
243 if *existing_change == 0 {
244 asset_changes.remove(&asset.asset_id);
245 }
246 continue;
247 }
248 amount_to_select -= *existing_change;
249 asset_changes.remove(&asset.asset_id);
250 }
251
252 let available: Vec<_> = spendable
253 .iter()
254 .filter(|v| !selected_outpoints.contains(&v.outpoint))
255 .cloned()
256 .collect();
257
258 let (asset_coins, asset_change) =
259 select_vtxos_for_asset(&available, amount_to_select, asset.asset_id)
260 .map_err(Error::from)
261 .context("failed to select coins for asset transfer")?;
262
263 for coin in &asset_coins {
264 if selected_outpoints.insert(coin.outpoint) {
265 btc_provided += coin.amount;
266
267 for carried_asset in &coin.assets {
268 if carried_asset.asset_id != asset.asset_id {
269 *asset_changes.entry(carried_asset.asset_id).or_insert(0) +=
270 carried_asset.amount;
271 }
272 }
273
274 selected.push(coin.clone());
275 }
276 }
277
278 if asset_change > 0 {
279 *asset_changes.entry(asset.asset_id).or_insert(0) += asset_change;
280 }
281 }
282 }
283
284 if !asset_changes.is_empty() {
285 btc_needed += self.server_info.dust;
286 }
287
288 let btc_shortfall = btc_needed.checked_sub(btc_provided).unwrap_or(Amount::ZERO);
289
290 if btc_shortfall > Amount::ZERO {
291 let available: Vec<_> = spendable
292 .iter()
293 .filter(|v| !selected_outpoints.contains(&v.outpoint))
294 .cloned()
295 .collect();
296
297 let btc_coins = select_vtxos(available, btc_shortfall, self.server_info.dust, true)
298 .map_err(Error::from)
299 .context("failed to select BTC coins for asset transfer")?;
300
301 for coin in &btc_coins {
302 if selected_outpoints.insert(coin.outpoint) {
303 for carried_asset in &coin.assets {
304 *asset_changes.entry(carried_asset.asset_id).or_insert(0) +=
305 carried_asset.amount;
306 }
307 selected.push(coin.clone());
308 }
309 }
310 }
311
312 let inputs = self.build_vtxo_inputs(selected.clone(), &script_pubkey_to_vtxo_map)?;
313
314 Ok(inputs)
315 }
316
317 async fn resolve_selected_send_inputs(
318 &self,
319 vtxo_outpoints: &[OutPoint],
320 ) -> Result<Vec<VtxoInput>, Error> {
321 let requested_outpoints: HashSet<_> = vtxo_outpoints.iter().copied().collect();
322
323 let (vtxo_list, script_pubkey_to_vtxo_map) = self
324 .list_vtxos_for_outpoints(vtxo_outpoints.to_vec())
325 .await
326 .context("failed to get VTXO list")?;
327
328 let selected: Vec<_> = vtxo_list
329 .spendable_offchain()
330 .filter(|vtxo| requested_outpoints.contains(&vtxo.outpoint))
331 .map(|vtxo| VirtualTxOutPoint {
332 outpoint: vtxo.outpoint,
333 script_pubkey: vtxo.script.clone(),
334 expire_at: vtxo.expires_at,
335 amount: vtxo.amount,
336 assets: vtxo.assets.clone(),
337 })
338 .collect();
339
340 if selected.is_empty() {
341 return Err(Error::ad_hoc("no matching VTXO outpoints found"));
342 }
343
344 if selected.len() != requested_outpoints.len() {
345 let found_outpoints: HashSet<_> = selected.iter().map(|v| v.outpoint).collect();
346 let missing_outpoints = requested_outpoints
347 .difference(&found_outpoints)
348 .map(ToString::to_string)
349 .collect::<Vec<_>>();
350
351 return Err(Error::ad_hoc(format!(
352 "some selected VTXO outpoints were not found or not spendable: {}",
353 missing_outpoints.join(", ")
354 )));
355 }
356
357 let inputs = self.build_vtxo_inputs(selected, &script_pubkey_to_vtxo_map)?;
358
359 Ok(inputs)
360 }
361
362 pub(crate) fn build_vtxo_inputs(
364 &self,
365 selected: Vec<VirtualTxOutPoint>,
366 script_pubkey_to_vtxo_map: &HashMap<bitcoin::ScriptBuf, ark_core::Vtxo>,
367 ) -> Result<Vec<VtxoInput>, Error> {
368 selected
369 .into_iter()
370 .map(|vtp| {
371 let vtxo = script_pubkey_to_vtxo_map
372 .get(&vtp.script_pubkey)
373 .ok_or_else(|| {
374 ark_core::Error::ad_hoc(format!(
375 "missing VTXO for script pubkey: {}",
376 vtp.script_pubkey
377 ))
378 })?;
379
380 let (forfeit_script, control_block) = vtxo
381 .forfeit_spend_info()
382 .context("failed to get forfeit spend info")?;
383
384 Ok(VtxoInput::new(
385 forfeit_script,
386 None,
387 control_block,
388 vtxo.tapscripts(),
389 vtxo.script_pubkey(),
390 vtp.amount,
391 vtp.outpoint,
392 vtp.assets,
393 ))
394 })
395 .collect()
396 }
397
398 fn validate_selected_inputs_cover_receivers(
399 vtxo_inputs: &[VtxoInput],
400 receivers: &[SendReceiver],
401 dust: Amount,
402 ) -> Result<(), Error> {
403 let selected_amount = vtxo_inputs
404 .iter()
405 .fold(Amount::ZERO, |acc, v| acc + v.amount());
406 let requested_amount = receivers.iter().fold(Amount::ZERO, |acc, r| acc + r.amount);
407
408 let mut selected_assets = HashMap::<AssetId, u64>::new();
409 for vtxo_input in vtxo_inputs {
410 for asset in vtxo_input.assets() {
411 *selected_assets.entry(asset.asset_id).or_insert(0) = selected_assets
412 .get(&asset.asset_id)
413 .copied()
414 .unwrap_or(0)
415 .checked_add(asset.amount)
416 .ok_or_else(|| Error::ad_hoc("selected asset amount overflow"))?;
417 }
418 }
419
420 let mut requested_assets = HashMap::<AssetId, u64>::new();
421 for receiver in receivers {
422 for asset in &receiver.assets {
423 *requested_assets.entry(asset.asset_id).or_insert(0) = requested_assets
424 .get(&asset.asset_id)
425 .copied()
426 .unwrap_or(0)
427 .checked_add(asset.amount)
428 .ok_or_else(|| Error::ad_hoc("requested asset amount overflow"))?;
429 }
430 }
431
432 for (asset_id, requested_asset_amount) in &requested_assets {
433 let selected_asset_amount = selected_assets.get(asset_id).copied().unwrap_or(0);
434 if selected_asset_amount < *requested_asset_amount {
435 return Err(Error::coin_select(format!(
436 "insufficient asset amount for {}: {} < {}",
437 asset_id, selected_asset_amount, requested_asset_amount
438 )));
439 }
440 }
441
442 let mut has_asset_change = false;
443 for (asset_id, selected_asset_amount) in &selected_assets {
444 let requested_asset_amount = requested_assets.get(asset_id).copied().unwrap_or(0);
445
446 if *selected_asset_amount < requested_asset_amount {
447 return Err(Error::coin_select(format!(
448 "insufficient asset amount for {}: {} < {}",
449 asset_id, selected_asset_amount, requested_asset_amount
450 )));
451 }
452
453 if *selected_asset_amount > requested_asset_amount {
454 has_asset_change = true;
455 }
456 }
457
458 let required_amount = match has_asset_change {
459 true => requested_amount
460 .checked_add(dust)
461 .ok_or_else(|| Error::ad_hoc("required BTC amount overflow"))?,
462 false => requested_amount,
463 };
464
465 if selected_amount < required_amount {
466 return Err(Error::coin_select(format!(
467 "insufficient VTXO amount: {} < {}",
468 selected_amount, required_amount
469 )));
470 }
471
472 Ok(())
473 }
474
475 async fn send_with_selected_inputs(
476 &self,
477 vtxo_inputs: Vec<VtxoInput>,
478 receivers: Vec<SendReceiver>,
479 ) -> Result<Txid, Error> {
480 Self::validate_selected_inputs_cover_receivers(
481 &vtxo_inputs,
482 &receivers,
483 self.server_info.dust,
484 )?;
485
486 let pending_tx = self.build_and_submit(vtxo_inputs, receivers).await?;
487 let ark_txid = pending_tx.ark_txid;
488
489 self.sign_and_finalize_pending_tx(pending_tx).await?;
490
491 Ok(ark_txid)
492 }
493
494 pub(crate) async fn submit_built_offchain_send(
499 &self,
500 mut ark_tx: bitcoin::Psbt,
501 checkpoint_txs: Vec<bitcoin::Psbt>,
502 used_pk: XOnlyPublicKey,
503 ) -> Result<PendingTx, Error> {
504 for i in 0..checkpoint_txs.len() {
505 sign_ark_transaction(self.make_sign_fn(), &mut ark_tx, i)?;
506 }
507
508 let res = self
509 .network_client()
510 .submit_offchain_transaction_request(ark_tx, checkpoint_txs)
511 .await
512 .map_err(Error::ark_server)
513 .context("failed to submit offchain transaction request")?;
514
515 let pending_tx = PendingTx {
516 ark_txid: res.signed_ark_tx.unsigned_tx.compute_txid(),
517 signed_ark_tx: res.signed_ark_tx,
518 signed_checkpoint_txs: res.signed_checkpoint_txs,
519 };
520
521 if let Err(err) = self.inner.key_provider.mark_as_used(&used_pk) {
522 tracing::warn!(
523 "Failed updating keypair cache for used change address: {:?}",
524 err
525 );
526 }
527
528 Ok(pending_tx)
529 }
530
531 async fn build_and_submit(
533 &self,
534 inputs: Vec<VtxoInput>,
535 receivers: Vec<SendReceiver>,
536 ) -> Result<PendingTx, Error> {
537 let (change_address, change_address_vtxo) = self.get_offchain_address()?;
538
539 let OffchainTransactions {
540 ark_tx,
541 checkpoint_txs,
542 } = build_asset_send_transactions(&receivers, &change_address, &inputs, &self.server_info)
543 .map_err(Error::from)
544 .context("failed to build offchain asset-send transactions")?;
545
546 self.submit_built_offchain_send(ark_tx, checkpoint_txs, change_address_vtxo.owner_pk())
547 .await
548 }
549
550 pub(crate) async fn sign_and_finalize_pending_tx(
552 &self,
553 pending_tx: PendingTx,
554 ) -> Result<(), Error> {
555 let ark_txid = pending_tx.ark_txid;
556 let mut signed_checkpoint_txs = pending_tx.signed_checkpoint_txs;
557
558 let ark_input_idx_by_cp_txid: HashMap<_, _> = pending_tx
561 .signed_ark_tx
562 .unsigned_tx
563 .input
564 .iter()
565 .enumerate()
566 .map(|(i, inp)| (inp.previous_output.txid, i))
567 .collect();
568
569 for checkpoint_psbt in signed_checkpoint_txs.iter_mut() {
570 if checkpoint_psbt.inputs[0].witness_script.is_none() {
571 let checkpoint_txid = checkpoint_psbt.unsigned_tx.compute_txid();
572 let idx = ark_input_idx_by_cp_txid
573 .get(&checkpoint_txid)
574 .ok_or_else(|| {
575 Error::ad_hoc(format!(
576 "checkpoint txid {checkpoint_txid} not found in ark tx inputs \
577 for pending tx {ark_txid}"
578 ))
579 })?;
580
581 let ws = pending_tx
582 .signed_ark_tx
583 .inputs
584 .get(*idx)
585 .and_then(|input| input.witness_script.clone())
586 .ok_or_else(|| {
587 Error::ad_hoc(format!(
588 "missing witness script on ark tx input {idx} \
589 for pending tx {ark_txid}"
590 ))
591 })?;
592
593 checkpoint_psbt.inputs[0].witness_script = Some(ws);
594 }
595
596 sign_checkpoint_transaction(self.make_sign_fn(), checkpoint_psbt)?;
597 }
598
599 self.finalize_offchain_tx(ark_txid, signed_checkpoint_txs)
600 .await
601 }
602
603 pub(crate) async fn finalize_offchain_tx(
611 &self,
612 ark_txid: Txid,
613 signed_checkpoint_txs: Vec<bitcoin::Psbt>,
614 ) -> Result<(), Error> {
615 const MAX_RETRIES: usize = 3;
616
617 let mut last_err = None;
618
619 for attempt in 0..=MAX_RETRIES {
620 if attempt > 0 {
621 let delay = Duration::from_millis(500 * (1 << (attempt - 1)));
622 tracing::warn!(
623 %ark_txid,
624 attempt,
625 ?delay,
626 "Retrying finalize after transient failure"
627 );
628 crate::utils::sleep(delay).await;
629 }
630
631 match timeout_op(
632 self.inner.timeout,
633 self.network_client()
634 .finalize_offchain_transaction(ark_txid, signed_checkpoint_txs.clone()),
635 )
636 .await
637 .context("finalize offchain transaction timed out")?
638 {
639 Ok(_) => return Ok(()),
640 Err(e) => {
641 last_err = Some(Error::ark_server(e));
642 }
643 }
644 }
645
646 Err(last_err
647 .expect("at least one attempt was made")
648 .with_context(|| {
649 format!("failed to finalize offchain transaction after {MAX_RETRIES} retries")
650 }))
651 }
652
653 async fn fetch_pending_offchain_txs(&self) -> Result<Vec<PendingTx>, Error> {
655 const MAX_INPUTS_PER_INTENT: usize = 20;
656
657 let ark_addresses = self.get_offchain_addresses()?;
658
659 let script_pubkey_to_vtxo_map: HashMap<_, _> = ark_addresses
660 .iter()
661 .map(|(a, v)| (a.to_p2tr_script_pubkey(), v.clone()))
662 .collect();
663
664 let addresses = ark_addresses.iter().map(|(a, _)| *a);
668 let request = ark_core::server::GetVtxosRequest::new_for_addresses(addresses)
669 .pending_only()
670 .map_err(Error::from)?;
671
672 let vtxos = self
673 .fetch_all_vtxos(request)
674 .await
675 .context("failed to fetch pending VTXOs")?;
676
677 tracing::debug!(num_pending_vtxos = vtxos.len(), "Fetched pending VTXOs");
678
679 if vtxos.is_empty() {
680 return Ok(vec![]);
681 }
682
683 let secp = Secp256k1::new();
684 let mut all_pending_txs = Vec::new();
685 let mut seen_ark_txids = HashSet::new();
686
687 for (batch_idx, batch) in vtxos.chunks(MAX_INPUTS_PER_INTENT).enumerate() {
689 let mut vtxo_inputs = Vec::new();
690 for virtual_tx_outpoint in batch {
691 let vtxo = match script_pubkey_to_vtxo_map.get(&virtual_tx_outpoint.script) {
692 Some(v) => v,
693 None => {
694 tracing::warn!(
695 outpoint = %virtual_tx_outpoint.outpoint,
696 script = %virtual_tx_outpoint.script,
697 "Skipping VTXO with unknown script"
698 );
699 continue;
700 }
701 };
702 let spend_info = vtxo
703 .forfeit_spend_info()
704 .context("failed to get forfeit spend info")?;
705
706 vtxo_inputs.push(intent::Input::new(
707 virtual_tx_outpoint.outpoint,
708 vtxo.exit_delay(),
709 None,
710 TxOut {
711 value: virtual_tx_outpoint.amount,
712 script_pubkey: vtxo.script_pubkey(),
713 },
714 vtxo.tapscripts(),
715 spend_info,
716 false,
717 virtual_tx_outpoint.is_swept,
718 virtual_tx_outpoint.assets.clone(),
719 ));
720 }
721
722 if vtxo_inputs.is_empty() {
723 continue;
724 }
725
726 tracing::debug!(
727 batch = batch_idx,
728 num_inputs = vtxo_inputs.len(),
729 "Querying server for pending txs"
730 );
731
732 let message = intent::IntentMessage::GetPendingTx { expire_at: 0 };
734
735 let sign_for_vtxo_fn = |input: &mut psbt::Input,
736 msg: secp256k1::Message|
737 -> Result<
738 Vec<(schnorr::Signature, XOnlyPublicKey)>,
739 ark_core::Error,
740 > {
741 match &input.witness_script {
742 None => Err(ark_core::Error::ad_hoc(
743 "Missing witness script in psbt::Input when signing get-pending-tx intent",
744 )),
745 Some(script) => {
746 let pks = extract_checksig_pubkeys(script);
747 let mut res = vec![];
748 for pk in &pks {
749 if let Ok(keypair) = self.keypair_by_pk(pk) {
750 let sig = secp.sign_schnorr_no_aux_rand(&msg, &keypair);
751 res.push((sig, keypair.x_only_public_key().0));
752 }
753 }
754 Ok(res)
755 }
756 }
757 };
758
759 let sign_for_onchain_fn =
760 |_: &mut psbt::Input,
761 _: secp256k1::Message|
762 -> Result<(schnorr::Signature, XOnlyPublicKey), ark_core::Error> {
763 Err(ark_core::Error::ad_hoc(
764 "unexpected onchain input in get-pending-tx intent",
765 ))
766 };
767
768 let get_pending_intent = intent::make_intent(
769 sign_for_vtxo_fn,
770 sign_for_onchain_fn,
771 vtxo_inputs,
772 vec![],
773 message,
774 )?;
775
776 let pending_txs = self
777 .network_client()
778 .get_pending_tx(get_pending_intent)
779 .await
780 .map_err(Error::ark_server)
781 .context("failed to get pending transactions")?;
782
783 tracing::debug!(
784 batch = batch_idx,
785 num_pending_txs = pending_txs.len(),
786 "Server response for batch"
787 );
788
789 for tx in pending_txs {
790 if seen_ark_txids.insert(tx.ark_txid) {
791 tracing::info!(
792 ark_txid = %tx.ark_txid,
793 "Found pending transaction"
794 );
795 all_pending_txs.push(tx);
796 }
797 }
798 }
799
800 tracing::info!(
801 num_pending_txs = all_pending_txs.len(),
802 "Total pending transactions found"
803 );
804
805 Ok(all_pending_txs)
806 }
807}