waterfalls_client/
async.rs

1// Bitcoin Dev Kit
2// Written in 2020 by Alekos Filini <alekos.filini@gmail.com>
3//
4// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
5//
6// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
7// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
8// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
9// You may not use this file except in accordance with one or both of these
10// licenses.
11
12//! Waterfalls by way of `reqwest` HTTP client.
13
14use std::marker::PhantomData;
15use std::str::FromStr;
16
17use bitcoin::consensus::{deserialize, serialize, Decodable, Encodable};
18use bitcoin::hex::{DisplayHex, FromHex};
19use bitcoin::Address;
20use bitcoin::{block::Header as BlockHeader, BlockHash, Transaction, Txid};
21
22#[allow(unused_imports)]
23use log::{debug, error, info, trace};
24
25use reqwest::{header, Client, Response};
26
27use crate::{Builder, Error, WaterfallResponse, BASE_BACKOFF_MILLIS, RETRYABLE_ERROR_CODES};
28
29#[derive(Debug, Clone)]
30pub struct AsyncClient<S = DefaultSleeper> {
31    /// The URL of the Waterfalls Server.
32    url: String,
33    /// The inner [`reqwest::Client`] to make HTTP requests.
34    client: Client,
35    /// Number of times to retry a request
36    max_retries: usize,
37
38    /// Marker for the type of sleeper used
39    marker: PhantomData<S>,
40}
41
42impl<S: Sleeper> AsyncClient<S> {
43    /// Build an async client from a builder
44    pub fn from_builder(builder: Builder) -> Result<Self, Error> {
45        let mut client_builder = Client::builder();
46
47        #[cfg(not(target_arch = "wasm32"))]
48        if let Some(proxy) = &builder.proxy {
49            client_builder = client_builder.proxy(reqwest::Proxy::all(proxy)?);
50        }
51
52        #[cfg(not(target_arch = "wasm32"))]
53        if let Some(timeout) = builder.timeout {
54            client_builder = client_builder.timeout(core::time::Duration::from_secs(timeout));
55        }
56
57        if !builder.headers.is_empty() {
58            let mut headers = header::HeaderMap::new();
59            for (k, v) in &builder.headers {
60                let header_name = header::HeaderName::from_lowercase(k.to_lowercase().as_bytes())
61                    .map_err(|_| Error::InvalidHttpHeaderName(k.clone()))?;
62                let header_value = header::HeaderValue::from_str(v)
63                    .map_err(|_| Error::InvalidHttpHeaderValue(v.clone()))?;
64                headers.insert(header_name, header_value);
65            }
66            client_builder = client_builder.default_headers(headers);
67        }
68
69        Ok(AsyncClient {
70            url: builder.base_url,
71            client: client_builder.build()?,
72            max_retries: builder.max_retries,
73            marker: PhantomData,
74        })
75    }
76
77    pub fn from_client(url: String, client: Client) -> Self {
78        AsyncClient {
79            url,
80            client,
81            max_retries: crate::DEFAULT_MAX_RETRIES,
82            marker: PhantomData,
83        }
84    }
85
86    /// Make an HTTP GET request to given URL, deserializing to any `T` that
87    /// implement [`bitcoin::consensus::Decodable`].
88    ///
89    /// It should be used when requesting Waterfalls endpoints that can be directly
90    /// deserialized to native `rust-bitcoin` types, which implements
91    /// [`bitcoin::consensus::Decodable`] from `&[u8]`.
92    ///
93    /// # Errors
94    ///
95    /// This function will return an error either from the HTTP client, or the
96    /// [`bitcoin::consensus::Decodable`] deserialization.
97    async fn get_response<T: Decodable>(&self, path: &str) -> Result<T, Error> {
98        let url = format!("{}{}", self.url, path);
99        let response = self.get_with_retry(&url).await?;
100
101        if !response.status().is_success() {
102            return Err(Error::HttpResponse {
103                status: response.status().as_u16(),
104                message: response.text().await?,
105            });
106        }
107
108        Ok(deserialize::<T>(&response.bytes().await?)?)
109    }
110
111    /// Make an HTTP GET request to given URL, deserializing to `Option<T>`.
112    ///
113    /// It uses [`AsyncWaterfallsClient::get_response`] internally.
114    ///
115    /// See [`AsyncWaterfallsClient::get_response`] above for full documentation.
116    async fn get_opt_response<T: Decodable>(&self, path: &str) -> Result<Option<T>, Error> {
117        match self.get_response::<T>(path).await {
118            Ok(res) => Ok(Some(res)),
119            Err(Error::HttpResponse { status: 404, .. }) => Ok(None),
120            Err(e) => Err(e),
121        }
122    }
123
124    /// Make an HTTP GET request to given URL with query parameters, deserializing to any `T` that
125    /// implements [`serde::de::DeserializeOwned`].
126    async fn get_response_json_with_query<T: serde::de::DeserializeOwned>(
127        &self,
128        path: &str,
129        query_params: &[(&str, &str)],
130    ) -> Result<T, Error> {
131        let url = format!("{}{}", self.url, path);
132        let mut request = self.client.get(&url);
133        for (key, value) in query_params {
134            request = request.query(&[(key, value)]);
135        }
136        let response = request.send().await?;
137
138        if !response.status().is_success() {
139            return Err(Error::HttpResponse {
140                status: response.status().as_u16(),
141                message: response.text().await?,
142            });
143        }
144
145        response.json::<T>().await.map_err(Error::Reqwest)
146    }
147
148    /// Make an HTTP GET request to given URL, deserializing to any `T` that
149    /// implements [`bitcoin::consensus::Decodable`].
150    ///
151    /// It should be used when requesting Waterfalls endpoints that are expected
152    /// to return a hex string decodable to native `rust-bitcoin` types which
153    /// implement [`bitcoin::consensus::Decodable`] from `&[u8]`.
154    ///
155    /// # Errors
156    ///
157    /// This function will return an error either from the HTTP client, or the
158    /// [`bitcoin::consensus::Decodable`] deserialization.
159    async fn get_response_hex<T: Decodable>(&self, path: &str) -> Result<T, Error> {
160        let url = format!("{}{}", self.url, path);
161        let response = self.get_with_retry(&url).await?;
162
163        if !response.status().is_success() {
164            return Err(Error::HttpResponse {
165                status: response.status().as_u16(),
166                message: response.text().await?,
167            });
168        }
169
170        let hex_str = response.text().await?;
171        Ok(deserialize(&Vec::from_hex(&hex_str)?)?)
172    }
173
174    /// Make an HTTP GET request to given URL, deserializing to `String`.
175    ///
176    /// It should be used when requesting Waterfalls endpoints that can return
177    /// `String` formatted data that can be parsed downstream.
178    ///
179    /// # Errors
180    ///
181    /// This function will return an error either from the HTTP client.
182    async fn get_response_text(&self, path: &str) -> Result<String, Error> {
183        let url = format!("{}{}", self.url, path);
184        let response = self.get_with_retry(&url).await?;
185
186        if !response.status().is_success() {
187            return Err(Error::HttpResponse {
188                status: response.status().as_u16(),
189                message: response.text().await?,
190            });
191        }
192
193        Ok(response.text().await?)
194    }
195
196    /// Make an HTTP POST request to given URL, serializing from any `T` that
197    /// implement [`bitcoin::consensus::Encodable`].
198    ///
199    /// It should be used when requesting Waterfalls endpoints that expected a
200    /// native bitcoin type serialized with [`bitcoin::consensus::Encodable`].
201    ///
202    /// # Errors
203    ///
204    /// This function will return an error either from the HTTP client, or the
205    /// [`bitcoin::consensus::Encodable`] serialization.
206    async fn post_request_hex<T: Encodable>(&self, path: &str, body: T) -> Result<(), Error> {
207        let url = format!("{}{}", self.url, path);
208        let body = serialize::<T>(&body).to_lower_hex_string();
209
210        let response = self.client.post(url).body(body).send().await?;
211
212        if !response.status().is_success() {
213            return Err(Error::HttpResponse {
214                status: response.status().as_u16(),
215                message: response.text().await?,
216            });
217        }
218
219        Ok(())
220    }
221
222    /// Get a [`Transaction`] option given its [`Txid`]
223    pub async fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
224        self.get_opt_response(&format!("/tx/{txid}/raw")).await
225    }
226
227    /// Get a [`Transaction`] given its [`Txid`].
228    pub async fn get_tx_no_opt(&self, txid: &Txid) -> Result<Transaction, Error> {
229        match self.get_tx(txid).await {
230            Ok(Some(tx)) => Ok(tx),
231            Ok(None) => Err(Error::TransactionNotFound(*txid)),
232            Err(e) => Err(e),
233        }
234    }
235
236    /// Query the waterfalls endpoint with a descriptor
237    pub async fn waterfalls(&self, descriptor: &str) -> Result<WaterfallResponse, Error> {
238        let path = "/v4/waterfalls";
239        self.get_response_json_with_query(path, &[("descriptor", descriptor)])
240            .await
241    }
242
243    /// Query the waterfalls endpoint with addresses
244    pub async fn waterfalls_addresses(
245        &self,
246        addresses: &[Address],
247    ) -> Result<WaterfallResponse, Error> {
248        let addresses_str = addresses
249            .iter()
250            .map(|a| a.to_string())
251            .collect::<Vec<String>>()
252            .join(",");
253        let path = "/v4/waterfalls";
254        self.get_response_json_with_query(path, &[("addresses", &addresses_str)])
255            .await
256    }
257
258    /// Query waterfalls with version-specific parameters
259    pub async fn waterfalls_version(
260        &self,
261        descriptor: &str,
262        version: u8,
263        page: Option<u32>,
264        to_index: Option<u32>,
265        utxo_only: bool,
266    ) -> Result<WaterfallResponse, Error> {
267        let path = format!("/v{version}/waterfalls");
268        let mut query_params = vec![
269            ("descriptor", descriptor.to_string()),
270            ("utxo_only", utxo_only.to_string()),
271        ];
272
273        if let Some(page) = page {
274            query_params.push(("page", page.to_string()));
275        }
276        if let Some(to_index) = to_index {
277            query_params.push(("to_index", to_index.to_string()));
278        }
279
280        let query_refs: Vec<(&str, &str)> =
281            query_params.iter().map(|(k, v)| (*k, v.as_str())).collect();
282        self.get_response_json_with_query(&path, &query_refs).await
283    }
284
285    /// Get a [`BlockHeader`] given a particular block hash.
286    pub async fn get_header_by_hash(&self, block_hash: &BlockHash) -> Result<BlockHeader, Error> {
287        self.get_response_hex(&format!("/block/{block_hash}/header"))
288            .await
289    }
290
291    /// Get the server's public key for encryption
292    pub async fn server_recipient(&self) -> Result<String, Error> {
293        self.get_response_text("/v1/server_recipient").await
294    }
295
296    /// Get the server's address for message signing verification
297    pub async fn server_address(&self) -> Result<String, Error> {
298        self.get_response_text("/v1/server_address").await
299    }
300
301    /// Get time since last block with freshness indicator
302    pub async fn time_since_last_block(&self) -> Result<String, Error> {
303        self.get_response_text("/v1/time_since_last_block").await
304    }
305
306    /// Broadcast a [`Transaction`] to Waterfalls
307    pub async fn broadcast(&self, transaction: &Transaction) -> Result<(), Error> {
308        self.post_request_hex("/tx", transaction).await
309    }
310
311    /// Get the [`BlockHash`] of the current blockchain tip.
312    pub async fn get_tip_hash(&self) -> Result<BlockHash, Error> {
313        self.get_response_text("/blocks/tip/hash")
314            .await
315            .map(|block_hash| BlockHash::from_str(&block_hash).map_err(Error::HexToArray))?
316    }
317
318    /// Get the [`BlockHash`] of a specific block height
319    pub async fn get_block_hash(&self, block_height: u32) -> Result<BlockHash, Error> {
320        self.get_response_text(&format!("/block-height/{block_height}"))
321            .await
322            .map(|block_hash| BlockHash::from_str(&block_hash).map_err(Error::HexToArray))?
323    }
324
325    /// Get transaction history for the specified address in Esplora-compatible format
326    pub async fn get_address_txs(&self, address: &Address) -> Result<String, Error> {
327        let path = format!("/address/{address}/txs");
328        self.get_response_text(&path).await
329    }
330
331    /// Get the underlying base URL.
332    pub fn url(&self) -> &str {
333        &self.url
334    }
335
336    /// Get the underlying [`Client`].
337    pub fn client(&self) -> &Client {
338        &self.client
339    }
340
341    /// Sends a GET request to the given `url`, retrying failed attempts
342    /// for retryable error codes until max retries hit.
343    async fn get_with_retry(&self, url: &str) -> Result<Response, Error> {
344        let mut delay = BASE_BACKOFF_MILLIS;
345        let mut attempts = 0;
346
347        loop {
348            match self.client.get(url).send().await? {
349                resp if attempts < self.max_retries && is_status_retryable(resp.status()) => {
350                    S::sleep(delay).await;
351                    attempts += 1;
352                    delay *= 2;
353                }
354                resp => return Ok(resp),
355            }
356        }
357    }
358}
359
360fn is_status_retryable(status: reqwest::StatusCode) -> bool {
361    RETRYABLE_ERROR_CODES.contains(&status.as_u16())
362}
363
364pub trait Sleeper: 'static {
365    type Sleep: std::future::Future<Output = ()>;
366    fn sleep(dur: std::time::Duration) -> Self::Sleep;
367}
368
369#[derive(Debug, Clone, Copy)]
370pub struct DefaultSleeper;
371
372#[cfg(any(test, feature = "tokio"))]
373impl Sleeper for DefaultSleeper {
374    type Sleep = tokio::time::Sleep;
375
376    fn sleep(dur: std::time::Duration) -> Self::Sleep {
377        tokio::time::sleep(dur)
378    }
379}