use std::sync::Arc;
use std::time::Instant;
use bdk_bitcoind_rpc::bitcoincore_rpc::{Auth, Client, Error as BitcoinRpcError, RawTx, RpcApi};
use bdk_bitcoind_rpc::{BlockEvent, Emitter, NO_EXPECTED_MEMPOOL_TXS};
use bdk_wallet::bitcoin::{Block, Transaction};
use tokio::sync::Mutex;
use tokio::time::{interval, Duration};
use tokio_util::sync::CancellationToken;
use crate::chain::{BitcoinRpcConfig, BroadcastErrorKind, BroadcastFailure, BroadcastOutcome};
use crate::error::Error;
use crate::{CdkBdk, WalletWithDb};
pub(crate) async fn apply_and_persist_chunk(
wallet: &Arc<Mutex<WalletWithDb>>,
chunk: &mut Vec<BlockEvent<Block>>,
warn_ms: u64,
) -> Result<(), Error> {
if chunk.is_empty() {
return Ok(());
}
let start = Instant::now();
let chunk_len = chunk.len();
{
let mut w = wallet.lock().await;
for block in chunk.drain(..) {
w.wallet
.apply_block_connected_to(&block.block, block.block_height(), block.connected_to())
.map_err(|e| Error::Wallet(e.to_string()))?;
}
w.persist()?;
}
let elapsed_ms = start.elapsed().as_millis() as u64;
if elapsed_ms > warn_ms {
tracing::warn!(
held_ms = elapsed_ms,
warn_ms,
chunk_size = chunk_len,
"Wallet lock held longer than configured warning threshold during block apply"
);
}
Ok(())
}
pub(crate) async fn sync_bitcoin_rpc(
cdk_bdk: &CdkBdk,
config: &BitcoinRpcConfig,
cancel_token: CancellationToken,
) -> Result<(), Error> {
let mut sync_interval = interval(Duration::from_secs(cdk_bdk.sync_interval_secs));
let apply_chunk_size = cdk_bdk.sync_config.apply_chunk_size.max(1);
let warn_ms = cdk_bdk.sync_config.lock_hold_warn_ms;
let mut rpc_client: Option<Arc<Client>> = None;
let mut consecutive_failures: u32 = 0;
tracing::info!(
host = %config.host,
port = config.port,
interval_secs = cdk_bdk.sync_interval_secs,
"Starting continuous block monitoring via Bitcoin RPC"
);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
tracing::info!("Wallet sync cancelled via cancel token");
break;
}
_ = sync_interval.tick() => {
let client = match &rpc_client {
Some(c) => Arc::clone(c),
None => {
match Client::new(
&format!("http://{}:{}", config.host, config.port),
Auth::UserPass(
config.user.clone(),
config.password.clone(),
),
) {
Ok(c) => {
let arc = Arc::new(c);
rpc_client = Some(Arc::clone(&arc));
arc
}
Err(e) => {
consecutive_failures =
consecutive_failures.saturating_add(1);
tracing::warn!(
error = %e,
consecutive_failures,
"Failed to construct Bitcoin RPC client; will retry on next tick"
);
continue;
}
}
}
};
let checkpoint = {
let w = cdk_bdk.wallet_with_db.lock().await;
w.wallet.latest_checkpoint()
};
let start_height = checkpoint.height();
let mut emitter = Emitter::new(
client.as_ref(),
checkpoint,
start_height,
NO_EXPECTED_MEMPOOL_TXS,
);
let mut any_applied = false;
let mut had_tick_error = false;
let mut chunk: Vec<bdk_bitcoind_rpc::BlockEvent<Block>> = Vec::with_capacity(apply_chunk_size);
loop {
match emitter.next_block() {
Ok(Some(block)) => {
chunk.push(block);
if chunk.len() >= apply_chunk_size {
if let Err(e) = apply_and_persist_chunk(
&cdk_bdk.wallet_with_db,
&mut chunk,
warn_ms,
)
.await
{
had_tick_error = true;
consecutive_failures =
consecutive_failures.saturating_add(1);
crate::sync::log_sync_failure(
"Failed to apply block chunk",
&e,
consecutive_failures,
);
rpc_client = None;
break;
}
any_applied = true;
}
}
Ok(None) => break,
Err(e) => {
had_tick_error = true;
consecutive_failures =
consecutive_failures.saturating_add(1);
if consecutive_failures >= crate::sync::SUSTAINED_FAILURE_THRESHOLD {
tracing::error!(
consecutive_failures,
"Bitcoin RPC error during sync: {e}; will retry next tick"
);
} else {
tracing::warn!(
consecutive_failures,
"Bitcoin RPC error during sync: {e}; will retry next tick"
);
}
rpc_client = None;
break;
}
}
}
if !chunk.is_empty() {
if let Err(e) = apply_and_persist_chunk(
&cdk_bdk.wallet_with_db,
&mut chunk,
warn_ms,
)
.await
{
had_tick_error = true;
consecutive_failures =
consecutive_failures.saturating_add(1);
crate::sync::log_sync_failure(
"Failed to apply final block chunk",
&e,
consecutive_failures,
);
rpc_client = None;
} else {
any_applied = true;
}
}
if any_applied {
let tip = {
let w = cdk_bdk.wallet_with_db.lock().await;
w.wallet.latest_checkpoint().block_id()
};
tracing::info!(
"Synced to new tip {} at height {}",
tip.hash,
tip.height
);
}
if !had_tick_error {
if consecutive_failures > 0 {
tracing::info!(
recovered_after = consecutive_failures,
"Bitcoin RPC sync recovered"
);
}
consecutive_failures = 0;
cdk_bdk.run_reconciliation().await;
}
}
}
}
Ok(())
}
pub(crate) fn classify_bitcoin_rpc_broadcast_message(message: &str) -> BroadcastErrorKind {
let message = message.to_ascii_lowercase();
if message.contains("already in block chain")
|| message.contains("already in blockchain")
|| message.contains("already have transaction")
|| message.contains("txn-already-in-mempool")
|| message.contains("transaction already in mempool")
{
return BroadcastErrorKind::Unknown;
}
if message.contains("missing inputs")
|| message.contains("bad-txns")
|| message.contains("bad-txns-inputs")
|| message.contains("txn-mempool-conflict")
|| message.contains("mandatory-script-verify-flag-failed")
|| message.contains("non-mandatory-script-verify-flag")
|| message.contains("non-bip68-final")
|| message.contains("nonstandard")
|| message.contains("non-standard")
|| message.contains("dust")
|| message.contains("min relay")
|| message.contains("minrelay")
|| message.contains("insufficient fee")
|| message.contains("mempool min fee")
|| message.contains("fee too low")
{
return BroadcastErrorKind::Rejected;
}
if message.contains("connection")
|| message.contains("timed out")
|| message.contains("timeout")
|| message.contains("broken pipe")
|| message.contains("refused")
|| message.contains("reset")
|| message.contains("temporarily unavailable")
{
return BroadcastErrorKind::Transient;
}
BroadcastErrorKind::Unknown
}
fn is_bitcoin_rpc_already_known(error: &BitcoinRpcError) -> bool {
let message = error.to_string().to_ascii_lowercase();
message.contains("already in block chain")
|| message.contains("already in blockchain")
|| message.contains("already have transaction")
|| message.contains("txn-already-in-mempool")
|| message.contains("transaction already in mempool")
}
pub(crate) fn classify_bitcoin_rpc_broadcast_error(error: &BitcoinRpcError) -> BroadcastErrorKind {
classify_bitcoin_rpc_broadcast_message(&error.to_string())
}
pub(crate) async fn broadcast_bitcoin_rpc(
config: &BitcoinRpcConfig,
tx: Transaction,
) -> Result<BroadcastOutcome, BroadcastFailure> {
let rpc_client: Client = Client::new(
&format!("http://{}:{}", config.host, config.port),
Auth::UserPass(config.user.clone(), config.password.clone()),
)
.map_err(|e| BroadcastFailure::new(BroadcastErrorKind::Transient, e.to_string()))?;
tracing::info!(
"Broadcasting transaction: {} via bitcoin rpc",
tx.compute_txid()
);
match rpc_client.send_raw_transaction(tx.raw_hex()) {
Ok(_) => Ok(BroadcastOutcome::Accepted),
Err(e) if is_bitcoin_rpc_already_known(&e) => Ok(BroadcastOutcome::AlreadyKnown),
Err(e) => {
let kind = classify_bitcoin_rpc_broadcast_error(&e);
Err(BroadcastFailure::new(kind, e.to_string()))
}
}
}
pub(crate) async fn fetch_fee_rate_bitcoin_rpc(
config: &BitcoinRpcConfig,
target_blocks: u16,
) -> Result<f64, Error> {
let config = config.clone();
let host = config.host.clone();
let port = config.port;
tokio::task::spawn_blocking(move || {
let rpc_client = Client::new(
&format!("http://{}:{}", host, port),
Auth::UserPass(config.user, config.password),
)?;
let estimate = rpc_client.estimate_smart_fee(target_blocks, None)?;
if let Some(fee_rate_btc_per_kvb) = estimate.fee_rate {
let sat_per_vb = fee_rate_btc_per_kvb.to_btc() * 100_000.0;
Ok(sat_per_vb)
} else {
Err(Error::FeeEstimationUnavailable)
}
})
.await
.map_err(|e| Error::FeeEstimationFailed(e.to_string()))?
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn classify_bitcoin_rpc_broadcast_errors() {
assert_eq!(
classify_bitcoin_rpc_broadcast_message("RPC error: missing inputs"),
BroadcastErrorKind::Rejected
);
assert_eq!(
classify_bitcoin_rpc_broadcast_message("connection refused"),
BroadcastErrorKind::Transient
);
assert_eq!(
classify_bitcoin_rpc_broadcast_message("some new backend error"),
BroadcastErrorKind::Unknown
);
}
}