Skip to main content

bark/
chain.rs

1
2
3use std::borrow::Borrow;
4use std::collections::{HashMap, HashSet};
5use std::str::FromStr as _;
6
7use anyhow::Context;
8use bdk_core::{BlockId, CheckPoint};
9use bdk_esplora::esplora_client;
10use bitcoin::constants::genesis_block;
11use bitcoin::{
12	Amount, Block, BlockHash, FeeRate, Network, OutPoint, Transaction, Txid, Weight,
13};
14use log::{debug, info, warn};
15use tokio::sync::RwLock;
16
17use bitcoin_ext::{BlockHeight, BlockRef, FeeRateExt, TxStatus};
18use bitcoin_ext::rpc;
19#[cfg(feature = "bitcoind-rpc")]
20use bitcoin_ext::rpc::{
21	BitcoinRpcClient, RPC_INVALID_ADDRESS_OR_KEY, RPC_VERIFY_ALREADY_IN_UTXO_SET,
22};
23#[cfg(feature = "bitcoind-rpc")]
24use bitcoind_async_client::Client as BitcoindClient;
25#[cfg(feature = "bitcoind-rpc")]
26use bitcoind_async_client::error::ClientError as BitcoindClientError;
27#[cfg(feature = "bitcoind-rpc")]
28use bitcoind_async_client::traits::{Broadcaster, Reader};
29
30const FEE_RATE_TARGET_CONF_FAST: u16 = 1;
31const FEE_RATE_TARGET_CONF_REGULAR: u16 = 3;
32const FEE_RATE_TARGET_CONF_SLOW: u16 = 6;
33
34#[cfg(feature = "bitcoind-rpc")]
35const MIN_BITCOIND_VERSION: usize = 290000;
36
37/// Configuration for the onchain data source.
38///
39/// [ChainSource] selects which backend to use for blockchain data and transaction broadcasting:
40/// - Bitcoind: uses a Bitcoin Core node via JSON-RPC
41/// - Esplora: uses the HTTP API endpoint of [esplora-electrs](https://github.com/Blockstream/electrs)
42///
43/// Typical usage is to construct a ChainSource from configuration and pass it to
44/// [ChainSource::new] along with the expected [Network].
45///
46/// Notes:
47/// - For [ChainSourceSpec::Bitcoind], authentication must be provided (cookie file or user/pass).
48#[derive(Clone, Debug)]
49pub enum ChainSourceSpec {
50	Bitcoind {
51		/// RPC URL of the Bitcoin Core node (e.g. <http://127.0.0.1:8332>).
52		url: String,
53		/// Authentication method for JSON-RPC (cookie file or user/pass).
54		auth: rpc::Auth,
55	},
56	Esplora {
57		/// Base URL of the esplora-electrs instance (e.g. <https://esplora.signet.2nd.dev>).
58		url: String,
59	},
60}
61
62impl ChainSourceSpec {
63	pub(crate) fn url(&self) -> &String {
64		match self {
65			ChainSourceSpec::Bitcoind { url, .. } => url,
66			ChainSourceSpec::Esplora { url } => url,
67		}
68	}
69}
70
71pub enum ChainSourceClient {
72	/// Native bitcoind backend.
73	///
74	/// Carries an async client for everything the wallet does asynchronously
75	/// and a sync companion for `bdk_bitcoind_rpc::Emitter`, which is sync-only
76	/// upstream and runs inside `tokio::task::spawn_blocking`.
77	#[cfg(feature = "bitcoind-rpc")]
78	Bitcoind {
79		rpc: BitcoindClient,
80		sync: BitcoinRpcClient,
81	},
82	Esplora(esplora_client::AsyncClient),
83}
84
85impl ChainSourceClient {
86	async fn check_network(&self, expected: Network) -> anyhow::Result<()> {
87		match self {
88			#[cfg(feature = "bitcoind-rpc")]
89			ChainSourceClient::Bitcoind { rpc, .. } => {
90				let network = rpc.network().await?;
91				if expected != network {
92					bail!("Network mismatch: expected {:?}, got {:?}", expected, network);
93				}
94			},
95			ChainSourceClient::Esplora(client) => {
96				let res = client.client().get(format!("{}/block-height/0", client.url()))
97					.send().await?.text().await?;
98				let genesis_hash = BlockHash::from_str(&res)
99					.context("bad response from server (not a blockhash). Esplora client possibly misconfigured")?;
100				if genesis_hash != genesis_block(expected).block_hash() {
101					bail!("Network mismatch: expected {:?}, got {:?}", expected, genesis_hash);
102				}
103			},
104		};
105
106		Ok(())
107	}
108}
109
110/// Client for interacting with the configured on-chain backend.
111///
112/// [ChainSource] abstracts over multiple backends using [ChainSourceSpec] to provide:
113/// - Chain queries (tip, block headers/blocks, transaction status and fetching)
114/// - Mempool-related utilities (ancestor fee/weight, spending lookups)
115/// - Broadcasting single transactions or packages (RBF/CPFP workflows)
116/// - Fee estimation and caching with optional fallback values
117///
118/// Behavior notes:
119/// - [ChainSource::update_fee_rates] refreshes internal fee estimates; if backend estimates
120///   fail and a fallback fee is provided, it will be used for all tiers.
121/// - [ChainSource::fee_rates] returns the last cached [FeeRates].
122///
123/// Examples:
124///
125/// ```rust
126/// # async fn func() {
127/// use bark::chain::{ChainSource, ChainSourceSpec};
128/// use bdk_bitcoind_rpc::bitcoincore_rpc::Auth;
129/// use bitcoin::{FeeRate, Network};
130///
131/// let spec = ChainSourceSpec::Bitcoind {
132///     url: "http://localhost:8332".into(),
133///     auth: Auth::UserPass("user".into(), "password".into()),
134/// };
135/// let network = Network::Bitcoin;
136/// let fallback_fee = FeeRate::from_sat_per_vb(5);
137/// #[cfg(feature = "socks5-proxy")]
138/// let socks5 = Some("socks5h://127.0.0.1:9050");
139///
140/// let instance = ChainSource::new(spec, network, fallback_fee, socks5).await.unwrap();
141/// # }
142/// ```
143pub struct ChainSource {
144	inner: ChainSourceClient,
145	network: Network,
146	fee_rates: RwLock<FeeRates>,
147}
148
149impl ChainSource {
150	/// Checks that the version of the chain source is compatible with Bark.
151	///
152	/// For bitcoind, it checks if the version is at least 29.0
153	/// This is the first version for which 0 fee-anchors are considered standard
154	pub async fn require_version(&self) -> anyhow::Result<()> {
155		#[cfg(feature = "bitcoind-rpc")]
156		if let ChainSourceClient::Bitcoind { rpc, .. } = self.inner() {
157			#[derive(Debug, serde::Deserialize)]
158			struct NetworkInfo { version: usize }
159			let info: NetworkInfo = rpc.call_raw("getnetworkinfo", &[]).await?;
160			if info.version < MIN_BITCOIND_VERSION {
161				bail!("Bitcoin Core version is too old, you can participate in rounds but won't be able to unilaterally exit. Please upgrade to 29.0 or higher.");
162			}
163		}
164
165		Ok(())
166	}
167
168	pub(crate) fn inner(&self) -> &ChainSourceClient {
169		&self.inner
170	}
171
172	/// Gets a cached copy of the calculated network [FeeRates]
173	pub async fn fee_rates(&self) -> FeeRates {
174		self.fee_rates.read().await.clone()
175	}
176
177	/// Gets the network that the [ChainSource] was validated against.
178	pub fn network(&self) -> Network {
179		self.network
180	}
181
182	/// Creates a new instance of the object with the specified chain source, network, and optional
183	/// fallback fee rate.
184	///
185	/// This function initializes the internal chain source client based on the provided `chain_source`:
186	/// - If `chain_source` is of type [ChainSourceSpec::Bitcoind], it creates a Bitcoin Core RPC client
187	///   using the provided URL and authentication parameters.
188	/// - If `chain_source` is of type [ChainSourceSpec::Esplora], it creates an Esplora client with the
189	///   given URL.
190	///
191	/// Both clients are initialized asynchronously, and any errors encountered during their
192	/// creation will be returned as part of the [anyhow::Result].
193	///
194	/// Additionally, the function performs a network consistency check to ensure the specified
195	/// network (e.g., `mainnet` or `signet`) matches the network configuration of the initialized
196	/// chain source client.
197	///
198	/// The `fallback_fee` parameter is optional. If provided, it is used as the default fee rate
199	/// for transactions. If not specified, the `FeeRate::BROADCAST_MIN` is used as the default fee
200	/// rate.
201	///
202	/// # Arguments
203	///
204	/// * `chain_source` - Specifies the backend to use for blockchain data.
205	/// * `network` - The Bitcoin network to operate on (e.g., `mainnet`, `testnet`, `regtest`).
206	/// * `fallback_fee` - An optional fallback fee rate to use for transaction fee estimation. If
207	///   not provided, a default fee rate of [FeeRate::BROADCAST_MIN] will be used.
208	///
209	/// # Returns
210	///
211	/// * `Ok(Self)` - If the object is successfully created with all necessary configurations.
212	/// * `Err(anyhow::Error)` - If there is an error in initializing the chain source client or
213	///   verifying the network.
214	pub async fn new(
215		spec: ChainSourceSpec,
216		network: Network,
217		fallback_fee: Option<FeeRate>,
218		#[cfg(feature = "socks5-proxy")] proxy: Option<&str>,
219	) -> anyhow::Result<Self> {
220		let inner = match spec {
221			#[cfg(feature = "bitcoind-rpc")]
222			ChainSourceSpec::Bitcoind { url, auth } => {
223				// `bdk_bitcoind_rpc::Emitter` is sync-only upstream, so we keep
224				// a sync companion to drive it inside `spawn_blocking`. The async
225				// client is used everywhere else. `BitcoinRpcClient` (rather
226				// than the bare `bitcoincore_rpc::Client`) is required so the
227				// `spawn_blocking` closure can take an owned, `Clone` value.
228				//
229				// The sync companion currently does not honour `socks5-proxy`;
230				// SOCKS5 is supported on the Esplora backend, where it is the
231				// realistic Tor-via-bitcoind use case.
232				let sync = BitcoinRpcClient::new(&url, auth.clone())
233					.context("failed to create sync bitcoind rpc client")?;
234				let async_auth = match auth {
235					rpc::Auth::None => bail!(
236						"bitcoind RPC auth is required (cookie file or user/pass)",
237					),
238					rpc::Auth::UserPass(u, p) => bitcoind_async_client::Auth::UserPass(u, p),
239					rpc::Auth::CookieFile(p) => bitcoind_async_client::Auth::CookieFile(p),
240				};
241				let rpc = BitcoindClient::new(url, async_auth, None, None, None)
242					.context("failed to create async bitcoind rpc client")?;
243				ChainSourceClient::Bitcoind { rpc, sync }
244			},
245			#[cfg(not(feature = "bitcoind-rpc"))]
246			ChainSourceSpec::Bitcoind { .. } => bail!(
247				"bitcoind RPC backend is not available: this build was compiled without \
248				 the `bitcoind-rpc` feature (notably the wasm-web build)",
249			),
250			ChainSourceSpec::Esplora { url } => ChainSourceClient::Esplora({
251				// the esplora client doesn't deal well with trailing slash in url
252				let url = url.strip_suffix("/").unwrap_or(&url);
253				let mut builder = esplora_client::Builder::new(url);
254				#[cfg(feature = "socks5-proxy")]
255				if let Some(proxy) = proxy {
256					builder = builder.proxy(proxy);
257				}
258				builder.build_async()
259					.with_context(|| format!("failed to create esplora client for url {}", url))?
260			}),
261		};
262
263		inner.check_network(network).await?;
264
265		let fee = fallback_fee.unwrap_or(FeeRate::BROADCAST_MIN);
266		let fee_rates = RwLock::new(FeeRates { fast: fee, regular: fee, slow: fee });
267
268		Ok(Self { inner, network, fee_rates })
269	}
270
271	async fn fetch_fee_rates(&self) -> anyhow::Result<FeeRates> {
272		match self.inner() {
273			#[cfg(feature = "bitcoind-rpc")]
274			ChainSourceClient::Bitcoind { rpc, .. } => {
275				let get_fee_rate = async |target: u16| -> anyhow::Result<FeeRate> {
276					let fee: rpc::json::EstimateSmartFeeResult = rpc.call_raw(
277						"estimatesmartfee",
278						&[
279							target.into(),
280							serde_json::to_value(rpc::json::EstimateMode::Economical)
281								.expect("serializable"),
282						],
283					).await?;
284					if let Some(fee_rate) = fee.fee_rate {
285						Ok(FeeRate::from_amount_per_kvb_ceil(fee_rate))
286					} else {
287						Err(anyhow!("No rate returned from estimate_smart_fee for a {} confirmation target", target))
288					}
289				};
290				Ok(FeeRates {
291					fast: get_fee_rate(FEE_RATE_TARGET_CONF_FAST).await?,
292					regular: get_fee_rate(FEE_RATE_TARGET_CONF_REGULAR).await.expect("should exist"),
293					slow: get_fee_rate(FEE_RATE_TARGET_CONF_SLOW).await.expect("should exist"),
294				})
295			},
296			ChainSourceClient::Esplora(client) => {
297				// The API should return rates for targets 1-25, 144 and 1008
298				let estimates = client.get_fee_estimates().await?;
299				let get_fee_rate = |target| {
300					let fee = estimates.get(&target).with_context(||
301						format!("No rate returned from get_fee_estimates for a {} confirmation target", target)
302					)?;
303					FeeRate::from_sat_per_vb_decimal_checked_ceil(*fee).with_context(||
304						format!("Invalid rate returned from get_fee_estimates {} for a {} confirmation target", fee, target)
305					)
306				};
307				Ok(FeeRates {
308					fast: get_fee_rate(FEE_RATE_TARGET_CONF_FAST)?,
309					regular: get_fee_rate(FEE_RATE_TARGET_CONF_REGULAR)?,
310					slow: get_fee_rate(FEE_RATE_TARGET_CONF_SLOW)?,
311				})
312			}
313		}
314	}
315
316	pub async fn tip(&self) -> anyhow::Result<BlockHeight> {
317		match self.inner() {
318			#[cfg(feature = "bitcoind-rpc")]
319			ChainSourceClient::Bitcoind { rpc, .. } => {
320				let count = rpc.get_block_count().await?;
321				Ok(count as BlockHeight)
322			},
323			ChainSourceClient::Esplora(client) => {
324				Ok(client.get_height().await?)
325			},
326		}
327	}
328
329	pub async fn tip_ref(&self) -> anyhow::Result<BlockRef> {
330		self.block_ref(self.tip().await?).await
331	}
332
333	pub async fn block_ref(&self, height: BlockHeight) -> anyhow::Result<BlockRef> {
334		match self.inner() {
335			#[cfg(feature = "bitcoind-rpc")]
336			ChainSourceClient::Bitcoind { rpc, .. } => {
337				let hash = rpc.get_block_hash(height as u64).await?;
338				Ok(BlockRef { height, hash })
339			},
340			ChainSourceClient::Esplora(client) => {
341				let hash = client.get_block_hash(height).await?;
342				Ok(BlockRef { height, hash })
343			},
344		}
345	}
346
347	pub async fn block(&self, hash: BlockHash) -> anyhow::Result<Option<Block>> {
348		match self.inner() {
349			#[cfg(feature = "bitcoind-rpc")]
350			ChainSourceClient::Bitcoind { rpc, .. } => {
351				match rpc.get_block(&hash).await {
352					Ok(block) => Ok(Some(block)),
353					Err(e) if is_not_found(&e) => Ok(None),
354					Err(e) => Err(e.into()),
355				}
356			},
357			ChainSourceClient::Esplora(client) => {
358				Ok(client.get_block_by_hash(&hash).await?)
359			},
360		}
361	}
362
363	/// Retrieves basic CPFP ancestry information of the given transaction. Confirmed transactions
364	/// are ignored as they are not relevant to CPFP.
365	pub async fn mempool_ancestor_info(&self, txid: Txid) -> anyhow::Result<MempoolAncestorInfo> {
366		let mut result = MempoolAncestorInfo::new(txid);
367
368		// TODO: Determine if any line of descendant transactions increase the effective fee rate
369		//		 of the target txid.
370		match self.inner() {
371			#[cfg(feature = "bitcoind-rpc")]
372			ChainSourceClient::Bitcoind { rpc, .. } => {
373				let entry: rpc::json::GetMempoolEntryResult = rpc.call_raw(
374					"getmempoolentry", &[serde_json::to_value(txid).expect("serializable")],
375				).await?;
376				let err = || anyhow!("missing weight parameter from getmempoolentry");
377
378				result.total_fee = entry.fees.ancestor;
379				result.total_weight = Weight::from_wu(entry.weight.ok_or_else(err)?) +
380					Weight::from_vb(entry.ancestor_size).ok_or_else(err)?;
381			},
382			ChainSourceClient::Esplora(client) => {
383				// We should first verify the transaction is in the mempool to maintain the same
384				// behavior as Bitcoin Core
385				let status = self.tx_status(txid).await?;
386				if !matches!(status, TxStatus::Mempool) {
387					return Err(anyhow!("{} is not in the mempool, status is {:?}", txid, status));
388				}
389
390				let mut info_map: HashMap<Txid, esplora_client::Tx> = HashMap::new();
391				let mut set = HashSet::from([txid]);
392				while !set.is_empty() {
393					// Start requests asynchronously
394					let requests = set.iter().filter_map(|txid| if info_map.contains_key(txid) {
395						None
396					} else {
397						Some((txid, client.get_tx_info(&txid)))
398					}).collect::<Vec<_>>();
399
400					// Collect txids to be added to the set
401					let mut next_set = HashSet::new();
402
403					// Process each request, ignoring parents of confirmed transactions
404					for (txid, request) in requests {
405						let info = request.await?
406							.ok_or_else(|| anyhow!("unable to retrieve tx info for {}", txid))?;
407						if !info.status.confirmed {
408							for vin in info.vin.iter() {
409								next_set.insert(vin.txid);
410							}
411						}
412						info_map.insert(*txid, info);
413					}
414					set = next_set;
415				}
416				// Calculate the total weight and fee of the unconfirmed ancestry
417				for info in info_map.into_values().filter(|info| !info.status.confirmed) {
418					result.total_fee += info.fee();
419					result.total_weight += info.weight();
420				}
421			},
422		}
423		// Now calculate the effective fee rate of the package
424		Ok(result)
425	}
426
427	/// For each provided outpoint, fetches the ID of any confirmed or unconfirmed in which the
428	/// outpoint is spent.
429	pub async fn txs_spending_inputs<T: IntoIterator<Item = OutPoint>>(
430		&self,
431		outpoints: T,
432		#[cfg_attr(not(feature = "bitcoind-rpc"), allow(unused_variables))]
433		block_scan_start: BlockHeight,
434	) -> anyhow::Result<TxsSpendingInputsResult> {
435		let mut res = TxsSpendingInputsResult::new();
436		match self.inner() {
437			#[cfg(feature = "bitcoind-rpc")]
438			ChainSourceClient::Bitcoind { sync, .. } => {
439				// We must offset the height to account for the fact we iterate using next_block()
440				let start = block_scan_start.saturating_sub(1);
441				let block_ref = self.block_ref(start).await?;
442				let cp = CheckPoint::new(BlockId {
443					height: block_ref.height,
444					hash: block_ref.hash,
445				});
446
447				debug!("Scanning blocks for spent outpoints with bitcoind, starting at block height {}...", block_scan_start);
448				let outpoint_set = outpoints.into_iter().collect::<HashSet<_>>();
449
450				// `bdk_bitcoind_rpc::Emitter` is sync-only upstream, so the
451				// scan loop runs inside `spawn_blocking` with the sync companion.
452				let sync_client = sync.clone();
453				let cp_for_blocking = cp.clone();
454				res = tokio::task::spawn_blocking(move || -> anyhow::Result<TxsSpendingInputsResult> {
455					let mut res = res;
456					let mut emitter = bdk_bitcoind_rpc::Emitter::new(
457						&sync_client,
458						cp_for_blocking.clone(),
459						cp_for_blocking.height(),
460						bdk_bitcoind_rpc::NO_EXPECTED_MEMPOOL_TXS,
461					);
462					while let Some(em) = emitter.next_block()? {
463						if em.block_height() % 1000 == 0 {
464							info!("Scanned for spent outpoints until block height {}", em.block_height());
465						}
466						for tx in &em.block.txdata {
467							for txin in tx.input.iter() {
468								if outpoint_set.contains(&txin.previous_output) {
469									res.add(
470										txin.previous_output.clone(),
471										tx.compute_txid(),
472										TxStatus::Confirmed(BlockRef {
473											height: em.block_height(),
474											hash: em.block.block_hash().clone(),
475										}),
476									);
477									if res.map.len() == outpoint_set.len() {
478										return Ok(res);
479									}
480								}
481							}
482						}
483					}
484
485					debug!("Finished scanning blocks for spent outpoints, now checking the mempool...");
486					let mempool = emitter.mempool()?;
487					for (tx, _last_seen) in &mempool.update {
488						for txin in tx.input.iter() {
489							if outpoint_set.contains(&txin.previous_output) {
490								res.add(
491									txin.previous_output.clone(),
492									tx.compute_txid(),
493									TxStatus::Mempool,
494								);
495								if res.map.len() == outpoint_set.len() {
496									return Ok(res);
497								}
498							}
499						}
500					}
501					debug!("Finished checking the mempool for spent outpoints");
502					Ok(res)
503				}).await.context("Emitter scan task panicked")??;
504			},
505			ChainSourceClient::Esplora(client) => {
506				for outpoint in outpoints {
507					let output_status = client.get_output_status(&outpoint.txid, outpoint.vout.into()).await?;
508
509					if let Some(output_status) = output_status {
510						if output_status.spent {
511							let tx_status = {
512								let status = output_status.status.expect("Status should be valid if an outpoint is spent");
513								if status.confirmed {
514									TxStatus::Confirmed(BlockRef {
515										height: status.block_height.expect("Confirmed transaction missing block_height"),
516										hash: status.block_hash.expect("Confirmed transaction missing block_hash"),
517									})
518								} else {
519									TxStatus::Mempool
520								}
521							};
522							let txid = output_status.txid.expect("Txid should be valid if an outpoint is spent");
523							res.add(outpoint, txid, tx_status);
524						}
525					}
526				}
527			},
528		}
529
530		Ok(res)
531	}
532
533	pub async fn broadcast_tx(&self, tx: &Transaction) -> anyhow::Result<()> {
534		match self.inner() {
535			#[cfg(feature = "bitcoind-rpc")]
536			ChainSourceClient::Bitcoind { rpc, .. } => {
537				match rpc.send_raw_transaction(tx).await {
538					Ok(_) => Ok(()),
539					Err(e) if is_in_utxo_set(&e) => Ok(()),
540					Err(e) => Err(e.into()),
541				}
542			},
543			ChainSourceClient::Esplora(client) => {
544				client.broadcast(tx).await?;
545				Ok(())
546			},
547		}
548	}
549
550	pub async fn broadcast_package(&self, txs: &[impl Borrow<Transaction>]) -> Result<(), BroadcastError> {
551		match self.inner() {
552			#[cfg(feature = "bitcoind-rpc")]
553			ChainSourceClient::Bitcoind { rpc, .. } => {
554				let hexes: Vec<String> = txs.iter()
555					.map(|t| bitcoin::consensus::encode::serialize_hex(t.borrow()))
556					.collect();
557				let res: rpc::SubmitPackageResult = rpc.call_raw("submitpackage", &[hexes.into()])
558					.await
559					.map_err(|e| BroadcastError::Other(e.to_string()))?;
560				if res.package_msg != "success" {
561					return Err(classify_submit_package_errors(
562						&res.package_msg,
563						res.tx_results.values().map(|t| (t.txid, t.error.as_deref())),
564					));
565				}
566				Ok(())
567			},
568			ChainSourceClient::Esplora(client) => {
569				let txs = txs.iter().map(|t| t.borrow().clone()).collect::<Vec<_>>();
570				let res = client.submit_package(&txs, None, None)
571					.await
572					.map_err(|e| BroadcastError::Other(e.to_string()))?;
573				if res.package_msg != "success" {
574					return Err(classify_submit_package_errors(
575						&res.package_msg,
576						res.tx_results.values().map(|t| (t.txid, t.error.as_deref())),
577					));
578				}
579
580				Ok(())
581			},
582		}
583	}
584
585	pub async fn get_tx(&self, txid: &Txid) -> anyhow::Result<Option<Transaction>> {
586		match self.inner() {
587			#[cfg(feature = "bitcoind-rpc")]
588			ChainSourceClient::Bitcoind { rpc, .. } => {
589				match rpc.get_raw_transaction_verbosity_zero(txid).await {
590					Ok(tx) => Ok(Some(tx.0)),
591					Err(e) if is_not_found(&e) => Ok(None),
592					Err(e) => Err(e.into()),
593				}
594			},
595			ChainSourceClient::Esplora(client) => {
596				Ok(client.get_tx(txid).await?)
597			},
598		}
599	}
600
601	/// Returns the block height the tx is confirmed in, if any.
602	pub async fn tx_confirmed(&self, txid: Txid) -> anyhow::Result<Option<BlockHeight>> {
603		Ok(self.tx_status(txid).await?.confirmed_height())
604	}
605
606	/// Returns the status of the given transaction, including the block height if it is confirmed
607	pub async fn tx_status(&self, txid: Txid) -> anyhow::Result<TxStatus> {
608		match self.inner() {
609			#[cfg(feature = "bitcoind-rpc")]
610			ChainSourceClient::Bitcoind { rpc, .. } => Ok(bitcoind_tx_status(rpc, txid).await?),
611			ChainSourceClient::Esplora(esplora) => {
612				match esplora.get_tx_info(&txid).await? {
613					Some(info) => match (info.status.block_height, info.status.block_hash) {
614						(Some(block_height), Some(block_hash)) => Ok(TxStatus::Confirmed(BlockRef {
615							height: block_height,
616							hash: block_hash,
617						} )),
618						_ => Ok(TxStatus::Mempool),
619					},
620					None => Ok(TxStatus::NotFound),
621				}
622			},
623		}
624	}
625
626	#[allow(unused)]
627	pub async fn txout_value(&self, outpoint: &OutPoint) -> anyhow::Result<Amount> {
628		let tx = match self.inner() {
629			#[cfg(feature = "bitcoind-rpc")]
630			ChainSourceClient::Bitcoind { rpc, .. } => {
631				rpc.get_raw_transaction_verbosity_zero(&outpoint.txid).await
632					.with_context(|| format!("tx {} unknown", outpoint.txid))?
633					.0
634			},
635			ChainSourceClient::Esplora(client) => {
636				client.get_tx(&outpoint.txid).await?
637					.with_context(|| format!("tx {} unknown", outpoint.txid))?
638			},
639		};
640		Ok(tx.output.get(outpoint.vout as usize).context("outpoint vout out of range")?.value)
641	}
642
643	/// Gets the current fee rates from the chain source, falling back to user-specified values if
644	/// necessary
645	pub async fn update_fee_rates(&self, fallback_fee: Option<FeeRate>) -> anyhow::Result<()> {
646		let fee_rates = match (self.fetch_fee_rates().await, fallback_fee) {
647			(Ok(fee_rates), _) => Ok(fee_rates),
648			(Err(e), None) => Err(e),
649			(Err(e), Some(fallback)) => {
650				warn!("Error getting fee rates, falling back to {} sat/kvB: {}",
651					fallback.to_btc_per_kvb(), e,
652				);
653				Ok(FeeRates { fast: fallback, regular: fallback, slow: fallback })
654			}
655		}?;
656
657		*self.fee_rates.write().await = fee_rates;
658		Ok(())
659	}
660}
661
662// ----- bitcoind-rpc feature-gated helpers ---------------------------------
663
664/// Inspect upstream `bitcoind-async-client` JSON-RPC errors for the
665/// "transaction not found" code. Mirrors the sync-side `BitcoinRpcErrorExt`
666/// in `bitcoin_ext::rpc`.
667#[cfg(feature = "bitcoind-rpc")]
668fn is_not_found(e: &BitcoindClientError) -> bool {
669	matches!(e, BitcoindClientError::Server(c, _) if *c == RPC_INVALID_ADDRESS_OR_KEY)
670}
671
672/// Inspect upstream errors for the "already in utxo set" code.
673#[cfg(feature = "bitcoind-rpc")]
674fn is_in_utxo_set(e: &BitcoindClientError) -> bool {
675	matches!(e, BitcoindClientError::Server(c, _) if *c == RPC_VERIFY_ALREADY_IN_UTXO_SET)
676}
677
678/// Two-step `getrawtransaction` + `getblockheader` to determine whether a
679/// txid is confirmed, in the mempool, or unknown.
680#[cfg(feature = "bitcoind-rpc")]
681async fn bitcoind_tx_status(
682	rpc: &BitcoindClient, txid: Txid,
683) -> Result<TxStatus, BitcoindClientError> {
684	let res: Result<rpc::GetRawTransactionResult, _> = rpc.call_raw(
685		"getrawtransaction",
686		&[serde_json::to_value(txid).expect("serializable"), true.into()],
687	).await;
688	let info = match res {
689		Ok(info) => info,
690		Err(e) if is_not_found(&e) => return Ok(TxStatus::NotFound),
691		Err(e) => return Err(e),
692	};
693	let Some(hash) = info.blockhash else {
694		return Ok(TxStatus::Mempool);
695	};
696	let header: rpc::json::GetBlockHeaderResult = rpc.call_raw(
697		"getblockheader",
698		&[serde_json::to_value(hash).expect("serializable"), true.into()],
699	).await?;
700	if header.confirmations > 0 {
701		Ok(TxStatus::Confirmed(BlockRef {
702			height: header.height as BlockHeight,
703			hash: header.hash,
704		}))
705	} else {
706		Ok(TxStatus::Mempool)
707	}
708}
709
710/// The [FeeRates] struct represents the fee rates for transactions categorized by speed or urgency.
711#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
712pub struct FeeRates {
713	/// The fee for fast transactions (higher cost, lower time delay).
714	pub fast: FeeRate,
715	/// The fee for standard-priority transactions.
716	pub regular: FeeRate,
717	/// The fee for slower transactions (lower cost, higher time delay).
718	pub slow: FeeRate,
719}
720
721/// Contains the fee information for an unconfirmed transaction found in the mempool.
722#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
723pub struct MempoolAncestorInfo {
724	/// The ID of the transaction that was queried.
725	pub txid: Txid,
726	/// The total fee of this transaction and all of its unconfirmed ancestors. If the transaction
727	/// is to be replaced, the total fees of the published package MUST exceed this.
728	pub total_fee: Amount,
729	/// The total weight of this transaction and all of its unconfirmed ancestors.
730	pub total_weight: Weight,
731}
732
733impl MempoolAncestorInfo {
734	pub fn new(txid: Txid) -> Self {
735		Self {
736			txid,
737			total_fee: Amount::ZERO,
738			total_weight: Weight::ZERO,
739		}
740	}
741
742	pub fn effective_fee_rate(&self) -> Option<FeeRate> {
743		FeeRate::from_amount_and_weight_ceil(self.total_fee, self.total_weight)
744	}
745}
746
747#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
748pub struct TxsSpendingInputsResult {
749	pub map: HashMap<OutPoint, (Txid, TxStatus)>,
750}
751
752impl TxsSpendingInputsResult {
753	pub fn new() -> Self {
754		Self { map: HashMap::new() }
755	}
756
757	pub fn add(&mut self, outpoint: OutPoint, txid: Txid, status: TxStatus) {
758		self.map.insert(outpoint, (txid, status));
759	}
760
761	pub fn get(&self, outpoint: &OutPoint) -> Option<&(Txid, TxStatus)> {
762		self.map.get(outpoint)
763	}
764
765	pub fn confirmed_txids(&self) -> impl Iterator<Item = (Txid, BlockRef)> + '_ {
766		self.map
767			.iter()
768			.filter_map(|(_, (txid, status))| {
769				match status {
770					TxStatus::Confirmed(block) => Some((*txid, *block)),
771					_ => None,
772				}
773			})
774	}
775
776	pub fn mempool_txids(&self) -> impl Iterator<Item = Txid> + '_ {
777		self.map
778			.iter()
779			.filter(|(_, (_, status))| matches!(status, TxStatus::Mempool))
780			.map(|(_, (txid, _))| *txid)
781	}
782}
783
784/// Classified failure modes when broadcasting a transaction package.
785///
786/// The reject reasons covered by the typed variants are stable Bitcoin Core mempool policy
787/// constants (`txn-already-known`, `bad-txns-inputs-missingorspent`, `insufficient fee, rejecting
788/// replacement`). Esplora forwards bitcoind's reject reasons verbatim, so the same matching works
789/// for both backends.
790#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
791pub enum BroadcastError {
792	/// The transaction is already in the mempool. Treated as success for retry-safety.
793	#[error("transaction already known to the mempool")]
794	AlreadyKnown,
795	/// Inputs are missing or already spent — typically a conflicting replacement is in the mempool.
796	#[error("transaction inputs are missing or already spent")]
797	MissingOrSpentInputs,
798	/// The replacement fee is insufficient under RBF policy.
799	#[error("insufficient fee, rejecting replacement")]
800	InsufficientReplacementFee,
801	/// Any other failure (unrecognized reject reason, RPC/transport error, etc.).
802	#[error("{0}")]
803	Other(String),
804}
805
806impl BroadcastError {
807	/// True if the error means the transaction (or an equivalent one) is already known to the
808	/// network — i.e., not a sign that our transaction is invalid.
809	pub fn is_mempool_conflict(&self) -> bool {
810		matches!(
811			self,
812			BroadcastError::AlreadyKnown
813				| BroadcastError::MissingOrSpentInputs
814				| BroadcastError::InsufficientReplacementFee,
815		)
816	}
817}
818
819fn classify_submit_package_errors<'a>(
820	package_msg: &str,
821	tx_results: impl Iterator<Item = (Txid, Option<&'a str>)>,
822) -> BroadcastError {
823	let errors: Vec<String> = tx_results
824		.map(|(txid, err)| format!("tx {}: {}", txid, err.unwrap_or("(no error)")))
825		.collect();
826	let combined = errors.join(", ");
827	if combined.contains("txn-already-known") {
828		BroadcastError::AlreadyKnown
829	} else if combined.contains("bad-txns-inputs-missingorspent") {
830		BroadcastError::MissingOrSpentInputs
831	} else if combined.contains("insufficient fee, rejecting replacement") {
832		BroadcastError::InsufficientReplacementFee
833	} else {
834		BroadcastError::Other(format!("msg: '{}', errors: [{}]", package_msg, combined))
835	}
836}