1
2
3use std::borrow::Borrow;
4use std::collections::{HashMap, HashSet};
5use std::str::FromStr as _;
6
7use anyhow::Context;
8use bdk_core::{BlockId, CheckPoint};
9use bdk_esplora::esplora_client;
10use bitcoin::constants::genesis_block;
11use bitcoin::{
12 Amount, Block, BlockHash, FeeRate, Network, OutPoint, Transaction, Txid, Weight,
13};
14use log::{debug, info, warn};
15use tokio::sync::RwLock;
16
17use bitcoin_ext::{BlockHeight, BlockRef, FeeRateExt, TxStatus};
18use bitcoin_ext::rpc::{self, BitcoinRpcExt, BitcoinRpcErrorExt, RpcApi};
19
20const FEE_RATE_TARGET_CONF_FAST: u16 = 1;
21const FEE_RATE_TARGET_CONF_REGULAR: u16 = 3;
22const FEE_RATE_TARGET_CONF_SLOW: u16 = 6;
23
24const TX_ALREADY_IN_CHAIN_ERROR: i32 = -27;
25const MIN_BITCOIND_VERSION: usize = 290000;
26
27#[derive(Clone, Debug)]
39pub enum ChainSourceSpec {
40 Bitcoind {
41 url: String,
43 auth: rpc::Auth,
45 },
46 Esplora {
47 url: String,
49 },
50}
51
52impl ChainSourceSpec {
53 pub(crate) fn url(&self) -> &String {
54 match self {
55 ChainSourceSpec::Bitcoind { url, .. } => url,
56 ChainSourceSpec::Esplora { url } => url,
57 }
58 }
59}
60
61pub enum ChainSourceClient {
62 Bitcoind(rpc::Client),
63 Esplora(esplora_client::AsyncClient),
64}
65
66impl ChainSourceClient {
67 async fn check_network(&self, expected: Network) -> anyhow::Result<()> {
68 match self {
69 ChainSourceClient::Bitcoind(bitcoind) => {
70 let network = bitcoind.get_blockchain_info()?;
71 if expected != network.chain {
72 bail!("Network mismatch: expected {:?}, got {:?}", expected, network.chain);
73 }
74 },
75 ChainSourceClient::Esplora(client) => {
76 let res = client.client().get(format!("{}/block-height/0", client.url()))
77 .send().await?.text().await?;
78 let genesis_hash = BlockHash::from_str(&res)
79 .context("bad response from server (not a blockhash). Esplora client possibly misconfigured")?;
80 if genesis_hash != genesis_block(expected).block_hash() {
81 bail!("Network mismatch: expected {:?}, got {:?}", expected, genesis_hash);
82 }
83 },
84 };
85
86 Ok(())
87 }
88}
89
90pub struct ChainSource {
124 inner: ChainSourceClient,
125 network: Network,
126 fee_rates: RwLock<FeeRates>,
127}
128
129impl ChainSource {
130 pub fn require_version(&self) -> anyhow::Result<()> {
135 if let ChainSourceClient::Bitcoind(bitcoind) = self.inner() {
136 if bitcoind.version()? < MIN_BITCOIND_VERSION {
137 bail!("Bitcoin Core version is too old, you can participate in rounds but won't be able to unilaterally exit. Please upgrade to 29.0 or higher.");
138 }
139 }
140
141 Ok(())
142 }
143
144 pub(crate) fn inner(&self) -> &ChainSourceClient {
145 &self.inner
146 }
147
148 pub async fn fee_rates(&self) -> FeeRates {
150 self.fee_rates.read().await.clone()
151 }
152
153 pub fn network(&self) -> Network {
155 self.network
156 }
157
158 pub async fn new(
191 spec: ChainSourceSpec,
192 network: Network,
193 fallback_fee: Option<FeeRate>,
194 #[cfg(feature = "socks5-proxy")] proxy: Option<&str>,
195 ) -> anyhow::Result<Self> {
196 let inner = match spec {
197 ChainSourceSpec::Bitcoind { url, auth } => ChainSourceClient::Bitcoind(
198 rpc::create_client(
199 &url,
200 auth,
201 #[cfg(feature = "socks5-proxy")] proxy,
202 ).context("failed to create bitcoind rpc client")?
203 ),
204 ChainSourceSpec::Esplora { url } => ChainSourceClient::Esplora({
205 let url = url.strip_suffix("/").unwrap_or(&url);
207 let mut builder = esplora_client::Builder::new(url);
208 #[cfg(feature = "socks5-proxy")]
209 if let Some(proxy) = proxy {
210 builder = builder.proxy(proxy);
211 }
212 builder.build_async()
213 .with_context(|| format!("failed to create esplora client for url {}", url))?
214 }),
215 };
216
217 inner.check_network(network).await?;
218
219 let fee = fallback_fee.unwrap_or(FeeRate::BROADCAST_MIN);
220 let fee_rates = RwLock::new(FeeRates { fast: fee, regular: fee, slow: fee });
221
222 Ok(Self { inner, network, fee_rates })
223 }
224
225 async fn fetch_fee_rates(&self) -> anyhow::Result<FeeRates> {
226 match self.inner() {
227 ChainSourceClient::Bitcoind(bitcoind) => {
228 let get_fee_rate = |target| {
229 let fee = bitcoind.estimate_smart_fee(
230 target, Some(rpc::json::EstimateMode::Economical),
231 )?;
232 if let Some(fee_rate) = fee.fee_rate {
233 Ok(FeeRate::from_amount_per_kvb_ceil(fee_rate))
234 } else {
235 Err(anyhow!("No rate returned from estimate_smart_fee for a {} confirmation target", target))
236 }
237 };
238 Ok(FeeRates {
239 fast: get_fee_rate(FEE_RATE_TARGET_CONF_FAST)?,
240 regular: get_fee_rate(FEE_RATE_TARGET_CONF_REGULAR).expect("should exist"),
241 slow: get_fee_rate(FEE_RATE_TARGET_CONF_SLOW).expect("should exist"),
242 })
243 },
244 ChainSourceClient::Esplora(client) => {
245 let estimates = client.get_fee_estimates().await?;
247 let get_fee_rate = |target| {
248 let fee = estimates.get(&target).with_context(||
249 format!("No rate returned from get_fee_estimates for a {} confirmation target", target)
250 )?;
251 FeeRate::from_sat_per_vb_decimal_checked_ceil(*fee).with_context(||
252 format!("Invalid rate returned from get_fee_estimates {} for a {} confirmation target", fee, target)
253 )
254 };
255 Ok(FeeRates {
256 fast: get_fee_rate(FEE_RATE_TARGET_CONF_FAST)?,
257 regular: get_fee_rate(FEE_RATE_TARGET_CONF_REGULAR)?,
258 slow: get_fee_rate(FEE_RATE_TARGET_CONF_SLOW)?,
259 })
260 }
261 }
262 }
263
264 pub async fn tip(&self) -> anyhow::Result<BlockHeight> {
265 match self.inner() {
266 ChainSourceClient::Bitcoind(bitcoind) => {
267 Ok(bitcoind.get_block_count()? as BlockHeight)
268 },
269 ChainSourceClient::Esplora(client) => {
270 Ok(client.get_height().await?)
271 },
272 }
273 }
274
275 pub async fn tip_ref(&self) -> anyhow::Result<BlockRef> {
276 self.block_ref(self.tip().await?).await
277 }
278
279 pub async fn block_ref(&self, height: BlockHeight) -> anyhow::Result<BlockRef> {
280 match self.inner() {
281 ChainSourceClient::Bitcoind(bitcoind) => {
282 let hash = bitcoind.get_block_hash(height as u64)?;
283 Ok(BlockRef { height, hash })
284 },
285 ChainSourceClient::Esplora(client) => {
286 let hash = client.get_block_hash(height).await?;
287 Ok(BlockRef { height, hash })
288 },
289 }
290 }
291
292 pub async fn block(&self, hash: BlockHash) -> anyhow::Result<Option<Block>> {
293 match self.inner() {
294 ChainSourceClient::Bitcoind(bitcoind) => {
295 match bitcoind.get_block(&hash) {
296 Ok(b) => Ok(Some(b)),
297 Err(e) if e.is_not_found() => Ok(None),
298 Err(e) => Err(e.into()),
299 }
300 },
301 ChainSourceClient::Esplora(client) => {
302 Ok(client.get_block_by_hash(&hash).await?)
303 },
304 }
305 }
306
307 pub async fn mempool_ancestor_info(&self, txid: Txid) -> anyhow::Result<MempoolAncestorInfo> {
310 let mut result = MempoolAncestorInfo::new(txid);
311
312 match self.inner() {
315 ChainSourceClient::Bitcoind(bitcoind) => {
316 let entry = bitcoind.get_mempool_entry(&txid)?;
317 let err = || anyhow!("missing weight parameter from getmempoolentry");
318
319 result.total_fee = entry.fees.ancestor;
320 result.total_weight = Weight::from_wu(entry.weight.ok_or_else(err)?) +
321 Weight::from_vb(entry.ancestor_size).ok_or_else(err)?;
322 },
323 ChainSourceClient::Esplora(client) => {
324 let status = self.tx_status(txid).await?;
327 if !matches!(status, TxStatus::Mempool) {
328 return Err(anyhow!("{} is not in the mempool, status is {:?}", txid, status));
329 }
330
331 let mut info_map: HashMap<Txid, esplora_client::Tx> = HashMap::new();
332 let mut set = HashSet::from([txid]);
333 while !set.is_empty() {
334 let requests = set.iter().filter_map(|txid| if info_map.contains_key(txid) {
336 None
337 } else {
338 Some((txid, client.get_tx_info(&txid)))
339 }).collect::<Vec<_>>();
340
341 let mut next_set = HashSet::new();
343
344 for (txid, request) in requests {
346 let info = request.await?
347 .ok_or_else(|| anyhow!("unable to retrieve tx info for {}", txid))?;
348 if !info.status.confirmed {
349 for vin in info.vin.iter() {
350 next_set.insert(vin.txid);
351 }
352 }
353 info_map.insert(*txid, info);
354 }
355 set = next_set;
356 }
357 for info in info_map.into_values().filter(|info| !info.status.confirmed) {
359 result.total_fee += info.fee();
360 result.total_weight += info.weight();
361 }
362 },
363 }
364 Ok(result)
366 }
367
368 pub async fn txs_spending_inputs<T: IntoIterator<Item = OutPoint>>(
371 &self,
372 outpoints: T,
373 block_scan_start: BlockHeight,
374 ) -> anyhow::Result<TxsSpendingInputsResult> {
375 let mut res = TxsSpendingInputsResult::new();
376 match self.inner() {
377 ChainSourceClient::Bitcoind(bitcoind) => {
378 let start = block_scan_start.saturating_sub(1);
380 let block_ref = self.block_ref(start).await?;
381 let cp = CheckPoint::new(BlockId {
382 height: block_ref.height,
383 hash: block_ref.hash,
384 });
385
386 let mut emitter = bdk_bitcoind_rpc::Emitter::new(
387 bitcoind, cp.clone(), cp.height(), bdk_bitcoind_rpc::NO_EXPECTED_MEMPOOL_TXS,
388 );
389
390 debug!("Scanning blocks for spent outpoints with bitcoind, starting at block height {}...", block_scan_start);
391 let outpoint_set = outpoints.into_iter().collect::<HashSet<_>>();
392 while let Some(em) = emitter.next_block()? {
393 if em.block_height() % 1000 == 0 {
395 info!("Scanned for spent outpoints until block height {}", em.block_height());
396 }
397 for tx in &em.block.txdata {
398 for txin in tx.input.iter() {
399 if outpoint_set.contains(&txin.previous_output) {
400 res.add(
401 txin.previous_output.clone(),
402 tx.compute_txid(),
403 TxStatus::Confirmed(BlockRef {
404 height: em.block_height(), hash: em.block.block_hash().clone()
405 })
406 );
407 if res.map.len() == outpoint_set.len() {
409 return Ok(res);
410 }
411 }
412 }
413 }
414 }
415
416 debug!("Finished scanning blocks for spent outpoints, now checking the mempool...");
417 let mempool = emitter.mempool()?;
418 for (tx, _last_seen) in &mempool.update {
419 for txin in tx.input.iter() {
420 if outpoint_set.contains(&txin.previous_output) {
421 res.add(
422 txin.previous_output.clone(),
423 tx.compute_txid(),
424 TxStatus::Mempool,
425 );
426
427 if res.map.len() == outpoint_set.len() {
429 return Ok(res);
430 }
431 }
432 }
433 }
434 debug!("Finished checking the mempool for spent outpoints");
435 },
436 ChainSourceClient::Esplora(client) => {
437 for outpoint in outpoints {
438 let output_status = client.get_output_status(&outpoint.txid, outpoint.vout.into()).await?;
439
440 if let Some(output_status) = output_status {
441 if output_status.spent {
442 let tx_status = {
443 let status = output_status.status.expect("Status should be valid if an outpoint is spent");
444 if status.confirmed {
445 TxStatus::Confirmed(BlockRef {
446 height: status.block_height.expect("Confirmed transaction missing block_height"),
447 hash: status.block_hash.expect("Confirmed transaction missing block_hash"),
448 })
449 } else {
450 TxStatus::Mempool
451 }
452 };
453 let txid = output_status.txid.expect("Txid should be valid if an outpoint is spent");
454 res.add(outpoint, txid, tx_status);
455 }
456 }
457 }
458 },
459 }
460
461 Ok(res)
462 }
463
464 pub async fn broadcast_tx(&self, tx: &Transaction) -> anyhow::Result<()> {
465 match self.inner() {
466 ChainSourceClient::Bitcoind(bitcoind) => {
467 match bitcoind.send_raw_transaction(tx) {
468 Ok(_) => Ok(()),
469 Err(rpc::Error::JsonRpc(
470 rpc::jsonrpc::Error::Rpc(e))
471 ) if e.code == TX_ALREADY_IN_CHAIN_ERROR => Ok(()),
472 Err(e) => Err(e.into()),
473 }
474 },
475 ChainSourceClient::Esplora(client) => {
476 client.broadcast(tx).await?;
477 Ok(())
478 },
479 }
480 }
481
482 pub async fn broadcast_package(&self, txs: &[impl Borrow<Transaction>]) -> anyhow::Result<()> {
483 match self.inner() {
484 ChainSourceClient::Bitcoind(bitcoind) => {
485 let res = bitcoind.submit_package(txs)?;
486 if res.package_msg != "success" {
487 let errors = res.tx_results.values()
488 .map(|t| format!("tx {}: {}",
489 t.txid, t.error.as_ref().map(|s| s.as_str()).unwrap_or("(no error)"),
490 ))
491 .collect::<Vec<_>>();
492 bail!("msg: '{}', errors: {:?}", res.package_msg, errors);
493 }
494 Ok(())
495 },
496 ChainSourceClient::Esplora(client) => {
497 let txs = txs.iter().map(|t| t.borrow().clone()).collect::<Vec<_>>();
498 let res = client.submit_package(&txs, None, None).await?;
499 if res.package_msg != "success" {
500 let errors = res.tx_results.values()
501 .map(|t| format!("tx {}: {}",
502 t.txid, t.error.as_ref().map(|s| s.as_str()).unwrap_or("(no error)"),
503 ))
504 .collect::<Vec<_>>();
505 bail!("msg: '{}', errors: {:?}", res.package_msg, errors);
506 }
507
508 Ok(())
509 },
510 }
511 }
512
513 pub async fn get_tx(&self, txid: &Txid) -> anyhow::Result<Option<Transaction>> {
514 match self.inner() {
515 ChainSourceClient::Bitcoind(bitcoind) => {
516 match bitcoind.get_raw_transaction(txid, None) {
517 Ok(tx) => Ok(Some(tx)),
518 Err(e) if e.is_not_found() => Ok(None),
519 Err(e) => Err(e.into()),
520 }
521 },
522 ChainSourceClient::Esplora(client) => {
523 Ok(client.get_tx(txid).await?)
524 },
525 }
526 }
527
528 pub async fn tx_confirmed(&self, txid: Txid) -> anyhow::Result<Option<BlockHeight>> {
530 Ok(self.tx_status(txid).await?.confirmed_height())
531 }
532
533 pub async fn tx_status(&self, txid: Txid) -> anyhow::Result<TxStatus> {
535 match self.inner() {
536 ChainSourceClient::Bitcoind(bitcoind) => Ok(bitcoind.tx_status(txid)?),
537 ChainSourceClient::Esplora(esplora) => {
538 match esplora.get_tx_info(&txid).await? {
539 Some(info) => match (info.status.block_height, info.status.block_hash) {
540 (Some(block_height), Some(block_hash)) => Ok(TxStatus::Confirmed(BlockRef {
541 height: block_height,
542 hash: block_hash,
543 } )),
544 _ => Ok(TxStatus::Mempool),
545 },
546 None => Ok(TxStatus::NotFound),
547 }
548 },
549 }
550 }
551
552 #[allow(unused)]
553 pub async fn txout_value(&self, outpoint: &OutPoint) -> anyhow::Result<Amount> {
554 let tx = match self.inner() {
555 ChainSourceClient::Bitcoind(bitcoind) => {
556 bitcoind.get_raw_transaction(&outpoint.txid, None)
557 .with_context(|| format!("tx {} unknown", outpoint.txid))?
558 },
559 ChainSourceClient::Esplora(client) => {
560 client.get_tx(&outpoint.txid).await?
561 .with_context(|| format!("tx {} unknown", outpoint.txid))?
562 },
563 };
564 Ok(tx.output.get(outpoint.vout as usize).context("outpoint vout out of range")?.value)
565 }
566
567 pub async fn update_fee_rates(&self, fallback_fee: Option<FeeRate>) -> anyhow::Result<()> {
570 let fee_rates = match (self.fetch_fee_rates().await, fallback_fee) {
571 (Ok(fee_rates), _) => Ok(fee_rates),
572 (Err(e), None) => Err(e),
573 (Err(e), Some(fallback)) => {
574 warn!("Error getting fee rates, falling back to {} sat/kvB: {}",
575 fallback.to_btc_per_kvb(), e,
576 );
577 Ok(FeeRates { fast: fallback, regular: fallback, slow: fallback })
578 }
579 }?;
580
581 *self.fee_rates.write().await = fee_rates;
582 Ok(())
583 }
584}
585
586#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
588pub struct FeeRates {
589 pub fast: FeeRate,
591 pub regular: FeeRate,
593 pub slow: FeeRate,
595}
596
597#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
599pub struct MempoolAncestorInfo {
600 pub txid: Txid,
602 pub total_fee: Amount,
605 pub total_weight: Weight,
607}
608
609impl MempoolAncestorInfo {
610 pub fn new(txid: Txid) -> Self {
611 Self {
612 txid,
613 total_fee: Amount::ZERO,
614 total_weight: Weight::ZERO,
615 }
616 }
617
618 pub fn effective_fee_rate(&self) -> Option<FeeRate> {
619 FeeRate::from_amount_and_weight_ceil(self.total_fee, self.total_weight)
620 }
621}
622
623#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
624pub struct TxsSpendingInputsResult {
625 pub map: HashMap<OutPoint, (Txid, TxStatus)>,
626}
627
628impl TxsSpendingInputsResult {
629 pub fn new() -> Self {
630 Self { map: HashMap::new() }
631 }
632
633 pub fn add(&mut self, outpoint: OutPoint, txid: Txid, status: TxStatus) {
634 self.map.insert(outpoint, (txid, status));
635 }
636
637 pub fn get(&self, outpoint: &OutPoint) -> Option<&(Txid, TxStatus)> {
638 self.map.get(outpoint)
639 }
640
641 pub fn confirmed_txids(&self) -> impl Iterator<Item = (Txid, BlockRef)> + '_ {
642 self.map
643 .iter()
644 .filter_map(|(_, (txid, status))| {
645 match status {
646 TxStatus::Confirmed(block) => Some((*txid, *block)),
647 _ => None,
648 }
649 })
650 }
651
652 pub fn mempool_txids(&self) -> impl Iterator<Item = Txid> + '_ {
653 self.map
654 .iter()
655 .filter(|(_, (_, status))| matches!(status, TxStatus::Mempool))
656 .map(|(_, (txid, _))| *txid)
657 }
658}