1use crate::error::ErrorContext;
2use crate::key_provider::KeypairIndex;
3use crate::utils::sleep;
4use crate::utils::timeout_op;
5use crate::utils::unix_now;
6use crate::wallet::BoardingWallet;
7use crate::wallet::OnchainWallet;
8use ark_core::asset::AssetId;
9use ark_core::build_anchor_tx;
10use ark_core::history;
11use ark_core::history::generate_incoming_vtxo_transaction_history;
12use ark_core::history::generate_outgoing_vtxo_transaction_history;
13use ark_core::history::sort_transactions_by_created_at;
14use ark_core::history::OutgoingTransaction;
15use ark_core::server;
16use ark_core::server::GetVtxosRequest;
17use ark_core::server::SubscriptionResponse;
18use ark_core::server::VirtualTxOutPoint;
19use ark_core::ArkAddress;
20use ark_core::BoardingOutput;
21use ark_core::ExplorerUtxo;
22use ark_core::UtxoCoinSelection;
23use ark_core::Vtxo;
24use ark_core::VtxoList;
25use ark_core::DEFAULT_DERIVATION_PATH;
26use ark_grpc::VtxoChainResponse;
27use bitcoin::bip32::DerivationPath;
28use bitcoin::bip32::Xpriv;
29use bitcoin::key::Keypair;
30use bitcoin::key::Secp256k1;
31use bitcoin::secp256k1::All;
32use bitcoin::Address;
33use bitcoin::Amount;
34use bitcoin::OutPoint;
35use bitcoin::ScriptBuf;
36use bitcoin::Transaction;
37use bitcoin::Txid;
38use bitcoin::XOnlyPublicKey;
39use futures::Future;
40use futures::Stream;
41use std::collections::HashMap;
42use std::collections::HashSet;
43use std::str::FromStr;
44use std::sync::Arc;
45use std::sync::RwLock;
46use std::time::Duration;
47
48pub mod error;
49pub mod key_provider;
50pub mod swap_storage;
51pub mod vtxo_watcher;
52pub mod wallet;
53
54mod asset;
55mod batch;
56mod boltz;
57mod coin_select;
58mod fee_estimation;
59mod migration;
60mod send_vtxo;
61mod unilateral_exit;
62mod utils;
63
64pub use ark_core::server::DeprecatedSignerStatus;
65pub use asset::IssueAssetResult;
66pub use boltz::ChainSwapAmount;
67pub use boltz::ChainSwapData;
68pub use boltz::ChainSwapDirection;
69pub use boltz::ChainSwapResult;
70pub use boltz::PendingVhtlcSpendTx;
71pub use boltz::PendingVhtlcSpendType;
72pub use boltz::ReverseSwapData;
73pub use boltz::SubmarineSwapData;
74pub use boltz::SwapAmount;
75pub use boltz::SwapStatus;
76pub use boltz::SwapStatusInfo;
77pub use boltz::SwapType;
78pub use boltz::TimeoutBlockHeights;
79pub use error::Error;
80pub use key_provider::Bip32KeyProvider;
81pub use key_provider::KeyProvider;
82pub use key_provider::StaticKeyProvider;
83pub use lightning_invoice;
84pub use migration::DeprecatedSignerMigrationReport;
85pub use migration::DeprecatedSignerReport;
86pub use migration::MigrationLegReport;
87pub use migration::MigrationSkipReason;
88pub use migration::MigrationVtxoRef;
89pub use migration::MAX_VTXOS_PER_SETTLEMENT;
90pub use swap_storage::InMemorySwapStorage;
91#[cfg(feature = "sqlite")]
92pub use swap_storage::SqliteSwapStorage;
93pub use swap_storage::SwapStorage;
94
95pub const DEFAULT_GAP_LIMIT: u32 = 20;
100
101pub const DEFAULT_BOLTZ_REFERRAL_ID: &str = "arkade-rs-SDK";
104
105#[derive(Clone)]
305pub struct OfflineClient<B, W, S, K> {
306 network_client: ark_grpc::Client,
308 pub name: String,
309 key_provider: Arc<K>,
310 blockchain: Arc<B>,
311 secp: Secp256k1<All>,
312 wallet: Arc<W>,
313 swap_storage: Arc<S>,
314 boltz_url: String,
315 boltz_referral_id: Option<String>,
316 timeout: Duration,
317 delegator_pk: Option<XOnlyPublicKey>,
318 historical_delegator_pks: Vec<XOnlyPublicKey>,
319}
320
321pub struct Client<B, W, S, K> {
325 inner: OfflineClient<B, W, S, K>,
326 state: Arc<RwLock<ServerState>>,
327}
328
329struct ServerState {
330 server_info: server::Info,
331 fee_estimator: ark_fees::Estimator,
332}
333
334#[derive(Clone, Copy, Debug)]
335pub struct TxStatus {
336 pub confirmed_at: Option<i64>,
337}
338
339#[derive(Clone, Copy, Debug)]
340pub struct SpendStatus {
341 pub spend_txid: Option<Txid>,
342}
343
344pub struct AddressVtxos {
345 pub unspent: Vec<VirtualTxOutPoint>,
346 pub spent: Vec<VirtualTxOutPoint>,
347}
348
349#[derive(Clone, Debug, Default)]
350pub struct OffChainBalance {
351 pre_confirmed: Amount,
352 confirmed: Amount,
353 recoverable: Amount,
354 pending_recovery: Amount,
358 asset_balances: HashMap<AssetId, u64>,
359}
360
361impl OffChainBalance {
362 pub fn pre_confirmed(&self) -> Amount {
363 self.pre_confirmed
364 }
365
366 pub fn confirmed(&self) -> Amount {
367 self.confirmed
368 }
369
370 pub fn recoverable(&self) -> Amount {
372 self.recoverable
373 }
374
375 pub fn pending_recovery(&self) -> Amount {
378 self.pending_recovery
379 }
380
381 pub fn total(&self) -> Amount {
382 self.pre_confirmed + self.confirmed + self.recoverable + self.pending_recovery
383 }
384
385 pub fn asset_balances(&self) -> &HashMap<AssetId, u64> {
387 &self.asset_balances
388 }
389}
390
391pub trait Blockchain {
392 fn find_outpoints(
393 &self,
394 address: &Address,
395 ) -> impl Future<Output = Result<Vec<ExplorerUtxo>, Error>> + Send;
396
397 fn find_tx(
398 &self,
399 txid: &Txid,
400 ) -> impl Future<Output = Result<Option<Transaction>, Error>> + Send;
401
402 fn get_tx_status(&self, txid: &Txid) -> impl Future<Output = Result<TxStatus, Error>> + Send;
403
404 fn get_output_status(
405 &self,
406 txid: &Txid,
407 vout: u32,
408 ) -> impl Future<Output = Result<SpendStatus, Error>> + Send;
409
410 fn broadcast(&self, tx: &Transaction) -> impl Future<Output = Result<(), Error>> + Send;
411
412 fn get_fee_rate(&self) -> impl Future<Output = Result<f64, Error>> + Send;
413
414 fn broadcast_package(
415 &self,
416 txs: &[&Transaction],
417 ) -> impl Future<Output = Result<(), Error>> + Send;
418}
419
420impl<B, W, S, K> OfflineClient<B, W, S, K>
421where
422 B: Blockchain,
423 W: BoardingWallet + OnchainWallet,
424 S: SwapStorage + 'static,
425 K: KeyProvider,
426{
427 #[allow(clippy::too_many_arguments)]
445 pub fn new(
446 name: String,
447 key_provider: Arc<K>,
448 blockchain: Arc<B>,
449 wallet: Arc<W>,
450 ark_server_url: String,
451 swap_storage: Arc<S>,
452 boltz_url: String,
453 boltz_referral_id: Option<String>,
454 timeout: Duration,
455 delegator_pk: Option<XOnlyPublicKey>,
456 historical_delegator_pks: Vec<XOnlyPublicKey>,
457 ) -> Self {
458 let secp = Secp256k1::new();
459
460 let network_client = ark_grpc::Client::new(ark_server_url);
461
462 let mut seen = HashSet::new();
465 let mut historical_delegator_pks: Vec<_> = historical_delegator_pks
466 .into_iter()
467 .filter(|pk| seen.insert(*pk))
468 .collect();
469
470 if let Some(pk) = delegator_pk {
471 historical_delegator_pks.retain(|k| *k != pk);
472 historical_delegator_pks.insert(0, pk);
473 }
474
475 let boltz_referral_id =
476 boltz_referral_id.or_else(|| Some(DEFAULT_BOLTZ_REFERRAL_ID.to_string()));
477
478 Self {
479 network_client,
480 name,
481 key_provider,
482 blockchain,
483 secp,
484 wallet,
485 swap_storage,
486 boltz_url,
487 boltz_referral_id,
488 timeout,
489 delegator_pk,
490 historical_delegator_pks,
491 }
492 }
493
494 pub fn with_boltz_referral_id(mut self, boltz_referral_id: Option<String>) -> Self {
499 self.boltz_referral_id = boltz_referral_id;
500 self
501 }
502
503 #[allow(clippy::too_many_arguments)]
518 pub fn new_with_keypair(
519 name: String,
520 kp: Keypair,
521 blockchain: Arc<B>,
522 wallet: Arc<W>,
523 ark_server_url: String,
524 swap_storage: Arc<S>,
525 boltz_url: String,
526 boltz_referral_id: Option<String>,
527 timeout: Duration,
528 delegator_pk: Option<XOnlyPublicKey>,
529 historical_delegator_pks: Vec<XOnlyPublicKey>,
530 ) -> OfflineClient<B, W, S, StaticKeyProvider> {
531 let key_provider = Arc::new(StaticKeyProvider::new(kp));
532
533 OfflineClient::new(
534 name,
535 key_provider,
536 blockchain,
537 wallet,
538 ark_server_url,
539 swap_storage,
540 boltz_url,
541 boltz_referral_id,
542 timeout,
543 delegator_pk,
544 historical_delegator_pks,
545 )
546 }
547
548 #[allow(clippy::too_many_arguments)]
561 pub fn new_with_bip32(
562 name: String,
563 xpriv: Xpriv,
564 path: Option<DerivationPath>,
565 blockchain: Arc<B>,
566 wallet: Arc<W>,
567 ark_server_url: String,
568 swap_storage: Arc<S>,
569 boltz_url: String,
570 boltz_referral_id: Option<String>,
571 timeout: Duration,
572 delegator_pk: Option<XOnlyPublicKey>,
573 historical_delegator_pks: Vec<XOnlyPublicKey>,
574 ) -> OfflineClient<B, W, S, Bip32KeyProvider> {
575 let path = path.unwrap_or(
576 DerivationPath::from_str(DEFAULT_DERIVATION_PATH).expect("valid derivation path"),
577 );
578 let key_provider = Arc::new(Bip32KeyProvider::new(xpriv, path));
579
580 OfflineClient::new(
581 name,
582 key_provider,
583 blockchain,
584 wallet,
585 ark_server_url,
586 swap_storage,
587 boltz_url,
588 boltz_referral_id,
589 timeout,
590 delegator_pk,
591 historical_delegator_pks,
592 )
593 }
594
595 pub fn delegator_pk(&self) -> Option<XOnlyPublicKey> {
597 self.delegator_pk
598 }
599
600 pub fn boltz_referral_id(&self) -> Option<&str> {
602 self.boltz_referral_id.as_deref()
603 }
604
605 pub async fn connect(mut self) -> Result<Client<B, W, S, K>, Error> {
611 timeout_op(self.timeout, self.network_client.connect())
612 .await
613 .context("Failed to connect to Ark server")??;
614
615 self.finish_connect().await
616 }
617
618 pub async fn connect_with_retries(
626 mut self,
627 max_retries: usize,
628 ) -> Result<Client<B, W, S, K>, Error> {
629 let mut n_retries = 0;
630 while n_retries < max_retries {
631 let res = timeout_op(self.timeout, self.network_client.connect())
632 .await
633 .context("Failed to connect to Ark server")?;
634
635 match res {
636 Ok(()) => break,
637 Err(error) => {
638 tracing::warn!(?error, "Failed to connect to Ark server, retrying");
639
640 sleep(Duration::from_secs(2)).await;
641
642 n_retries += 1;
643
644 continue;
645 }
646 };
647 }
648
649 self.finish_connect().await
650 }
651
652 async fn finish_connect(mut self) -> Result<Client<B, W, S, K>, Error> {
653 let server_info = timeout_op(self.timeout, self.network_client.get_info())
654 .await
655 .context("Failed to get Ark server info")??;
656
657 tracing::debug!(
658 name = self.name,
659 ark_server_url = ?self.network_client,
660 "Connected to Ark server"
661 );
662
663 let fee_estimator = build_fee_estimator(&server_info)?;
664 let state = Arc::new(RwLock::new(ServerState {
665 server_info,
666 fee_estimator,
667 }));
668 let hook_state = state.clone();
669 self.network_client
670 .set_info_refresh_hook(move |server_info| {
671 update_server_state(&hook_state, server_info)
672 .map_err(|err| Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
673 });
674
675 let client = Client { inner: self, state };
676
677 if let Err(error) = client.discover_keys(DEFAULT_GAP_LIMIT).await {
678 tracing::warn!(?error, "Failed during key discovery");
679 };
680
681 match client.server_info() {
686 Ok(server_info) => {
687 if let Err(error) = client.persist_watch_boarding_outputs(&server_info) {
688 tracing::warn!(?error, "Failed to persist boarding outputs at connect");
689 }
690 }
691 Err(error) => {
692 tracing::warn!(
693 ?error,
694 "Failed to read server info for boarding persistence"
695 );
696 }
697 }
698
699 Ok(client)
700 }
701}
702
703fn build_fee_estimator(server_info: &server::Info) -> Result<ark_fees::Estimator, Error> {
704 let fee_estimator_config = server_info
705 .fees
706 .clone()
707 .map(|fees| ark_fees::Config {
708 intent_offchain_input_program: fees.intent_fee.offchain_input.unwrap_or_default(),
709 intent_onchain_input_program: fees.intent_fee.onchain_input.unwrap_or_default(),
710 intent_offchain_output_program: fees.intent_fee.offchain_output.unwrap_or_default(),
711 intent_onchain_output_program: fees.intent_fee.onchain_output.unwrap_or_default(),
712 })
713 .unwrap_or_default();
714
715 ark_fees::Estimator::new(fee_estimator_config).map_err(Error::ark_server)
716}
717
718fn update_server_state(
719 state: &Arc<RwLock<ServerState>>,
720 server_info: server::Info,
721) -> Result<(), Error> {
722 let fee_estimator = build_fee_estimator(&server_info)?;
723 let mut state = state
724 .write()
725 .map_err(|_| Error::ad_hoc("client server state lock poisoned"))?;
726 state.server_info = server_info;
727 state.fee_estimator = fee_estimator;
728 Ok(())
729}
730
731impl<B, W, S, K> Client<B, W, S, K>
732where
733 B: Blockchain,
734 W: BoardingWallet + OnchainWallet,
735 S: SwapStorage + 'static,
736 K: KeyProvider,
737{
738 pub fn server_info(&self) -> Result<server::Info, Error> {
740 self.state
741 .read()
742 .map(|state| state.server_info.clone())
743 .map_err(|_| Error::ad_hoc("client server state lock poisoned"))
744 }
745
746 fn with_server_state<T>(&self, f: impl FnOnce(&ServerState) -> T) -> Result<T, Error> {
747 self.state
748 .read()
749 .map(|state| f(&state))
750 .map_err(|_| Error::ad_hoc("client server state lock poisoned"))
751 }
752
753 fn eval_onchain_output_fee(&self, output: ark_fees::Output) -> Result<Amount, Error> {
754 self.with_server_state(|state| state.fee_estimator.eval_onchain_output(output))?
755 .map(|fee| Amount::from_sat(fee.to_satoshis()))
756 .map_err(Error::ad_hoc)
757 }
758
759 pub async fn refresh_server_info(&self) -> Result<(), Error> {
769 let server_info = timeout_op(self.inner.timeout, self.network_client().get_info())
770 .await
771 .context("Failed to refresh Ark server info")??;
772
773 update_server_state(&self.state, server_info)?;
774
775 Ok(())
776 }
777
778 pub fn delegator_pk(&self) -> Option<XOnlyPublicKey> {
780 self.inner.delegator_pk()
781 }
782
783 pub fn boltz_referral_id(&self) -> Option<&str> {
785 self.inner.boltz_referral_id()
786 }
787
788 pub fn get_offchain_address(&self) -> Result<(ArkAddress, Vtxo), Error> {
796 let server_info = &self.server_info()?;
797
798 let server_signer = server_info.signer_pk.into();
799 let owner = self
800 .next_keypair(KeypairIndex::LastUnused)?
801 .public_key()
802 .into();
803
804 let vtxo = self.make_vtxo(server_signer, owner)?;
805
806 let ark_address = vtxo.to_ark_address();
807
808 Ok((ark_address, vtxo))
809 }
810
811 pub fn get_offchain_addresses(&self) -> Result<Vec<(ArkAddress, Vtxo)>, Error> {
818 let server_info = &self.server_info()?;
819 let pks = self.inner.key_provider.get_cached_pks()?;
820
821 let all_server_keys: Vec<XOnlyPublicKey> = server_info.all_server_keys().collect();
824 let candidate_delays = ark_core::candidate_exit_delays(
830 server_info.unilateral_exit_delay,
831 server_info.network,
832 )?;
833
834 let mut results = Vec::new();
835
836 for owner_pk in &pks {
837 for server_signer in &all_server_keys {
838 for exit_delay in &candidate_delays {
839 let default_vtxo = Vtxo::new_default(
841 self.secp(),
842 *server_signer,
843 *owner_pk,
844 *exit_delay,
845 server_info.network,
846 )?;
847 results.push((default_vtxo.to_ark_address(), default_vtxo));
848
849 let mut seen = HashSet::new();
851 for dpk in &self.inner.historical_delegator_pks {
852 if !seen.insert(dpk) {
853 continue;
854 }
855 let delegate_vtxo = Vtxo::new_with_delegator(
856 self.secp(),
857 *server_signer,
858 *owner_pk,
859 *dpk,
860 *exit_delay,
861 server_info.network,
862 )?;
863 results.push((delegate_vtxo.to_ark_address(), delegate_vtxo));
864 }
865 }
866 }
867 }
868
869 Ok(results)
870 }
871
872 fn make_vtxo(
875 &self,
876 server_signer: XOnlyPublicKey,
877 owner: XOnlyPublicKey,
878 ) -> Result<Vtxo, Error> {
879 let server_info = &self.server_info()?;
880 match self.inner.delegator_pk {
881 Some(delegator) => Vtxo::new_with_delegator(
882 self.secp(),
883 server_signer,
884 owner,
885 delegator,
886 server_info.unilateral_exit_delay,
887 server_info.network,
888 )
889 .map_err(Into::into),
890 None => Vtxo::new_default(
891 self.secp(),
892 server_signer,
893 owner,
894 server_info.unilateral_exit_delay,
895 server_info.network,
896 )
897 .map_err(Into::into),
898 }
899 }
900
901 pub async fn discover_keys(&self, gap_limit: u32) -> Result<u32, Error> {
912 if !self.inner.key_provider.supports_discovery() {
913 tracing::debug!("Key provider does not support discovery, skipping");
914 return Ok(0);
915 }
916
917 let server_info = &self.server_info()?;
918 let all_server_keys: Vec<XOnlyPublicKey> = server_info.all_server_keys().collect();
921 let candidate_delays = ark_core::candidate_exit_delays(
925 server_info.unilateral_exit_delay,
926 server_info.network,
927 )?;
928
929 let mut start_index = 0u32;
930 let mut discovered_count = 0u32;
931
932 tracing::info!(gap_limit, "Starting key discovery");
933
934 loop {
935 let mut batch: Vec<(u32, Keypair, Vec<ArkAddress>)> =
937 Vec::with_capacity(gap_limit as usize);
938
939 for i in 0..gap_limit {
940 let index = start_index
941 .checked_add(i)
942 .ok_or_else(|| Error::ad_hoc("Key discovery index overflow"))?;
943
944 let kp = match self.inner.key_provider.derive_at_discovery_index(index)? {
945 Some(kp) => kp,
946 None => break,
947 };
948
949 let owner_pk = kp.x_only_public_key().0;
950
951 let mut addresses = Vec::new();
952
953 for server_signer in &all_server_keys {
954 for exit_delay in &candidate_delays {
955 let default_vtxo = Vtxo::new_default(
957 self.secp(),
958 *server_signer,
959 owner_pk,
960 *exit_delay,
961 server_info.network,
962 )?;
963 addresses.push(default_vtxo.to_ark_address());
964
965 for dpk in &self.inner.historical_delegator_pks {
967 let delegate_vtxo = Vtxo::new_with_delegator(
968 self.secp(),
969 *server_signer,
970 owner_pk,
971 *dpk,
972 *exit_delay,
973 server_info.network,
974 )?;
975 addresses.push(delegate_vtxo.to_ark_address());
976 }
977 }
978 }
979
980 batch.push((index, kp, addresses));
981 }
982
983 if batch.is_empty() {
984 break;
985 }
986
987 let addresses = batch.iter().flat_map(|(_, _, addrs)| addrs.iter().copied());
989
990 let vtxo_list = self.list_vtxos_for_addresses(addresses).await?;
991
992 let used_scripts: HashSet<&ScriptBuf> = vtxo_list.all().map(|v| &v.script).collect();
994
995 let mut found_any = false;
997 for (index, kp, addrs) in batch {
998 let used_addr = addrs.iter().find(|addr| {
999 let script = addr.to_p2tr_script_pubkey();
1000 used_scripts.contains(&script)
1001 });
1002 if let Some(addr) = used_addr {
1003 tracing::debug!(index, addr = %addr, "Found used address");
1004 self.inner
1005 .key_provider
1006 .cache_discovered_keypair(index, kp)?;
1007 discovered_count += 1;
1008 found_any = true;
1009 }
1010 }
1011
1012 if !found_any {
1014 break;
1015 }
1016
1017 start_index = start_index
1018 .checked_add(gap_limit)
1019 .ok_or_else(|| Error::ad_hoc("Key discovery index overflow"))?;
1020 }
1021
1022 tracing::info!(discovered_count, "Key discovery completed");
1023
1024 Ok(discovered_count)
1025 }
1026
1027 pub fn get_boarding_address(&self) -> Result<Address, Error> {
1029 let server_info = &self.server_info()?;
1030
1031 let boarding_output = self.inner.wallet.new_boarding_output(
1032 server_info.signer_pk.into(),
1033 server_info.boarding_exit_delay,
1034 server_info.network,
1035 )?;
1036
1037 Ok(boarding_output.address().clone())
1038 }
1039
1040 pub fn get_onchain_address(&self) -> Result<Address, Error> {
1041 self.inner.wallet.get_onchain_address()
1042 }
1043
1044 pub fn get_boarding_addresses(&self) -> Result<Vec<Address>, Error> {
1045 let server_info = &self.server_info()?;
1046
1047 let outputs = self.persist_watch_boarding_outputs(server_info)?;
1056
1057 let mut seen = HashSet::new();
1058 let mut addresses = Vec::with_capacity(outputs.len());
1059 for output in &outputs {
1060 let address = output.address().clone();
1061 if seen.insert(address.clone()) {
1062 addresses.push(address);
1063 }
1064 }
1065
1066 Ok(addresses)
1067 }
1068
1069 fn persist_watch_boarding_outputs(
1079 &self,
1080 server_info: &server::Info,
1081 ) -> Result<Vec<BoardingOutput>, Error> {
1082 let candidate_delays =
1083 ark_core::candidate_exit_delays(server_info.boarding_exit_delay, server_info.network)?;
1084
1085 let mut outputs = Vec::new();
1086 for server_pk in server_info.all_server_keys() {
1087 for exit_delay in &candidate_delays {
1088 let boarding_output = self.inner.wallet.new_boarding_output(
1089 server_pk,
1090 *exit_delay,
1091 server_info.network,
1092 )?;
1093 outputs.push(boarding_output);
1094 }
1095 }
1096
1097 Ok(outputs)
1098 }
1099
1100 pub async fn get_virtual_tx_outpoints(
1101 &self,
1102 addresses: impl Iterator<Item = ArkAddress>,
1103 ) -> Result<Vec<VirtualTxOutPoint>, Error> {
1104 let request = GetVtxosRequest::new_for_addresses(addresses);
1105 self.fetch_all_vtxos(request).await
1106 }
1107
1108 pub async fn list_vtxos(&self) -> Result<(VtxoList, HashMap<ScriptBuf, Vtxo>), Error> {
1109 let ark_addresses = self.get_offchain_addresses()?;
1110
1111 let script_pubkey_to_vtxo_map = ark_addresses
1112 .iter()
1113 .map(|(a, v)| (a.to_p2tr_script_pubkey(), v.clone()))
1114 .collect();
1115
1116 let addresses = ark_addresses.iter().map(|(a, _)| a).copied();
1117
1118 let vtxo_list = self.list_vtxos_for_addresses(addresses).await?;
1119
1120 Ok((vtxo_list, script_pubkey_to_vtxo_map))
1121 }
1122
1123 pub async fn list_vtxos_for_addresses(
1124 &self,
1125 addresses: impl Iterator<Item = ArkAddress>,
1126 ) -> Result<VtxoList, Error> {
1127 let virtual_tx_outpoints = self
1128 .get_virtual_tx_outpoints(addresses)
1129 .await
1130 .context("failed to get VTXOs for addresses")?;
1131
1132 let vtxo_list = VtxoList::new(self.server_info()?.dust, virtual_tx_outpoints);
1133
1134 Ok(vtxo_list)
1135 }
1136
1137 pub async fn list_vtxos_for_outpoints(
1138 &self,
1139 outpoints: Vec<OutPoint>,
1140 ) -> Result<(VtxoList, HashMap<ScriptBuf, Vtxo>), Error> {
1141 let ark_addresses = self.get_offchain_addresses()?;
1142
1143 let script_pubkey_to_vtxo_map = ark_addresses
1144 .iter()
1145 .map(|(a, v)| (a.to_p2tr_script_pubkey(), v.clone()))
1146 .collect::<HashMap<_, _>>();
1147
1148 let request = GetVtxosRequest::new_for_outpoints(&outpoints);
1149 let virtual_tx_outpoints = self.fetch_all_vtxos(request).await?;
1150
1151 let virtual_tx_outpoints = virtual_tx_outpoints
1153 .into_iter()
1154 .filter(|v| match script_pubkey_to_vtxo_map.get(&v.script) {
1155 Some(_) => true,
1156 None => {
1157 tracing::debug!(outpoint = %v.outpoint, "Missing spend info for VTXO");
1158
1159 false
1160 }
1161 })
1162 .collect();
1163
1164 let vtxo_list = VtxoList::new(self.server_info()?.dust, virtual_tx_outpoints);
1165
1166 Ok((vtxo_list, script_pubkey_to_vtxo_map))
1167 }
1168
1169 pub async fn get_vtxo_chain(
1170 &self,
1171 out_point: OutPoint,
1172 size: i32,
1173 index: i32,
1174 ) -> Result<Option<VtxoChainResponse>, Error> {
1175 let vtxo_chain = timeout_op(
1176 self.inner.timeout,
1177 self.network_client()
1178 .get_vtxo_chain(Some(out_point), Some((size, index))),
1179 )
1180 .await
1181 .context("Failed to fetch VTXO chain")??;
1182
1183 Ok(Some(vtxo_chain))
1184 }
1185
1186 pub async fn offchain_balance(&self) -> Result<OffChainBalance, Error> {
1187 let (vtxo_list, script_map) = self.list_vtxos().await.context("failed to list VTXOs")?;
1188 let now = unix_now()?;
1189 let server_info = self.server_info()?;
1190
1191 let spendable_outpoints: HashSet<OutPoint> = vtxo_list
1192 .spendable_offchain_at(&server_info, now, |script| {
1193 script_map.get(script).map(|vtxo| vtxo.server_pk())
1194 })
1195 .map(|vtxo| vtxo.outpoint)
1196 .collect();
1197
1198 let pre_confirmed = vtxo_list
1199 .pre_confirmed()
1200 .filter(|v| spendable_outpoints.contains(&v.outpoint))
1201 .fold(Amount::ZERO, |acc, x| acc + x.amount);
1202
1203 let confirmed = vtxo_list
1204 .confirmed()
1205 .filter(|v| spendable_outpoints.contains(&v.outpoint))
1206 .fold(Amount::ZERO, |acc, x| acc + x.amount);
1207
1208 let recoverable = vtxo_list
1209 .recoverable()
1210 .fold(Amount::ZERO, |acc, x| acc + x.amount);
1211
1212 let pending_recovery = vtxo_list
1213 .pending_recovery_due_to_signer_at(&server_info, now, |script| {
1214 script_map.get(script).map(|vtxo| vtxo.server_pk())
1215 })
1216 .fold(Amount::ZERO, |acc, x| acc + x.amount);
1217
1218 let mut asset_balances: HashMap<AssetId, u64> = HashMap::new();
1220 for vtxo in vtxo_list.spendable_offchain_at(&server_info, now, |script| {
1221 script_map.get(script).map(|vtxo| vtxo.server_pk())
1222 }) {
1223 for asset in &vtxo.assets {
1224 let total = asset_balances
1225 .get(&asset.asset_id)
1226 .copied()
1227 .unwrap_or(0)
1228 .checked_add(asset.amount)
1229 .ok_or_else(|| Error::ad_hoc("asset balance overflow"))?;
1230 asset_balances.insert(asset.asset_id, total);
1231 }
1232 }
1233
1234 Ok(OffChainBalance {
1235 pre_confirmed,
1236 confirmed,
1237 recoverable,
1238 pending_recovery,
1239 asset_balances,
1240 })
1241 }
1242
1243 pub async fn get_asset(&self, asset_id: AssetId) -> Result<server::AssetInfo, Error> {
1245 timeout_op(
1246 self.inner.timeout,
1247 self.network_client().get_asset(asset_id),
1248 )
1249 .await
1250 .context("Failed to get asset info")?
1251 .map_err(Error::ark_server)
1252 }
1253
1254 pub async fn transaction_history(&self) -> Result<Vec<history::Transaction>, Error> {
1255 let mut boarding_transactions = Vec::new();
1256 let mut boarding_commitment_transactions = Vec::new();
1257
1258 let boarding_addresses = self.get_boarding_addresses()?;
1259 for boarding_address in boarding_addresses.iter() {
1260 let outpoints = timeout_op(
1261 self.inner.timeout,
1262 self.blockchain().find_outpoints(boarding_address),
1263 )
1264 .await
1265 .context("Failed to find outpoints")??;
1266
1267 for ExplorerUtxo {
1268 outpoint,
1269 amount,
1270 confirmation_blocktime,
1271 ..
1272 } in outpoints.iter()
1273 {
1274 let confirmed_at = confirmation_blocktime.map(|t| t as i64);
1275
1276 boarding_transactions.push(history::Transaction::Boarding {
1277 txid: outpoint.txid,
1278 amount: *amount,
1279 confirmed_at,
1280 });
1281
1282 let status = timeout_op(
1283 self.inner.timeout,
1284 self.blockchain()
1285 .get_output_status(&outpoint.txid, outpoint.vout),
1286 )
1287 .await
1288 .context("Failed to get Tx output status")??;
1289
1290 if let Some(spend_txid) = status.spend_txid {
1291 boarding_commitment_transactions.push(spend_txid);
1292 }
1293 }
1294 }
1295
1296 let (vtxo_list, _) = self.list_vtxos().await?;
1297
1298 let spent_outpoints = vtxo_list.spent().cloned().collect::<Vec<_>>();
1299 let unspent_outpoints = vtxo_list.all_unspent().cloned().collect::<Vec<_>>();
1300
1301 let incoming_transactions = generate_incoming_vtxo_transaction_history(
1302 &spent_outpoints,
1303 &unspent_outpoints,
1304 &boarding_commitment_transactions,
1305 )?;
1306
1307 let outgoing_txs =
1308 generate_outgoing_vtxo_transaction_history(&spent_outpoints, &unspent_outpoints)?;
1309
1310 let mut outgoing_transactions = vec![];
1311 for tx in outgoing_txs {
1312 let tx = match tx {
1313 OutgoingTransaction::Complete(tx) => tx,
1314 OutgoingTransaction::Incomplete(incomplete_tx) => {
1315 let first_outpoint = incomplete_tx.first_outpoint();
1316
1317 let request = GetVtxosRequest::new_for_outpoints(&[first_outpoint]);
1318 let vtxos = self.fetch_all_vtxos(request).await?;
1319
1320 match vtxos.first() {
1321 Some(virtual_tx_outpoint) => {
1322 match incomplete_tx.finish(virtual_tx_outpoint) {
1323 Ok(tx) => tx,
1324 Err(e) => {
1325 tracing::warn!(
1326 %first_outpoint,
1327 "Could not finish outgoing TX, skipping: {e}"
1328 );
1329 continue;
1330 }
1331 }
1332 }
1333 None => {
1334 tracing::warn!(
1335 %first_outpoint,
1336 "Could not find virtual TX outpoint for outgoing TX, skipping"
1337 );
1338 continue;
1339 }
1340 }
1341 }
1342 OutgoingTransaction::IncompleteOffboard(incomplete_offboard) => {
1343 let status = timeout_op(
1344 self.inner.timeout,
1345 self.blockchain()
1346 .get_tx_status(&incomplete_offboard.commitment_txid()),
1347 )
1348 .await
1349 .context("failed to get commitment TX status")??;
1350
1351 incomplete_offboard.finish(status.confirmed_at)
1352 }
1353 };
1354
1355 outgoing_transactions.push(tx);
1356 }
1357
1358 let mut txs = [
1359 boarding_transactions,
1360 incoming_transactions,
1361 outgoing_transactions,
1362 ]
1363 .concat();
1364
1365 sort_transactions_by_created_at(&mut txs);
1366
1367 Ok(txs)
1368 }
1369
1370 pub fn dust(&self) -> Result<Amount, Error> {
1372 Ok(self.server_info()?.dust)
1373 }
1374
1375 pub fn network_client(&self) -> ark_grpc::Client {
1376 self.inner.network_client.clone()
1377 }
1378
1379 async fn fetch_all_vtxos(
1381 &self,
1382 request: GetVtxosRequest,
1383 ) -> Result<Vec<VirtualTxOutPoint>, Error> {
1384 if request.reference().is_empty() {
1385 return Ok(Vec::new());
1386 }
1387
1388 let mut all_vtxos = Vec::new();
1389 let mut cursor = 0;
1390 const PAGE_SIZE: i32 = 100;
1391
1392 loop {
1393 let paged_request = request.clone().with_page(PAGE_SIZE, cursor);
1394 let response = timeout_op(
1395 self.inner.timeout,
1396 self.network_client().list_vtxos(paged_request),
1397 )
1398 .await
1399 .context("failed to fetch list of VTXOs")??;
1400
1401 all_vtxos.extend(response.vtxos);
1402
1403 match response.page {
1405 Some(page) if page.next < page.total => {
1406 cursor = page.next;
1407 }
1408 _ => break,
1409 }
1410 }
1411
1412 Ok(all_vtxos)
1413 }
1414
1415 fn next_keypair(&self, keypair_index: KeypairIndex) -> Result<Keypair, Error> {
1416 self.inner.key_provider.get_next_keypair(keypair_index)
1417 }
1418 fn keypair_by_pk(&self, pk: &XOnlyPublicKey) -> Result<Keypair, Error> {
1419 self.inner.key_provider.get_keypair_for_pk(pk)
1420 }
1421
1422 fn derivation_index_for_pk(&self, pk: &XOnlyPublicKey) -> Option<u32> {
1423 self.inner.key_provider.get_derivation_index_for_pk(pk)
1424 }
1425
1426 fn secp(&self) -> &Secp256k1<All> {
1427 &self.inner.secp
1428 }
1429
1430 fn blockchain(&self) -> &B {
1431 &self.inner.blockchain
1432 }
1433
1434 fn swap_storage(&self) -> &S {
1435 &self.inner.swap_storage
1436 }
1437
1438 pub async fn bump_tx(&self, parent: &Transaction) -> Result<Transaction, Error> {
1440 let fee_rate = timeout_op(self.inner.timeout, self.blockchain().get_fee_rate())
1441 .await
1442 .context("Failed to retrieve fee rate")??;
1443
1444 let change_address = self.inner.wallet.get_onchain_address()?;
1445
1446 let select_coins_fn =
1448 |target_amount: Amount| -> Result<UtxoCoinSelection, ark_core::Error> {
1449 self.inner.wallet.select_coins(target_amount).map_err(|e| {
1450 ark_core::Error::ad_hoc(format!("failed to select coins for anchor TX: {e}"))
1451 })
1452 };
1453
1454 let mut psbt = build_anchor_tx(parent, change_address, fee_rate, select_coins_fn)
1456 .map_err(|e| Error::ad_hoc(e.to_string()))?;
1457
1458 self.inner
1460 .wallet
1461 .sign(&mut psbt)
1462 .context("failed to sign bump TX")?;
1463
1464 let tx = psbt.extract_tx().map_err(Error::ad_hoc)?;
1466
1467 Ok(tx)
1468 }
1469
1470 pub async fn subscribe_to_scripts(
1486 &self,
1487 scripts: Vec<ArkAddress>,
1488 subscription_id: Option<String>,
1489 ) -> Result<String, Error> {
1490 self.network_client()
1491 .subscribe_to_scripts(scripts, subscription_id)
1492 .await
1493 .map_err(Into::into)
1494 }
1495
1496 pub async fn unsubscribe_from_scripts(
1506 &self,
1507 scripts: Vec<ArkAddress>,
1508 subscription_id: String,
1509 ) -> Result<(), Error> {
1510 self.network_client()
1511 .unsubscribe_from_scripts(scripts, subscription_id)
1512 .await
1513 .map_err(Into::into)
1514 }
1515
1516 pub async fn get_subscription(
1529 &self,
1530 subscription_id: String,
1531 ) -> Result<impl Stream<Item = Result<SubscriptionResponse, ark_grpc::Error>> + Unpin, Error>
1532 {
1533 self.network_client()
1534 .get_subscription(subscription_id)
1535 .await
1536 .map_err(Into::into)
1537 }
1538}
1539
1540#[cfg(test)]
1541mod digest_guard_tests {
1542 use super::*;
1543 use ark_grpc::test_utils;
1544 use bitcoin::key::Secp256k1;
1545 use bitcoin::secp256k1::SecretKey;
1546 use bitcoin::Address;
1547 use std::convert::Infallible;
1548 use std::future::Future;
1549 use std::pin::Pin;
1550 use std::sync::atomic::AtomicUsize;
1551 use std::sync::atomic::Ordering;
1552 use std::task::Context;
1553 use std::task::Poll;
1554 use tokio::net::TcpListener;
1555 use tonic::body::Body;
1556 use tonic::codegen::http;
1557 use tonic::codegen::Service;
1558 use tonic::server::NamedService;
1559 use tonic::server::UnaryService;
1560
1561 #[derive(Clone, Default)]
1562 struct MockArkServer {
1563 state: Arc<MockState>,
1564 }
1565
1566 #[derive(Default)]
1567 struct MockState {
1568 get_info_calls: AtomicUsize,
1569 list_vtxos_calls: AtomicUsize,
1570 }
1571
1572 impl Service<http::Request<Body>> for MockArkServer {
1573 type Response = http::Response<Body>;
1574 type Error = Infallible;
1575 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
1576
1577 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1578 Poll::Ready(Ok(()))
1579 }
1580
1581 fn call(&mut self, req: http::Request<Body>) -> Self::Future {
1582 match req.uri().path() {
1583 "/ark.v1.ArkService/GetInfo" => {
1584 let method = GetInfoSvc {
1585 state: self.state.clone(),
1586 };
1587 Box::pin(async move {
1588 let codec = tonic_prost::ProstCodec::default();
1589 let mut grpc = tonic::server::Grpc::new(codec);
1590 Ok(grpc.unary(method, req).await)
1591 })
1592 }
1593 "/ark.v1.IndexerService/GetVtxos" => {
1594 let method = ListVtxosSvc {
1595 state: self.state.clone(),
1596 };
1597 Box::pin(async move {
1598 let codec = tonic_prost::ProstCodec::default();
1599 let mut grpc = tonic::server::Grpc::new(codec);
1600 Ok(grpc.unary(method, req).await)
1601 })
1602 }
1603 _ => Box::pin(async move {
1604 Ok(http::Response::builder()
1605 .status(200)
1606 .header("grpc-status", "12")
1607 .header("content-type", "application/grpc")
1608 .body(Body::empty())
1609 .unwrap())
1610 }),
1611 }
1612 }
1613 }
1614
1615 impl NamedService for MockArkServer {
1616 const NAME: &'static str = "ark.v1.ArkService";
1617 }
1618
1619 #[derive(Clone)]
1620 struct MockIndexerServer(MockArkServer);
1621
1622 impl Service<http::Request<Body>> for MockIndexerServer {
1623 type Response = http::Response<Body>;
1624 type Error = Infallible;
1625 type Future = <MockArkServer as Service<http::Request<Body>>>::Future;
1626
1627 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1628 self.0.poll_ready(cx)
1629 }
1630
1631 fn call(&mut self, req: http::Request<Body>) -> Self::Future {
1632 self.0.call(req)
1633 }
1634 }
1635
1636 impl NamedService for MockIndexerServer {
1637 const NAME: &'static str = "ark.v1.IndexerService";
1638 }
1639
1640 #[derive(Clone)]
1641 struct GetInfoSvc {
1642 state: Arc<MockState>,
1643 }
1644
1645 impl UnaryService<test_utils::GetInfoRequest> for GetInfoSvc {
1646 type Response = test_utils::GetInfoResponse;
1647 type Future = Pin<
1648 Box<dyn Future<Output = Result<tonic::Response<Self::Response>, tonic::Status>> + Send>,
1649 >;
1650
1651 fn call(&mut self, _request: tonic::Request<test_utils::GetInfoRequest>) -> Self::Future {
1652 self.state.get_info_calls.fetch_add(1, Ordering::SeqCst);
1653 Box::pin(async { Ok(tonic::Response::new(info_response("fresh-digest"))) })
1654 }
1655 }
1656
1657 #[derive(Clone)]
1658 struct ListVtxosSvc {
1659 state: Arc<MockState>,
1660 }
1661
1662 impl UnaryService<test_utils::GetVtxosRequest> for ListVtxosSvc {
1663 type Response = test_utils::GetVtxosResponse;
1664 type Future = Pin<
1665 Box<dyn Future<Output = Result<tonic::Response<Self::Response>, tonic::Status>> + Send>,
1666 >;
1667
1668 fn call(&mut self, _request: tonic::Request<test_utils::GetVtxosRequest>) -> Self::Future {
1669 self.state.list_vtxos_calls.fetch_add(1, Ordering::SeqCst);
1670 Box::pin(async {
1671 Err(tonic::Status::failed_precondition(
1672 "DIGEST_MISMATCH: invalid digest header",
1673 ))
1674 })
1675 }
1676 }
1677
1678 fn info_response(digest: &str) -> test_utils::GetInfoResponse {
1679 let secp = Secp256k1::new();
1680 let secret_key = SecretKey::from_slice(&[1; 32]).unwrap();
1681 let keypair = Keypair::from_secret_key(&secp, &secret_key);
1682 let public_key = bitcoin::secp256k1::PublicKey::from_secret_key(&secp, &secret_key);
1683 let (xonly, _) = keypair.x_only_public_key();
1684 let address = Address::p2tr(&secp, xonly, None, bitcoin::Network::Regtest);
1685
1686 test_utils::GetInfoResponse {
1687 version: "0.9.9".to_string(),
1688 signer_pubkey: public_key.to_string(),
1689 forfeit_pubkey: public_key.to_string(),
1690 forfeit_address: address.to_string(),
1691 checkpoint_tapscript: String::new(),
1692 network: "regtest".to_string(),
1693 session_duration: 60,
1694 unilateral_exit_delay: 144,
1695 boarding_exit_delay: 144,
1696 utxo_min_amount: 0,
1697 utxo_max_amount: 0,
1698 vtxo_min_amount: 0,
1699 vtxo_max_amount: 0,
1700 dust: 1000,
1701 fees: None,
1702 scheduled_session: None,
1703 deprecated_signers: Vec::new(),
1704 service_status: Default::default(),
1705 digest: digest.to_string(),
1706 max_tx_weight: 0,
1707 max_op_return_outputs: 0,
1708 }
1709 }
1710
1711 #[tokio::test]
1712 async fn guarded_client_refreshes_info_and_does_not_retry_on_digest_mismatch() {
1713 let _ = rustls::crypto::ring::default_provider().install_default();
1714
1715 let mock = MockArkServer::default();
1716 let state = mock.state.clone();
1717 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1718 let addr = listener.local_addr().unwrap();
1719 let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
1720
1721 let indexer_mock = MockIndexerServer(mock.clone());
1722 tokio::spawn(async move {
1723 tonic::transport::Server::builder()
1724 .add_service(mock)
1725 .add_service(indexer_mock)
1726 .serve_with_incoming(incoming)
1727 .await
1728 .unwrap();
1729 });
1730
1731 let mut inner = ark_grpc::Client::new(format!("http://{addr}"));
1732 inner.connect().await.unwrap();
1733
1734 let initial_info: server::Info = info_response("stale-digest").try_into().unwrap();
1735 let cached_state = Arc::new(RwLock::new(ServerState {
1736 server_info: initial_info,
1737 fee_estimator: build_fee_estimator(&info_response("stale-digest").try_into().unwrap())
1738 .unwrap(),
1739 }));
1740 let hook_state = cached_state.clone();
1741 inner.set_info_refresh_hook(move |server_info| {
1742 update_server_state(&hook_state, server_info)
1743 .map_err(|err| Box::new(err) as Box<dyn std::error::Error + Send + Sync>)
1744 });
1745
1746 let err = match inner
1747 .list_vtxos(GetVtxosRequest::new_for_outpoints(&[OutPoint::null()]))
1748 .await
1749 {
1750 Ok(_) => panic!("list_vtxos unexpectedly succeeded"),
1751 Err(err) => err,
1752 };
1753
1754 assert!(err.is_server_info_changed());
1755 assert!(Error::from(err).is_server_info_changed());
1756 assert_eq!(state.list_vtxos_calls.load(Ordering::SeqCst), 1);
1757 assert_eq!(state.get_info_calls.load(Ordering::SeqCst), 1);
1758 assert_eq!(
1759 cached_state.read().unwrap().server_info.digest,
1760 "fresh-digest"
1761 );
1762 }
1763}