Skip to main content

bark/
chain.rs

1
2
3use std::borrow::Borrow;
4use std::collections::{HashMap, HashSet};
5use std::str::FromStr as _;
6use std::time::Duration;
7
8use anyhow::Context;
9use bdk_core::{BlockId, CheckPoint};
10use bdk_esplora::esplora_client;
11use bitcoin::constants::genesis_block;
12use bitcoin::{
13	Amount, Block, BlockHash, FeeRate, Network, OutPoint, Transaction, Txid, Weight,
14};
15use log::{debug, info, warn};
16use tokio::sync::RwLock;
17
18use crate::utils::time::Instant;
19
20use bitcoin_ext::{BlockHeight, BlockRef, FeeRateExt, TxStatus};
21use bitcoin_ext::rpc;
22#[cfg(feature = "bitcoind-rpc")]
23use bitcoin_ext::rpc::{
24	BitcoinRpcClient, RPC_INVALID_ADDRESS_OR_KEY, RPC_VERIFY_ALREADY_IN_UTXO_SET,
25};
26#[cfg(feature = "bitcoind-rpc")]
27use bitcoind_async_client::Client as BitcoindClient;
28#[cfg(feature = "bitcoind-rpc")]
29use bitcoind_async_client::error::ClientError as BitcoindClientError;
30#[cfg(feature = "bitcoind-rpc")]
31use bitcoind_async_client::traits::{Broadcaster, Reader};
32
33const FEE_RATE_TARGET_CONF_FAST: u16 = 1;
34const FEE_RATE_TARGET_CONF_REGULAR: u16 = 3;
35const FEE_RATE_TARGET_CONF_SLOW: u16 = 6;
36
37/// Coalesce bursts of `tip()` calls within the same `Wallet::sync()` cycle
38/// (parallel sub-syncs each fetch tip, and the exit progress state machine
39/// fetches it twice per iteration). Short enough to be invisible to tests
40/// and UI; long enough to dedupe within a single sync burst.
41const TIP_CACHE_TTL: Duration = Duration::from_secs(1);
42
43/// Fee estimates change on the scale of minutes, so refreshing more often
44/// than this buys nothing while costing one HTTP round trip per sync tick.
45const FEE_RATES_CACHE_TTL: Duration = Duration::from_secs(30);
46
47#[cfg(feature = "bitcoind-rpc")]
48const MIN_BITCOIND_VERSION: usize = 290000;
49
50/// Configuration for the onchain data source.
51///
52/// [ChainSource] selects which backend to use for blockchain data and transaction broadcasting:
53/// - Bitcoind: uses a Bitcoin Core node via JSON-RPC
54/// - Esplora: uses the HTTP API endpoint of [esplora-electrs](https://github.com/Blockstream/electrs)
55///
56/// Typical usage is to construct a ChainSource from configuration and pass it to
57/// [ChainSource::new] along with the expected [Network].
58///
59/// Notes:
60/// - For [ChainSourceSpec::Bitcoind], authentication must be provided (cookie file or user/pass).
61#[derive(Clone, Debug)]
62pub enum ChainSourceSpec {
63	Bitcoind {
64		/// RPC URL of the Bitcoin Core node (e.g. <http://127.0.0.1:8332>).
65		url: String,
66		/// Authentication method for JSON-RPC (cookie file or user/pass).
67		auth: rpc::Auth,
68	},
69	Esplora {
70		/// Base URL of the esplora-electrs instance (e.g. <https://esplora.signet.2nd.dev>).
71		url: String,
72	},
73}
74
75impl ChainSourceSpec {
76	pub(crate) fn url(&self) -> &String {
77		match self {
78			ChainSourceSpec::Bitcoind { url, .. } => url,
79			ChainSourceSpec::Esplora { url } => url,
80		}
81	}
82}
83
84pub enum ChainSourceClient {
85	/// Native bitcoind backend.
86	///
87	/// Carries an async client for everything the wallet does asynchronously
88	/// and a sync companion for `bdk_bitcoind_rpc::Emitter`, which is sync-only
89	/// upstream and runs inside `tokio::task::spawn_blocking`.
90	#[cfg(feature = "bitcoind-rpc")]
91	Bitcoind {
92		rpc: BitcoindClient,
93		sync: BitcoinRpcClient,
94	},
95	Esplora(esplora_client::AsyncClient),
96}
97
98impl ChainSourceClient {
99	async fn check_network(&self, expected: Network) -> anyhow::Result<()> {
100		match self {
101			#[cfg(feature = "bitcoind-rpc")]
102			ChainSourceClient::Bitcoind { rpc, .. } => {
103				let network = rpc.network().await?;
104				if expected != network {
105					bail!("Network mismatch: expected {:?}, got {:?}", expected, network);
106				}
107			},
108			ChainSourceClient::Esplora(client) => {
109				let res = client.client().get(format!("{}/block-height/0", client.url()))
110					.send().await?.text().await?;
111				let genesis_hash = BlockHash::from_str(&res)
112					.context("bad response from server (not a blockhash). Esplora client possibly misconfigured")?;
113				if genesis_hash != genesis_block(expected).block_hash() {
114					bail!("Network mismatch: expected {:?}, got {:?}", expected, genesis_hash);
115				}
116			},
117		};
118
119		Ok(())
120	}
121}
122
123/// Client for interacting with the configured on-chain backend.
124///
125/// [ChainSource] abstracts over multiple backends using [ChainSourceSpec] to provide:
126/// - Chain queries (tip, block headers/blocks, transaction status and fetching)
127/// - Mempool-related utilities (ancestor fee/weight, spending lookups)
128/// - Broadcasting single transactions or packages (RBF/CPFP workflows)
129/// - Fee estimation and caching with optional fallback values
130///
131/// Behavior notes:
132/// - [ChainSource::update_fee_rates] refreshes internal fee estimates; if backend estimates
133///   fail and a fallback fee is provided, it will be used for all tiers.
134/// - [ChainSource::fee_rates] returns the last cached [FeeRates].
135///
136/// Examples:
137///
138/// ```rust
139/// # async fn func() {
140/// use bark::chain::{ChainSource, ChainSourceSpec};
141/// use bdk_bitcoind_rpc::bitcoincore_rpc::Auth;
142/// use bitcoin::{FeeRate, Network};
143///
144/// let spec = ChainSourceSpec::Bitcoind {
145///     url: "http://localhost:8332".into(),
146///     auth: Auth::UserPass("user".into(), "password".into()),
147/// };
148/// let network = Network::Bitcoin;
149/// let fallback_fee = FeeRate::from_sat_per_vb(5);
150/// #[cfg(feature = "socks5-proxy")]
151/// let socks5 = Some("socks5h://127.0.0.1:9050");
152///
153/// let instance = ChainSource::new(spec, network, fallback_fee, socks5).await.unwrap();
154/// # }
155/// ```
156pub struct ChainSource {
157	inner: ChainSourceClient,
158	network: Network,
159	fee_rates: RwLock<FeeRates>,
160	/// `None` until the first successful (or fallback) `update_fee_rates`.
161	/// `Some(t)` makes subsequent calls within `FEE_RATES_CACHE_TTL` a no-op.
162	fee_rates_fetched_at: RwLock<Option<Instant>>,
163	/// Last observed tip height with the time it was fetched, used to
164	/// short-circuit repeat `tip()` calls within `TIP_CACHE_TTL`.
165	tip_cache: RwLock<Option<(BlockHeight, Instant)>>,
166}
167
168impl ChainSource {
169	/// Checks that the version of the chain source is compatible with Bark.
170	///
171	/// For bitcoind, it checks if the version is at least 29.0
172	/// This is the first version for which 0 fee-anchors are considered standard
173	pub async fn require_version(&self) -> anyhow::Result<()> {
174		#[cfg(feature = "bitcoind-rpc")]
175		if let ChainSourceClient::Bitcoind { rpc, .. } = self.inner() {
176			#[derive(Debug, serde::Deserialize)]
177			struct NetworkInfo { version: usize }
178			let info: NetworkInfo = rpc.call_raw("getnetworkinfo", &[]).await?;
179			if info.version < MIN_BITCOIND_VERSION {
180				bail!("Bitcoin Core version is too old, you can participate in rounds but won't be able to unilaterally exit. Please upgrade to 29.0 or higher.");
181			}
182		}
183
184		Ok(())
185	}
186
187	pub(crate) fn inner(&self) -> &ChainSourceClient {
188		&self.inner
189	}
190
191	/// Gets a cached copy of the calculated network [FeeRates]
192	pub async fn fee_rates(&self) -> FeeRates {
193		self.fee_rates.read().await.clone()
194	}
195
196	/// Gets the network that the [ChainSource] was validated against.
197	pub fn network(&self) -> Network {
198		self.network
199	}
200
201	/// Creates a new instance of the object with the specified chain source, network, and optional
202	/// fallback fee rate.
203	///
204	/// This function initializes the internal chain source client based on the provided `chain_source`:
205	/// - If `chain_source` is of type [ChainSourceSpec::Bitcoind], it creates a Bitcoin Core RPC client
206	///   using the provided URL and authentication parameters.
207	/// - If `chain_source` is of type [ChainSourceSpec::Esplora], it creates an Esplora client with the
208	///   given URL.
209	///
210	/// Both clients are initialized asynchronously, and any errors encountered during their
211	/// creation will be returned as part of the [anyhow::Result].
212	///
213	/// Additionally, the function performs a network consistency check to ensure the specified
214	/// network (e.g., `mainnet` or `signet`) matches the network configuration of the initialized
215	/// chain source client.
216	///
217	/// The `fallback_fee` parameter is optional. If provided, it is used as the default fee rate
218	/// for transactions. If not specified, the `FeeRate::BROADCAST_MIN` is used as the default fee
219	/// rate.
220	///
221	/// # Arguments
222	///
223	/// * `chain_source` - Specifies the backend to use for blockchain data.
224	/// * `network` - The Bitcoin network to operate on (e.g., `mainnet`, `testnet`, `regtest`).
225	/// * `fallback_fee` - An optional fallback fee rate to use for transaction fee estimation. If
226	///   not provided, a default fee rate of [FeeRate::BROADCAST_MIN] will be used.
227	///
228	/// # Returns
229	///
230	/// * `Ok(Self)` - If the object is successfully created with all necessary configurations.
231	/// * `Err(anyhow::Error)` - If there is an error in initializing the chain source client or
232	///   verifying the network.
233	pub async fn new(
234		spec: ChainSourceSpec,
235		network: Network,
236		fallback_fee: Option<FeeRate>,
237		#[cfg(feature = "socks5-proxy")] proxy: Option<&str>,
238	) -> anyhow::Result<Self> {
239		let inner = match spec {
240			#[cfg(feature = "bitcoind-rpc")]
241			ChainSourceSpec::Bitcoind { url, auth } => {
242				// `bdk_bitcoind_rpc::Emitter` is sync-only upstream, so we keep
243				// a sync companion to drive it inside `spawn_blocking`. The async
244				// client is used everywhere else. `BitcoinRpcClient` (rather
245				// than the bare `bitcoincore_rpc::Client`) is required so the
246				// `spawn_blocking` closure can take an owned, `Clone` value.
247				//
248				// The sync companion currently does not honour `socks5-proxy`;
249				// SOCKS5 is supported on the Esplora backend, where it is the
250				// realistic Tor-via-bitcoind use case.
251				let sync = BitcoinRpcClient::new(&url, auth.clone())
252					.context("failed to create sync bitcoind rpc client")?;
253				let async_auth = match auth {
254					rpc::Auth::None => bail!(
255						"bitcoind RPC auth is required (cookie file or user/pass)",
256					),
257					rpc::Auth::UserPass(u, p) => bitcoind_async_client::Auth::UserPass(u, p),
258					rpc::Auth::CookieFile(p) => bitcoind_async_client::Auth::CookieFile(p),
259				};
260				let rpc = BitcoindClient::new(url, async_auth, None, None, None)
261					.context("failed to create async bitcoind rpc client")?;
262				ChainSourceClient::Bitcoind { rpc, sync }
263			},
264			#[cfg(not(feature = "bitcoind-rpc"))]
265			ChainSourceSpec::Bitcoind { .. } => bail!(
266				"bitcoind RPC backend is not available: this build was compiled without \
267				 the `bitcoind-rpc` feature (notably the wasm-web build)",
268			),
269			ChainSourceSpec::Esplora { url } => ChainSourceClient::Esplora({
270				let url = crate::utils::url_with_default_https_scheme(&url);
271				// the esplora client doesn't deal well with trailing slash in url
272				let url = url.strip_suffix("/").unwrap_or(&url);
273				let mut builder = esplora_client::Builder::new(url);
274				#[cfg(feature = "socks5-proxy")]
275				if let Some(proxy) = proxy {
276					builder = builder.proxy(proxy);
277				}
278				builder.build_async()
279					.with_context(|| format!("failed to create esplora client for url {}", url))?
280			}),
281		};
282
283		inner.check_network(network).await?;
284
285		let fee = fallback_fee.unwrap_or(FeeRate::BROADCAST_MIN);
286		let fee_rates = RwLock::new(FeeRates { fast: fee, regular: fee, slow: fee });
287
288		Ok(Self {
289			inner,
290			network,
291			fee_rates,
292			fee_rates_fetched_at: RwLock::new(None),
293			tip_cache: RwLock::new(None),
294		})
295	}
296
297	async fn fetch_fee_rates(&self) -> anyhow::Result<FeeRates> {
298		match self.inner() {
299			#[cfg(feature = "bitcoind-rpc")]
300			ChainSourceClient::Bitcoind { rpc, .. } => {
301				let get_fee_rate = async |target: u16| -> anyhow::Result<FeeRate> {
302					let fee: rpc::json::EstimateSmartFeeResult = rpc.call_raw(
303						"estimatesmartfee",
304						&[
305							target.into(),
306							serde_json::to_value(rpc::json::EstimateMode::Economical)
307								.expect("serializable"),
308						],
309					).await?;
310					if let Some(fee_rate) = fee.fee_rate {
311						Ok(FeeRate::from_amount_per_kvb_ceil(fee_rate))
312					} else {
313						Err(anyhow!("No rate returned from estimate_smart_fee for a {} confirmation target", target))
314					}
315				};
316				Ok(FeeRates {
317					fast: get_fee_rate(FEE_RATE_TARGET_CONF_FAST).await?,
318					regular: get_fee_rate(FEE_RATE_TARGET_CONF_REGULAR).await.expect("should exist"),
319					slow: get_fee_rate(FEE_RATE_TARGET_CONF_SLOW).await.expect("should exist"),
320				})
321			},
322			ChainSourceClient::Esplora(client) => {
323				// The API should return rates for targets 1-25, 144 and 1008
324				let estimates = client.get_fee_estimates().await?;
325				let get_fee_rate = |target| {
326					let fee = estimates.get(&target).with_context(||
327						format!("No rate returned from get_fee_estimates for a {} confirmation target", target)
328					)?;
329					FeeRate::from_sat_per_vb_decimal_checked_ceil(*fee).with_context(||
330						format!("Invalid rate returned from get_fee_estimates {} for a {} confirmation target", fee, target)
331					)
332				};
333				Ok(FeeRates {
334					fast: get_fee_rate(FEE_RATE_TARGET_CONF_FAST)?,
335					regular: get_fee_rate(FEE_RATE_TARGET_CONF_REGULAR)?,
336					slow: get_fee_rate(FEE_RATE_TARGET_CONF_SLOW)?,
337				})
338			}
339		}
340	}
341
342	pub async fn tip(&self) -> anyhow::Result<BlockHeight> {
343		if let Some((height, fetched_at)) = *self.tip_cache.read().await {
344			if fetched_at.elapsed() < TIP_CACHE_TTL {
345				return Ok(height);
346			}
347		}
348		let height = match self.inner() {
349			#[cfg(feature = "bitcoind-rpc")]
350			ChainSourceClient::Bitcoind { rpc, .. } => {
351				rpc.get_block_count().await? as BlockHeight
352			},
353			ChainSourceClient::Esplora(client) => {
354				client.get_height().await?
355			},
356		};
357		*self.tip_cache.write().await = Some((height, Instant::now()));
358		Ok(height)
359	}
360
361	/// Drop the cached tip and fee-rate values, forcing the next call to
362	/// `tip()` or `update_fee_rates()` to round-trip the backend. Useful
363	/// in tests that fabricate chain changes faster than the TTL so the
364	/// next observation is deterministic without sleeping.
365	pub async fn invalidate_caches(&self) {
366		*self.tip_cache.write().await = None;
367		*self.fee_rates_fetched_at.write().await = None;
368	}
369
370	pub async fn tip_ref(&self) -> anyhow::Result<BlockRef> {
371		self.block_ref(self.tip().await?).await
372	}
373
374	pub async fn block_ref(&self, height: BlockHeight) -> anyhow::Result<BlockRef> {
375		match self.inner() {
376			#[cfg(feature = "bitcoind-rpc")]
377			ChainSourceClient::Bitcoind { rpc, .. } => {
378				let hash = rpc.get_block_hash(height as u64).await?;
379				Ok(BlockRef { height, hash })
380			},
381			ChainSourceClient::Esplora(client) => {
382				let hash = client.get_block_hash(height).await?;
383				Ok(BlockRef { height, hash })
384			},
385		}
386	}
387
388	pub async fn block(&self, hash: BlockHash) -> anyhow::Result<Option<Block>> {
389		match self.inner() {
390			#[cfg(feature = "bitcoind-rpc")]
391			ChainSourceClient::Bitcoind { rpc, .. } => {
392				match rpc.get_block(&hash).await {
393					Ok(block) => Ok(Some(block)),
394					Err(e) if is_not_found(&e) => Ok(None),
395					Err(e) => Err(e.into()),
396				}
397			},
398			ChainSourceClient::Esplora(client) => {
399				Ok(client.get_block_by_hash(&hash).await?)
400			},
401		}
402	}
403
404	/// Retrieves basic CPFP ancestry information of the given transaction. Confirmed transactions
405	/// are ignored as they are not relevant to CPFP.
406	pub async fn mempool_ancestor_info(&self, txid: Txid) -> anyhow::Result<MempoolAncestorInfo> {
407		let mut result = MempoolAncestorInfo::new(txid);
408
409		// TODO: Determine if any line of descendant transactions increase the effective fee rate
410		//		 of the target txid.
411		match self.inner() {
412			#[cfg(feature = "bitcoind-rpc")]
413			ChainSourceClient::Bitcoind { rpc, .. } => {
414				let entry: rpc::json::GetMempoolEntryResult = rpc.call_raw(
415					"getmempoolentry", &[serde_json::to_value(txid).expect("serializable")],
416				).await?;
417				let err = || anyhow!("missing weight parameter from getmempoolentry");
418
419				result.total_fee = entry.fees.ancestor;
420				result.total_weight = Weight::from_wu(entry.weight.ok_or_else(err)?) +
421					Weight::from_vb(entry.ancestor_size).ok_or_else(err)?;
422			},
423			ChainSourceClient::Esplora(client) => {
424				// We should first verify the transaction is in the mempool to maintain the same
425				// behavior as Bitcoin Core
426				let status = self.tx_status(txid).await?;
427				if !matches!(status, TxStatus::Mempool) {
428					return Err(anyhow!("{} is not in the mempool, status is {:?}", txid, status));
429				}
430
431				let mut info_map: HashMap<Txid, esplora_client::Tx> = HashMap::new();
432				let mut set = HashSet::from([txid]);
433				while !set.is_empty() {
434					// Start requests asynchronously
435					let requests = set.iter().filter_map(|txid| if info_map.contains_key(txid) {
436						None
437					} else {
438						Some((txid, client.get_tx_info(&txid)))
439					}).collect::<Vec<_>>();
440
441					// Collect txids to be added to the set
442					let mut next_set = HashSet::new();
443
444					// Process each request, ignoring parents of confirmed transactions
445					for (txid, request) in requests {
446						let info = request.await?
447							.ok_or_else(|| anyhow!("unable to retrieve tx info for {}", txid))?;
448						if !info.status.confirmed {
449							for vin in info.vin.iter() {
450								next_set.insert(vin.txid);
451							}
452						}
453						info_map.insert(*txid, info);
454					}
455					set = next_set;
456				}
457				// Calculate the total weight and fee of the unconfirmed ancestry
458				for info in info_map.into_values().filter(|info| !info.status.confirmed) {
459					result.total_fee += info.fee();
460					result.total_weight += info.weight();
461				}
462			},
463		}
464		// Now calculate the effective fee rate of the package
465		Ok(result)
466	}
467
468	/// For each provided outpoint, fetches the ID of any confirmed or unconfirmed in which the
469	/// outpoint is spent.
470	pub async fn txs_spending_inputs<T: IntoIterator<Item = OutPoint>>(
471		&self,
472		outpoints: T,
473		#[cfg_attr(not(feature = "bitcoind-rpc"), allow(unused_variables))]
474		block_scan_start: BlockHeight,
475	) -> anyhow::Result<TxsSpendingInputsResult> {
476		let mut res = TxsSpendingInputsResult::new();
477		match self.inner() {
478			#[cfg(feature = "bitcoind-rpc")]
479			ChainSourceClient::Bitcoind { sync, .. } => {
480				// We must offset the height to account for the fact we iterate using next_block()
481				let start = block_scan_start.saturating_sub(1);
482				let block_ref = self.block_ref(start).await?;
483				let cp = CheckPoint::new(BlockId {
484					height: block_ref.height,
485					hash: block_ref.hash,
486				});
487
488				debug!("Scanning blocks for spent outpoints with bitcoind, starting at block height {}...", block_scan_start);
489				let outpoint_set = outpoints.into_iter().collect::<HashSet<_>>();
490
491				// `bdk_bitcoind_rpc::Emitter` is sync-only upstream, so the
492				// scan loop runs inside `spawn_blocking` with the sync companion.
493				let sync_client = sync.clone();
494				let cp_for_blocking = cp.clone();
495				res = tokio::task::spawn_blocking(move || -> anyhow::Result<TxsSpendingInputsResult> {
496					let mut res = res;
497					let mut emitter = bdk_bitcoind_rpc::Emitter::new(
498						&sync_client,
499						cp_for_blocking.clone(),
500						cp_for_blocking.height(),
501						bdk_bitcoind_rpc::NO_EXPECTED_MEMPOOL_TXS,
502					);
503					while let Some(em) = emitter.next_block()? {
504						if em.block_height() % 1000 == 0 {
505							info!("Scanned for spent outpoints until block height {}", em.block_height());
506						}
507						for tx in &em.block.txdata {
508							for txin in tx.input.iter() {
509								if outpoint_set.contains(&txin.previous_output) {
510									res.add(
511										txin.previous_output.clone(),
512										tx.compute_txid(),
513										TxStatus::Confirmed(BlockRef {
514											height: em.block_height(),
515											hash: em.block.block_hash().clone(),
516										}),
517									);
518									if res.map.len() == outpoint_set.len() {
519										return Ok(res);
520									}
521								}
522							}
523						}
524					}
525
526					debug!("Finished scanning blocks for spent outpoints, now checking the mempool...");
527					let mempool = emitter.mempool()?;
528					for (tx, _last_seen) in &mempool.update {
529						for txin in tx.input.iter() {
530							if outpoint_set.contains(&txin.previous_output) {
531								res.add(
532									txin.previous_output.clone(),
533									tx.compute_txid(),
534									TxStatus::Mempool,
535								);
536								if res.map.len() == outpoint_set.len() {
537									return Ok(res);
538								}
539							}
540						}
541					}
542					debug!("Finished checking the mempool for spent outpoints");
543					Ok(res)
544				}).await.context("Emitter scan task panicked")??;
545			},
546			ChainSourceClient::Esplora(client) => {
547				for outpoint in outpoints {
548					let output_status = client.get_output_status(&outpoint.txid, outpoint.vout.into()).await?;
549
550					if let Some(output_status) = output_status {
551						if output_status.spent {
552							let tx_status = {
553								let status = output_status.status.expect("Status should be valid if an outpoint is spent");
554								if status.confirmed {
555									TxStatus::Confirmed(BlockRef {
556										height: status.block_height.expect("Confirmed transaction missing block_height"),
557										hash: status.block_hash.expect("Confirmed transaction missing block_hash"),
558									})
559								} else {
560									TxStatus::Mempool
561								}
562							};
563							let txid = output_status.txid.expect("Txid should be valid if an outpoint is spent");
564							res.add(outpoint, txid, tx_status);
565						}
566					}
567				}
568			},
569		}
570
571		Ok(res)
572	}
573
574	pub async fn broadcast_tx(&self, tx: &Transaction) -> anyhow::Result<()> {
575		match self.inner() {
576			#[cfg(feature = "bitcoind-rpc")]
577			ChainSourceClient::Bitcoind { rpc, .. } => {
578				match rpc.send_raw_transaction(tx).await {
579					Ok(_) => Ok(()),
580					Err(e) if is_in_utxo_set(&e) => Ok(()),
581					Err(e) => Err(e.into()),
582				}
583			},
584			ChainSourceClient::Esplora(client) => {
585				client.broadcast(tx).await?;
586				Ok(())
587			},
588		}
589	}
590
591	pub async fn broadcast_package(&self, txs: &[impl Borrow<Transaction>]) -> Result<(), BroadcastError> {
592		match self.inner() {
593			#[cfg(feature = "bitcoind-rpc")]
594			ChainSourceClient::Bitcoind { rpc, .. } => {
595				let hexes: Vec<String> = txs.iter()
596					.map(|t| bitcoin::consensus::encode::serialize_hex(t.borrow()))
597					.collect();
598				let res: rpc::SubmitPackageResult = rpc.call_raw("submitpackage", &[hexes.into()])
599					.await
600					.map_err(|e| BroadcastError::Other(e.to_string()))?;
601				if res.package_msg != "success" {
602					return Err(classify_submit_package_errors(
603						&res.package_msg,
604						res.tx_results.values().map(|t| (t.txid, t.error.as_deref())),
605					));
606				}
607				Ok(())
608			},
609			ChainSourceClient::Esplora(client) => {
610				let txs = txs.iter().map(|t| t.borrow().clone()).collect::<Vec<_>>();
611				let res = client.submit_package(&txs, None, None)
612					.await
613					.map_err(|e| BroadcastError::Other(e.to_string()))?;
614				if res.package_msg != "success" {
615					return Err(classify_submit_package_errors(
616						&res.package_msg,
617						res.tx_results.values().map(|t| (t.txid, t.error.as_deref())),
618					));
619				}
620
621				Ok(())
622			},
623		}
624	}
625
626	pub async fn get_tx(&self, txid: &Txid) -> anyhow::Result<Option<Transaction>> {
627		match self.inner() {
628			#[cfg(feature = "bitcoind-rpc")]
629			ChainSourceClient::Bitcoind { rpc, .. } => {
630				match rpc.get_raw_transaction_verbosity_zero(txid).await {
631					Ok(tx) => Ok(Some(tx.0)),
632					Err(e) if is_not_found(&e) => Ok(None),
633					Err(e) => Err(e.into()),
634				}
635			},
636			ChainSourceClient::Esplora(client) => {
637				Ok(client.get_tx(txid).await?)
638			},
639		}
640	}
641
642	/// Returns the block height the tx is confirmed in, if any.
643	pub async fn tx_confirmed(&self, txid: Txid) -> anyhow::Result<Option<BlockHeight>> {
644		Ok(self.tx_status(txid).await?.confirmed_height())
645	}
646
647	/// Returns the status of the given transaction, including the block height if it is confirmed
648	pub async fn tx_status(&self, txid: Txid) -> anyhow::Result<TxStatus> {
649		match self.inner() {
650			#[cfg(feature = "bitcoind-rpc")]
651			ChainSourceClient::Bitcoind { rpc, .. } => Ok(bitcoind_tx_status(rpc, txid).await?),
652			ChainSourceClient::Esplora(esplora) => {
653				match esplora.get_tx_info(&txid).await? {
654					Some(info) => match (info.status.block_height, info.status.block_hash) {
655						(Some(block_height), Some(block_hash)) => Ok(TxStatus::Confirmed(BlockRef {
656							height: block_height,
657							hash: block_hash,
658						} )),
659						_ => Ok(TxStatus::Mempool),
660					},
661					None => Ok(TxStatus::NotFound),
662				}
663			},
664		}
665	}
666
667	#[allow(unused)]
668	pub async fn txout_value(&self, outpoint: &OutPoint) -> anyhow::Result<Amount> {
669		let tx = match self.inner() {
670			#[cfg(feature = "bitcoind-rpc")]
671			ChainSourceClient::Bitcoind { rpc, .. } => {
672				rpc.get_raw_transaction_verbosity_zero(&outpoint.txid).await
673					.with_context(|| format!("tx {} unknown", outpoint.txid))?
674					.0
675			},
676			ChainSourceClient::Esplora(client) => {
677				client.get_tx(&outpoint.txid).await?
678					.with_context(|| format!("tx {} unknown", outpoint.txid))?
679			},
680		};
681		Ok(tx.output.get(outpoint.vout as usize).context("outpoint vout out of range")?.value)
682	}
683
684	/// Gets the current fee rates from the chain source, falling back to user-specified values if
685	/// necessary.
686	///
687	/// No-ops if a previous successful call ran within `FEE_RATES_CACHE_TTL`.
688	/// The fallback path overwrites the cached rates but deliberately does
689	/// not advance the cache timestamp, so the next call retries the backend
690	/// instead of serving the fallback for another full TTL.
691	pub async fn update_fee_rates(&self, fallback_fee: Option<FeeRate>) -> anyhow::Result<()> {
692		if let Some(fetched_at) = *self.fee_rates_fetched_at.read().await {
693			if fetched_at.elapsed() < FEE_RATES_CACHE_TTL {
694				return Ok(());
695			}
696		}
697		let (fee_rates, used_fallback) = match (self.fetch_fee_rates().await, fallback_fee) {
698			(Ok(fee_rates), _) => (fee_rates, false),
699			(Err(e), None) => return Err(e),
700			(Err(e), Some(fallback)) => {
701				warn!("Error getting fee rates, falling back to {} sat/kvB: {}",
702					fallback.to_btc_per_kvb(), e,
703				);
704				(FeeRates { fast: fallback, regular: fallback, slow: fallback }, true)
705			}
706		};
707
708		*self.fee_rates.write().await = fee_rates;
709		if !used_fallback {
710			*self.fee_rates_fetched_at.write().await = Some(Instant::now());
711		}
712		Ok(())
713	}
714}
715
716// ----- bitcoind-rpc feature-gated helpers ---------------------------------
717
718/// Inspect upstream `bitcoind-async-client` JSON-RPC errors for the
719/// "transaction not found" code. Mirrors the sync-side `BitcoinRpcErrorExt`
720/// in `bitcoin_ext::rpc`.
721#[cfg(feature = "bitcoind-rpc")]
722fn is_not_found(e: &BitcoindClientError) -> bool {
723	matches!(e, BitcoindClientError::Server(c, _) if *c == RPC_INVALID_ADDRESS_OR_KEY)
724}
725
726/// Inspect upstream errors for the "already in utxo set" code.
727#[cfg(feature = "bitcoind-rpc")]
728fn is_in_utxo_set(e: &BitcoindClientError) -> bool {
729	matches!(e, BitcoindClientError::Server(c, _) if *c == RPC_VERIFY_ALREADY_IN_UTXO_SET)
730}
731
732/// Two-step `getrawtransaction` + `getblockheader` to determine whether a
733/// txid is confirmed, in the mempool, or unknown.
734#[cfg(feature = "bitcoind-rpc")]
735async fn bitcoind_tx_status(
736	rpc: &BitcoindClient, txid: Txid,
737) -> Result<TxStatus, BitcoindClientError> {
738	let res: Result<rpc::GetRawTransactionResult, _> = rpc.call_raw(
739		"getrawtransaction",
740		&[serde_json::to_value(txid).expect("serializable"), true.into()],
741	).await;
742	let info = match res {
743		Ok(info) => info,
744		Err(e) if is_not_found(&e) => return Ok(TxStatus::NotFound),
745		Err(e) => return Err(e),
746	};
747	let Some(hash) = info.blockhash else {
748		return Ok(TxStatus::Mempool);
749	};
750	let header: rpc::json::GetBlockHeaderResult = rpc.call_raw(
751		"getblockheader",
752		&[serde_json::to_value(hash).expect("serializable"), true.into()],
753	).await?;
754	if header.confirmations > 0 {
755		Ok(TxStatus::Confirmed(BlockRef {
756			height: header.height as BlockHeight,
757			hash: header.hash,
758		}))
759	} else {
760		Ok(TxStatus::Mempool)
761	}
762}
763
764/// The [FeeRates] struct represents the fee rates for transactions categorized by speed or urgency.
765#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
766pub struct FeeRates {
767	/// The fee for fast transactions (higher cost, lower time delay).
768	pub fast: FeeRate,
769	/// The fee for standard-priority transactions.
770	pub regular: FeeRate,
771	/// The fee for slower transactions (lower cost, higher time delay).
772	pub slow: FeeRate,
773}
774
775/// Contains the fee information for an unconfirmed transaction found in the mempool.
776#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
777pub struct MempoolAncestorInfo {
778	/// The ID of the transaction that was queried.
779	pub txid: Txid,
780	/// The total fee of this transaction and all of its unconfirmed ancestors. If the transaction
781	/// is to be replaced, the total fees of the published package MUST exceed this.
782	pub total_fee: Amount,
783	/// The total weight of this transaction and all of its unconfirmed ancestors.
784	pub total_weight: Weight,
785}
786
787impl MempoolAncestorInfo {
788	pub fn new(txid: Txid) -> Self {
789		Self {
790			txid,
791			total_fee: Amount::ZERO,
792			total_weight: Weight::ZERO,
793		}
794	}
795
796	pub fn effective_fee_rate(&self) -> Option<FeeRate> {
797		FeeRate::from_amount_and_weight_ceil(self.total_fee, self.total_weight)
798	}
799}
800
801#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
802pub struct TxsSpendingInputsResult {
803	pub map: HashMap<OutPoint, (Txid, TxStatus)>,
804}
805
806impl TxsSpendingInputsResult {
807	pub fn new() -> Self {
808		Self { map: HashMap::new() }
809	}
810
811	pub fn add(&mut self, outpoint: OutPoint, txid: Txid, status: TxStatus) {
812		self.map.insert(outpoint, (txid, status));
813	}
814
815	pub fn get(&self, outpoint: &OutPoint) -> Option<&(Txid, TxStatus)> {
816		self.map.get(outpoint)
817	}
818
819	pub fn confirmed_txids(&self) -> impl Iterator<Item = (Txid, BlockRef)> + '_ {
820		self.map
821			.iter()
822			.filter_map(|(_, (txid, status))| {
823				match status {
824					TxStatus::Confirmed(block) => Some((*txid, *block)),
825					_ => None,
826				}
827			})
828	}
829
830	pub fn mempool_txids(&self) -> impl Iterator<Item = Txid> + '_ {
831		self.map
832			.iter()
833			.filter(|(_, (_, status))| matches!(status, TxStatus::Mempool))
834			.map(|(_, (txid, _))| *txid)
835	}
836}
837
838/// Classified failure modes when broadcasting a transaction package.
839///
840/// The reject reasons covered by the typed variants are stable Bitcoin Core mempool policy
841/// constants (`txn-already-known`, `bad-txns-inputs-missingorspent`, `insufficient fee, rejecting
842/// replacement`). Esplora forwards bitcoind's reject reasons verbatim, so the same matching works
843/// for both backends.
844#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
845pub enum BroadcastError {
846	/// The transaction is already in the mempool. Treated as success for retry-safety.
847	#[error("transaction already known to the mempool")]
848	AlreadyKnown,
849	/// Inputs are missing or already spent — typically a conflicting replacement is in the mempool.
850	#[error("transaction inputs are missing or already spent")]
851	MissingOrSpentInputs,
852	/// The replacement fee is insufficient under RBF policy.
853	#[error("insufficient fee, rejecting replacement")]
854	InsufficientReplacementFee,
855	/// Any other failure (unrecognized reject reason, RPC/transport error, etc.).
856	#[error("{0}")]
857	Other(String),
858}
859
860impl BroadcastError {
861	/// True if the error means the transaction (or an equivalent one) is already known to the
862	/// network — i.e., not a sign that our transaction is invalid.
863	pub fn is_mempool_conflict(&self) -> bool {
864		matches!(
865			self,
866			BroadcastError::AlreadyKnown
867				| BroadcastError::MissingOrSpentInputs
868				| BroadcastError::InsufficientReplacementFee,
869		)
870	}
871}
872
873fn classify_submit_package_errors<'a>(
874	package_msg: &str,
875	tx_results: impl Iterator<Item = (Txid, Option<&'a str>)>,
876) -> BroadcastError {
877	let errors: Vec<String> = tx_results
878		.map(|(txid, err)| format!("tx {}: {}", txid, err.unwrap_or("(no error)")))
879		.collect();
880	let combined = errors.join(", ");
881	if combined.contains("txn-already-known") {
882		BroadcastError::AlreadyKnown
883	} else if combined.contains("bad-txns-inputs-missingorspent") {
884		BroadcastError::MissingOrSpentInputs
885	} else if combined.contains("insufficient fee, rejecting replacement") {
886		BroadcastError::InsufficientReplacementFee
887	} else {
888		BroadcastError::Other(format!("msg: '{}', errors: [{}]", package_msg, combined))
889	}
890}