1use {
2 super::*,
3 http_body_util::{BodyExt, Full},
4 hyper::{Method, Request, Uri, body::Bytes},
5 hyper_util::{
6 client::legacy::{Client, connect::HttpConnector},
7 rt::TokioExecutor,
8 },
9 serde_json::{Value, json},
10};
11
12pub(crate) struct Fetcher {
13 auth: String,
14 client: Client<HttpConnector, Full<Bytes>>,
15 url: Uri,
16}
17
18#[derive(Deserialize, Debug)]
19struct JsonResponse<T> {
20 error: Option<JsonError>,
21 id: usize,
22 result: Option<T>,
23}
24
25#[derive(Deserialize, Debug)]
26struct JsonError {
27 code: i32,
28 message: String,
29}
30
31impl Fetcher {
32 pub(crate) fn new(settings: &Settings) -> Result<Self> {
33 let client = Client::builder(TokioExecutor::new()).build_http();
34
35 let url = if settings.bitcoin_rpc_url(None).starts_with("http://") {
36 settings.bitcoin_rpc_url(None)
37 } else {
38 "http://".to_string() + &settings.bitcoin_rpc_url(None)
39 };
40
41 let url = Uri::try_from(&url).map_err(|e| anyhow!("Invalid rpc url {url}: {e}"))?;
42
43 let (user, password) = settings.bitcoin_credentials()?.get_user_pass()?;
44 let auth = format!("{}:{}", user.unwrap(), password.unwrap());
45 let auth = format!("Basic {}", &base64_encode(auth.as_bytes()));
46 Ok(Fetcher { client, url, auth })
47 }
48
49 pub(crate) async fn get_transactions(&self, txids: Vec<Txid>) -> Result<Vec<Transaction>> {
50 if txids.is_empty() {
51 return Ok(Vec::new());
52 }
53
54 let mut reqs = Vec::with_capacity(txids.len());
55 for (i, txid) in txids.iter().enumerate() {
56 let req = json!({
57 "jsonrpc": "2.0",
58 "id": i, "method": "getrawtransaction",
60 "params": [ txid ]
61 });
62 reqs.push(req);
63 }
64
65 let body = Value::Array(reqs).to_string();
66
67 let mut results: Vec<JsonResponse<String>>;
68 let mut retries = 0;
69
70 loop {
71 results = match self.try_get_transactions(body.clone()).await {
72 Ok(results) => results,
73 Err(error) => {
74 if retries >= 5 {
75 return Err(anyhow!(
76 "failed to fetch raw transactions after 5 retries: {}",
77 error
78 ));
79 }
80
81 log::info!("failed to fetch raw transactions, retrying: {error}");
82
83 tokio::time::sleep(Duration::from_millis(100 * u64::pow(2, retries))).await;
84 retries += 1;
85 continue;
86 }
87 };
88 break;
89 }
90
91 if let Some(err) = results.iter().find_map(|res| res.error.as_ref()) {
93 return Err(anyhow!(
94 "failed to fetch raw transaction: code {} message {}",
95 err.code,
96 err.message
97 ));
98 }
99
100 results.sort_by(|a, b| a.id.cmp(&b.id));
102
103 let txs = results
104 .into_iter()
105 .map(|res| {
106 res
107 .result
108 .ok_or_else(|| anyhow!("Missing result for batched JSON-RPC response"))
109 .and_then(|str| {
110 hex::decode(str)
111 .map_err(|e| anyhow!("Result for batched JSON-RPC response not valid hex: {e}"))
112 })
113 .and_then(|hex| {
114 consensus::deserialize(&hex).map_err(|e| {
115 anyhow!("Result for batched JSON-RPC response not valid bitcoin tx: {e}")
116 })
117 })
118 })
119 .collect::<Result<Vec<Transaction>>>()?;
120 Ok(txs)
121 }
122
123 async fn try_get_transactions(&self, body: String) -> Result<Vec<JsonResponse<String>>> {
124 let req = Request::builder()
125 .method(Method::POST)
126 .uri(&self.url)
127 .header(hyper::header::AUTHORIZATION, &self.auth)
128 .header(hyper::header::CONTENT_TYPE, "application/json")
129 .body(Full::new(Bytes::from(body)))?;
130
131 let response = self.client.request(req).await?;
132
133 let buf = response.into_body().collect().await?.to_bytes();
134
135 let results: Vec<JsonResponse<String>> = match serde_json::from_slice(&buf) {
136 Ok(results) => results,
137 Err(e) => {
138 return Err(anyhow!(
139 "failed to parse JSON-RPC response: {e}. response: {response}",
140 e = e,
141 response = String::from_utf8_lossy(&buf)
142 ));
143 }
144 };
145
146 Ok(results)
147 }
148}