1use crate::error::ErrorContext;
2use crate::swap_storage::SwapStorage;
3use crate::utils::timeout_op;
4use crate::wallet::BoardingWallet;
5use crate::wallet::OnchainWallet;
6use crate::Blockchain;
7use crate::Client;
8use crate::Error;
9use ark_core::asset::AssetId;
10use ark_core::coin_select::select_vtxos;
11use ark_core::coin_select::select_vtxos_for_asset;
12use ark_core::coin_select::VirtualTxOutPoint;
13use ark_core::intent;
14use ark_core::script::extract_checksig_pubkeys;
15use ark_core::send::build_asset_send_transactions;
16use ark_core::send::sign_ark_transaction;
17use ark_core::send::sign_checkpoint_transaction;
18use ark_core::send::OffchainTransactions;
19use ark_core::send::SendReceiver;
20use ark_core::send::VtxoInput;
21use ark_core::server;
22use ark_core::server::PendingTx;
23use bitcoin::key::Secp256k1;
24use bitcoin::psbt;
25use bitcoin::secp256k1;
26use bitcoin::secp256k1::schnorr;
27use bitcoin::Amount;
28use bitcoin::OutPoint;
29use bitcoin::TxOut;
30use bitcoin::Txid;
31use bitcoin::XOnlyPublicKey;
32use std::collections::HashMap;
33use std::collections::HashSet;
34use std::time::Duration;
35
36impl<B, W, S, K> Client<B, W, S, K>
37where
38 B: Blockchain,
39 W: BoardingWallet + OnchainWallet,
40 S: SwapStorage + 'static,
41 K: crate::KeyProvider,
42{
43 pub async fn send(&self, receivers: Vec<SendReceiver>) -> Result<Txid, Error> {
59 let selected = self
61 .auto_select_send_inputs(&receivers)
62 .await
63 .context("failed to auto-select send inputs")?;
64
65 let txid = self
66 .send_with_selected_inputs(selected, receivers)
67 .await
68 .context("failed to send with selected inputs")?;
69
70 Ok(txid)
71 }
72
73 pub async fn send_selection(
95 &self,
96 vtxo_outpoints: &[OutPoint],
97 receivers: Vec<SendReceiver>,
98 ) -> Result<Txid, Error> {
99 let selected = self
101 .resolve_selected_send_inputs(vtxo_outpoints)
102 .await
103 .context("failed to resolve selected send inputs")?;
104
105 let txid = self
106 .send_with_selected_inputs(selected, receivers)
107 .await
108 .context("failed to send with selected inputs")?;
109
110 Ok(txid)
111 }
112
113 pub async fn finalize_pending_offchain_tx(&self, ark_txid: Txid) -> Result<(), Error> {
128 let pending_txs = self.fetch_pending_offchain_txs().await?;
129
130 let pending_tx = pending_txs
131 .into_iter()
132 .find(|tx| tx.ark_txid == ark_txid)
133 .ok_or_else(|| {
134 Error::ad_hoc(format!(
135 "no pending transaction found for ark txid {ark_txid}"
136 ))
137 })?;
138
139 self.sign_and_finalize_pending_tx(pending_tx).await
140 }
141
142 pub async fn continue_pending_offchain_txs(&self) -> Result<Vec<Txid>, Error> {
154 let pending_txs = self.fetch_pending_offchain_txs().await?;
155
156 if pending_txs.is_empty() {
157 return Ok(vec![]);
158 }
159
160 let mut finalized_txids = Vec::new();
161
162 for pending_tx in pending_txs {
163 let ark_txid = pending_tx.ark_txid;
164 self.sign_and_finalize_pending_tx(pending_tx).await?;
165 finalized_txids.push(ark_txid);
166 }
167
168 Ok(finalized_txids)
169 }
170
171 pub async fn list_pending_offchain_txs(&self) -> Result<Vec<PendingTx>, Error> {
180 self.fetch_pending_offchain_txs().await
181 }
182
183 pub async fn submit_offchain_tx(
191 &self,
192 vtxo_inputs: Vec<VtxoInput>,
193 address: ark_core::ArkAddress,
194 amount: Amount,
195 ) -> Result<Txid, Error> {
196 let server_info = self.server_info()?;
197 let receivers = vec![SendReceiver {
198 address,
199 amount,
200 assets: Vec::new(),
201 }];
202 let pending_tx = self
203 .build_and_submit(vtxo_inputs, receivers, &server_info)
204 .await?;
205 Ok(pending_tx.ark_txid)
206 }
207
208 fn make_sign_fn(
212 &self,
213 ) -> impl FnMut(
214 &mut psbt::Input,
215 secp256k1::Message,
216 ) -> Result<Vec<(schnorr::Signature, XOnlyPublicKey)>, ark_core::Error>
217 + '_ {
218 |input, msg| {
219 let script = input
220 .witness_script
221 .as_ref()
222 .ok_or_else(|| ark_core::Error::ad_hoc("Missing witness script for psbt::Input"))?;
223 let pks = extract_checksig_pubkeys(script);
224 let secp = Secp256k1::new();
225 let mut sigs = vec![];
226 for pk in pks {
227 if let Ok(keypair) = self.keypair_by_pk(&pk) {
228 let sig = secp.sign_schnorr_no_aux_rand(&msg, &keypair);
229 sigs.push((sig, keypair.x_only_public_key().0));
230 }
231 }
232 Ok(sigs)
233 }
234 }
235
236 async fn auto_select_send_inputs(
237 &self,
238 receivers: &[SendReceiver],
239 ) -> Result<Vec<VtxoInput>, Error> {
240 let (vtxo_list, script_pubkey_to_vtxo_map) = self
241 .list_vtxos()
242 .await
243 .context("failed to get spendable VTXOs")?;
244
245 let now = crate::utils::unix_now()?;
246 let server_info = self.server_info()?;
247 let spendable = vtxo_list
248 .spendable_offchain_at(&server_info, now, |script| {
249 script_pubkey_to_vtxo_map
250 .get(script)
251 .map(|vtxo| vtxo.server_pk())
252 })
253 .map(|vtxo| VirtualTxOutPoint {
254 outpoint: vtxo.outpoint,
255 script_pubkey: vtxo.script.clone(),
256 expire_at: vtxo.expires_at,
257 amount: vtxo.amount,
258 assets: vtxo.assets.clone(),
259 })
260 .collect::<Vec<_>>();
261
262 let mut selected_outpoints = HashSet::new();
263 let mut selected = Vec::new();
264 let mut asset_changes: HashMap<AssetId, u64> = HashMap::new();
265 let mut btc_needed = Amount::ZERO;
266 let mut btc_provided = Amount::ZERO;
267
268 for receiver in receivers {
269 btc_needed += receiver.amount;
270
271 for asset in &receiver.assets {
272 let mut amount_to_select = asset.amount;
273
274 if let Some(existing_change) = asset_changes.get_mut(&asset.asset_id) {
275 if amount_to_select <= *existing_change {
276 *existing_change -= amount_to_select;
277 if *existing_change == 0 {
278 asset_changes.remove(&asset.asset_id);
279 }
280 continue;
281 }
282 amount_to_select -= *existing_change;
283 asset_changes.remove(&asset.asset_id);
284 }
285
286 let available: Vec<_> = spendable
287 .iter()
288 .filter(|v| !selected_outpoints.contains(&v.outpoint))
289 .cloned()
290 .collect();
291
292 let (asset_coins, asset_change) =
293 select_vtxos_for_asset(&available, amount_to_select, asset.asset_id)
294 .map_err(Error::from)
295 .context("failed to select coins for asset transfer")?;
296
297 for coin in &asset_coins {
298 if selected_outpoints.insert(coin.outpoint) {
299 btc_provided += coin.amount;
300
301 for carried_asset in &coin.assets {
302 if carried_asset.asset_id != asset.asset_id {
303 *asset_changes.entry(carried_asset.asset_id).or_insert(0) +=
304 carried_asset.amount;
305 }
306 }
307
308 selected.push(coin.clone());
309 }
310 }
311
312 if asset_change > 0 {
313 *asset_changes.entry(asset.asset_id).or_insert(0) += asset_change;
314 }
315 }
316 }
317
318 if !asset_changes.is_empty() {
319 btc_needed += server_info.dust;
320 }
321
322 let btc_shortfall = btc_needed.checked_sub(btc_provided).unwrap_or(Amount::ZERO);
323
324 if btc_shortfall > Amount::ZERO {
325 let available: Vec<_> = spendable
326 .iter()
327 .filter(|v| !selected_outpoints.contains(&v.outpoint))
328 .cloned()
329 .collect();
330
331 let btc_coins = select_vtxos(available, btc_shortfall, server_info.dust, true)
332 .map_err(Error::from)
333 .context("failed to select BTC coins for asset transfer")?;
334
335 for coin in &btc_coins {
336 if selected_outpoints.insert(coin.outpoint) {
337 for carried_asset in &coin.assets {
338 *asset_changes.entry(carried_asset.asset_id).or_insert(0) +=
339 carried_asset.amount;
340 }
341 selected.push(coin.clone());
342 }
343 }
344 }
345
346 let inputs = self.build_vtxo_inputs(selected.clone(), &script_pubkey_to_vtxo_map)?;
347
348 Ok(inputs)
349 }
350
351 async fn resolve_selected_send_inputs(
352 &self,
353 vtxo_outpoints: &[OutPoint],
354 ) -> Result<Vec<VtxoInput>, Error> {
355 let requested_outpoints: HashSet<_> = vtxo_outpoints.iter().copied().collect();
356
357 let (vtxo_list, script_pubkey_to_vtxo_map) = self
358 .list_vtxos_for_outpoints(vtxo_outpoints.to_vec())
359 .await
360 .context("failed to get VTXO list")?;
361
362 let now = crate::utils::unix_now()?;
363 let server_info = self.server_info()?;
364 let selected: Vec<_> = vtxo_list
365 .spendable_offchain_at(&server_info, now, |script| {
366 script_pubkey_to_vtxo_map
367 .get(script)
368 .map(|vtxo| vtxo.server_pk())
369 })
370 .filter(|vtxo| requested_outpoints.contains(&vtxo.outpoint))
371 .map(|vtxo| VirtualTxOutPoint {
372 outpoint: vtxo.outpoint,
373 script_pubkey: vtxo.script.clone(),
374 expire_at: vtxo.expires_at,
375 amount: vtxo.amount,
376 assets: vtxo.assets.clone(),
377 })
378 .collect();
379
380 if selected.is_empty() {
381 return Err(Error::ad_hoc("no matching VTXO outpoints found"));
382 }
383
384 if selected.len() != requested_outpoints.len() {
385 let found_outpoints: HashSet<_> = selected.iter().map(|v| v.outpoint).collect();
386 let missing_outpoints = requested_outpoints
387 .difference(&found_outpoints)
388 .map(ToString::to_string)
389 .collect::<Vec<_>>();
390
391 return Err(Error::ad_hoc(format!(
392 "some selected VTXO outpoints were not found or not spendable: {}",
393 missing_outpoints.join(", ")
394 )));
395 }
396
397 let inputs = self.build_vtxo_inputs(selected, &script_pubkey_to_vtxo_map)?;
398
399 Ok(inputs)
400 }
401
402 pub(crate) fn build_vtxo_inputs(
404 &self,
405 selected: Vec<VirtualTxOutPoint>,
406 script_pubkey_to_vtxo_map: &HashMap<bitcoin::ScriptBuf, ark_core::Vtxo>,
407 ) -> Result<Vec<VtxoInput>, Error> {
408 selected
409 .into_iter()
410 .map(|vtp| {
411 let vtxo = script_pubkey_to_vtxo_map
412 .get(&vtp.script_pubkey)
413 .ok_or_else(|| {
414 ark_core::Error::ad_hoc(format!(
415 "missing VTXO for script pubkey: {}",
416 vtp.script_pubkey
417 ))
418 })?;
419
420 let (forfeit_script, control_block) = vtxo
421 .forfeit_spend_info()
422 .context("failed to get forfeit spend info")?;
423
424 Ok(VtxoInput::new(
425 forfeit_script,
426 None,
427 control_block,
428 vtxo.tapscripts(),
429 vtxo.script_pubkey(),
430 vtp.amount,
431 vtp.outpoint,
432 vtp.assets,
433 ))
434 })
435 .collect()
436 }
437
438 fn validate_selected_inputs_cover_receivers(
439 vtxo_inputs: &[VtxoInput],
440 receivers: &[SendReceiver],
441 dust: Amount,
442 ) -> Result<(), Error> {
443 let selected_amount = vtxo_inputs
444 .iter()
445 .fold(Amount::ZERO, |acc, v| acc + v.amount());
446 let requested_amount = receivers.iter().fold(Amount::ZERO, |acc, r| acc + r.amount);
447
448 let mut selected_assets = HashMap::<AssetId, u64>::new();
449 for vtxo_input in vtxo_inputs {
450 for asset in vtxo_input.assets() {
451 *selected_assets.entry(asset.asset_id).or_insert(0) = selected_assets
452 .get(&asset.asset_id)
453 .copied()
454 .unwrap_or(0)
455 .checked_add(asset.amount)
456 .ok_or_else(|| Error::ad_hoc("selected asset amount overflow"))?;
457 }
458 }
459
460 let mut requested_assets = HashMap::<AssetId, u64>::new();
461 for receiver in receivers {
462 for asset in &receiver.assets {
463 *requested_assets.entry(asset.asset_id).or_insert(0) = requested_assets
464 .get(&asset.asset_id)
465 .copied()
466 .unwrap_or(0)
467 .checked_add(asset.amount)
468 .ok_or_else(|| Error::ad_hoc("requested asset amount overflow"))?;
469 }
470 }
471
472 for (asset_id, requested_asset_amount) in &requested_assets {
473 let selected_asset_amount = selected_assets.get(asset_id).copied().unwrap_or(0);
474 if selected_asset_amount < *requested_asset_amount {
475 return Err(Error::coin_select(format!(
476 "insufficient asset amount for {}: {} < {}",
477 asset_id, selected_asset_amount, requested_asset_amount
478 )));
479 }
480 }
481
482 let mut has_asset_change = false;
483 for (asset_id, selected_asset_amount) in &selected_assets {
484 let requested_asset_amount = requested_assets.get(asset_id).copied().unwrap_or(0);
485
486 if *selected_asset_amount < requested_asset_amount {
487 return Err(Error::coin_select(format!(
488 "insufficient asset amount for {}: {} < {}",
489 asset_id, selected_asset_amount, requested_asset_amount
490 )));
491 }
492
493 if *selected_asset_amount > requested_asset_amount {
494 has_asset_change = true;
495 }
496 }
497
498 let required_amount = match has_asset_change {
499 true => requested_amount
500 .checked_add(dust)
501 .ok_or_else(|| Error::ad_hoc("required BTC amount overflow"))?,
502 false => requested_amount,
503 };
504
505 if selected_amount < required_amount {
506 return Err(Error::coin_select(format!(
507 "insufficient VTXO amount: {} < {}",
508 selected_amount, required_amount
509 )));
510 }
511
512 Ok(())
513 }
514
515 async fn send_with_selected_inputs(
516 &self,
517 vtxo_inputs: Vec<VtxoInput>,
518 receivers: Vec<SendReceiver>,
519 ) -> Result<Txid, Error> {
520 let server_info = self.server_info()?;
521 Self::validate_selected_inputs_cover_receivers(&vtxo_inputs, &receivers, server_info.dust)?;
522
523 let pending_tx = self
524 .build_and_submit(vtxo_inputs, receivers, &server_info)
525 .await?;
526 let ark_txid = pending_tx.ark_txid;
527
528 self.sign_and_finalize_pending_tx(pending_tx).await?;
529
530 Ok(ark_txid)
531 }
532
533 pub(crate) async fn submit_built_offchain_send(
538 &self,
539 mut ark_tx: bitcoin::Psbt,
540 checkpoint_txs: Vec<bitcoin::Psbt>,
541 used_pk: XOnlyPublicKey,
542 ) -> Result<PendingTx, Error> {
543 for i in 0..checkpoint_txs.len() {
544 sign_ark_transaction(self.make_sign_fn(), &mut ark_tx, i)?;
545 }
546
547 let res = self
548 .network_client()
549 .submit_offchain_transaction_request(ark_tx, checkpoint_txs)
550 .await
551 .map_err(Error::ark_server)
552 .context("failed to submit offchain transaction request")?;
553
554 let pending_tx = PendingTx {
555 ark_txid: res.signed_ark_tx.unsigned_tx.compute_txid(),
556 signed_ark_tx: res.signed_ark_tx,
557 signed_checkpoint_txs: res.signed_checkpoint_txs,
558 };
559
560 if let Err(err) = self.inner.key_provider.mark_as_used(&used_pk) {
561 tracing::warn!(
562 "Failed updating keypair cache for used change address: {:?}",
563 err
564 );
565 }
566
567 Ok(pending_tx)
568 }
569
570 async fn build_and_submit(
572 &self,
573 inputs: Vec<VtxoInput>,
574 receivers: Vec<SendReceiver>,
575 server_info: &server::Info,
576 ) -> Result<PendingTx, Error> {
577 let (change_address, change_address_vtxo) = self.get_offchain_address()?;
578
579 let OffchainTransactions {
580 ark_tx,
581 checkpoint_txs,
582 } = build_asset_send_transactions(&receivers, &change_address, &inputs, server_info)
583 .map_err(Error::from)
584 .context("failed to build offchain asset-send transactions")?;
585
586 self.submit_built_offchain_send(ark_tx, checkpoint_txs, change_address_vtxo.owner_pk())
587 .await
588 }
589
590 pub(crate) async fn sign_and_finalize_pending_tx(
592 &self,
593 pending_tx: PendingTx,
594 ) -> Result<(), Error> {
595 let ark_txid = pending_tx.ark_txid;
596 let mut signed_checkpoint_txs = pending_tx.signed_checkpoint_txs;
597
598 let ark_input_idx_by_cp_txid: HashMap<_, _> = pending_tx
601 .signed_ark_tx
602 .unsigned_tx
603 .input
604 .iter()
605 .enumerate()
606 .map(|(i, inp)| (inp.previous_output.txid, i))
607 .collect();
608
609 for checkpoint_psbt in signed_checkpoint_txs.iter_mut() {
610 if checkpoint_psbt.inputs[0].witness_script.is_none() {
611 let checkpoint_txid = checkpoint_psbt.unsigned_tx.compute_txid();
612 let idx = ark_input_idx_by_cp_txid
613 .get(&checkpoint_txid)
614 .ok_or_else(|| {
615 Error::ad_hoc(format!(
616 "checkpoint txid {checkpoint_txid} not found in ark tx inputs \
617 for pending tx {ark_txid}"
618 ))
619 })?;
620
621 let ws = pending_tx
622 .signed_ark_tx
623 .inputs
624 .get(*idx)
625 .and_then(|input| input.witness_script.clone())
626 .ok_or_else(|| {
627 Error::ad_hoc(format!(
628 "missing witness script on ark tx input {idx} \
629 for pending tx {ark_txid}"
630 ))
631 })?;
632
633 checkpoint_psbt.inputs[0].witness_script = Some(ws);
634 }
635
636 sign_checkpoint_transaction(self.make_sign_fn(), checkpoint_psbt)?;
637 }
638
639 self.finalize_offchain_tx(ark_txid, signed_checkpoint_txs)
640 .await
641 }
642
643 pub async fn finalize_offchain_tx(
651 &self,
652 ark_txid: Txid,
653 signed_checkpoint_txs: Vec<bitcoin::Psbt>,
654 ) -> Result<(), Error> {
655 const MAX_RETRIES: usize = 3;
656
657 let mut last_err = None;
658
659 for attempt in 0..=MAX_RETRIES {
660 if attempt > 0 {
661 let delay = Duration::from_millis(500 * (1 << (attempt - 1)));
662 tracing::warn!(
663 %ark_txid,
664 attempt,
665 ?delay,
666 "Retrying finalize after transient failure"
667 );
668 crate::utils::sleep(delay).await;
669 }
670
671 match timeout_op(
672 self.inner.timeout,
673 self.network_client()
674 .finalize_offchain_transaction(ark_txid, signed_checkpoint_txs.clone()),
675 )
676 .await
677 .context("finalize offchain transaction timed out")?
678 {
679 Ok(_) => return Ok(()),
680 Err(e) => {
681 last_err = Some(Error::ark_server(e));
682 }
683 }
684 }
685
686 Err(last_err
687 .expect("at least one attempt was made")
688 .with_context(|| {
689 format!("failed to finalize offchain transaction after {MAX_RETRIES} retries")
690 }))
691 }
692
693 async fn fetch_pending_offchain_txs(&self) -> Result<Vec<PendingTx>, Error> {
695 const MAX_INPUTS_PER_INTENT: usize = 20;
696
697 let ark_addresses = self.get_offchain_addresses()?;
698
699 let script_pubkey_to_vtxo_map: HashMap<_, _> = ark_addresses
700 .iter()
701 .map(|(a, v)| (a.to_p2tr_script_pubkey(), v.clone()))
702 .collect();
703
704 let addresses = ark_addresses.iter().map(|(a, _)| *a);
708 let request = server::GetVtxosRequest::new_for_addresses(addresses)
709 .pending_only()
710 .map_err(Error::from)?;
711
712 let vtxos = self
713 .fetch_all_vtxos(request)
714 .await
715 .context("failed to fetch pending VTXOs")?;
716
717 tracing::debug!(num_pending_vtxos = vtxos.len(), "Fetched pending VTXOs");
718
719 if vtxos.is_empty() {
720 return Ok(vec![]);
721 }
722
723 let secp = Secp256k1::new();
724 let mut all_pending_txs = Vec::new();
725 let mut seen_ark_txids = HashSet::new();
726
727 for (batch_idx, batch) in vtxos.chunks(MAX_INPUTS_PER_INTENT).enumerate() {
729 let mut vtxo_inputs = Vec::new();
730 for virtual_tx_outpoint in batch {
731 let vtxo = match script_pubkey_to_vtxo_map.get(&virtual_tx_outpoint.script) {
732 Some(v) => v,
733 None => {
734 tracing::warn!(
735 outpoint = %virtual_tx_outpoint.outpoint,
736 script = %virtual_tx_outpoint.script,
737 "Skipping VTXO with unknown script"
738 );
739 continue;
740 }
741 };
742 let spend_info = vtxo
743 .forfeit_spend_info()
744 .context("failed to get forfeit spend info")?;
745
746 vtxo_inputs.push(intent::Input::new(
747 virtual_tx_outpoint.outpoint,
748 vtxo.exit_delay(),
749 None,
750 TxOut {
751 value: virtual_tx_outpoint.amount,
752 script_pubkey: vtxo.script_pubkey(),
753 },
754 vtxo.tapscripts(),
755 spend_info,
756 false,
757 virtual_tx_outpoint.is_swept,
758 virtual_tx_outpoint.assets.clone(),
759 ));
760 }
761
762 if vtxo_inputs.is_empty() {
763 continue;
764 }
765
766 tracing::debug!(
767 batch = batch_idx,
768 num_inputs = vtxo_inputs.len(),
769 "Querying server for pending txs"
770 );
771
772 let message = intent::IntentMessage::GetPendingTx { expire_at: 0 };
774
775 let sign_for_vtxo_fn = |input: &mut psbt::Input,
776 msg: secp256k1::Message|
777 -> Result<
778 Vec<(schnorr::Signature, XOnlyPublicKey)>,
779 ark_core::Error,
780 > {
781 match &input.witness_script {
782 None => Err(ark_core::Error::ad_hoc(
783 "Missing witness script in psbt::Input when signing get-pending-tx intent",
784 )),
785 Some(script) => {
786 let pks = extract_checksig_pubkeys(script);
787 let mut res = vec![];
788 for pk in &pks {
789 if let Ok(keypair) = self.keypair_by_pk(pk) {
790 let sig = secp.sign_schnorr_no_aux_rand(&msg, &keypair);
791 res.push((sig, keypair.x_only_public_key().0));
792 }
793 }
794 Ok(res)
795 }
796 }
797 };
798
799 let sign_for_onchain_fn =
800 |_: &mut psbt::Input,
801 _: secp256k1::Message|
802 -> Result<(schnorr::Signature, XOnlyPublicKey), ark_core::Error> {
803 Err(ark_core::Error::ad_hoc(
804 "unexpected onchain input in get-pending-tx intent",
805 ))
806 };
807
808 let get_pending_intent = intent::make_intent(
809 sign_for_vtxo_fn,
810 sign_for_onchain_fn,
811 vtxo_inputs,
812 vec![],
813 message,
814 )?;
815
816 let pending_txs = self
817 .network_client()
818 .get_pending_tx(get_pending_intent)
819 .await
820 .map_err(Error::ark_server)
821 .context("failed to get pending transactions")?;
822
823 tracing::debug!(
824 batch = batch_idx,
825 num_pending_txs = pending_txs.len(),
826 "Server response for batch"
827 );
828
829 for tx in pending_txs {
830 if seen_ark_txids.insert(tx.ark_txid) {
831 tracing::info!(
832 ark_txid = %tx.ark_txid,
833 "Found pending transaction"
834 );
835 all_pending_txs.push(tx);
836 }
837 }
838 }
839
840 tracing::info!(
841 num_pending_txs = all_pending_txs.len(),
842 "Total pending transactions found"
843 );
844
845 Ok(all_pending_txs)
846 }
847}