use async_trait::async_trait;
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, Script, Txid},
collections::BTreeMap,
keychain::LocalUpdate,
BlockId, ConfirmationTimeAnchor,
};
use bdk_esplora::EsploraAsyncExt;
use bitcoin::secp256k1::rand;
use bitcoin::secp256k1::rand::prelude::SliceRandom;
use bitcoin::{BlockHeader, MerkleBlock, Transaction};
use esplora_client::{AsyncClient, BlockStatus, Error, OutputStatus, TxStatus};
use futures::{stream::FuturesOrdered, TryStreamExt};
use reqwest::Client;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct MultiEsploraClient {
clients: Vec<Arc<AsyncClient>>,
}
impl MultiEsploraClient {
pub fn new(clients: Vec<Arc<AsyncClient>>) -> Self {
if clients.is_empty() {
panic!("No esplora clients provided");
}
Self { clients }
}
fn get_random_client(&self) -> Arc<AsyncClient> {
let client = self.clients.choose(&mut rand::thread_rng()).unwrap();
client.clone()
}
pub(crate) fn client(&self) -> Client {
let client = self.get_random_client();
client.client().to_owned()
}
pub(crate) fn url(&self) -> String {
let client = self.get_random_client();
client.url().to_owned()
}
pub async fn broadcast(&self, transaction: &Transaction) -> Result<(), Error> {
let client = self.get_random_client();
client.broadcast(transaction).await
}
pub async fn get_block_hash(&self, block_height: u32) -> Result<BlockHash, Error> {
let client = self.get_random_client();
client.get_block_hash(block_height).await
}
pub async fn get_height(&self) -> Result<u32, Error> {
let client = self.get_random_client();
client.get_height().await
}
pub async fn get_tip_hash(&self) -> Result<BlockHash, Error> {
let client = self.get_random_client();
client.get_tip_hash().await
}
pub async fn scripthash_txs(
&self,
script: &Script,
last_seen: Option<Txid>,
) -> Result<Vec<esplora_client::Tx>, Error> {
let client = self.get_random_client();
client.scripthash_txs(script, last_seen).await
}
pub async fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
let client = self.get_random_client();
client.get_tx(txid).await
}
pub async fn get_tx_status(&self, txid: &Txid) -> Result<TxStatus, Error> {
let client = self.get_random_client();
client.get_tx_status(txid).await
}
pub async fn get_output_status(
&self,
txid: &Txid,
index: u64,
) -> Result<Option<OutputStatus>, Error> {
let client = self.get_random_client();
client.get_output_status(txid, index).await
}
pub async fn get_block_status(&self, block_hash: &BlockHash) -> Result<BlockStatus, Error> {
let client = self.get_random_client();
client.get_block_status(block_hash).await
}
pub async fn get_merkle_block(&self, tx_hash: &Txid) -> Result<Option<MerkleBlock>, Error> {
let client = self.get_random_client();
client.get_merkle_block(tx_hash).await
}
pub async fn get_header_by_hash(&self, block_hash: &BlockHash) -> Result<BlockHeader, Error> {
let client = self.get_random_client();
client.get_header_by_hash(block_hash).await
}
pub async fn get_fee_estimates(&self) -> Result<HashMap<String, f64>, Error> {
let client = self.get_random_client();
client.get_fee_estimates().await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl EsploraAsyncExt for MultiEsploraClient {
async fn scan<K: Ord + Clone + Send>(
&self,
local_chain: &BTreeMap<u32, BlockHash>,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, Script)> + Send> + Send,
>,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
stop_gap: usize,
parallel_requests: usize,
) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error> {
let parallel_requests = Ord::max(parallel_requests, 1);
let (mut update, tip_at_start) = loop {
let mut update = LocalUpdate::<K, ConfirmationTimeAnchor>::default();
for (&height, &original_hash) in local_chain.iter().rev() {
let update_block_id = BlockId {
height,
hash: self.get_block_hash(height).await?,
};
let _ = update
.chain
.insert_block(update_block_id)
.expect("cannot repeat height here");
if update_block_id.hash == original_hash {
break;
}
}
let tip_at_start = BlockId {
height: self.get_height().await?,
hash: self.get_tip_hash().await?,
};
if update.chain.insert_block(tip_at_start).is_ok() {
break (update, tip_at_start);
}
};
for (keychain, spks) in keychain_spks {
let mut spks = spks.into_iter();
let mut last_active_index = None;
let mut empty_scripts = 0;
type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
loop {
let futures = (0..parallel_requests)
.filter_map(|_| {
let (index, script) = spks.next()?;
let client = self.clone();
Some(async move {
let mut related_txs = client.scripthash_txs(&script, None).await?;
let n_confirmed =
related_txs.iter().filter(|tx| tx.status.confirmed).count();
if n_confirmed >= 25 {
loop {
let new_related_txs = client
.scripthash_txs(
&script,
Some(related_txs.last().unwrap().txid),
)
.await?;
let n = new_related_txs.len();
related_txs.extend(new_related_txs);
if n < 25 {
break;
}
}
}
Result::<_, Error>::Ok((index, related_txs))
})
})
.collect::<FuturesOrdered<_>>();
let n_futures = futures.len();
for (index, related_txs) in futures.try_collect::<Vec<IndexWithTxs>>().await? {
if related_txs.is_empty() {
empty_scripts += 1;
} else {
last_active_index = Some(index);
empty_scripts = 0;
}
for tx in related_txs {
let anchor = map_confirmation_time_anchor(&tx.status, tip_at_start);
let _ = update.graph.insert_tx(tx.to_tx());
if let Some(anchor) = anchor {
let _ = update.graph.insert_anchor(tx.txid, anchor);
}
}
}
if n_futures == 0 || empty_scripts >= stop_gap {
break;
}
}
if let Some(last_active_index) = last_active_index {
update.keychain.insert(keychain, last_active_index);
}
}
for txid in txids.into_iter() {
if update.graph.get_tx(txid).is_none() {
match self.get_tx(&txid).await? {
Some(tx) => {
let _ = update.graph.insert_tx(tx);
}
None => continue,
}
}
match self.get_tx_status(&txid).await? {
tx_status if tx_status.confirmed => {
if let Some(anchor) = map_confirmation_time_anchor(&tx_status, tip_at_start) {
let _ = update.graph.insert_anchor(txid, anchor);
}
}
_ => continue,
}
}
for op in outpoints.into_iter() {
let mut op_txs = Vec::with_capacity(2);
if let (
Some(tx),
tx_status @ TxStatus {
confirmed: true, ..
},
) = (
self.get_tx(&op.txid).await?,
self.get_tx_status(&op.txid).await?,
) {
op_txs.push((tx, tx_status));
if let Some(OutputStatus {
txid: Some(txid),
status: Some(spend_status),
..
}) = self.get_output_status(&op.txid, op.vout as _).await?
{
if let Some(spend_tx) = self.get_tx(&txid).await? {
op_txs.push((spend_tx, spend_status));
}
}
}
for (tx, status) in op_txs {
let txid = tx.txid();
let anchor = map_confirmation_time_anchor(&status, tip_at_start);
let _ = update.graph.insert_tx(tx);
if let Some(anchor) = anchor {
let _ = update.graph.insert_anchor(txid, anchor);
}
}
}
if tip_at_start.hash != self.get_block_hash(tip_at_start.height).await? {
let txids_found = update
.graph
.full_txs()
.map(|tx_node| tx_node.txid)
.collect::<Vec<_>>();
update.chain = EsploraAsyncExt::scan_without_keychain(
self,
local_chain,
[],
txids_found,
[],
parallel_requests,
)
.await?
.chain;
}
Ok(update)
}
}
fn map_confirmation_time_anchor(
tx_status: &TxStatus,
tip_at_start: BlockId,
) -> Option<ConfirmationTimeAnchor> {
match (tx_status.block_time, tx_status.block_height) {
(Some(confirmation_time), Some(confirmation_height)) => Some(ConfirmationTimeAnchor {
anchor_block: tip_at_start,
confirmation_height,
confirmation_time,
}),
_ => None,
}
}