#![doc = include_str!("../README.md")]
use std::fs;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use bdk_wallet::bitcoin::Network;
use bdk_wallet::keys::bip39::Mnemonic;
use bdk_wallet::keys::{DerivableKey, ExtendedKey};
use bdk_wallet::rusqlite::Connection;
use bdk_wallet::template::Bip84;
use bdk_wallet::{KeychainKind, PersistedWallet, Wallet};
use cdk_common::common::FeeReserve;
use cdk_common::database::KVStore;
use cdk_common::nuts::nut30::MeltQuoteOnchainFeeOption;
use cdk_common::payment::{
CreateIncomingPaymentResponse, Event, IncomingPaymentOptions, MakePaymentResponse, MintPayment,
OnchainSettings, OutgoingPaymentOptions, PaymentIdentifier, PaymentQuoteResponse,
SettingsResponse, WaitPaymentResponse,
};
use cdk_common::{Amount, CurrencyUnit, MeltQuoteState};
use futures::Stream;
use tokio::sync::{Mutex, Notify};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::BroadcastStream;
use tokio_util::sync::CancellationToken;
pub use crate::chain::{BitcoinRpcConfig, ChainSource, EsploraConfig};
pub use crate::error::Error;
pub use crate::storage::{BdkStorage, FinalizedReceiveIntentRecord, FinalizedSendIntentRecord};
pub use crate::types::{
BatchConfig, FeeEstimationConfig, PaymentMetadata, PaymentTier, SyncConfig,
DEFAULT_TARGET_BLOCK_TIME_SECS,
};
pub mod chain;
pub mod error;
pub(crate) mod fee;
pub mod receive;
pub(crate) mod recovery;
pub mod send;
pub mod storage;
pub(crate) mod sync;
pub mod types;
pub(crate) mod util;
pub(crate) struct WalletWithDb {
pub(crate) wallet: PersistedWallet<Connection>,
pub(crate) db: Connection,
}
pub(crate) struct BackgroundTasks {
pub(crate) cancel: CancellationToken,
pub(crate) sync: JoinHandle<()>,
pub(crate) batch: JoinHandle<()>,
}
struct PaymentEventStream {
receiver: BroadcastStream<Event>,
cancel: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
is_active: Arc<AtomicBool>,
}
impl Stream for PaymentEventStream {
type Item = Event;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.cancel.as_mut().poll(cx).is_ready() {
this.is_active.store(false, Ordering::SeqCst);
return Poll::Ready(None);
}
loop {
match Pin::new(&mut this.receiver).poll_next(cx) {
Poll::Ready(Some(Ok(event))) => return Poll::Ready(Some(event)),
Poll::Ready(Some(Err(err))) => {
tracing::warn!(
"cdk-bdk payment event subscriber lagged or errored: {}",
err
);
}
Poll::Ready(None) => {
this.is_active.store(false, Ordering::SeqCst);
return Poll::Ready(None);
}
Poll::Pending => return Poll::Pending,
}
}
}
}
impl Drop for PaymentEventStream {
fn drop(&mut self) {
self.is_active.store(false, Ordering::SeqCst);
}
}
impl WalletWithDb {
pub(crate) fn new(wallet: PersistedWallet<Connection>, db: Connection) -> Self {
Self { wallet, db }
}
pub(crate) fn persist(&mut self) -> Result<bool, bdk_wallet::rusqlite::Error> {
self.wallet.persist(&mut self.db)
}
}
#[derive(Clone)]
pub struct CdkBdk {
pub(crate) fee_reserve: FeeReserve,
pub(crate) wait_invoice_cancel_token: CancellationToken,
pub(crate) wait_invoice_is_active: Arc<AtomicBool>,
pub(crate) payment_sender: tokio::sync::broadcast::Sender<Event>,
pub(crate) tasks: Arc<Mutex<Option<BackgroundTasks>>>,
pub(crate) shutdown_timeout: Duration,
pub(crate) wallet_with_db: Arc<Mutex<WalletWithDb>>,
pub(crate) chain_source: ChainSource,
pub(crate) storage: BdkStorage,
pub(crate) network: Network,
pub(crate) batch_config: BatchConfig,
pub(crate) batch_notify: Arc<Notify>,
pub(crate) num_confs: u32,
pub(crate) min_receive_amount_sat: u64,
pub(crate) min_send_amount_sat: u64,
pub(crate) sync_interval_secs: u64,
pub(crate) sync_config: SyncConfig,
pub(crate) fee_rate_cache: Arc<Mutex<std::collections::HashMap<PaymentTier, (f64, u64)>>>,
}
impl CdkBdk {
pub(crate) fn validate_send_amount_against_dust(
&self,
address: &str,
amount_sat: u64,
) -> Result<(), Error> {
let address = bdk_wallet::bitcoin::Address::from_str(address)
.map_err(|e| Error::Wallet(e.to_string()))?
.require_network(self.network)
.map_err(|e| Error::Wallet(e.to_string()))?;
let dust_limit = bdk_wallet::bitcoin::TxOut::minimal_non_dust(address.script_pubkey())
.value
.to_sat();
if amount_sat < dust_limit {
return Err(Error::DustOutput {
amount: amount_sat,
dust_limit,
});
}
Ok(())
}
pub(crate) fn validate_send_amount(&self, address: &str, amount_sat: u64) -> Result<(), Error> {
self.validate_send_amount_against_dust(address, amount_sat)?;
if amount_sat < self.min_send_amount_sat {
return Err(Error::AmountBelowMinimumSend {
amount: amount_sat,
min: self.min_send_amount_sat,
});
}
Ok(())
}
pub(crate) fn confirmations_satisfied(&self, tip_height: u32, anchor_height: u32) -> bool {
if tip_height < anchor_height {
return false;
}
tip_height - anchor_height + 1 >= self.num_confs
}
pub(crate) fn should_ignore_receive_amount(&self, amount_sat: u64) -> bool {
amount_sat < self.min_receive_amount_sat
}
pub(crate) fn txid_has_required_confirmations(
&self,
wallet: &PersistedWallet<Connection>,
txid_str: &str,
intent_kind: &str,
intent_id: &str,
) -> bool {
let Ok(parsed_txid) = bdk_wallet::bitcoin::Txid::from_str(txid_str) else {
tracing::warn!(
intent_kind,
intent_id,
txid = txid_str,
"Could not parse txid during confirmation check"
);
return false;
};
let Some(tx_details) = wallet.get_tx(parsed_txid) else {
return false;
};
let check_point = wallet.latest_checkpoint().height();
match &tx_details.chain_position {
bdk_wallet::chain::ChainPosition::Confirmed { anchor, .. } => {
self.confirmations_satisfied(check_point, anchor.block_id.height)
}
bdk_wallet::chain::ChainPosition::Unconfirmed { .. } => false,
}
}
#[allow(clippy::too_many_arguments)]
pub fn new(
mnemonic: Mnemonic,
network: Network,
chain_source: ChainSource,
storage_dir_path: String,
fee_reserve: FeeReserve,
kv_store: Arc<dyn KVStore<Err = cdk_common::database::Error> + Send + Sync>,
batch_config: Option<BatchConfig>,
num_confs: u32,
min_receive_amount_sat: u64,
min_send_amount_sat: u64,
sync_interval_secs: u64,
shutdown_timeout_secs: Option<u64>,
sync_config: Option<SyncConfig>,
) -> Result<Self, Error> {
let storage_dir_path = PathBuf::from(storage_dir_path);
let storage_dir_path = storage_dir_path.join("bdk_wallet");
fs::create_dir_all(&storage_dir_path)?;
let mut db = Connection::open(storage_dir_path.join("bdk_wallet.sqlite"))?;
let xkey: ExtendedKey = mnemonic.into_extended_key()?;
let xprv = xkey.into_xprv(network.into()).ok_or(Error::Path)?;
let descriptor = Bip84(xprv, KeychainKind::External);
let change_descriptor = Bip84(xprv, KeychainKind::Internal);
let wallet_opt = Wallet::load()
.descriptor(KeychainKind::External, Some(descriptor.clone()))
.descriptor(KeychainKind::Internal, Some(change_descriptor.clone()))
.extract_keys()
.check_network(network)
.load_wallet(&mut db)
.map_err(|e| Error::Wallet(e.to_string()))?;
let mut wallet = match wallet_opt {
Some(wallet) => wallet,
None => Wallet::create(descriptor, change_descriptor)
.network(network)
.create_wallet(&mut db)
.map_err(|e| Error::Wallet(e.to_string()))?,
};
wallet.persist(&mut db)?;
let wallet_with_db = WalletWithDb::new(wallet, db);
let batch_config = batch_config.unwrap_or_default();
if batch_config.poll_interval.is_zero() {
return Err(Error::InvalidConfig(
"batch_config.poll_interval must be greater than zero".to_string(),
));
}
batch_config.validate().map_err(Error::InvalidConfig)?;
if sync_interval_secs == 0 {
return Err(Error::InvalidConfig(
"sync_interval_secs must be greater than zero".to_string(),
));
}
let channel_capacity = batch_config.max_batch_size * 2 + 16;
let (payment_sender, _) = tokio::sync::broadcast::channel(channel_capacity);
Ok(Self {
fee_reserve,
wait_invoice_cancel_token: CancellationToken::new(),
wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
payment_sender,
tasks: Arc::new(Mutex::new(None)),
shutdown_timeout: Duration::from_secs(shutdown_timeout_secs.unwrap_or(30)),
wallet_with_db: Arc::new(Mutex::new(wallet_with_db)),
chain_source,
storage: BdkStorage::new(kv_store),
network,
batch_config,
batch_notify: Arc::new(Notify::new()),
num_confs,
min_receive_amount_sat,
min_send_amount_sat,
sync_interval_secs,
sync_config: sync_config.unwrap_or_default(),
fee_rate_cache: Arc::new(Mutex::new(std::collections::HashMap::new())),
})
}
}
async fn supervise<F, Fut>(name: &'static str, cancel: CancellationToken, mut f: F)
where
F: FnMut(CancellationToken) -> Fut,
Fut: Future<Output = Result<(), Error>>,
{
const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
const MAX_BACKOFF: Duration = Duration::from_secs(60);
const SUPERVISOR_BACKOFF_RESET: Duration = Duration::from_secs(300);
let mut backoff = INITIAL_BACKOFF;
loop {
if cancel.is_cancelled() {
break;
}
let started = Instant::now();
let child_cancel = cancel.clone();
let result = tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("{name} supervisor: cancelled");
return;
}
r = f(child_cancel) => r,
};
match result {
Ok(()) => {
tracing::info!("{name} supervisor: task exited cleanly");
return;
}
Err(e) => {
let ran_for = started.elapsed();
let transient = e.is_transient();
tracing::error!(
task = name,
ran_for_secs = ran_for.as_secs(),
transient,
"supervised task returned error: {e}; restarting with backoff"
);
if ran_for >= SUPERVISOR_BACKOFF_RESET {
backoff = INITIAL_BACKOFF;
}
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("{name} supervisor: cancelled during backoff");
return;
}
_ = tokio::time::sleep(backoff) => {}
}
backoff = (backoff * 2).min(MAX_BACKOFF);
}
}
}
}
#[async_trait]
impl MintPayment for CdkBdk {
type Err = cdk_common::payment::Error;
#[tracing::instrument(skip_all)]
async fn start(&self) -> Result<(), Self::Err> {
let mut tasks_lock = self.tasks.lock().await;
if tasks_lock.is_some() {
return Err(Error::AlreadyStarted.into());
}
self.recover_receive_saga().await?;
self.recover_send_saga().await?;
let cancel = CancellationToken::new();
let sync_self = self.clone();
let sync_cancel = cancel.clone();
let sync_handle = tokio::spawn(async move {
supervise("wallet sync", sync_cancel, move |cancel| {
let me = sync_self.clone();
async move { me.sync_wallet(cancel).await }
})
.await;
});
let batch_self = self.clone();
let batch_cancel = cancel.clone();
let batch_handle = tokio::spawn(async move {
supervise("batch processor", batch_cancel, move |cancel| {
let me = batch_self.clone();
async move { me.run_batch_processor(cancel).await }
})
.await;
});
*tasks_lock = Some(BackgroundTasks {
cancel,
sync: sync_handle,
batch: batch_handle,
});
Ok(())
}
async fn stop(&self) -> Result<(), Self::Err> {
self.wait_invoice_cancel_token.cancel();
let tasks_opt = {
let mut tasks_lock = self.tasks.lock().await;
tasks_lock.take()
};
if let Some(bg) = tasks_opt {
bg.cancel.cancel();
let sync_aborter = bg.sync.abort_handle();
let batch_aborter = bg.batch.abort_handle();
let joined = tokio::time::timeout(self.shutdown_timeout, async move {
let _ = bg.sync.await;
let _ = bg.batch.await;
})
.await;
if joined.is_err() {
sync_aborter.abort();
batch_aborter.abort();
tracing::error!(
"cdk-bdk background tasks did not exit within {:?}; forced abort",
self.shutdown_timeout
);
}
}
Ok(())
}
async fn get_settings(&self) -> Result<SettingsResponse, Self::Err> {
Ok(SettingsResponse {
unit: "sat".to_string(),
bolt11: None,
bolt12: None,
onchain: Some(OnchainSettings {
confirmations: self.num_confs,
min_receive_amount_sat: self.min_receive_amount_sat,
min_send_amount_sat: self.min_send_amount_sat,
}),
custom: std::collections::HashMap::new(),
})
}
async fn get_payment_quote(
&self,
_unit: &CurrencyUnit,
options: OutgoingPaymentOptions,
) -> Result<PaymentQuoteResponse, Self::Err> {
let onchain_options = match options {
OutgoingPaymentOptions::Onchain(o) => o,
_ => return Err(cdk_common::payment::Error::UnsupportedPaymentOption),
};
self.validate_send_amount(
&onchain_options.address,
onchain_options.amount.clone().to_u64(),
)?;
let amount_sat = onchain_options.amount.clone().to_u64();
let mut fee_options = Vec::with_capacity(self.batch_config.fee_options.len());
for (idx, tier) in self.batch_config.fee_options.iter().enumerate() {
let fee_estimate = self
.estimate_onchain_fee_reserve(&onchain_options.address, amount_sat, *tier)
.await?;
fee_options.push(MeltQuoteOnchainFeeOption {
fee_index: idx as u32,
fee_reserve: Amount::from(fee_estimate.fee_reserve_sat),
estimated_blocks: tier.estimated_blocks(),
});
}
let cheapest = fee_options
.iter()
.min_by_key(|option| u64::from(option.fee_reserve))
.copied()
.expect("fee_options is validated as non-empty");
Ok(PaymentQuoteResponse {
request_lookup_id: Some(PaymentIdentifier::QuoteId(onchain_options.quote_id.clone())),
amount: onchain_options.amount,
fee: Amount::new(cheapest.fee_reserve.into(), CurrencyUnit::Sat),
state: MeltQuoteState::Unpaid,
extra_json: None,
estimated_blocks: Some(cheapest.estimated_blocks),
fee_options: Some(fee_options),
})
}
async fn make_payment(
&self,
_unit: &CurrencyUnit,
options: OutgoingPaymentOptions,
) -> Result<MakePaymentResponse, Self::Err> {
let onchain_options = match options {
OutgoingPaymentOptions::Onchain(o) => o,
_ => return Err(cdk_common::payment::Error::UnsupportedPaymentOption),
};
let address = onchain_options.address;
let amount = onchain_options.amount;
let quote_id = onchain_options.quote_id;
self.validate_send_amount(&address, amount.clone().to_u64())?;
let max_fee = onchain_options
.max_fee_amount
.unwrap_or(Amount::new(1000, CurrencyUnit::Sat));
let amount_sat = amount.clone().to_u64();
let max_fee_sat = max_fee.clone().to_u64();
let tier = self
.batch_config
.tier_for_fee_index(onchain_options.fee_index)
.map_err(Error::UnknownFeeIndex)?;
let metadata = PaymentMetadata::from_optional_json(onchain_options.metadata.as_deref());
let fee_estimate = self
.estimate_onchain_fee_reserve(&address, amount_sat, tier)
.await?;
if fee_estimate.raw_fee_sat > max_fee_sat {
return Err(Error::EstimatedFeeTooHigh {
estimated_fee: fee_estimate.raw_fee_sat,
max_fee: max_fee_sat,
}
.into());
}
crate::send::payment_intent::SendIntent::new(
&self.storage,
quote_id.to_string(),
address,
amount_sat,
max_fee_sat,
tier,
metadata,
)
.await?;
if tier == PaymentTier::Immediate {
self.batch_notify.notify_one();
}
Ok(MakePaymentResponse {
payment_lookup_id: PaymentIdentifier::QuoteId(quote_id),
payment_proof: None,
status: MeltQuoteState::Pending,
total_spent: Amount::new(0, CurrencyUnit::Sat),
})
}
async fn create_incoming_payment_request(
&self,
options: IncomingPaymentOptions,
) -> Result<CreateIncomingPaymentResponse, Self::Err> {
let onchain_options = match options {
IncomingPaymentOptions::Onchain(o) => o,
_ => return Err(cdk_common::payment::Error::UnsupportedPaymentOption),
};
let mut wallet_with_db = self.wallet_with_db.lock().await;
let address = wallet_with_db
.wallet
.reveal_next_address(KeychainKind::External);
let address_str = address.address.to_string();
wallet_with_db.persist().map_err(|err| {
tracing::error!("Could not persist to bdk db: {}", err);
Error::BdkPersist
})?;
let quote_id = onchain_options.quote_id;
self.storage
.track_receive_address(&address_str, "e_id.to_string())
.await?;
Ok(CreateIncomingPaymentResponse {
request_lookup_id: PaymentIdentifier::QuoteId(quote_id),
request: address_str,
expiry: None,
extra_json: None,
})
}
async fn wait_payment_event(
&self,
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Self::Err> {
self.wait_invoice_is_active.store(true, Ordering::SeqCst);
let receiver = self.payment_sender.subscribe();
let stream = PaymentEventStream {
receiver: BroadcastStream::new(receiver),
cancel: Box::pin(self.wait_invoice_cancel_token.clone().cancelled_owned()),
is_active: Arc::clone(&self.wait_invoice_is_active),
};
Ok(Box::pin(stream))
}
async fn check_incoming_payment_status(
&self,
payment_identifier: &PaymentIdentifier,
) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
let PaymentIdentifier::QuoteId(quote_id) = payment_identifier else {
return Err(Error::UnsupportedOnchain.into());
};
let quote_id_str = quote_id.to_string();
let mut results = Vec::new();
let finalized = self
.storage
.get_finalized_receive_intents_by_quote_id("e_id_str)
.await?;
for record in finalized {
results.push(WaitPaymentResponse {
payment_identifier: payment_identifier.clone(),
payment_amount: Amount::new(record.amount_sat, CurrencyUnit::Sat),
payment_id: record.outpoint,
});
}
Ok(results)
}
async fn check_outgoing_payment(
&self,
payment_identifier: &PaymentIdentifier,
) -> Result<MakePaymentResponse, Self::Err> {
let quote_id = match payment_identifier {
PaymentIdentifier::QuoteId(id) => id.to_string(),
_ => return Err(Error::UnsupportedOnchain.into()),
};
if let Some(record) = self.storage.get_send_intent_by_quote_id("e_id).await? {
let total_spent = match &record.state {
crate::send::payment_intent::record::SendIntentState::Pending { .. }
| crate::send::payment_intent::record::SendIntentState::Batched { .. } => {
Amount::new(0, CurrencyUnit::Sat)
}
crate::send::payment_intent::record::SendIntentState::AwaitingConfirmation {
fee_contribution_sat,
..
} => Amount::new(record.amount_sat + fee_contribution_sat, CurrencyUnit::Sat),
crate::send::payment_intent::record::SendIntentState::Failed { .. } => {
Amount::new(0, CurrencyUnit::Sat)
}
};
let status = match record.state {
crate::send::payment_intent::record::SendIntentState::Pending { .. }
| crate::send::payment_intent::record::SendIntentState::Batched { .. }
| crate::send::payment_intent::record::SendIntentState::AwaitingConfirmation {
..
} => MeltQuoteState::Pending,
crate::send::payment_intent::record::SendIntentState::Failed { .. } => {
MeltQuoteState::Failed
}
};
return Ok(MakePaymentResponse {
payment_lookup_id: payment_identifier.clone(),
payment_proof: None,
status,
total_spent,
});
}
if let Some(record) = self
.storage
.get_finalized_intent_by_quote_id("e_id)
.await?
{
return Ok(MakePaymentResponse {
payment_lookup_id: payment_identifier.clone(),
payment_proof: Some(record.outpoint),
status: MeltQuoteState::Paid,
total_spent: Amount::new(record.total_spent_sat, CurrencyUnit::Sat),
});
}
Ok(MakePaymentResponse {
payment_lookup_id: payment_identifier.clone(),
payment_proof: None,
status: MeltQuoteState::Unknown,
total_spent: Amount::new(0, CurrencyUnit::Sat),
})
}
fn is_payment_event_stream_active(&self) -> bool {
self.wait_invoice_is_active.load(Ordering::SeqCst)
}
fn cancel_payment_event_stream(&self) {
self.wait_invoice_cancel_token.cancel();
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use bdk_wallet::bitcoin::hashes::Hash as _;
use bdk_wallet::bitcoin::{
absolute, transaction, Network, OutPoint, Sequence, Transaction, TxIn, TxOut, Txid, Witness,
};
use bdk_wallet::keys::bip39::Mnemonic;
use cdk_common::common::FeeReserve;
use cdk_common::payment::MintPayment;
use futures::StreamExt;
use super::*;
use crate::fee::apply_quote_fee_safety;
async fn build_test_instance(shutdown_timeout_secs: u64) -> CdkBdk {
build_test_instance_with_tempdir(shutdown_timeout_secs)
.await
.0
}
async fn build_test_instance_with_tempdir(
shutdown_timeout_secs: u64,
) -> (CdkBdk, tempfile::TempDir) {
build_test_instance_with_config(shutdown_timeout_secs, None, 60)
.await
.expect("build CdkBdk test instance")
}
async fn build_test_instance_with_config(
shutdown_timeout_secs: u64,
batch_config: Option<BatchConfig>,
sync_interval_secs: u64,
) -> Result<(CdkBdk, tempfile::TempDir), Error> {
let tmp = tempfile::tempdir().expect("tempdir");
let mnemonic = Mnemonic::from_str(
"abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
)
.expect("mnemonic");
let kv = cdk_sqlite::mint::memory::empty()
.await
.expect("in-memory kv store");
let chain_source = ChainSource::Esplora(EsploraConfig {
url: "http://127.0.0.1:1".to_string(),
parallel_requests: 1,
});
let fee_reserve = FeeReserve {
min_fee_reserve: Amount::new(1, CurrencyUnit::Sat).into(),
percent_fee_reserve: 0.02,
};
let backend = CdkBdk::new(
mnemonic,
Network::Regtest,
chain_source,
tmp.path().to_string_lossy().into_owned(),
fee_reserve,
Arc::new(kv),
batch_config,
1,
0,
546,
sync_interval_secs,
Some(shutdown_timeout_secs),
None,
)?;
Ok((backend, tmp))
}
async fn fund_backend_wallet(backend: &CdkBdk, amount_sat: u64) {
let mut wallet_with_db = backend.wallet_with_db.lock().await;
let funding_script = wallet_with_db
.wallet
.reveal_next_address(KeychainKind::External)
.address
.script_pubkey();
let funding_tx = Transaction {
version: transaction::Version::TWO,
lock_time: absolute::LockTime::ZERO,
input: vec![TxIn {
previous_output: OutPoint::new(Txid::all_zeros(), 0),
script_sig: Default::default(),
sequence: Sequence::ENABLE_RBF_NO_LOCKTIME,
witness: Witness::new(),
}],
output: vec![TxOut {
value: bdk_wallet::bitcoin::Amount::from_sat(amount_sat),
script_pubkey: funding_script,
}],
};
wallet_with_db
.wallet
.apply_unconfirmed_txs([(funding_tx, 0)]);
wallet_with_db.persist().expect("persist funded wallet");
}
#[tokio::test]
async fn test_new_rejects_zero_sync_interval() {
match build_test_instance_with_config(5, None, 0).await {
Err(Error::InvalidConfig(message)) => {
assert!(message.contains("sync_interval_secs"));
}
Ok(_) => panic!("zero sync interval should be rejected"),
Err(err) => panic!("expected invalid config error, got {err}"),
}
}
#[tokio::test]
async fn test_new_rejects_zero_batch_poll_interval() {
let batch_config = BatchConfig {
poll_interval: Duration::ZERO,
..BatchConfig::default()
};
match build_test_instance_with_config(5, Some(batch_config), 60).await {
Err(Error::InvalidConfig(message)) => {
assert!(message.contains("poll_interval"));
}
Ok(_) => panic!("zero batch poll interval should be rejected"),
Err(err) => panic!("expected invalid config error, got {err}"),
}
}
#[tokio::test]
async fn test_new_rejects_zero_target_block_time() {
let batch_config = BatchConfig {
target_block_time: Duration::ZERO,
..BatchConfig::default()
};
match build_test_instance_with_config(5, Some(batch_config), 60).await {
Err(Error::InvalidConfig(message)) => {
assert!(message.contains("target_block_time"));
}
Ok(_) => panic!("zero target block time should be rejected"),
Err(err) => panic!("expected invalid config error, got {err}"),
}
}
#[tokio::test]
async fn test_new_rejects_invalid_fallback_fee_rate() {
let batch_config = BatchConfig {
fee_estimation: FeeEstimationConfig {
fallback_sat_per_vb: 0.0,
..FeeEstimationConfig::default()
},
..BatchConfig::default()
};
match build_test_instance_with_config(5, Some(batch_config), 60).await {
Err(Error::InvalidConfig(message)) => {
assert!(message.contains("fallback_sat_per_vb"));
}
Ok(_) => panic!("invalid fallback fee rate should be rejected"),
Err(err) => panic!("expected invalid config error, got {err}"),
}
}
#[test]
fn test_default_batch_deadlines_match_advertised_blocks() {
let batch_config = BatchConfig::default();
assert_eq!(batch_config.target_block_time, Duration::from_secs(600));
assert_eq!(batch_config.standard_deadline, Duration::from_secs(3600));
assert_eq!(batch_config.economy_deadline, Duration::from_secs(86_400));
assert_eq!(
batch_config.max_intent_age,
Some(Duration::from_secs(86_430))
);
}
#[tokio::test]
async fn test_start_then_stop_exits_promptly() {
let backend = build_test_instance(5).await;
let started = tokio::time::timeout(Duration::from_secs(10), backend.start())
.await
.expect("start timed out");
started.expect("start should succeed");
let stopped = tokio::time::timeout(Duration::from_secs(10), backend.stop())
.await
.expect("stop timed out");
stopped.expect("stop should succeed");
}
#[tokio::test]
async fn test_double_start_returns_already_started() {
let backend = build_test_instance(5).await;
backend.start().await.expect("first start");
let second = backend.start().await;
assert!(second.is_err(), "second start should error");
backend.stop().await.expect("stop");
}
#[tokio::test]
async fn test_stop_without_start_is_ok() {
let backend = build_test_instance(5).await;
backend.stop().await.expect("stop on never-started is ok");
backend.stop().await.expect("double stop is ok");
}
#[tokio::test]
async fn test_restart_after_stop() {
let backend = build_test_instance(5).await;
backend.start().await.expect("first start");
backend.stop().await.expect("first stop");
backend.start().await.expect("second start");
backend.stop().await.expect("second stop");
}
#[tokio::test]
async fn test_wait_payment_event_tracks_active_state_and_cancels() {
let backend = build_test_instance(5).await;
assert!(!backend.is_payment_event_stream_active());
let mut stream = backend
.wait_payment_event()
.await
.expect("payment event stream");
assert!(backend.is_payment_event_stream_active());
backend.cancel_payment_event_stream();
let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("stream should observe cancellation promptly");
assert!(next.is_none());
assert!(!backend.is_payment_event_stream_active());
}
#[test]
fn test_quote_fee_safety_adds_multiplier_and_fixed_margin() {
let config = FeeEstimationConfig {
quote_safety_multiplier: 1.25,
quote_fixed_safety_sat: 500,
..FeeEstimationConfig::default()
};
assert_eq!(apply_quote_fee_safety(1_000, &config), 1_750);
}
#[tokio::test]
async fn test_fee_rate_cache_falls_back_on_error() {
let backend = build_test_instance(5).await;
let tier_err = backend
.estimate_fee_rate_sat_per_vb(PaymentTier::Immediate)
.await;
assert!(
tier_err.is_err(),
"fee rate estimation should fail against bogus Esplora URL"
);
}
#[tokio::test]
async fn test_get_payment_quote_does_not_stage_wallet_changes() {
let (backend, _tmp) = build_test_instance_with_tempdir(5).await;
fund_backend_wallet(&backend, 100_000).await;
let (_quote_id, options) = onchain_options_for(10_000);
backend
.get_payment_quote(&CurrencyUnit::Sat, options)
.await
.expect("quote should succeed with fallback fee rate");
let wallet_with_db = backend.wallet_with_db.lock().await;
assert!(
wallet_with_db.wallet.staged().is_none(),
"quote estimation must not mutate or stage BDK wallet state"
);
}
#[tokio::test]
async fn test_default_fee_options_emit_immediate_only() {
let (backend, _tmp) = build_test_instance_with_tempdir(5).await;
fund_backend_wallet(&backend, 100_000).await;
let (_quote_id, options) = onchain_options_for(10_000);
let quote = backend
.get_payment_quote(&CurrencyUnit::Sat, options)
.await
.expect("quote should succeed");
let fee_options = quote.fee_options.expect("fee options");
assert_eq!(fee_options.len(), 1);
assert_eq!(fee_options[0].fee_index, 0);
assert_eq!(fee_options[0].estimated_blocks, 1);
}
#[tokio::test]
async fn test_configured_fee_options_emit_indexes_in_order() {
let batch_config = BatchConfig {
fee_options: vec![
PaymentTier::Immediate,
PaymentTier::Standard,
PaymentTier::Economy,
],
..BatchConfig::default()
};
let (backend, _tmp) = build_test_instance_with_config(5, Some(batch_config), 60)
.await
.expect("build CdkBdk test instance");
fund_backend_wallet(&backend, 100_000).await;
let (_quote_id, options) = onchain_options_for(10_000);
let quote = backend
.get_payment_quote(&CurrencyUnit::Sat, options)
.await
.expect("quote should succeed");
let fee_options = quote.fee_options.expect("fee options");
let indexes: Vec<u32> = fee_options.iter().map(|option| option.fee_index).collect();
let estimated_blocks: Vec<u32> = fee_options
.iter()
.map(|option| option.estimated_blocks)
.collect();
assert_eq!(indexes, vec![0, 1, 2]);
assert_eq!(estimated_blocks, vec![1, 6, 144]);
}
#[tokio::test]
async fn test_configured_fee_index_resolves_by_position() {
let batch_config = BatchConfig {
fee_options: vec![PaymentTier::Immediate, PaymentTier::Economy],
..BatchConfig::default()
};
let (backend, _tmp) = build_test_instance_with_config(5, Some(batch_config), 60)
.await
.expect("build CdkBdk test instance");
fund_backend_wallet(&backend, 100_000).await;
let (quote_id, mut options) = onchain_options_for(10_000);
let OutgoingPaymentOptions::Onchain(onchain) = &mut options else {
panic!("expected onchain options");
};
onchain.fee_index = Some(1);
onchain.max_fee_amount = Some(Amount::new(10_000, CurrencyUnit::Sat));
backend
.make_payment(&CurrencyUnit::Sat, options)
.await
.expect("make_payment should enqueue the intent");
let intent = backend
.storage
.get_send_intent_by_quote_id("e_id.to_string())
.await
.expect("lookup send intent by quote id")
.expect("send intent should be persisted");
assert_eq!(intent.tier, PaymentTier::Economy);
}
#[tokio::test]
async fn test_make_payment_omitted_fee_index_defaults_to_immediate() {
let batch_config = BatchConfig {
fee_options: vec![PaymentTier::Immediate, PaymentTier::Economy],
..BatchConfig::default()
};
let (backend, _tmp) = build_test_instance_with_config(5, Some(batch_config), 60)
.await
.expect("build CdkBdk test instance");
fund_backend_wallet(&backend, 100_000).await;
let (quote_id, options) = onchain_options_for(10_000);
backend
.make_payment(&CurrencyUnit::Sat, options)
.await
.expect("make_payment should enqueue the intent");
let intent = backend
.storage
.get_send_intent_by_quote_id("e_id.to_string())
.await
.expect("lookup send intent by quote id")
.expect("send intent should be persisted");
assert_eq!(intent.tier, PaymentTier::Immediate);
}
#[tokio::test]
async fn test_new_rejects_invalid_fee_option_lists() {
for fee_options in [
Vec::new(),
vec![PaymentTier::Immediate, PaymentTier::Immediate],
vec![
PaymentTier::Immediate,
PaymentTier::Standard,
PaymentTier::Economy,
PaymentTier::Immediate,
],
] {
let batch_config = BatchConfig {
fee_options,
..BatchConfig::default()
};
match build_test_instance_with_config(5, Some(batch_config), 60).await {
Err(Error::InvalidConfig(message)) => {
assert!(message.contains("fee_options"));
}
Ok(_) => panic!("invalid fee options should be rejected"),
Err(err) => panic!("expected invalid config error, got {err}"),
}
}
}
#[tokio::test]
async fn test_get_payment_quote_rejects_empty_wallet() {
let backend = build_test_instance(5).await;
let (_quote_id, options) = onchain_options_for(10_000);
let err = backend
.get_payment_quote(&CurrencyUnit::Sat, options)
.await
.expect_err("empty wallet should not receive an onchain quote");
let cdk_common::payment::Error::Onchain(inner) = err else {
panic!("expected onchain error");
};
let backend_err = inner
.downcast_ref::<Error>()
.expect("expected cdk-bdk backend error");
assert!(matches!(backend_err, Error::NoSpendableUtxos));
}
#[tokio::test]
async fn test_make_payment_rechecks_current_fee_against_max_fee() {
let (backend, _tmp) = build_test_instance_with_tempdir(5).await;
fund_backend_wallet(&backend, 100_000).await;
let (quote_id, mut options) = onchain_options_for(10_000);
let OutgoingPaymentOptions::Onchain(onchain) = &mut options else {
panic!("expected onchain options");
};
onchain.max_fee_amount = Some(Amount::new(1, CurrencyUnit::Sat));
let err = backend
.make_payment(&CurrencyUnit::Sat, options)
.await
.expect_err("payment should be rejected when current fee exceeds max");
let cdk_common::payment::Error::Onchain(inner) = err else {
panic!("expected onchain error");
};
match inner.downcast_ref::<Error>() {
Some(Error::EstimatedFeeTooHigh { max_fee, .. }) => assert_eq!(*max_fee, 1),
other => panic!("expected EstimatedFeeTooHigh, got {other:?}"),
}
assert!(
backend
.storage
.get_send_intent_by_quote_id("e_id.to_string())
.await
.expect("lookup send intent by quote id")
.is_none(),
"fee recheck rejection must not leave a pending send intent behind"
);
}
#[tokio::test]
async fn test_get_settings_reports_min_send_amount() {
let backend = build_test_instance(5).await;
let settings = backend.get_settings().await.expect("settings");
let onchain = settings.onchain.expect("onchain settings");
assert_eq!(onchain.min_receive_amount_sat, 0);
assert_eq!(onchain.min_send_amount_sat, 546);
}
use cdk_common::payment::OnchainOutgoingPaymentOptions;
use cdk_common::QuoteId;
use uuid::Uuid;
fn onchain_options_for(amount_sat: u64) -> (QuoteId, OutgoingPaymentOptions) {
let quote_id = QuoteId::UUID(Uuid::new_v4());
(
quote_id.clone(),
onchain_options_for_quote(quote_id, amount_sat),
)
}
fn onchain_options_for_quote(quote_id: QuoteId, amount_sat: u64) -> OutgoingPaymentOptions {
OutgoingPaymentOptions::Onchain(Box::new(OnchainOutgoingPaymentOptions {
address: "bcrt1qw508d6qejxtdg4y5r3zarvary0c5xw7kygt080".to_string(),
amount: Amount::new(amount_sat, CurrencyUnit::Sat),
max_fee_amount: Some(Amount::new(1_000, CurrencyUnit::Sat)),
quote_id,
fee_index: None,
metadata: None,
}))
}
#[tokio::test]
async fn test_make_payment_pending_total_spent_is_zero() {
let (backend, _tmp) = build_test_instance_with_tempdir(5).await;
fund_backend_wallet(&backend, 100_000).await;
let (quote_id, options) = onchain_options_for(10_000);
let response = backend
.make_payment(&CurrencyUnit::Sat, options)
.await
.expect("make_payment should enqueue the intent");
assert_eq!(response.status, MeltQuoteState::Pending);
assert_eq!(
response.payment_lookup_id,
PaymentIdentifier::QuoteId(quote_id)
);
assert_eq!(
response.total_spent,
Amount::new(0, CurrencyUnit::Sat),
"Pending onchain response MUST use 0 sentinel; the real \
total_spent is only known after the batch transaction is built"
);
}
#[tokio::test]
async fn test_get_payment_quote_rejects_dust_output() {
let backend = build_test_instance(5).await;
let (_quote_id, options) = onchain_options_for(1);
let err = backend
.get_payment_quote(&CurrencyUnit::Sat, options)
.await
.expect_err("dust output should be rejected at quote time");
let cdk_common::payment::Error::Onchain(inner) = err else {
panic!("expected onchain error");
};
let backend_err = inner
.downcast_ref::<Error>()
.expect("expected cdk-bdk backend error");
assert!(matches!(backend_err, Error::DustOutput { .. }));
}
#[tokio::test]
async fn test_make_payment_rejects_dust_output_without_persisting_intent() {
let backend = build_test_instance(5).await;
let (quote_id, options) = onchain_options_for(1);
let err = backend
.make_payment(&CurrencyUnit::Sat, options)
.await
.expect_err("dust output should be rejected before enqueue");
let cdk_common::payment::Error::Onchain(inner) = err else {
panic!("expected onchain error");
};
let backend_err = inner
.downcast_ref::<Error>()
.expect("expected cdk-bdk backend error");
assert!(matches!(backend_err, Error::DustOutput { .. }));
assert!(
backend
.storage
.get_send_intent_by_quote_id("e_id.to_string())
.await
.expect("lookup send intent by quote id")
.is_none(),
"dust rejection must not leave a pending send intent behind"
);
}
#[tokio::test]
async fn test_get_payment_quote_rejects_amount_below_minimum_send() {
let backend = build_test_instance(5).await;
let (_quote_id, options) = onchain_options_for(545);
let err = backend
.get_payment_quote(&CurrencyUnit::Sat, options)
.await
.expect_err("amount below configured minimum should be rejected at quote time");
let cdk_common::payment::Error::Onchain(inner) = err else {
panic!("expected onchain error");
};
let backend_err = inner
.downcast_ref::<Error>()
.expect("expected cdk-bdk backend error");
assert!(matches!(
backend_err,
Error::AmountBelowMinimumSend {
amount: 545,
min: 546
}
));
}
#[tokio::test]
async fn test_make_payment_rejects_amount_below_minimum_send_without_persisting_intent() {
let backend = build_test_instance(5).await;
let (quote_id, options) = onchain_options_for(545);
let err = backend
.make_payment(&CurrencyUnit::Sat, options)
.await
.expect_err("amount below configured minimum should be rejected before enqueue");
let cdk_common::payment::Error::Onchain(inner) = err else {
panic!("expected onchain error");
};
let backend_err = inner
.downcast_ref::<Error>()
.expect("expected cdk-bdk backend error");
assert!(matches!(
backend_err,
Error::AmountBelowMinimumSend {
amount: 545,
min: 546
}
));
assert!(
backend
.storage
.get_send_intent_by_quote_id("e_id.to_string())
.await
.expect("lookup send intent by quote id")
.is_none(),
"minimum-send rejection must not leave a pending send intent behind"
);
}
#[tokio::test]
async fn test_check_outgoing_payment_pending_intent_reports_zero_total_spent() {
let (backend, _tmp) = build_test_instance_with_tempdir(5).await;
fund_backend_wallet(&backend, 100_000).await;
let (quote_id, options) = onchain_options_for(12_345);
backend
.make_payment(&CurrencyUnit::Sat, options)
.await
.expect("make_payment should enqueue the intent");
let payment_identifier = PaymentIdentifier::QuoteId(quote_id);
let response = backend
.check_outgoing_payment(&payment_identifier)
.await
.expect("check_outgoing_payment for Pending intent");
assert_eq!(response.status, MeltQuoteState::Pending);
assert_eq!(response.total_spent, Amount::new(0, CurrencyUnit::Sat));
assert_eq!(response.payment_proof, None);
}
#[tokio::test]
async fn test_check_outgoing_payment_batched_intent_reports_zero_total_spent() {
use crate::send::payment_intent::SendIntent;
use crate::types::{PaymentMetadata, PaymentTier};
let backend = build_test_instance(5).await;
let quote_id = QuoteId::UUID(Uuid::new_v4());
let pending = SendIntent::new(
&backend.storage,
quote_id.to_string(),
"bcrt1qw508d6qejxtdg4y5r3zarvary0c5xw7kygt080".to_string(),
20_000,
1_000,
PaymentTier::Standard,
PaymentMetadata::default(),
)
.await
.expect("create Pending send intent");
pending
.assign_to_batch(&backend.storage, Uuid::new_v4())
.await
.expect("transition Pending → Batched");
let payment_identifier = PaymentIdentifier::QuoteId(quote_id);
let response = backend
.check_outgoing_payment(&payment_identifier)
.await
.expect("check_outgoing_payment for Batched intent");
assert_eq!(response.status, MeltQuoteState::Pending);
assert_eq!(
response.total_spent,
Amount::new(0, CurrencyUnit::Sat),
"Batched intents report total_spent = 0 until the batch \
transaction is built and the per-intent fee is fixed"
);
}
#[tokio::test]
async fn test_check_outgoing_payment_awaiting_confirmation_includes_fee() {
use crate::send::payment_intent::SendIntent;
use crate::types::{PaymentMetadata, PaymentTier};
let backend = build_test_instance(5).await;
let quote_id = QuoteId::UUID(Uuid::new_v4());
let pending = SendIntent::new(
&backend.storage,
quote_id.to_string(),
"bcrt1qw508d6qejxtdg4y5r3zarvary0c5xw7kygt080".to_string(),
30_000,
2_000,
PaymentTier::Immediate,
PaymentMetadata::default(),
)
.await
.expect("create Pending send intent");
let batched = pending
.assign_to_batch(&backend.storage, Uuid::new_v4())
.await
.expect("transition Pending → Batched");
let fee_contrib = 512_u64;
batched
.mark_broadcast(
&backend.storage,
"deadbeef".to_string(),
"deadbeef:0".to_string(),
fee_contrib,
)
.await
.expect("transition Batched → AwaitingConfirmation");
let payment_identifier = PaymentIdentifier::QuoteId(quote_id);
let response = backend
.check_outgoing_payment(&payment_identifier)
.await
.expect("check_outgoing_payment for AwaitingConfirmation intent");
assert_eq!(response.status, MeltQuoteState::Pending);
assert_eq!(
response.total_spent,
Amount::new(30_000 + fee_contrib, CurrencyUnit::Sat),
"AwaitingConfirmation intents know the per-intent fee \
contribution and must report amount + fee"
);
}
#[tokio::test]
async fn test_check_outgoing_payment_failed_intent_reports_failed() {
use crate::send::payment_intent::SendIntent;
use crate::types::{PaymentMetadata, PaymentTier};
let backend = build_test_instance(5).await;
let quote_id = QuoteId::UUID(Uuid::new_v4());
let pending = SendIntent::new(
&backend.storage,
quote_id.to_string(),
"bcrt1qw508d6qejxtdg4y5r3zarvary0c5xw7kygt080".to_string(),
30_000,
2_000,
PaymentTier::Immediate,
PaymentMetadata::default(),
)
.await
.expect("create Pending send intent");
pending
.fail(&backend.storage, "fee too high".to_string())
.await
.expect("transition Pending to Failed");
let payment_identifier = PaymentIdentifier::QuoteId(quote_id);
let response = backend
.check_outgoing_payment(&payment_identifier)
.await
.expect("check_outgoing_payment for Failed intent");
assert_eq!(response.status, MeltQuoteState::Failed);
assert_eq!(response.total_spent, Amount::new(0, CurrencyUnit::Sat));
assert_eq!(response.payment_proof, None);
}
#[tokio::test]
async fn test_make_payment_can_retry_failed_intent_with_same_quote_id() {
let (backend, _tmp) = build_test_instance_with_tempdir(5).await;
fund_backend_wallet(&backend, 100_000).await;
let (quote_id, options) = onchain_options_for(30_000);
backend
.make_payment(&CurrencyUnit::Sat, options)
.await
.expect("initial make_payment should enqueue intent");
let initial = backend
.storage
.get_send_intent_by_quote_id("e_id.to_string())
.await
.expect("lookup initial intent")
.expect("initial intent exists");
backend
.storage
.update_send_intent(
&initial.intent_id,
&crate::send::payment_intent::record::SendIntentState::Failed {
reason: "pre-sign failure".to_string(),
created_at: 1_700_000_000,
failed_at: 1_700_000_100,
},
)
.await
.expect("mark failed");
let retry_options = onchain_options_for_quote(quote_id.clone(), 30_000);
let response = backend
.make_payment(&CurrencyUnit::Sat, retry_options)
.await
.expect("retry with same quote id should requeue failed intent");
assert_eq!(response.status, MeltQuoteState::Pending);
let retried = backend
.storage
.get_send_intent_by_quote_id("e_id.to_string())
.await
.expect("lookup retried intent")
.expect("retried intent exists");
assert_eq!(retried.intent_id, initial.intent_id);
assert!(matches!(
retried.state,
crate::send::payment_intent::record::SendIntentState::Pending { .. }
));
}
#[tokio::test]
async fn test_check_outgoing_payment_unknown_quote_reports_zero() {
let backend = build_test_instance(5).await;
let quote_id = QuoteId::UUID(Uuid::new_v4());
let payment_identifier = PaymentIdentifier::QuoteId(quote_id);
let response = backend
.check_outgoing_payment(&payment_identifier)
.await
.expect("check_outgoing_payment for unknown quote");
assert_eq!(response.status, MeltQuoteState::Unknown);
assert_eq!(response.total_spent, Amount::new(0, CurrencyUnit::Sat));
assert_eq!(response.payment_proof, None);
}
#[test]
fn test_is_transient_classifies_network_errors() {
let esplora_err = Error::Esplora(
"HttpResponse { status: 525, message: \"error code: 525\" }".to_string(),
);
assert!(esplora_err.is_transient());
let esplora_404 = Error::Esplora(
"HttpResponse { status: 404, message: \"Block not found\" }".to_string(),
);
assert!(esplora_404.is_transient());
let wallet_err = Error::Wallet("invalid checkpoint".to_string());
assert!(!wallet_err.is_transient());
let vout_err = Error::VoutNotFound;
assert!(!vout_err.is_transient());
let io_err = Error::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"network timeout",
));
assert!(io_err.is_transient());
let io_other = Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"bad data",
));
assert!(!io_other.is_transient());
}
#[tokio::test]
async fn test_supervisor_restarts_failing_task_with_backoff() {
let cancel = CancellationToken::new();
let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let cancel_inner = cancel.clone();
let supervisor = tokio::spawn(async move {
super::supervise("test", cancel_inner, move |_c| {
let c = Arc::clone(&counter_clone);
async move {
c.fetch_add(1, Ordering::Relaxed);
Err::<(), Error>(Error::Esplora("boom".to_string()))
}
})
.await;
});
tokio::time::sleep(Duration::from_millis(2_500)).await;
cancel.cancel();
tokio::time::timeout(Duration::from_secs(5), supervisor)
.await
.expect("supervisor did not exit after cancel")
.expect("supervisor task panicked");
let n = counter.load(Ordering::Relaxed);
assert!(
n >= 2,
"supervisor should have restarted the task at least twice, got {n}"
);
}
#[tokio::test]
async fn test_supervisor_exits_on_ok() {
let cancel = CancellationToken::new();
let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let cancel_inner = cancel.clone();
let supervisor = tokio::spawn(async move {
super::supervise("test", cancel_inner, move |_c| {
let c = Arc::clone(&counter_clone);
async move {
c.fetch_add(1, Ordering::Relaxed);
Ok::<(), Error>(())
}
})
.await;
});
tokio::time::timeout(Duration::from_secs(5), supervisor)
.await
.expect("supervisor did not exit after Ok(())")
.expect("supervisor task panicked");
assert_eq!(
counter.load(Ordering::Relaxed),
1,
"supervisor must not restart a task that returned Ok(())"
);
}
#[tokio::test]
async fn test_supervisor_cancel_during_backoff() {
let cancel = CancellationToken::new();
let cancel_inner = cancel.clone();
let supervisor = tokio::spawn(async move {
super::supervise("test", cancel_inner, move |_c| async move {
Err::<(), Error>(Error::Esplora("boom".to_string()))
})
.await;
});
tokio::time::sleep(Duration::from_millis(200)).await;
let cancel_at = std::time::Instant::now();
cancel.cancel();
tokio::time::timeout(Duration::from_secs(2), supervisor)
.await
.expect("supervisor did not exit promptly after cancel")
.expect("supervisor task panicked");
let elapsed = cancel_at.elapsed();
assert!(
elapsed < Duration::from_millis(500),
"supervisor took {elapsed:?} to exit after cancel; expected < 500ms"
);
}
#[tokio::test]
async fn test_sync_wallet_survives_unreachable_esplora() {
let backend = build_test_instance(5).await;
backend.start().await.expect("start");
tokio::time::sleep(Duration::from_millis(500)).await;
{
let tasks = backend.tasks.lock().await;
let bg = tasks.as_ref().expect("tasks running");
assert!(
!bg.sync.is_finished(),
"sync task must not exit on transient Esplora errors"
);
}
backend.stop().await.expect("stop");
}
}