1use std::marker::PhantomData;
15use std::str::FromStr;
16
17use bitcoin::consensus::{deserialize, serialize, Decodable, Encodable};
18use bitcoin::hex::{DisplayHex, FromHex};
19use bitcoin::Address;
20use bitcoin::{block::Header as BlockHeader, BlockHash, Transaction, Txid};
21
22#[allow(unused_imports)]
23use log::{debug, error, info, trace};
24
25use reqwest::{header, Client, Response};
26
27use crate::{Builder, Error, WaterfallResponse, BASE_BACKOFF_MILLIS, RETRYABLE_ERROR_CODES};
28
29#[derive(Debug, Clone)]
30pub struct AsyncClient<S = DefaultSleeper> {
31 url: String,
33 client: Client,
35 max_retries: usize,
37
38 marker: PhantomData<S>,
40}
41
42impl<S: Sleeper> AsyncClient<S> {
43 pub fn from_builder(builder: Builder) -> Result<Self, Error> {
45 let mut client_builder = Client::builder();
46
47 #[cfg(not(target_arch = "wasm32"))]
48 if let Some(proxy) = &builder.proxy {
49 client_builder = client_builder.proxy(reqwest::Proxy::all(proxy)?);
50 }
51
52 #[cfg(not(target_arch = "wasm32"))]
53 if let Some(timeout) = builder.timeout {
54 client_builder = client_builder.timeout(core::time::Duration::from_secs(timeout));
55 }
56
57 if !builder.headers.is_empty() {
58 let mut headers = header::HeaderMap::new();
59 for (k, v) in &builder.headers {
60 let header_name = header::HeaderName::from_lowercase(k.to_lowercase().as_bytes())
61 .map_err(|_| Error::InvalidHttpHeaderName(k.clone()))?;
62 let header_value = header::HeaderValue::from_str(v)
63 .map_err(|_| Error::InvalidHttpHeaderValue(v.clone()))?;
64 headers.insert(header_name, header_value);
65 }
66 client_builder = client_builder.default_headers(headers);
67 }
68
69 Ok(AsyncClient {
70 url: builder.base_url,
71 client: client_builder.build()?,
72 max_retries: builder.max_retries,
73 marker: PhantomData,
74 })
75 }
76
77 pub fn from_client(url: String, client: Client) -> Self {
78 AsyncClient {
79 url,
80 client,
81 max_retries: crate::DEFAULT_MAX_RETRIES,
82 marker: PhantomData,
83 }
84 }
85
86 async fn get_response<T: Decodable>(&self, path: &str) -> Result<T, Error> {
98 let url = format!("{}{}", self.url, path);
99 let response = self.get_with_retry(&url).await?;
100
101 if !response.status().is_success() {
102 return Err(Error::HttpResponse {
103 status: response.status().as_u16(),
104 message: response.text().await?,
105 });
106 }
107
108 Ok(deserialize::<T>(&response.bytes().await?)?)
109 }
110
111 async fn get_opt_response<T: Decodable>(&self, path: &str) -> Result<Option<T>, Error> {
117 match self.get_response::<T>(path).await {
118 Ok(res) => Ok(Some(res)),
119 Err(Error::HttpResponse { status: 404, .. }) => Ok(None),
120 Err(e) => Err(e),
121 }
122 }
123
124 async fn get_response_json_with_query<T: serde::de::DeserializeOwned>(
127 &self,
128 path: &str,
129 query_params: &[(&str, &str)],
130 ) -> Result<T, Error> {
131 let url = format!("{}{}", self.url, path);
132 let mut request = self.client.get(&url);
133 for (key, value) in query_params {
134 request = request.query(&[(key, value)]);
135 }
136 let response = request.send().await?;
137
138 if !response.status().is_success() {
139 return Err(Error::HttpResponse {
140 status: response.status().as_u16(),
141 message: response.text().await?,
142 });
143 }
144
145 response.json::<T>().await.map_err(Error::Reqwest)
146 }
147
148 async fn get_response_hex<T: Decodable>(&self, path: &str) -> Result<T, Error> {
160 let url = format!("{}{}", self.url, path);
161 let response = self.get_with_retry(&url).await?;
162
163 if !response.status().is_success() {
164 return Err(Error::HttpResponse {
165 status: response.status().as_u16(),
166 message: response.text().await?,
167 });
168 }
169
170 let hex_str = response.text().await?;
171 Ok(deserialize(&Vec::from_hex(&hex_str)?)?)
172 }
173
174 async fn get_response_text(&self, path: &str) -> Result<String, Error> {
183 let url = format!("{}{}", self.url, path);
184 let response = self.get_with_retry(&url).await?;
185
186 if !response.status().is_success() {
187 return Err(Error::HttpResponse {
188 status: response.status().as_u16(),
189 message: response.text().await?,
190 });
191 }
192
193 Ok(response.text().await?)
194 }
195
196 async fn post_request_hex<T: Encodable>(&self, path: &str, body: T) -> Result<(), Error> {
207 let url = format!("{}{}", self.url, path);
208 let body = serialize::<T>(&body).to_lower_hex_string();
209
210 let response = self.client.post(url).body(body).send().await?;
211
212 if !response.status().is_success() {
213 return Err(Error::HttpResponse {
214 status: response.status().as_u16(),
215 message: response.text().await?,
216 });
217 }
218
219 Ok(())
220 }
221
222 pub async fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
224 self.get_opt_response(&format!("/tx/{txid}/raw")).await
225 }
226
227 pub async fn get_tx_no_opt(&self, txid: &Txid) -> Result<Transaction, Error> {
229 match self.get_tx(txid).await {
230 Ok(Some(tx)) => Ok(tx),
231 Ok(None) => Err(Error::TransactionNotFound(*txid)),
232 Err(e) => Err(e),
233 }
234 }
235
236 pub async fn waterfalls(&self, descriptor: &str) -> Result<WaterfallResponse, Error> {
238 let path = "/v4/waterfalls";
239 self.get_response_json_with_query(path, &[("descriptor", descriptor)])
240 .await
241 }
242
243 pub async fn waterfalls_addresses(
245 &self,
246 addresses: &[Address],
247 ) -> Result<WaterfallResponse, Error> {
248 let addresses_str = addresses
249 .iter()
250 .map(|a| a.to_string())
251 .collect::<Vec<String>>()
252 .join(",");
253 let path = "/v4/waterfalls";
254 self.get_response_json_with_query(path, &[("addresses", &addresses_str)])
255 .await
256 }
257
258 pub async fn waterfalls_version(
260 &self,
261 descriptor: &str,
262 version: u8,
263 page: Option<u32>,
264 to_index: Option<u32>,
265 utxo_only: bool,
266 ) -> Result<WaterfallResponse, Error> {
267 let path = format!("/v{version}/waterfalls");
268 let mut query_params = vec![
269 ("descriptor", descriptor.to_string()),
270 ("utxo_only", utxo_only.to_string()),
271 ];
272
273 if let Some(page) = page {
274 query_params.push(("page", page.to_string()));
275 }
276 if let Some(to_index) = to_index {
277 query_params.push(("to_index", to_index.to_string()));
278 }
279
280 let query_refs: Vec<(&str, &str)> =
281 query_params.iter().map(|(k, v)| (*k, v.as_str())).collect();
282 self.get_response_json_with_query(&path, &query_refs).await
283 }
284
285 pub async fn get_header_by_hash(&self, block_hash: &BlockHash) -> Result<BlockHeader, Error> {
287 self.get_response_hex(&format!("/block/{block_hash}/header"))
288 .await
289 }
290
291 pub async fn server_recipient(&self) -> Result<String, Error> {
293 self.get_response_text("/v1/server_recipient").await
294 }
295
296 pub async fn server_address(&self) -> Result<String, Error> {
298 self.get_response_text("/v1/server_address").await
299 }
300
301 pub async fn time_since_last_block(&self) -> Result<String, Error> {
303 self.get_response_text("/v1/time_since_last_block").await
304 }
305
306 pub async fn broadcast(&self, transaction: &Transaction) -> Result<(), Error> {
308 self.post_request_hex("/tx", transaction).await
309 }
310
311 pub async fn get_tip_hash(&self) -> Result<BlockHash, Error> {
313 self.get_response_text("/blocks/tip/hash")
314 .await
315 .map(|block_hash| BlockHash::from_str(&block_hash).map_err(Error::HexToArray))?
316 }
317
318 pub async fn get_block_hash(&self, block_height: u32) -> Result<BlockHash, Error> {
320 self.get_response_text(&format!("/block-height/{block_height}"))
321 .await
322 .map(|block_hash| BlockHash::from_str(&block_hash).map_err(Error::HexToArray))?
323 }
324
325 pub async fn get_address_txs(&self, address: &Address) -> Result<String, Error> {
327 let path = format!("/address/{address}/txs");
328 self.get_response_text(&path).await
329 }
330
331 pub fn url(&self) -> &str {
333 &self.url
334 }
335
336 pub fn client(&self) -> &Client {
338 &self.client
339 }
340
341 async fn get_with_retry(&self, url: &str) -> Result<Response, Error> {
344 let mut delay = BASE_BACKOFF_MILLIS;
345 let mut attempts = 0;
346
347 loop {
348 match self.client.get(url).send().await? {
349 resp if attempts < self.max_retries && is_status_retryable(resp.status()) => {
350 S::sleep(delay).await;
351 attempts += 1;
352 delay *= 2;
353 }
354 resp => return Ok(resp),
355 }
356 }
357 }
358}
359
360fn is_status_retryable(status: reqwest::StatusCode) -> bool {
361 RETRYABLE_ERROR_CODES.contains(&status.as_u16())
362}
363
364pub trait Sleeper: 'static {
365 type Sleep: std::future::Future<Output = ()>;
366 fn sleep(dur: std::time::Duration) -> Self::Sleep;
367}
368
369#[derive(Debug, Clone, Copy)]
370pub struct DefaultSleeper;
371
372#[cfg(any(test, feature = "tokio"))]
373impl Sleeper for DefaultSleeper {
374 type Sleep = tokio::time::Sleep;
375
376 fn sleep(dur: std::time::Duration) -> Self::Sleep {
377 tokio::time::sleep(dur)
378 }
379}