bitcoins_provider/
provider.rs

1use async_trait::async_trait;
2use std::time::Duration;
3use thiserror::Error;
4
5use bitcoins::{
6    enc::Address,
7    hashes::{BlockHash, TXID},
8    types::*,
9};
10use coins_core::prelude::*;
11use futures_util::lock::Mutex;
12use lru::LruCache;
13
14use crate::{
15    chain::Tips, pending::PendingTx, types::RawHeader, watcher::PollingWatcher, DEFAULT_CACHE_SIZE,
16};
17
18/// Errors thrown by providers
19#[derive(Debug, Error)]
20pub enum ProviderError {
21    /// Serde issue
22    #[cfg(any(feature = "rpc", feature = "esplora"))]
23    #[error(transparent)]
24    SerdeJsonError(#[from] serde_json::Error),
25
26    /// Bubbled up from bitcoins
27    #[error(transparent)]
28    EncoderError(#[from] coins_core::enc::bases::EncodingError),
29
30    /// Bubbled up from core
31    #[error(transparent)]
32    CoinsSerError(#[from] coins_core::ser::SerError),
33
34    /// Unsupported action. Provider should give a string describing the action and reason
35    #[error("Unsupported action: {0}")]
36    Unsupported(String),
37
38    /// RPC Error Response
39    #[cfg(feature = "rpc")]
40    #[error("RPC Error Response: {0}")]
41    RpcErrorResponse(crate::rpc::common::ErrorResponse),
42
43    /// Custom provider error. Indicates whether the request should be retried
44    #[error("Proivder error {e}")]
45    Custom {
46        /// Whether the Custom error suggests that the request be retried
47        from_parsing: bool,
48        /// The error
49        e: Box<dyn std::error::Error>,
50    },
51}
52
53impl ProviderError {
54    /// Shortcut for instantiating a custom error
55    pub fn custom(from_parsing: bool, e: Box<dyn std::error::Error>) -> Self {
56        Self::Custom { from_parsing, e }
57    }
58    /// Returns true if the request failed due to a local parsing error.
59    ///
60    /// ## Note:
61    ///
62    /// This usually indicates that a requested object was not found. It is common for Bitcoin
63    /// APIs to violate JSON RPC conventions, and return raw strings in this case.
64    #[cfg(any(feature = "rpc", feature = "esplora"))]
65    pub fn from_parsing(&self) -> bool {
66        matches!(
67            self,
68            ProviderError::Custom {
69                from_parsing: true,
70                e: _,
71            } | ProviderError::SerdeJsonError(_)
72                | ProviderError::CoinsSerError(_)
73                | ProviderError::EncoderError(_)
74        )
75    }
76
77    /// Returns true if the request failed due to a local parsing error.
78    ///
79    /// ## Note:
80    ///
81    /// This usually indicates that a requested object was not found. It is common for Bitcoin
82    /// APIs to violate JSON RPC conventions, and return raw strings in this case.
83    #[cfg(not(any(feature = "rpc", feature = "esplora")))]
84    pub fn from_parsing(&self) -> bool {
85        match self {
86            ProviderError::Custom {
87                from_parsing: true,
88                e: _,
89            } => true,
90            ProviderError::CoinsSerError(_) => true,
91            ProviderError::EncoderError(_) => true,
92            _ => false,
93        }
94    }
95}
96
97/// A Bitcoin Provider
98#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
99#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
100pub trait BtcProvider: Sync + Send {
101    /// Explicitly drop the provider, closing connections and freeing resources
102    fn close(self)
103    where
104        Self: Sized,
105    {
106    }
107
108    // -- CHAIN UTILS -- //
109
110    /// Fetch the LE digest of the chain tip
111    async fn tip_hash(&self) -> Result<BlockHash, ProviderError>;
112
113    /// Fetch the height of the chain tip
114    async fn tip_height(&self) -> Result<usize, ProviderError>;
115
116    /// Query the backend to determine if the header with `digest` is in the main chain.
117    async fn in_best_chain(&self, digest: BlockHash) -> Result<bool, ProviderError>;
118
119    /// Return `headers` blockhashes starting at height `start`. If the range is longer than the
120    /// chain, it will return as many headers as possible. If the start is above the tip height,
121    /// it will return an empty vector/
122    async fn get_digest_range(
123        &self,
124        start: usize,
125        headers: usize,
126    ) -> Result<Vec<BlockHash>, ProviderError>;
127
128    /// Return `headers` raw headers starting at height `start`. If the range is longer than the
129    /// chain, it will return as many headers as possible. If the start is above the tip height,
130    /// it will return an empty vector/
131    async fn get_raw_header_range(
132        &self,
133        start: usize,
134        headers: usize,
135    ) -> Result<Vec<RawHeader>, ProviderError>;
136
137    /// Get the header at `height` in the remote data source's best known chain. If no header is
138    /// known at that height, return `None`.
139    async fn get_header_at_height(
140        &self,
141        height: usize,
142    ) -> Result<Option<RawHeader>, ProviderError> {
143        Ok(self.get_raw_header_range(height, 1).await?.first().copied())
144    }
145
146    /// Return the raw header corresponding to a block hash. Returns `None` if the header is
147    /// unknown to the remote API
148    async fn get_raw_header(&self, digest: BlockHash) -> Result<Option<RawHeader>, ProviderError>;
149
150    /// Return the height of a header, or `None` if the header is unknown.
151    ///
152    /// ## Warning: Having a height does NOT mean that the header is part of the main chain.
153    async fn get_height_of(&self, digest: BlockHash) -> Result<Option<usize>, ProviderError>;
154
155    // -- TX UTILS -- //
156
157    /// Get confirming height of the tx. Ok(None) if unknown
158    async fn get_confirmed_height(&self, txid: TXID) -> Result<Option<usize>, ProviderError>;
159
160    /// Get the number of confs a tx has. If the TX is unconfirmed this will be `Ok(Some(0))`. If
161    /// the TX is unknown to the API, it will be `Ok(None)`.
162    async fn get_confs(&self, txid: TXID) -> Result<Option<usize>, ProviderError>;
163
164    /// Fetch a transaction from the remote API. If the tx is not found, the result will be
165    /// `Ok(None)`
166    async fn get_tx(&self, txid: TXID) -> Result<Option<BitcoinTx>, ProviderError>;
167
168    /// Broadcast a transaction to the network. Resolves to a TXID when broadcast.
169    async fn broadcast(&self, tx: BitcoinTx) -> Result<TXID, ProviderError>;
170
171    // -- SPEND UTILS -- //
172
173    /// Fetch the ID of a transaction that spends an outpoint. If no TX known to the remote source
174    /// spends that outpoint, the result will be `Ok(None)`.
175    ///
176    /// Note: some providers may not implement this functionality.
177    async fn get_outspend(&self, outpoint: BitcoinOutpoint) -> Result<Option<TXID>, ProviderError>;
178
179    /// Fetch the UTXOs belonging to an address from the remote API
180    ///
181    /// ## Note: some providers may not implement this functionality.
182    ///
183    /// ## Note: when using Bitcoin Core, this may take upwards of 40 second
184    async fn get_utxos_by_address(&self, address: &Address) -> Result<Vec<Utxo>, ProviderError>;
185
186    /// Fetch the UTXOs belonging to a script pubkey from the remote API
187    ///
188    /// Note: some providers may not implement this functionality.
189    ///
190    /// ## Note: when using Bitcoin Core, this may take upwards of 40 second
191    async fn get_utxos_by_script(&self, spk: &ScriptPubkey) -> Result<Vec<Utxo>, ProviderError> {
192        self.get_utxos_by_address(&crate::Encoder::encode_address(spk)?)
193            .await
194    }
195
196    // -- MERKLE UTILS -- //
197
198    /// Get the merkle proof for a transaction. This will be `None` if the tx is not confirmed
199    async fn get_merkle(
200        &self,
201        txid: TXID,
202    ) -> Result<Option<(usize, Vec<Hash256Digest>)>, ProviderError>;
203
204    /// TODO: make less brittle
205    async fn get_confirming_digests(
206        &self,
207        txid: TXID,
208        confs: usize,
209    ) -> Result<Vec<BlockHash>, ProviderError> {
210        let height = {
211            let height_opt = self.get_confirmed_height(txid).await?;
212            if height_opt.is_none() {
213                return Ok(vec![]);
214            }
215            height_opt.unwrap()
216        };
217        self.get_digest_range(height, confs).await
218    }
219
220    /// TODO: make less brittle
221    async fn get_confirming_headers(
222        &self,
223        txid: TXID,
224        confs: usize,
225    ) -> Result<Vec<RawHeader>, ProviderError> {
226        let height = {
227            let height_opt = self.get_confirmed_height(txid).await?;
228            if height_opt.is_none() {
229                return Ok(vec![]);
230            }
231            height_opt.unwrap()
232        };
233        self.get_raw_header_range(height, confs).await
234    }
235}
236
237/// An extension trait that adds polling watchers for a provider
238#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
239#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
240pub trait PollingBtcProvider: BtcProvider {
241    /// Return the polling duration of the provider
242    fn interval(&self) -> Duration;
243
244    /// Set the polling interval of the provider. Interval is seconds.
245    fn set_interval(&mut self, interval: usize);
246
247    /// Broadcast a transaction, get a future that resolves when the tx is confirmed. This
248    /// returns a `PendingTx` future. The tx will not be braodcast until that future is scheduled
249    /// to run.
250    fn send(&self, tx: BitcoinTx, confirmations: usize) -> PendingTx
251    where
252        Self: Sized,
253    {
254        PendingTx::new(tx, self)
255            .confirmations(confirmations)
256            .interval(self.interval())
257    }
258
259    /// Track a txid that may or may not already be in the mempool. Returns `None` if the txid is
260    /// not known to the remote node.
261    async fn track(&self, txid: TXID, confirmations: usize) -> Option<PendingTx<'_>>
262    where
263        Self: Sized,
264    {
265        let tx = self.get_tx(txid).await.ok().flatten()?;
266        Some(
267            PendingTx::new(tx, self)
268                .confirmations(confirmations)
269                .interval(self.interval()),
270        )
271    }
272
273    /// Watch the chain tip. Get notified of the new `BlockHash` every time it changes.
274    ///
275    /// Note: A new hash does not necessarily mean the chain height has increased. Reorgs may
276    /// result in the height remaining the same, or decreasing in rare cases.
277    fn tips(&self, limit: usize) -> Tips
278    where
279        Self: Sized,
280    {
281        Tips::new(limit, self).interval(self.interval())
282    }
283
284    /// Watch an outpoint, waiting for a tx to spend it. This returns a `PollingWatcher` future.
285    /// The observation will not start until that future is scheduled to run.
286    ///
287    /// Note: some providers may not implement this functionality.
288    fn watch(&self, outpoint: BitcoinOutpoint, confirmations: usize) -> PollingWatcher
289    where
290        Self: Sized,
291    {
292        PollingWatcher::new(outpoint, self)
293            .confirmations(confirmations)
294            .interval(self.interval())
295    }
296}
297
298/// A provider that caches API responses whose values will never change.
299pub struct CachingProvider<T: BtcProvider> {
300    provider: T,
301    tx_cache: Mutex<LruCache<TXID, BitcoinTx>>,
302    header_cache: Mutex<LruCache<BlockHash, RawHeader>>,
303    height_cache: Mutex<LruCache<BlockHash, usize>>,
304}
305
306impl<T: BtcProvider> From<T> for CachingProvider<T> {
307    fn from(provider: T) -> Self {
308        Self {
309            provider,
310            tx_cache: Mutex::new(LruCache::new(DEFAULT_CACHE_SIZE)),
311            header_cache: Mutex::new(LruCache::new(DEFAULT_CACHE_SIZE)),
312            height_cache: Mutex::new(LruCache::new(DEFAULT_CACHE_SIZE)),
313        }
314    }
315}
316
317impl<T> Default for CachingProvider<T>
318where
319    T: BtcProvider + Default,
320{
321    fn default() -> Self {
322        T::default().into()
323    }
324}
325
326impl<T: BtcProvider> CachingProvider<T> {
327    /// Return a reference to the TX, if it's in the cache.
328    pub async fn peek_tx(&self, txid: TXID) -> Option<BitcoinTx> {
329        self.tx_cache.lock().await.peek(&txid).cloned()
330    }
331
332    /// Return true if the cache has the tx in it
333    pub async fn has_tx(&self, txid: TXID) -> bool {
334        self.tx_cache.lock().await.contains(&txid)
335    }
336
337    /// Return true if the cache has the header in it
338    pub async fn has_header(&self, digest: BlockHash) -> bool {
339        self.header_cache.lock().await.contains(&digest)
340    }
341
342    /// Return true if the cache has the height in it
343    pub async fn has_height(&self, digest: BlockHash) -> bool {
344        self.height_cache.lock().await.contains(&digest)
345    }
346}
347
348#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
349#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
350impl<T> BtcProvider for CachingProvider<T>
351where
352    T: BtcProvider,
353{
354    async fn tip_hash(&self) -> Result<BlockHash, ProviderError> {
355        self.provider.tip_hash().await
356    }
357
358    async fn tip_height(&self) -> Result<usize, ProviderError> {
359        self.provider.tip_height().await
360    }
361
362    async fn in_best_chain(&self, digest: BlockHash) -> Result<bool, ProviderError> {
363        self.provider.in_best_chain(digest).await
364    }
365
366    async fn get_digest_range(
367        &self,
368        start: usize,
369        headers: usize,
370    ) -> Result<Vec<BlockHash>, ProviderError> {
371        self.provider.get_digest_range(start, headers).await
372    }
373
374    async fn get_raw_header_range(
375        &self,
376        start: usize,
377        headers: usize,
378    ) -> Result<Vec<RawHeader>, ProviderError> {
379        self.provider.get_raw_header_range(start, headers).await
380    }
381
382    async fn get_raw_header(&self, digest: BlockHash) -> Result<Option<RawHeader>, ProviderError> {
383        if self.has_header(digest).await {
384            return Ok(self.header_cache.lock().await.get(&digest).cloned());
385        }
386
387        let header_opt = { self.provider.get_raw_header(digest).await? };
388        if header_opt.is_none() {
389            return Ok(None);
390        }
391        let header = header_opt.unwrap();
392        self.header_cache.lock().await.put(digest, header);
393        Ok(Some(header))
394    }
395
396    async fn get_height_of(&self, digest: BlockHash) -> Result<Option<usize>, ProviderError> {
397        if self.has_header(digest).await {
398            return Ok(self.height_cache.lock().await.get(&digest).cloned());
399        }
400
401        let height_opt = { self.provider.get_height_of(digest).await? };
402        if height_opt.is_none() {
403            return Ok(None);
404        }
405        let height = height_opt.unwrap();
406        self.height_cache.lock().await.put(digest, height);
407        Ok(Some(height))
408    }
409
410    async fn get_confirmed_height(&self, txid: TXID) -> Result<Option<usize>, ProviderError> {
411        self.provider.get_confirmed_height(txid).await
412    }
413
414    async fn get_confs(&self, txid: TXID) -> Result<Option<usize>, ProviderError> {
415        self.provider.get_confs(txid).await
416    }
417
418    async fn get_tx(&self, txid: TXID) -> Result<Option<BitcoinTx>, ProviderError> {
419        if self.has_tx(txid).await {
420            return Ok(self.tx_cache.lock().await.get(&txid).cloned());
421        }
422
423        let tx_opt = { self.provider.get_tx(txid).await? };
424        if tx_opt.is_none() {
425            return Ok(None);
426        }
427        let tx = tx_opt.unwrap();
428        self.tx_cache.lock().await.put(txid, tx.clone());
429        Ok(Some(tx))
430    }
431
432    async fn broadcast(&self, tx: BitcoinTx) -> Result<TXID, ProviderError> {
433        self.provider.broadcast(tx).await
434    }
435
436    async fn get_outspend(&self, outpoint: BitcoinOutpoint) -> Result<Option<TXID>, ProviderError> {
437        self.provider.get_outspend(outpoint).await
438    }
439
440    async fn get_utxos_by_address(&self, address: &Address) -> Result<Vec<Utxo>, ProviderError> {
441        self.provider.get_utxos_by_address(address).await
442    }
443
444    async fn get_merkle(
445        &self,
446        txid: TXID,
447    ) -> Result<Option<(usize, Vec<Hash256Digest>)>, ProviderError> {
448        self.provider.get_merkle(txid).await
449    }
450}
451
452#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
453#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
454impl<T> PollingBtcProvider for CachingProvider<T>
455where
456    T: PollingBtcProvider,
457{
458    fn interval(&self) -> Duration {
459        self.provider.interval()
460    }
461    fn set_interval(&mut self, interval: usize) {
462        self.provider.set_interval(interval)
463    }
464}