1use async_trait::async_trait;
2use std::time::Duration;
3use thiserror::Error;
4
5use bitcoins::{
6 enc::Address,
7 hashes::{BlockHash, TXID},
8 types::*,
9};
10use coins_core::prelude::*;
11use futures_util::lock::Mutex;
12use lru::LruCache;
13
14use crate::{
15 chain::Tips, pending::PendingTx, types::RawHeader, watcher::PollingWatcher, DEFAULT_CACHE_SIZE,
16};
17
18#[derive(Debug, Error)]
20pub enum ProviderError {
21 #[cfg(any(feature = "rpc", feature = "esplora"))]
23 #[error(transparent)]
24 SerdeJsonError(#[from] serde_json::Error),
25
26 #[error(transparent)]
28 EncoderError(#[from] coins_core::enc::bases::EncodingError),
29
30 #[error(transparent)]
32 CoinsSerError(#[from] coins_core::ser::SerError),
33
34 #[error("Unsupported action: {0}")]
36 Unsupported(String),
37
38 #[cfg(feature = "rpc")]
40 #[error("RPC Error Response: {0}")]
41 RpcErrorResponse(crate::rpc::common::ErrorResponse),
42
43 #[error("Proivder error {e}")]
45 Custom {
46 from_parsing: bool,
48 e: Box<dyn std::error::Error>,
50 },
51}
52
53impl ProviderError {
54 pub fn custom(from_parsing: bool, e: Box<dyn std::error::Error>) -> Self {
56 Self::Custom { from_parsing, e }
57 }
58 #[cfg(any(feature = "rpc", feature = "esplora"))]
65 pub fn from_parsing(&self) -> bool {
66 matches!(
67 self,
68 ProviderError::Custom {
69 from_parsing: true,
70 e: _,
71 } | ProviderError::SerdeJsonError(_)
72 | ProviderError::CoinsSerError(_)
73 | ProviderError::EncoderError(_)
74 )
75 }
76
77 #[cfg(not(any(feature = "rpc", feature = "esplora")))]
84 pub fn from_parsing(&self) -> bool {
85 match self {
86 ProviderError::Custom {
87 from_parsing: true,
88 e: _,
89 } => true,
90 ProviderError::CoinsSerError(_) => true,
91 ProviderError::EncoderError(_) => true,
92 _ => false,
93 }
94 }
95}
96
97#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
99#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
100pub trait BtcProvider: Sync + Send {
101 fn close(self)
103 where
104 Self: Sized,
105 {
106 }
107
108 async fn tip_hash(&self) -> Result<BlockHash, ProviderError>;
112
113 async fn tip_height(&self) -> Result<usize, ProviderError>;
115
116 async fn in_best_chain(&self, digest: BlockHash) -> Result<bool, ProviderError>;
118
119 async fn get_digest_range(
123 &self,
124 start: usize,
125 headers: usize,
126 ) -> Result<Vec<BlockHash>, ProviderError>;
127
128 async fn get_raw_header_range(
132 &self,
133 start: usize,
134 headers: usize,
135 ) -> Result<Vec<RawHeader>, ProviderError>;
136
137 async fn get_header_at_height(
140 &self,
141 height: usize,
142 ) -> Result<Option<RawHeader>, ProviderError> {
143 Ok(self.get_raw_header_range(height, 1).await?.first().copied())
144 }
145
146 async fn get_raw_header(&self, digest: BlockHash) -> Result<Option<RawHeader>, ProviderError>;
149
150 async fn get_height_of(&self, digest: BlockHash) -> Result<Option<usize>, ProviderError>;
154
155 async fn get_confirmed_height(&self, txid: TXID) -> Result<Option<usize>, ProviderError>;
159
160 async fn get_confs(&self, txid: TXID) -> Result<Option<usize>, ProviderError>;
163
164 async fn get_tx(&self, txid: TXID) -> Result<Option<BitcoinTx>, ProviderError>;
167
168 async fn broadcast(&self, tx: BitcoinTx) -> Result<TXID, ProviderError>;
170
171 async fn get_outspend(&self, outpoint: BitcoinOutpoint) -> Result<Option<TXID>, ProviderError>;
178
179 async fn get_utxos_by_address(&self, address: &Address) -> Result<Vec<Utxo>, ProviderError>;
185
186 async fn get_utxos_by_script(&self, spk: &ScriptPubkey) -> Result<Vec<Utxo>, ProviderError> {
192 self.get_utxos_by_address(&crate::Encoder::encode_address(spk)?)
193 .await
194 }
195
196 async fn get_merkle(
200 &self,
201 txid: TXID,
202 ) -> Result<Option<(usize, Vec<Hash256Digest>)>, ProviderError>;
203
204 async fn get_confirming_digests(
206 &self,
207 txid: TXID,
208 confs: usize,
209 ) -> Result<Vec<BlockHash>, ProviderError> {
210 let height = {
211 let height_opt = self.get_confirmed_height(txid).await?;
212 if height_opt.is_none() {
213 return Ok(vec![]);
214 }
215 height_opt.unwrap()
216 };
217 self.get_digest_range(height, confs).await
218 }
219
220 async fn get_confirming_headers(
222 &self,
223 txid: TXID,
224 confs: usize,
225 ) -> Result<Vec<RawHeader>, ProviderError> {
226 let height = {
227 let height_opt = self.get_confirmed_height(txid).await?;
228 if height_opt.is_none() {
229 return Ok(vec![]);
230 }
231 height_opt.unwrap()
232 };
233 self.get_raw_header_range(height, confs).await
234 }
235}
236
237#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
239#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
240pub trait PollingBtcProvider: BtcProvider {
241 fn interval(&self) -> Duration;
243
244 fn set_interval(&mut self, interval: usize);
246
247 fn send(&self, tx: BitcoinTx, confirmations: usize) -> PendingTx
251 where
252 Self: Sized,
253 {
254 PendingTx::new(tx, self)
255 .confirmations(confirmations)
256 .interval(self.interval())
257 }
258
259 async fn track(&self, txid: TXID, confirmations: usize) -> Option<PendingTx<'_>>
262 where
263 Self: Sized,
264 {
265 let tx = self.get_tx(txid).await.ok().flatten()?;
266 Some(
267 PendingTx::new(tx, self)
268 .confirmations(confirmations)
269 .interval(self.interval()),
270 )
271 }
272
273 fn tips(&self, limit: usize) -> Tips
278 where
279 Self: Sized,
280 {
281 Tips::new(limit, self).interval(self.interval())
282 }
283
284 fn watch(&self, outpoint: BitcoinOutpoint, confirmations: usize) -> PollingWatcher
289 where
290 Self: Sized,
291 {
292 PollingWatcher::new(outpoint, self)
293 .confirmations(confirmations)
294 .interval(self.interval())
295 }
296}
297
298pub struct CachingProvider<T: BtcProvider> {
300 provider: T,
301 tx_cache: Mutex<LruCache<TXID, BitcoinTx>>,
302 header_cache: Mutex<LruCache<BlockHash, RawHeader>>,
303 height_cache: Mutex<LruCache<BlockHash, usize>>,
304}
305
306impl<T: BtcProvider> From<T> for CachingProvider<T> {
307 fn from(provider: T) -> Self {
308 Self {
309 provider,
310 tx_cache: Mutex::new(LruCache::new(DEFAULT_CACHE_SIZE)),
311 header_cache: Mutex::new(LruCache::new(DEFAULT_CACHE_SIZE)),
312 height_cache: Mutex::new(LruCache::new(DEFAULT_CACHE_SIZE)),
313 }
314 }
315}
316
317impl<T> Default for CachingProvider<T>
318where
319 T: BtcProvider + Default,
320{
321 fn default() -> Self {
322 T::default().into()
323 }
324}
325
326impl<T: BtcProvider> CachingProvider<T> {
327 pub async fn peek_tx(&self, txid: TXID) -> Option<BitcoinTx> {
329 self.tx_cache.lock().await.peek(&txid).cloned()
330 }
331
332 pub async fn has_tx(&self, txid: TXID) -> bool {
334 self.tx_cache.lock().await.contains(&txid)
335 }
336
337 pub async fn has_header(&self, digest: BlockHash) -> bool {
339 self.header_cache.lock().await.contains(&digest)
340 }
341
342 pub async fn has_height(&self, digest: BlockHash) -> bool {
344 self.height_cache.lock().await.contains(&digest)
345 }
346}
347
348#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
349#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
350impl<T> BtcProvider for CachingProvider<T>
351where
352 T: BtcProvider,
353{
354 async fn tip_hash(&self) -> Result<BlockHash, ProviderError> {
355 self.provider.tip_hash().await
356 }
357
358 async fn tip_height(&self) -> Result<usize, ProviderError> {
359 self.provider.tip_height().await
360 }
361
362 async fn in_best_chain(&self, digest: BlockHash) -> Result<bool, ProviderError> {
363 self.provider.in_best_chain(digest).await
364 }
365
366 async fn get_digest_range(
367 &self,
368 start: usize,
369 headers: usize,
370 ) -> Result<Vec<BlockHash>, ProviderError> {
371 self.provider.get_digest_range(start, headers).await
372 }
373
374 async fn get_raw_header_range(
375 &self,
376 start: usize,
377 headers: usize,
378 ) -> Result<Vec<RawHeader>, ProviderError> {
379 self.provider.get_raw_header_range(start, headers).await
380 }
381
382 async fn get_raw_header(&self, digest: BlockHash) -> Result<Option<RawHeader>, ProviderError> {
383 if self.has_header(digest).await {
384 return Ok(self.header_cache.lock().await.get(&digest).cloned());
385 }
386
387 let header_opt = { self.provider.get_raw_header(digest).await? };
388 if header_opt.is_none() {
389 return Ok(None);
390 }
391 let header = header_opt.unwrap();
392 self.header_cache.lock().await.put(digest, header);
393 Ok(Some(header))
394 }
395
396 async fn get_height_of(&self, digest: BlockHash) -> Result<Option<usize>, ProviderError> {
397 if self.has_header(digest).await {
398 return Ok(self.height_cache.lock().await.get(&digest).cloned());
399 }
400
401 let height_opt = { self.provider.get_height_of(digest).await? };
402 if height_opt.is_none() {
403 return Ok(None);
404 }
405 let height = height_opt.unwrap();
406 self.height_cache.lock().await.put(digest, height);
407 Ok(Some(height))
408 }
409
410 async fn get_confirmed_height(&self, txid: TXID) -> Result<Option<usize>, ProviderError> {
411 self.provider.get_confirmed_height(txid).await
412 }
413
414 async fn get_confs(&self, txid: TXID) -> Result<Option<usize>, ProviderError> {
415 self.provider.get_confs(txid).await
416 }
417
418 async fn get_tx(&self, txid: TXID) -> Result<Option<BitcoinTx>, ProviderError> {
419 if self.has_tx(txid).await {
420 return Ok(self.tx_cache.lock().await.get(&txid).cloned());
421 }
422
423 let tx_opt = { self.provider.get_tx(txid).await? };
424 if tx_opt.is_none() {
425 return Ok(None);
426 }
427 let tx = tx_opt.unwrap();
428 self.tx_cache.lock().await.put(txid, tx.clone());
429 Ok(Some(tx))
430 }
431
432 async fn broadcast(&self, tx: BitcoinTx) -> Result<TXID, ProviderError> {
433 self.provider.broadcast(tx).await
434 }
435
436 async fn get_outspend(&self, outpoint: BitcoinOutpoint) -> Result<Option<TXID>, ProviderError> {
437 self.provider.get_outspend(outpoint).await
438 }
439
440 async fn get_utxos_by_address(&self, address: &Address) -> Result<Vec<Utxo>, ProviderError> {
441 self.provider.get_utxos_by_address(address).await
442 }
443
444 async fn get_merkle(
445 &self,
446 txid: TXID,
447 ) -> Result<Option<(usize, Vec<Hash256Digest>)>, ProviderError> {
448 self.provider.get_merkle(txid).await
449 }
450}
451
452#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
453#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
454impl<T> PollingBtcProvider for CachingProvider<T>
455where
456 T: PollingBtcProvider,
457{
458 fn interval(&self) -> Duration {
459 self.provider.interval()
460 }
461 fn set_interval(&mut self, interval: usize) {
462 self.provider.set_interval(interval)
463 }
464}