ord/index/
fetcher.rs

1use {
2  super::*,
3  http_body_util::{BodyExt, Full},
4  hyper::{Method, Request, Uri, body::Bytes},
5  hyper_util::{
6    client::legacy::{Client, connect::HttpConnector},
7    rt::TokioExecutor,
8  },
9  serde_json::{Value, json},
10};
11
12pub(crate) struct Fetcher {
13  auth: String,
14  client: Client<HttpConnector, Full<Bytes>>,
15  url: Uri,
16}
17
18#[derive(Deserialize, Debug)]
19struct JsonResponse<T> {
20  error: Option<JsonError>,
21  id: usize,
22  result: Option<T>,
23}
24
25#[derive(Deserialize, Debug)]
26struct JsonError {
27  code: i32,
28  message: String,
29}
30
31impl Fetcher {
32  pub(crate) fn new(settings: &Settings) -> Result<Self> {
33    let client = Client::builder(TokioExecutor::new()).build_http();
34
35    let url = if settings.bitcoin_rpc_url(None).starts_with("http://") {
36      settings.bitcoin_rpc_url(None)
37    } else {
38      "http://".to_string() + &settings.bitcoin_rpc_url(None)
39    };
40
41    let url = Uri::try_from(&url).map_err(|e| anyhow!("Invalid rpc url {url}: {e}"))?;
42
43    let (user, password) = settings.bitcoin_credentials()?.get_user_pass()?;
44    let auth = format!("{}:{}", user.unwrap(), password.unwrap());
45    let auth = format!("Basic {}", &base64_encode(auth.as_bytes()));
46    Ok(Fetcher { client, url, auth })
47  }
48
49  pub(crate) async fn get_transactions(&self, txids: Vec<Txid>) -> Result<Vec<Transaction>> {
50    if txids.is_empty() {
51      return Ok(Vec::new());
52    }
53
54    let mut reqs = Vec::with_capacity(txids.len());
55    for (i, txid) in txids.iter().enumerate() {
56      let req = json!({
57        "jsonrpc": "2.0",
58        "id": i, // Use the index as id, so we can quickly sort the response
59        "method": "getrawtransaction",
60        "params": [ txid ]
61      });
62      reqs.push(req);
63    }
64
65    let body = Value::Array(reqs).to_string();
66
67    let mut results: Vec<JsonResponse<String>>;
68    let mut retries = 0;
69
70    loop {
71      results = match self.try_get_transactions(body.clone()).await {
72        Ok(results) => results,
73        Err(error) => {
74          if retries >= 5 {
75            return Err(anyhow!(
76              "failed to fetch raw transactions after 5 retries: {}",
77              error
78            ));
79          }
80
81          log::info!("failed to fetch raw transactions, retrying: {error}");
82
83          tokio::time::sleep(Duration::from_millis(100 * u64::pow(2, retries))).await;
84          retries += 1;
85          continue;
86        }
87      };
88      break;
89    }
90
91    // Return early on any error, because we need all results to proceed
92    if let Some(err) = results.iter().find_map(|res| res.error.as_ref()) {
93      return Err(anyhow!(
94        "failed to fetch raw transaction: code {} message {}",
95        err.code,
96        err.message
97      ));
98    }
99
100    // Results from batched JSON-RPC requests can come back in any order, so we must sort them by id
101    results.sort_by(|a, b| a.id.cmp(&b.id));
102
103    let txs = results
104      .into_iter()
105      .map(|res| {
106        res
107          .result
108          .ok_or_else(|| anyhow!("Missing result for batched JSON-RPC response"))
109          .and_then(|str| {
110            hex::decode(str)
111              .map_err(|e| anyhow!("Result for batched JSON-RPC response not valid hex: {e}"))
112          })
113          .and_then(|hex| {
114            consensus::deserialize(&hex).map_err(|e| {
115              anyhow!("Result for batched JSON-RPC response not valid bitcoin tx: {e}")
116            })
117          })
118      })
119      .collect::<Result<Vec<Transaction>>>()?;
120    Ok(txs)
121  }
122
123  async fn try_get_transactions(&self, body: String) -> Result<Vec<JsonResponse<String>>> {
124    let req = Request::builder()
125      .method(Method::POST)
126      .uri(&self.url)
127      .header(hyper::header::AUTHORIZATION, &self.auth)
128      .header(hyper::header::CONTENT_TYPE, "application/json")
129      .body(Full::new(Bytes::from(body)))?;
130
131    let response = self.client.request(req).await?;
132
133    let buf = response.into_body().collect().await?.to_bytes();
134
135    let results: Vec<JsonResponse<String>> = match serde_json::from_slice(&buf) {
136      Ok(results) => results,
137      Err(e) => {
138        return Err(anyhow!(
139          "failed to parse JSON-RPC response: {e}. response: {response}",
140          e = e,
141          response = String::from_utf8_lossy(&buf)
142        ));
143      }
144    };
145
146    Ok(results)
147  }
148}