1#[cfg(all(any(target_os = "android", target_os = "ios"), feature = "tls-native-roots"))]
295compile_error!("feature `tls-native-roots` can't be used on Android or iOS, use `tls-webpki-roots` instead");
296
297pub extern crate ark;
298
299pub extern crate bip39;
300pub extern crate lightning_invoice;
301pub extern crate lnurl as lnurllib;
302
303#[macro_use] extern crate anyhow;
304#[macro_use] extern crate async_trait;
305#[macro_use] extern crate serde;
306
307pub mod actions;
308pub mod chain;
309pub mod exit;
310pub mod movement;
311pub mod onchain;
312pub mod payment_request;
313pub mod persist;
314pub mod round;
315pub mod subsystem;
316pub mod vtxo;
317
318pub mod lock_manager;
319
320mod arkoor;
321mod board;
322mod config;
323mod daemon;
324mod fees;
325mod lightning;
326mod mailbox;
327mod notification;
328mod offboard;
329#[cfg(feature = "socks5-proxy")]
330mod proxy;
331mod psbtext;
332mod utils;
333
334pub use self::arkoor::{ArkoorCreateResult, ArkoorAddressError};
335pub use self::config::{BarkNetwork, Config};
336pub use self::daemon::DaemonHandle;
337pub use self::fees::FeeEstimate;
338pub use self::notification::{WalletNotification, NotificationStream};
339pub use self::vtxo::WalletVtxo;
340pub use self::utils::time;
341
342use std::borrow::Cow;
343use std::collections::HashSet;
344use std::iter;
345use std::path::PathBuf;
346use std::sync::Arc;
347use std::time::Duration;
348
349use anyhow::{bail, Context};
350use bip39::Mnemonic;
351use bitcoin::{Amount, Network, OutPoint};
352use bitcoin::bip32::{self, ChildNumber, Fingerprint};
353use bitcoin::secp256k1::{self, Keypair, PublicKey};
354use futures::stream::FuturesUnordered;
355use log::{debug, error, info, trace, warn};
356use tokio_stream::StreamExt;
357
358use ark::{ArkInfo, ProtocolEncoding, Vtxo, VtxoId, VtxoPolicy, VtxoRequest};
359use ark::address::VtxoDelivery;
360use ark::fees::{validate_and_subtract_fee_min_dust, VtxoFeeInfo};
361use ark::rounds::{RoundAttempt, RoundEvent};
362use ark::vtxo::{Full, PubkeyVtxoPolicy, VtxoRef};
363use ark::vtxo::policy::signing::VtxoSigner;
364use bitcoin_ext::{BlockHeight, P2TR_DUST, TxStatus};
365use server_rpc::{protos, ServerConnection};
366use server_rpc::client::{ConnectError, CreateEndpointError};
367
368use crate::chain::{ChainSource, ChainSourceSpec};
369use crate::exit::Exit;
370use crate::lock_manager::LockManager;
371use crate::movement::{Movement, MovementId, PaymentMethod};
372use crate::movement::manager::MovementManager;
373use crate::notification::NotificationDispatch;
374use crate::onchain::{ExitUnilaterally, PreparePsbt, SignPsbt, Utxo};
375use crate::onchain::DaemonizableOnchainWallet;
376use crate::persist::BarkPersister;
377use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
378#[cfg(feature = "socks5-proxy")]
379use crate::proxy::proxy_for_url;
380use crate::round::{RoundParticipation, RoundSecretNonces, RoundStatus};
381use crate::subsystem::RoundMovement;
382use crate::utils::rejected_vtxos_from_error;
383use crate::vtxo::{FilterVtxos, RefreshStrategy, VtxoFilter, VtxoStateKind};
384
385#[cfg(all(feature = "wasm-web", feature = "socks5-proxy"))]
386compile_error!("features `wasm-web` does not support feature `socks5-proxy");
387
388#[cfg(all(feature = "wasm-web", feature = "bitcoind-rpc"))]
389compile_error!("`wasm-web` does not support the `bitcoind-rpc` feature");
390
391const BARK_PURPOSE_INDEX: u32 = 350;
393const VTXO_KEYS_INDEX: u32 = 0;
395const MAILBOX_KEY_INDEX: u32 = 1;
397const RECOVERY_MAILBOX_KEY_INDEX: u32 = 2;
399const MISSING_SERVER_TRANSPORT_HELP: &str =
400 "This build of bark-wallet does not include an Ark server transport backend. Enable feature `bark-wallet/native` or `bark-wallet/wasm-web` to use server-backed wallet functionality.";
401
402const SUBSCRIBE_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 60);
404
405lazy_static::lazy_static! {
406 static ref SECP: secp256k1::Secp256k1<secp256k1::All> = secp256k1::Secp256k1::new();
408}
409
410fn log_server_pubkey_changed_error(expected: PublicKey, got: PublicKey) {
416 error!(
417 "
418Server public key has changed!
419
420The Ark server's public key is different from the one stored when this
421wallet was created. This typically happens when:
422
423 - The server operator has rotated their keys
424 - You are connecting to a different server
425 - The server has been replaced
426
427For safety, this wallet will not connect to the server until you
428resolve this. You can recover your funds on-chain by doing an emergency exit.
429
430This will exit your VTXOs to on-chain Bitcoin without needing the server's cooperation.
431
432Expected: {expected}
433Got: {got}")
434}
435
436fn log_server_mailbox_pubkey_changed_error(expected: PublicKey, got: PublicKey) {
438 error!(
439 "
440Server mailbox public key has changed!
441
442The Ark server's mailbox public key is different from the one stored when this
443wallet was created. This typically happens when:
444
445 - The server operator has rotated their keys
446 - You are connecting to a different server
447 - The server has been replaced
448
449For safety, this wallet will not connect to the server until you resolve this.
450
451Unlike a server pubkey change, your VTXOs are not at risk - the mailbox pubkey
452only affects address receive semantics. Any Ark addresses you previously
453shared will stop receiving new payments; you will need to share new addresses
454after reconnecting.
455
456Expected: {expected}
457Got: {got}")
458}
459
460#[derive(Debug, Clone)]
462pub struct LightningReceiveBalance {
463 pub total: Amount,
465 pub claimable: Amount,
467}
468
469#[derive(Debug, Clone)]
471pub struct Balance {
472 pub spendable: Amount,
474 pub pending_lightning_send: Amount,
476 pub claimable_lightning_receive: Amount,
478 pub pending_in_round: Amount,
480 pub pending_exit: Option<Amount>,
486 pub pending_board: Amount,
488}
489
490pub struct UtxoInfo {
491 pub outpoint: OutPoint,
492 pub amount: Amount,
493 pub confirmation_height: Option<u32>,
494}
495
496impl From<Utxo> for UtxoInfo {
497 fn from(value: Utxo) -> Self {
498 match value {
499 Utxo::Local(o) => UtxoInfo {
500 outpoint: o.outpoint,
501 amount: o.amount,
502 confirmation_height: o.confirmation_height,
503 },
504 Utxo::Exit(e) => UtxoInfo {
505 outpoint: e.vtxo.point(),
506 amount: e.vtxo.amount(),
507 confirmation_height: Some(e.height),
508 },
509 }
510 }
511}
512
513pub struct OffchainBalance {
516 pub available: Amount,
518 pub pending_in_round: Amount,
520 pub pending_exit: Amount,
523}
524
525#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
527pub struct WalletProperties {
528 pub network: Network,
532
533 pub fingerprint: Fingerprint,
537
538 pub server_pubkey: Option<PublicKey>,
545
546 pub server_mailbox_pubkey: Option<PublicKey>,
554}
555
556pub struct WalletSeed {
562 master: bip32::Xpriv,
563 vtxo: bip32::Xpriv,
564}
565
566impl WalletSeed {
567 pub fn new_from_seed(network: Network, seed: &[u8; 64]) -> Self {
569 let bark_path = [ChildNumber::from_hardened_idx(BARK_PURPOSE_INDEX).unwrap()];
570 let master = bip32::Xpriv::new_master(network, seed)
571 .expect("invalid seed")
572 .derive_priv(&SECP, &bark_path)
573 .expect("purpose is valid");
574
575 let vtxo_path = [ChildNumber::from_hardened_idx(VTXO_KEYS_INDEX).unwrap()];
576 let vtxo = master.derive_priv(&SECP, &vtxo_path)
577 .expect("vtxo path is valid");
578
579 Self { master, vtxo }
580 }
581
582 pub fn new_from_mnemonic(network: Network, mnemonic: &Mnemonic) -> Self {
584 Self::new_from_seed(network, &mnemonic.to_seed(""))
585 }
586
587 pub fn fingerprint(&self) -> Fingerprint {
588 self.master.fingerprint(&SECP)
589 }
590
591 fn derive_vtxo_keypair(&self, idx: u32) -> Keypair {
592 self.vtxo.derive_priv(&SECP, &[idx.into()]).unwrap().to_keypair(&SECP)
593 }
594
595 fn to_mailbox_keypair(&self) -> Keypair {
596 let mailbox_path = [ChildNumber::from_hardened_idx(MAILBOX_KEY_INDEX).unwrap()];
597 self.master.derive_priv(&SECP, &mailbox_path).unwrap().to_keypair(&SECP)
598 }
599
600 fn to_recovery_mailbox_keypair(&self) -> Keypair {
601 let mailbox_path = [ChildNumber::from_hardened_idx(RECOVERY_MAILBOX_KEY_INDEX).unwrap()];
602 self.master.derive_priv(&SECP, &mailbox_path).unwrap().to_keypair(&SECP)
603 }
604}
605
606pub struct OpenWalletArgs {
608 pub run_daemon: bool,
614
615 pub datadir: Option<PathBuf>,
625
626 pub persister: Option<Arc<dyn BarkPersister>>,
630
631 pub lock_manager: Option<Box<dyn LockManager>>,
638
639 pub onchain: Option<Arc<tokio::sync::RwLock<dyn DaemonizableOnchainWallet>>>,
643
644 pub create_if_not_exists: bool,
648
649 pub create_without_server: bool,
653}
654
655impl Default for OpenWalletArgs {
656 fn default() -> Self {
657 Self {
658 run_daemon: true,
659 onchain: None,
660 datadir: None,
661 persister: None,
662 lock_manager: None,
663 create_if_not_exists: true,
664 create_without_server: false,
665 }
666 }
667}
668
669struct WalletInner {
670 chain: Arc<ChainSource>,
672
673 exit: Exit,
675
676 movements: Arc<MovementManager>,
678
679 notifications: NotificationDispatch,
681
682 config: Config,
684
685 db: Arc<dyn BarkPersister>,
687
688 lock_manager: Box<dyn LockManager>,
692
693 seed: WalletSeed,
695
696 server: tokio::sync::OnceCell<ServerConnection>,
703
704 daemon: parking_lot::Mutex<Option<DaemonHandle>>,
706
707 last_force_exit_scan_tip: tokio::sync::Mutex<Option<BlockHeight>>,
711
712 pub(crate) round_secret_nonces: RoundSecretNonces,
715}
716
717#[derive(Clone)]
848pub struct Wallet {
849 inner: Arc<WalletInner>,
850}
851
852impl Wallet {
853 pub async fn network(&self) -> anyhow::Result<Network> {
854 Ok(self.properties().await?.network)
855 }
856
857 pub fn chain(&self) -> &Arc<ChainSource> {
859 &self.inner.chain
860 }
861
862 pub fn exit_mgr(&self) -> &Exit {
864 &self.inner.exit
865 }
866
867 pub fn movements_mgr(&self) -> &MovementManager {
869 &self.inner.movements
870 }
871
872 pub async fn peek_next_keypair(&self) -> anyhow::Result<(Keypair, u32)> {
875 let last_revealed = self.inner.db.get_last_vtxo_key_index().await?;
876
877 let index = last_revealed.map(|i| i + 1).unwrap_or(u32::MIN);
878 let keypair = self.inner.seed.derive_vtxo_keypair(index);
879
880 Ok((keypair, index))
881 }
882
883 pub async fn derive_store_next_keypair(&self) -> anyhow::Result<(Keypair, u32)> {
886 let (keypair, index) = self.peek_next_keypair().await?;
887 self.inner.db.store_vtxo_key(index, keypair.public_key()).await?;
888 Ok((keypair, index))
889 }
890
891 #[deprecated(note = "use peek_keypair instead")]
892 pub async fn peak_keypair(&self, index: u32) -> anyhow::Result<Keypair> {
893 self.peek_keypair(index).await
894 }
895
896 pub async fn peek_keypair(&self, index: u32) -> anyhow::Result<Keypair> {
910 let keypair = self.inner.seed.derive_vtxo_keypair(index);
911 if self.inner.db.get_public_key_idx(&keypair.public_key()).await?.is_some() {
912 Ok(keypair)
913 } else {
914 bail!("VTXO key {} does not exist, please derive it first", index)
915 }
916 }
917
918
919 pub async fn pubkey_keypair(&self, public_key: &PublicKey) -> anyhow::Result<Option<(u32, Keypair)>> {
931 if let Some(index) = self.inner.db.get_public_key_idx(&public_key).await? {
932 Ok(Some((index, self.inner.seed.derive_vtxo_keypair(index))))
933 } else {
934 Ok(None)
935 }
936 }
937
938 pub async fn get_vtxo_key(&self, vtxo: impl VtxoRef) -> anyhow::Result<Keypair> {
949 let bare_vtxo = match vtxo.as_bare_vtxo() {
950 Some(bare) => bare,
951 None => Cow::Owned(self.get_vtxo_by_id(vtxo.vtxo_id()).await?.vtxo),
952 };
953 let pubkey = self.find_signable_clause(&bare_vtxo).await
954 .context("VTXO is not signable by wallet")?
955 .pubkey();
956 let idx = self.inner.db.get_public_key_idx(&pubkey).await?
957 .context("VTXO key not found")?;
958 Ok(self.inner.seed.derive_vtxo_keypair(idx))
959 }
960
961 #[deprecated(note = "use peek_address instead")]
962 pub async fn peak_address(&self, index: u32) -> anyhow::Result<ark::Address> {
963 self.peek_address(index).await
964 }
965
966 pub async fn peek_address(&self, index: u32) -> anyhow::Result<ark::Address> {
970 let properties = self.properties().await?;
971 let network = properties.network;
972 let keypair = self.peek_keypair(index).await?;
973 let mailbox = self.mailbox_identifier();
974
975
976 let (server_pubkey, mailbox_pubkey) =
977 if let (Some(spk), Some(mpk)) = (properties.server_pubkey, properties.server_mailbox_pubkey) {
978 (spk, mpk)
979 } else {
980 let (_, ark_info) = self.require_server().await?;
981 (ark_info.server_pubkey, ark_info.mailbox_pubkey)
982 };
983
984 Ok(ark::Address::builder()
985 .testnet(network != bitcoin::Network::Bitcoin)
986 .server_pubkey(server_pubkey)
987 .pubkey_policy(keypair.public_key())
988 .mailbox(mailbox_pubkey, mailbox, &keypair)
989 .expect("Failed to assign mailbox")
990 .into_address().unwrap())
991 }
992
993 pub async fn new_address_with_index(&self) -> anyhow::Result<(ark::Address, u32)> {
997 let (_, index) = self.derive_store_next_keypair().await?;
998 let addr = self.peek_address(index).await?;
999 Ok((addr, index))
1000 }
1001
1002 pub async fn new_address(&self) -> anyhow::Result<ark::Address> {
1004 let (addr, _) = self.new_address_with_index().await?;
1005 Ok(addr)
1006 }
1007
1008 pub async fn create(
1017 network: Network,
1018 seed: &WalletSeed,
1019 config: &Config,
1020 db: &dyn BarkPersister,
1021 lock_manager: &dyn LockManager,
1022 allow_unreachable_server: bool,
1023 ) -> anyhow::Result<()> {
1024 trace!("Config: {:?}", config);
1025
1026 let wallet_fingerprint = seed.fingerprint();
1027
1028 let create_guard = lock_manager.lock(
1033 &format!("{}.create", wallet_fingerprint),
1034 Duration::from_secs(5),
1035 ).await.context("wallet initialization already in progress")?;
1036
1037 if let Some(existing) = db.read_properties().await? {
1038 trace!("Existing config: {:?}", existing);
1039 bail!("cannot overwrite already existing config")
1040 }
1041
1042 let (server_pubkey, mailbox_pubkey) = match Self::connect_to_server(&config, network).await {
1044 Ok(conn) => {
1045 let ark_info = conn.ark_info().await;
1046 (Some(ark_info.server_pubkey), Some(ark_info.mailbox_pubkey))
1047 },
1048 Err(_) if allow_unreachable_server => (None, None),
1049 Err(err) => {
1050 bail!("Failed to connect to provided server: {:#}", err);
1051 },
1052 };
1053
1054 let properties = WalletProperties {
1055 network,
1056 fingerprint: wallet_fingerprint,
1057 server_pubkey,
1058 server_mailbox_pubkey: mailbox_pubkey,
1059 };
1060
1061 db.init_wallet(&properties).await.context("cannot init wallet in the database")?;
1063 info!("Created wallet with fingerprint: {}", wallet_fingerprint);
1064 if let Some(pk) = server_pubkey {
1065 info!("Stored server pubkey: {}", pk);
1066 }
1067
1068 drop(create_guard);
1071
1072 Ok(())
1073 }
1074
1075 pub async fn open(
1077 network: Network,
1078 seed: WalletSeed,
1079 config: Config,
1080 args: OpenWalletArgs,
1081 ) -> anyhow::Result<Wallet> {
1082 let fingerprint = seed.fingerprint();
1083 let lock_manager = if let Some(lm) = args.lock_manager {
1084 lm
1085 } else {
1086 crate::lock_manager::platform_default(args.datadir.as_ref(), Some(fingerprint))
1087 .context("failed to instantiate platform default lock manager")?
1088 };
1089
1090 let db = if let Some(db) = args.persister {
1091 db
1092 } else {
1093 if let Some(ref datadir) = args.datadir {
1094 #[cfg(not(target_arch = "wasm32"))]
1095 if !datadir.exists() && args.create_if_not_exists {
1096 tokio::fs::create_dir_all(datadir).await.with_context(|| format!(
1097 "failed to create datadir at {}", datadir.display(),
1098 ))?;
1099 }
1100 }
1101 crate::persist::platform_default(args.datadir.as_ref(), Some(fingerprint)).await
1102 .context("failed to instantiate platform default persister")?
1103 };
1104
1105 let properties = if let Some(p) = db.read_properties().await? {
1106 p
1107 } else if args.create_if_not_exists {
1108 Self::create(
1109 network, &seed, &config, &*db, &*lock_manager, args.create_without_server,
1110 ).await.context("error creating new wallet")?;
1111 db.read_properties().await?
1112 .context("create failed: no wallet properties after Wallet::create was called")?
1113 } else {
1114 bail!("wallet does not exist; use Wallet::create or \
1115 set options.create_if_not_exists to true");
1116 };
1117
1118 if properties.fingerprint != fingerprint {
1119 bail!("incorrect mnemonic")
1120 }
1121
1122 let chain_source = if let Some(ref url) = config.esplora_address {
1123 ChainSourceSpec::Esplora {
1124 url: url.clone(),
1125 }
1126 } else if let Some(ref url) = config.bitcoind_address {
1127 let auth = if let Some(ref c) = config.bitcoind_cookiefile {
1128 bitcoin_ext::rpc::Auth::CookieFile(c.clone())
1129 } else {
1130 bitcoin_ext::rpc::Auth::UserPass(
1131 config.bitcoind_user.clone().context("need bitcoind auth config")?,
1132 config.bitcoind_pass.clone().context("need bitcoind auth config")?,
1133 )
1134 };
1135 ChainSourceSpec::Bitcoind { url: url.clone(), auth }
1136 } else {
1137 bail!("Need to either provide esplora or bitcoind info");
1138 };
1139
1140 #[cfg(feature = "socks5-proxy")]
1141 let chain_proxy = proxy_for_url(&config.socks5_proxy, chain_source.url())?;
1142 let chain_source_client = ChainSource::new(
1143 chain_source, properties.network, config.fallback_fee_rate,
1144 #[cfg(feature = "socks5-proxy")] chain_proxy.as_deref(),
1145 ).await?;
1146 let chain = Arc::new(chain_source_client);
1147 chain.require_version().await
1148 .context("provided chain source doesn't meet version requirement")?;
1149
1150 let server = tokio::sync::OnceCell::new();
1151
1152 let notifications = NotificationDispatch::new();
1153 let movements = Arc::new(MovementManager::new(db.clone(), notifications.clone()));
1154 let exit = Exit::new(db.clone(), chain.clone(), movements.clone()).await?;
1155
1156 let ret = Wallet { inner: Arc::new(WalletInner {
1157 config, db, lock_manager, seed, exit, movements, notifications, server, chain,
1158 daemon: parking_lot::Mutex::new(None),
1159 last_force_exit_scan_tip: tokio::sync::Mutex::new(None),
1160 round_secret_nonces: RoundSecretNonces::new(),
1161 })};
1162
1163 ret.inner.exit.load().await
1164 .context("error loading exit system after opening wallet")?;
1165
1166 if args.run_daemon {
1167 ret.start_daemon(args.onchain)
1168 .context("failed to start daemon after opening wallet")?;
1169 }
1170
1171 Ok(ret)
1172 }
1173
1174 pub fn config(&self) -> &Config {
1176 &self.inner.config
1177 }
1178
1179 pub async fn properties(&self) -> anyhow::Result<WalletProperties> {
1181 let properties = self.inner.db.read_properties().await?.context("Wallet is not initialised")?;
1182 Ok(properties)
1183 }
1184
1185 pub fn fingerprint(&self) -> Fingerprint {
1187 self.inner.seed.fingerprint()
1188 }
1189
1190 async fn connect_to_server(
1191 config: &Config,
1192 network: Network,
1193 ) -> anyhow::Result<ServerConnection> {
1194 let server_address = crate::utils::url_with_default_https_scheme(&config.server_address);
1195 let mut builder = ServerConnection::builder()
1196 .address(&server_address)
1197 .network(network);
1198
1199 #[cfg(feature = "socks5-proxy")]
1200 if let Some(proxy) = proxy_for_url(&config.socks5_proxy, &server_address)? {
1201 builder = builder.proxy(&proxy)
1202 }
1203
1204 if let Some(ref token) = config.server_access_token {
1205 builder = builder.access_token(token);
1206 }
1207
1208 if let Some(ref ua) = config.user_agent {
1209 builder = builder.user_agent(ua);
1210 }
1211
1212 builder.connect().await.map_err(wrap_server_connect_error)
1213 .context("Failed to connect to Ark server")
1214 }
1215
1216 async fn require_server(&self) -> anyhow::Result<(ServerConnection, ArkInfo)> {
1217 let conn = self.inner.server.get_or_try_init(|| async {
1221 let network = self.properties().await?.network;
1222 Self::connect_to_server(&self.inner.config, network).await
1223 .context("You should be connected to Ark server to perform this action")
1224 }).await?.clone();
1225
1226 let ark_info = conn.ark_info().await;
1227 self.check_and_store_server_keys(&ark_info).await?;
1228
1229 Ok((conn, ark_info))
1230 }
1231
1232 pub async fn refresh_server(&self) -> anyhow::Result<()> {
1233 let srv = self.inner.server.get_or_try_init(|| async {
1239 let properties = self.properties().await?;
1240 Self::connect_to_server(&self.inner.config, properties.network).await
1241 .map_err(anyhow::Error::from)
1242 }).await?;
1243
1244 srv.check_connection().await?;
1245 let ark_info = srv.ark_info().await;
1246 ark_info.fees.validate().context("invalid fee schedule")?;
1247 self.check_and_store_server_keys(&ark_info).await?;
1248
1249 Ok(())
1250 }
1251
1252 async fn check_and_store_server_keys(&self, ark_info: &ArkInfo) -> anyhow::Result<()> {
1259 let properties = self.properties().await?;
1260
1261 if let Some(stored_pubkey) = properties.server_pubkey {
1262 if stored_pubkey != ark_info.server_pubkey {
1263 log_server_pubkey_changed_error(stored_pubkey, ark_info.server_pubkey);
1264 bail!("Server public key has changed. You should exit all your VTXOs!");
1265 }
1266 } else {
1267 self.inner.db.set_server_pubkey(ark_info.server_pubkey).await?;
1268 info!("Stored server pubkey for existing wallet: {}", ark_info.server_pubkey);
1269 }
1270
1271 if let Some(stored_mailbox_pubkey) = properties.server_mailbox_pubkey {
1272 if stored_mailbox_pubkey != ark_info.mailbox_pubkey {
1273 log_server_mailbox_pubkey_changed_error(stored_mailbox_pubkey, ark_info.mailbox_pubkey);
1274 bail!("Server mailbox public key has changed.");
1275 }
1276 } else {
1277 self.inner.db.set_server_mailbox_pubkey(ark_info.mailbox_pubkey).await?;
1278 info!("Stored server mailbox pubkey for existing wallet: {}", ark_info.mailbox_pubkey);
1279 }
1280
1281 Ok(())
1282 }
1283
1284 pub async fn ark_info(&self) -> anyhow::Result<Option<ArkInfo>> {
1286 match self.inner.server.get() {
1287 Some(srv) => Ok(Some(srv.ark_info().await)),
1288 None => Ok(None),
1289 }
1290 }
1291
1292 pub async fn require_ark_info(&self) -> anyhow::Result<ArkInfo> {
1298 let (_, ark_info) = self.require_server().await?;
1299 Ok(ark_info)
1300 }
1301
1302 pub async fn balance(&self) -> anyhow::Result<Balance> {
1306 let vtxos = self.vtxos().await?;
1307
1308 let spendable = {
1309 let mut v = vtxos.iter().collect();
1310 VtxoStateKind::Spendable.filter_vtxos(&mut v).await?;
1311 v.into_iter().map(|v| v.amount()).sum::<Amount>()
1312 };
1313
1314 let pending_lightning_send = self.pending_lightning_send_vtxos().await?.iter()
1315 .map(|v| v.amount())
1316 .sum::<Amount>();
1317
1318 let claimable_lightning_receive = self.claimable_lightning_receive_balance().await?;
1319
1320 let pending_board = self.pending_board_vtxos().await?.iter()
1321 .map(|v| v.amount())
1322 .sum::<Amount>();
1323
1324 let pending_in_round = self.pending_round_balance().await?;
1325
1326 let pending_exit = self.exit_mgr().try_pending_total();
1327
1328 Ok(Balance {
1329 spendable,
1330 pending_in_round,
1331 pending_lightning_send,
1332 claimable_lightning_receive,
1333 pending_exit,
1334 pending_board,
1335 })
1336 }
1337
1338 pub async fn validate_vtxo(&self, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
1340 let tx = self.inner.chain.get_tx(&vtxo.chain_anchor().txid).await
1341 .context("could not fetch chain tx")?;
1342
1343 let tx = tx.with_context(|| {
1344 format!("vtxo chain anchor not found for vtxo: {}", vtxo.chain_anchor().txid)
1345 })?;
1346
1347 vtxo.validate(&tx)?;
1348
1349 Ok(())
1350 }
1351
1352 pub async fn import_vtxo(&self, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
1362 if self.inner.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
1363 info!("VTXO {} already exists in wallet, skipping import", vtxo.id());
1364 return Ok(());
1365 }
1366
1367 self.validate_vtxo(vtxo).await.context("VTXO validation failed")?;
1368
1369 if self.find_signable_clause(vtxo).await.is_none() {
1370 bail!("VTXO {} is not owned by this wallet (no signable clause found)", vtxo.id());
1371 }
1372
1373 let current_height = self.inner.chain.tip().await?;
1374 if vtxo.expiry_height() <= current_height {
1375 bail!("Vtxo {} has expired", vtxo.id());
1376 }
1377
1378 self.store_spendable_vtxos([vtxo]).await.context("failed to store imported VTXO")?;
1379
1380 info!("Successfully imported VTXO {}", vtxo.id());
1381 Ok(())
1382 }
1383
1384 pub async fn get_vtxo_by_id(&self, vtxo_id: VtxoId) -> anyhow::Result<WalletVtxo> {
1386 let vtxo = self.inner.db.get_wallet_vtxo(vtxo_id).await
1387 .with_context(|| format!("Error when querying vtxo {} in database", vtxo_id))?
1388 .with_context(|| format!("The VTXO with id {} cannot be found", vtxo_id))?;
1389 Ok(vtxo)
1390 }
1391
1392 pub async fn get_full_vtxo(&self, vtxo_id: VtxoId) -> anyhow::Result<Vtxo<Full>> {
1400 self.inner.db.get_full_vtxo(vtxo_id).await
1401 .with_context(|| format!("Error when querying full vtxo {} in database", vtxo_id))?
1402 .with_context(|| format!("The VTXO with id {} cannot be found", vtxo_id))
1403 }
1404
1405 pub async fn get_full_vtxos<V: VtxoRef>(
1407 &self,
1408 vtxos: impl IntoIterator<Item = V>,
1409 ) -> anyhow::Result<Vec<Vtxo<Full>>> {
1410 let ids = vtxos.into_iter().map(|v| v.vtxo_id()).collect::<Vec<_>>();
1411 self.inner.db.get_full_vtxos(&ids).await
1412 .with_context(||
1413 format!("Error when querying full vtxos in database with IDs: {:?}", ids)
1414 )
1415 }
1416
1417 #[deprecated(since="0.1.0-beta.5", note = "Use Wallet::history instead")]
1419 pub async fn movements(&self) -> anyhow::Result<Vec<Movement>> {
1420 self.history().await
1421 }
1422
1423 pub async fn history(&self) -> anyhow::Result<Vec<Movement>> {
1425 Ok(self.inner.db.get_all_movements().await?)
1426 }
1427
1428 pub async fn update_history_metadata(
1448 &self,
1449 movement_id: MovementId,
1450 patch: &serde_json::Value,
1451 ) -> anyhow::Result<()> {
1452 self.inner.movements.patch_metadata(movement_id, patch).await?;
1453 Ok(())
1454 }
1455
1456 pub async fn history_by_payment_method(
1458 &self,
1459 payment_method: &PaymentMethod,
1460 ) -> anyhow::Result<Vec<Movement>> {
1461 let mut ret = self.inner.db.get_movements_by_payment_method(payment_method).await?;
1462 ret.sort_by_key(|m| m.id);
1463 Ok(ret)
1464 }
1465
1466 pub async fn all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1468 Ok(self.inner.db.get_all_vtxos().await?)
1469 }
1470
1471 pub async fn vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1473 Ok(self.inner.db.get_vtxos_by_state(&VtxoStateKind::UNSPENT_STATES).await?)
1474 }
1475
1476 pub async fn vtxos_with(&self, filter: &impl FilterVtxos) -> anyhow::Result<Vec<WalletVtxo>> {
1478 let mut vtxos = self.vtxos().await?;
1479 filter.filter_vtxos(&mut vtxos).await.context("error filtering vtxos")?;
1480 Ok(vtxos)
1481 }
1482
1483 pub async fn spendable_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1485 Ok(self.vtxos_with(&VtxoStateKind::Spendable).await?)
1486 }
1487
1488 pub async fn spendable_vtxos_with(
1490 &self,
1491 filter: &impl FilterVtxos,
1492 ) -> anyhow::Result<Vec<WalletVtxo>> {
1493 let mut vtxos = self.spendable_vtxos().await?;
1494 filter.filter_vtxos(&mut vtxos).await.context("error filtering vtxos")?;
1495 Ok(vtxos)
1496 }
1497
1498 pub async fn get_expiring_vtxos(
1500 &self,
1501 threshold: BlockHeight,
1502 ) -> anyhow::Result<Vec<WalletVtxo>> {
1503 let expiry = self.inner.chain.tip().await? + threshold;
1504 let filter = VtxoFilter::new(&self).expires_before(expiry);
1505 Ok(self.spendable_vtxos_with(&filter).await?)
1506 }
1507
1508 pub async fn maintenance(&self) -> anyhow::Result<()> {
1514 info!("Starting wallet maintenance in interactive mode");
1515 self.sync().await;
1516
1517 let rounds = self.progress_pending_rounds(None).await;
1519 if let Err(e) = rounds.as_ref() {
1520 warn!("Error progressing pending rounds: {:#}", e);
1521 }
1522
1523 let states = self.inner.db.get_pending_round_state_ids().await?;
1525 for id in states {
1526 debug!("Cancelling pending round participation {}", id);
1527 let mut state = match self.lock_wait_round_state(id).await {
1528 Ok(Some(s)) => s,
1529 Ok(None) => continue, Err(e) => {
1531 warn!("Failed to lock round state with id {}: {:#}", id, e);
1532 continue;
1533 }
1534 };
1535 if let Err(e) = state.state_mut().try_cancel(self).await {
1536 warn!("Error cancelling pending round: {:#}", e);
1537 }
1538 }
1539
1540 let refresh = self.maintenance_refresh().await;
1542 if let Err(e) = refresh.as_ref() {
1543 warn!("Error refreshing VTXOs: {:#}", e);
1544 }
1545 if rounds.is_err() || refresh.is_err() {
1546 bail!("Maintenance encountered errors.\nprogress_rounds: {:#?}\nrefresh: {:#?}", rounds, refresh);
1547 }
1548 Ok(())
1549 }
1550
1551 pub async fn maintenance_delegated(&self) -> anyhow::Result<()> {
1557 info!("Starting wallet maintenance in delegated mode");
1558 self.sync().await;
1559 let rounds = self.progress_pending_rounds(None).await;
1560 if let Err(e) = rounds.as_ref() {
1561 warn!("Error progressing pending rounds: {:#}", e);
1562 }
1563 let refresh = self.maybe_schedule_maintenance_refresh_delegated().await;
1564 if let Err(e) = refresh.as_ref() {
1565 warn!("Error refreshing VTXOs: {:#}", e);
1566 }
1567 if rounds.is_err() || refresh.is_err() {
1568 bail!("Delegated maintenance encountered errors.\nprogress_rounds: {:#?}\nrefresh: {:#?}", rounds, refresh);
1569 }
1570 Ok(())
1571 }
1572
1573 pub async fn maintenance_with_onchain<W: PreparePsbt + SignPsbt + ExitUnilaterally>(
1581 &self,
1582 onchain: &mut W,
1583 ) -> anyhow::Result<()> {
1584 info!("Starting wallet maintenance in interactive mode with onchain wallet");
1585
1586 let maintenance = self.maintenance().await;
1588
1589 let exit_sync = self.sync_exits().await;
1591 if let Err(e) = exit_sync.as_ref() {
1592 warn!("Error syncing exits: {:#}", e);
1593 }
1594 let exit_progress = self.exit_mgr().progress_exits_with_bdk(self, onchain, None).await;
1595 if let Err(e) = exit_progress.as_ref() {
1596 warn!("Error progressing exits: {:#}", e);
1597 }
1598 if maintenance.is_err() || exit_sync.is_err() || exit_progress.is_err() {
1599 bail!("Maintenance encountered errors.\nmaintenance: {:#?}\nexit_sync: {:#?}\nexit_progress: {:#?}", maintenance, exit_sync, exit_progress);
1600 }
1601 Ok(())
1602 }
1603
1604 pub async fn maintenance_with_onchain_delegated<W: PreparePsbt + SignPsbt + ExitUnilaterally>(
1611 &self,
1612 onchain: &mut W,
1613 ) -> anyhow::Result<()> {
1614 info!("Starting wallet maintenance in delegated mode with onchain wallet");
1615
1616 let maintenance = self.maintenance_delegated().await;
1618
1619 let exit_sync = self.sync_exits().await;
1621 if let Err(e) = exit_sync.as_ref() {
1622 warn!("Error syncing exits: {:#}", e);
1623 }
1624 let exit_progress = self.exit_mgr().progress_exits_with_bdk(self, onchain, None).await;
1625 if let Err(e) = exit_progress.as_ref() {
1626 warn!("Error progressing exits: {:#}", e);
1627 }
1628 if maintenance.is_err() || exit_sync.is_err() || exit_progress.is_err() {
1629 bail!("Delegated maintenance encountered errors.\nmaintenance: {:#?}\nexit_sync: {:#?}\nexit_progress: {:#?}", maintenance, exit_sync, exit_progress);
1630 }
1631 Ok(())
1632 }
1633
1634 pub(crate) async fn join_round_for_maintenance_refresh(
1649 &self,
1650 attempt: &RoundAttempt,
1651 ) -> anyhow::Result<Option<RoundStateId>> {
1652 self.maintenance_refresh_retry_loop(|part| async move {
1653 info!("Joining round {} for maintenance refresh ({} vtxos)",
1654 attempt.round_seq, part.inputs.len());
1655 Ok(Some(self.join_attempt_interactive(
1656 part, attempt, Some(RoundMovement::Refresh),
1657 ).await?.id()))
1658 }).await
1659 }
1660
1661 pub async fn maybe_schedule_maintenance_refresh_delegated(
1669 &self,
1670 ) -> anyhow::Result<Option<RoundStateId>> {
1671 self.maintenance_refresh_retry_loop(|part| async move {
1672 info!("Scheduling delegated maintenance refresh ({} vtxos)", part.inputs.len());
1673 Ok(Some(self.join_next_round_delegated(part, Some(RoundMovement::Refresh)).await?.id()))
1674 }).await
1675 }
1676
1677 async fn maintenance_refresh_retry_loop<F, Fut>(
1685 &self,
1686 attempt_refresh: F,
1687 ) -> anyhow::Result<Option<RoundStateId>>
1688 where
1689 F: Fn(RoundParticipation) -> Fut,
1690 Fut: Future<Output = anyhow::Result<Option<RoundStateId>>>,
1691 {
1692 let mut excluded = HashSet::new();
1693 for _ in 0..10 {
1694 let vtxos = self.get_vtxos_to_refresh_with_excluded(excluded.iter().copied()).await?;
1695 if vtxos.is_empty() {
1696 return Ok(None);
1697 }
1698 let part = match self.build_refresh_participation(vtxos).await? {
1699 Some(participation) => participation,
1700 None => return Ok(None),
1701 };
1702
1703 match attempt_refresh(part).await {
1704 Ok(state_id) => return Ok(state_id),
1705 Err(e) => {
1706 let rejected = rejected_vtxos_from_error(&e).into_iter()
1707 .filter(|id| !excluded.contains(id))
1708 .collect::<Vec<_>>();
1709 if rejected.is_empty() {
1710 return Err(e);
1711 }
1712 warn!("Maintenance refresh rejected {} unusable input(s) ({:?}); \
1713 retrying without them", rejected.len(), rejected);
1714 excluded.extend(rejected);
1715 },
1716 }
1717 }
1718 bail!("Maintenance refresh failed after 10 retries");
1719 }
1720
1721 pub async fn maintenance_refresh(&self) -> anyhow::Result<Option<RoundStatus>> {
1733 if self.get_vtxos_to_refresh().await?.is_empty() {
1734 return Ok(None);
1735 }
1736
1737 info!("Waiting for round to perform maintenance refresh...");
1738 let mut events = self.subscribe_round_events().await?;
1739 while let Some(event) = events.next().await {
1740 let event = event.context("error on round event stream")?;
1741 if let RoundEvent::Attempt(a) = event && a.attempt_seq == 0 {
1742 debug!("Round {} started, triggering maintenance refresh", a.round_seq);
1743 let state_id = match self.join_round_for_maintenance_refresh(&a).await? {
1744 Some(id) => id,
1745 None => return Ok(None),
1746 };
1747 let state = self.lock_wait_round_state(state_id).await?
1750 .context("maintenance refresh round state vanished after joining")?;
1751 return Ok(Some(self.drive_round_state(state, &mut events).await?));
1752 }
1753 }
1754 Ok(None)
1755 }
1756
1757 pub async fn sync(&self) {
1763 futures::join!(
1764 async {
1765 if let Err(e) = self.inner.chain.update_fee_rates(self.inner.config.fallback_fee_rate).await {
1768 warn!("Error updating fee rates: {:#}", e);
1769 }
1770 },
1771 async {
1772 if let Err(e) = self.sync_mailbox().await {
1773 warn!("Error in mailbox sync: {:#}", e);
1774 }
1775 },
1776 async {
1777 if let Err(e) = self.sync_pending_rounds().await {
1778 warn!("Error while trying to progress rounds awaiting confirmations: {:#}", e);
1779 }
1780 },
1781 async {
1782 if let Err(e) = self.sync_pending_lightning_send_vtxos().await {
1783 warn!("Error syncing pending lightning payments: {:#}", e);
1784 }
1785 },
1786 async {
1787 if let Err(e) = self.sync_pending_arkoor_sends().await {
1788 warn!("Error syncing pending arkoor sends: {:#}", e);
1789 }
1790 },
1791 async {
1792 if let Err(e) = self.try_claim_all_lightning_receives(false).await {
1793 warn!("Error claiming pending lightning receives: {:#}", e);
1794 }
1795 },
1796 async {
1797 if let Err(e) = self.sync_pending_boards().await {
1798 warn!("Error syncing pending boards: {:#}", e);
1799 }
1800 },
1801 async {
1802 if let Err(e) = self.sync_pending_offboards().await {
1803 warn!("Error syncing pending offboards: {:#}", e);
1804 }
1805 },
1806 async {
1807 if let Err(e) = self.sync_force_exited_vtxos().await {
1808 warn!("Error scanning for on-chain-exited VTXOs: {:#}", e);
1809 }
1810 }
1811 );
1812 }
1813
1814 pub async fn sync_exits(&self) -> anyhow::Result<()> {
1820 self.exit_mgr().sync(&self).await?;
1821 Ok(())
1822 }
1823
1824 pub async fn sync_force_exited_vtxos(&self) -> anyhow::Result<()> {
1837 let tip = self.inner.chain.tip().await?;
1839 let mut lock = self.inner.last_force_exit_scan_tip.lock().await;
1840 if *lock == Some(tip) {
1841 return Ok(());
1842 }
1843
1844 let exiting = self.exit_mgr().get_exit_vtxo_ids().await;
1846 let vtxos = self.inner.db.get_vtxos_by_state(&[VtxoStateKind::Spendable]).await?
1847 .into_iter()
1848 .filter(|v| !exiting.contains(&v.vtxo.id()));
1849
1850 let mut checked = FuturesUnordered::new();
1852 for wv in vtxos {
1853 let chain = self.inner.chain.clone();
1854 checked.push(async move {
1855 let txid = wv.vtxo_id().to_point().txid;
1856 let status = chain.tx_status(txid).await;
1857 (wv, status)
1858 });
1859 }
1860
1861 let mut to_exit = Vec::new();
1862 while let Some((vtxo, status)) = futures::StreamExt::next(&mut checked).await {
1863 match status {
1864 Ok(TxStatus::NotFound) => {},
1865 Ok(_) => {
1866 info!("VTXO {} was exited on-chain without us; routing it to a claimable exit",
1867 vtxo.vtxo.id(),
1868 );
1869 to_exit.push(vtxo.vtxo);
1870 },
1871 Err(e) => warn!("Could not check on-chain status of VTXO {}: {:#}",
1872 vtxo.vtxo.id(), e,
1873 ),
1874 }
1875 }
1876
1877 if !to_exit.is_empty() {
1878 self.exit_mgr().start_exit_for_vtxos(&to_exit).await
1879 .context("failed to start exit for on-chain-exited VTXOs")?;
1880
1881 *lock = Some(tip);
1882 self.sync_exits().await
1883 .context("failed to sync exits after starting new ones")?;
1884 } else {
1885 *lock = Some(tip);
1886 }
1887
1888 Ok(())
1889 }
1890
1891 pub async fn dangerous_drop_vtxo(&self, vtxo_id: VtxoId) -> anyhow::Result<()> {
1894 warn!("Drop vtxo {} from the database", vtxo_id);
1895 self.inner.db.remove_vtxo(vtxo_id).await?;
1896 Ok(())
1897 }
1898
1899 pub async fn dangerous_drop_all_vtxos(&self) -> anyhow::Result<()> {
1902 warn!("Dropping all vtxos from the db...");
1903 for vtxo in self.vtxos().await? {
1904 self.inner.db.remove_vtxo(vtxo.id()).await?;
1905 }
1906
1907 self.exit_mgr().dangerous_clear_exit().await?;
1908 Ok(())
1909 }
1910
1911 async fn has_counterparty_risk(&self, vtxo: &Vtxo<Full>) -> anyhow::Result<bool> {
1919 for past_pks in vtxo.past_arkoor_pubkeys() {
1920 let mut owns_any = false;
1921 for past_pk in past_pks {
1922 if self.inner.db.get_public_key_idx(&past_pk).await?.is_some() {
1923 owns_any = true;
1924 break;
1925 }
1926 }
1927 if !owns_any {
1928 return Ok(true);
1929 }
1930 }
1931
1932 let my_clause = self.find_signable_clause(vtxo).await;
1933 Ok(!my_clause.is_some())
1934 }
1935
1936 async fn add_should_refresh_vtxos<V: VtxoRef>(
1942 &self,
1943 participation: &mut RoundParticipation,
1944 exclude: impl IntoIterator<Item = V>,
1945 ) -> anyhow::Result<()> {
1946 let tip = self.inner.chain.tip().await?;
1949 let mut vtxos_to_refresh = self.spendable_vtxos_with(
1950 &RefreshStrategy::should_refresh(self, tip, self.inner.chain.fee_rates().await.fast),
1951 ).await?;
1952 if vtxos_to_refresh.is_empty() {
1953 return Ok(());
1954 }
1955
1956 let excluded_ids = participation.inputs.iter()
1957 .map(|v| v.vtxo_id())
1958 .chain(exclude.into_iter().map(|v| v.vtxo_id()))
1959 .collect::<HashSet<_>>();
1960 let mut total_amount = Amount::ZERO;
1961 for i in (0..vtxos_to_refresh.len()).rev() {
1962 let vtxo = &vtxos_to_refresh[i];
1963 if excluded_ids.contains(&vtxo.id()) {
1964 vtxos_to_refresh.swap_remove(i);
1965 continue;
1966 }
1967 total_amount += vtxo.amount();
1968 }
1969 if vtxos_to_refresh.is_empty() {
1970 return Ok(());
1972 }
1973
1974 let (_, ark_info) = self.require_server().await?;
1977 let fee = ark_info.fees.refresh.calculate_no_base_fee(
1978 vtxos_to_refresh.iter().map(|wv| VtxoFeeInfo::from_vtxo_and_tip(&wv.vtxo, tip)),
1979 ).context("fee overflowed")?;
1980
1981 let output_amount = match validate_and_subtract_fee_min_dust(total_amount, fee) {
1983 Ok(amount) => amount,
1984 Err(e) => {
1985 trace!("Cannot add should-refresh VTXOs: {}", e);
1986 return Ok(());
1987 },
1988 };
1989 info!(
1990 "Adding {} extra VTXOs to round participation total = {}, fee = {}, output = {}",
1991 vtxos_to_refresh.len(), total_amount, fee, output_amount,
1992 );
1993 let (user_keypair, _) = self.derive_store_next_keypair().await?;
1994 let req = VtxoRequest {
1995 policy: VtxoPolicy::new_pubkey(user_keypair.public_key()),
1996 amount: output_amount,
1997 };
1998 let extra_ids = vtxos_to_refresh.into_iter().map(|wv| wv.id()).collect::<Vec<_>>();
1999 let extra_full = self.inner.db.get_full_vtxos(&extra_ids).await
2000 .context("failed to hydrate refresh candidates")?;
2001 participation.inputs.reserve(extra_full.len());
2002 participation.inputs.extend(extra_full);
2003 participation.outputs.push(req);
2004
2005 Ok(())
2006 }
2007
2008 pub async fn build_refresh_participation<V: VtxoRef>(
2009 &self,
2010 vtxos: impl IntoIterator<Item = V>,
2011 ) -> anyhow::Result<Option<RoundParticipation>> {
2012 let (vtxos, total_amount) = {
2013 let iter = vtxos.into_iter();
2014 let size_hint = iter.size_hint();
2015 let mut vtxos = Vec::<Vtxo<Full>>::with_capacity(size_hint.1.unwrap_or(size_hint.0));
2016 let mut amount = Amount::ZERO;
2017 for vref in iter {
2018 let id = vref.vtxo_id();
2023 if vtxos.iter().any(|v| v.id() == id) {
2024 bail!("duplicate VTXO id: {}", id);
2025 }
2026 let vtxo = if let Some(vtxo) = vref.into_full_vtxo() {
2027 vtxo
2028 } else {
2029 self.inner.db.get_full_vtxo(id).await?
2032 .with_context(|| format!("vtxo with id {} not found", id))?
2033 };
2034 amount += vtxo.amount();
2035 vtxos.push(vtxo);
2036 }
2037 (vtxos, amount)
2038 };
2039
2040 if vtxos.is_empty() {
2041 info!("Skipping refresh since no VTXOs are provided.");
2042 return Ok(None);
2043 }
2044 ensure!(total_amount >= P2TR_DUST,
2045 "vtxo amount must be at least {} to participate in a round",
2046 P2TR_DUST,
2047 );
2048
2049 let (_, ark_info) = self.require_server().await?;
2051 let current_height = self.inner.chain.tip().await?;
2052 let vtxo_fee_infos = vtxos.iter()
2053 .map(|v| VtxoFeeInfo::from_vtxo_and_tip(v, current_height));
2054 let fee = ark_info.fees.refresh.calculate(vtxo_fee_infos).context("fee overflowed")?;
2055 let output_amount = validate_and_subtract_fee_min_dust(total_amount, fee)?;
2056
2057 info!("Refreshing {} VTXOs (total amount = {}, fee = {}, output = {}).",
2058 vtxos.len(), total_amount, fee, output_amount,
2059 );
2060 let (user_keypair, _) = self.derive_store_next_keypair().await?;
2061 let req = VtxoRequest {
2062 policy: VtxoPolicy::Pubkey(PubkeyVtxoPolicy { user_pubkey: user_keypair.public_key() }),
2063 amount: output_amount,
2064 };
2065
2066 Ok(Some(RoundParticipation {
2067 inputs: vtxos,
2068 outputs: vec![req],
2069 unblinded_mailbox_id: None,
2070 }))
2071 }
2072
2073 pub async fn refresh_vtxos<V: VtxoRef>(
2078 &self,
2079 vtxos: impl IntoIterator<Item = V>,
2080 ) -> anyhow::Result<Option<RoundStatus>> {
2081 let mut participation = match self.build_refresh_participation(vtxos).await? {
2082 Some(participation) => participation,
2083 None => return Ok(None),
2084 };
2085
2086 if let Err(e) = self.add_should_refresh_vtxos(
2087 &mut participation, iter::empty::<VtxoId>(),
2088 ).await {
2089 warn!("Error trying to add additional VTXOs that should be refreshed: {:#}", e);
2090 }
2091
2092 Ok(Some(self.participate_round(participation, Some(RoundMovement::Refresh)).await?))
2093 }
2094
2095 pub async fn refresh_vtxos_delegated<V: VtxoRef>(
2101 &self,
2102 vtxos: impl IntoIterator<Item = V>,
2103 ) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
2104 let mut part = match self.build_refresh_participation(vtxos).await? {
2105 Some(participation) => participation,
2106 None => return Ok(None),
2107 };
2108
2109 if let Err(e) = self.add_should_refresh_vtxos(&mut part, iter::empty::<VtxoId>()).await {
2110 warn!("Error trying to add additional VTXOs that should be refreshed: {:#}", e);
2111 }
2112
2113 Ok(Some(self.join_next_round_delegated(part, Some(RoundMovement::Refresh)).await?))
2114 }
2115
2116 pub async fn get_vtxos_to_refresh(&self) -> anyhow::Result<Vec<WalletVtxo>> {
2119 let vtxos = self.spendable_vtxos_with(&RefreshStrategy::should_refresh_if_must(
2120 self,
2121 self.inner.chain.tip().await?,
2122 self.inner.chain.fee_rates().await.fast,
2123 )).await?;
2124 Ok(vtxos)
2125 }
2126
2127 pub async fn get_vtxos_to_refresh_with_excluded<V: VtxoRef>(
2130 &self,
2131 exclude: impl IntoIterator<Item = V>,
2132 ) -> anyhow::Result<Vec<WalletVtxo>> {
2133 let mut vtxos = self.get_vtxos_to_refresh().await?;
2134 for v in exclude.into_iter() {
2135 if let Some(index) = vtxos.iter().position(|vtxo| vtxo.id() == v.vtxo_id()) {
2136 vtxos.swap_remove(index);
2137 }
2138 }
2139 Ok(vtxos)
2140 }
2141
2142 pub async fn get_first_expiring_vtxo_blockheight(
2144 &self,
2145 ) -> anyhow::Result<Option<BlockHeight>> {
2146 Ok(self.spendable_vtxos().await?.iter().map(|v| v.expiry_height()).min())
2147 }
2148
2149 pub async fn get_next_required_refresh_blockheight(
2152 &self,
2153 ) -> anyhow::Result<Option<BlockHeight>> {
2154 let first_expiry = self.get_first_expiring_vtxo_blockheight().await?;
2155 Ok(first_expiry.map(|h| h.saturating_sub(self.inner.config.vtxo_refresh_expiry_threshold)))
2156 }
2157
2158 async fn select_vtxos_to_cover(
2164 &self,
2165 amount: Amount,
2166 ) -> anyhow::Result<Vec<WalletVtxo>> {
2167 let mut vtxos = self.spendable_vtxos().await?;
2168 self.sort_vtxos_for_selection(&mut vtxos);
2169
2170 let (last, _total_amount) = self.select_vtxos_inner(amount, &vtxos)?;
2171 vtxos.truncate(last+1);
2172 Ok(vtxos)
2173 }
2174
2175 async fn select_vtxos_to_cover_with_fee<F>(
2181 &self,
2182 amount: Amount,
2183 calc_fee: F,
2184 ) -> anyhow::Result<(Vec<WalletVtxo>, Amount)>
2185 where
2186 F: for<'a> Fn(
2187 Amount, std::iter::Copied<std::slice::Iter<'a, VtxoFeeInfo>>,
2188 ) -> anyhow::Result<Amount>,
2189 {
2190 let tip = self.inner.chain.tip().await?;
2191 let mut vtxos = self.spendable_vtxos().await?;
2192 self.sort_vtxos_for_selection(&mut vtxos);
2193
2194 let fee_info = vtxos.iter()
2195 .map(|v| VtxoFeeInfo::from_vtxo_and_tip(v, tip))
2196 .collect::<Vec<_>>();
2197
2198 const MAX_ITERATIONS: usize = 100;
2201 let mut fee = Amount::ZERO;
2202 for _ in 0..MAX_ITERATIONS {
2203 let required = amount.checked_add(fee)
2204 .context("Amount + fee overflow")?;
2205
2206 let (last, vtxo_amount) = self.select_vtxos_inner(required, &vtxos)
2207 .context("Could not find enough suitable VTXOs to cover payment + fees")?;
2208 fee = calc_fee(amount, fee_info[..=last].iter().copied())?;
2209
2210 if amount + fee <= vtxo_amount {
2211 trace!("Selected vtxos to cover amount + fee: amount = {}, fee = {}, total inputs = {}",
2212 amount, fee, vtxo_amount,
2213 );
2214 vtxos.truncate(last+1);
2215 return Ok((vtxos, fee));
2216 }
2217 trace!("VTXO sum of {} did not exceed amount {} and fee {}, iterating again",
2218 vtxo_amount, amount, fee,
2219 );
2220 }
2221 bail!("Fee calculation did not converge after maximum iterations")
2222 }
2223
2224 fn sort_vtxos_for_selection(&self, vtxos: &mut Vec<WalletVtxo>) {
2226 vtxos.sort_by_key(|v| v.expiry_height());
2227 }
2228
2229 fn select_vtxos_inner(
2235 &self,
2236 amount: Amount,
2237 vtxos: &Vec<WalletVtxo>,
2238 ) -> anyhow::Result<(usize, Amount)> {
2239 let mut total_amount = Amount::ZERO;
2241 for (i, vtxo) in vtxos.iter().enumerate() {
2242 total_amount += vtxo.amount();
2243
2244 if total_amount >= amount {
2245 return Ok((i, total_amount))
2246 }
2247 }
2248
2249 bail!("Insufficient money available. Needed {} but {} is available",
2250 amount, total_amount,
2251 );
2252 }
2253
2254 pub fn start_daemon(
2260 &self,
2261 onchain: Option<Arc<tokio::sync::RwLock<dyn DaemonizableOnchainWallet>>>,
2262 ) -> anyhow::Result<()> {
2263 let mut daemon = self.inner.daemon.lock();
2264 if daemon.is_some() {
2265 warn!("Called Wallet::start_daemon while daemon was already running.");
2266 return Ok(());
2267 }
2268
2269 let handle = crate::daemon::start_daemon(self.clone(), onchain);
2270 let _ = daemon.insert(handle);
2271
2272 Ok(())
2273 }
2274
2275 #[deprecated(since = "0.1.4", note = "use start_daemon instead")]
2277 pub fn run_daemon(
2278 &self,
2279 onchain: Option<Arc<tokio::sync::RwLock<dyn DaemonizableOnchainWallet>>>,
2280 ) -> anyhow::Result<()> {
2281 self.start_daemon(onchain)
2282 }
2283
2284 pub fn stop_daemon(&self) {
2286 let mut daemon = self.inner.daemon.lock();
2287 if let Some(handle) = daemon.take() {
2288 handle.stop();
2289 }
2290 }
2291
2292 pub async fn register_vtxo_transactions_with_server(
2296 &self,
2297 vtxos: &[impl AsRef<Vtxo<Full>>],
2298 ) -> anyhow::Result<()> {
2299 if vtxos.is_empty() {
2300 return Ok(());
2301 }
2302
2303 let (mut srv, _) = self.require_server().await?;
2304 srv.client.register_vtxo_transactions(protos::RegisterVtxoTransactionsRequest {
2305 vtxos: vtxos.iter().map(|v| v.as_ref().serialize()).collect(),
2306 }).await.context("failed to register vtxo transactions")?;
2307
2308 Ok(())
2309 }
2310}
2311
2312fn wrap_server_connect_error(err: ConnectError) -> anyhow::Error {
2313 match err {
2314 ConnectError::CreateEndpoint(CreateEndpointError::NoTransportBackend) => {
2315 anyhow!(MISSING_SERVER_TRANSPORT_HELP)
2316 },
2317 other => anyhow::Error::from(other),
2318 }
2319}
2320
2321impl std::ops::Drop for WalletInner {
2322 fn drop(&mut self) {
2323 if let Some(handle) = self.daemon.lock().take() {
2324 handle.stop();
2325 }
2326 }
2327}
2328
2329#[cfg(test)]
2330mod tests {
2331 use server_rpc::client::CreateEndpointError;
2332
2333 use super::{wrap_server_connect_error, MISSING_SERVER_TRANSPORT_HELP};
2334
2335 #[test]
2336 fn no_transport_connect_error_is_reworded_for_wallet_users() {
2337 let err = wrap_server_connect_error(CreateEndpointError::NoTransportBackend.into());
2338 assert!(err.to_string().contains(MISSING_SERVER_TRANSPORT_HELP));
2339 assert!(err.to_string().contains("feature `bark-wallet/native` or `bark-wallet/wasm-web`"));
2340 }
2341}