blockchain_client/providers/
jsonrpc.rs1use async_trait::async_trait;
2use reqwest::Client;
3use serde_json::json;
4use std::sync::Arc;
5
6use crate::client::{JsonRpcRequest, JsonRpcResponse, RpcConfig};
7use crate::error::{Result, RpcError};
8use crate::traits::BlockchainClient;
9use crate::types::*;
10
11#[derive(Clone)]
12pub struct JsonRpcClient {
13 config: Arc<RpcConfig>,
14 http_client: Client,
15 chain: Chain,
16 network: Network,
17}
18
19impl JsonRpcClient {
20 pub fn new(config: RpcConfig, chain: Chain, network: Network) -> Result<Self> {
21 let http_client = Client::builder()
22 .timeout(config.timeout())
23 .pool_max_idle_per_host(config.pool_max_idle_per_host)
24 .pool_idle_timeout(config.pool_idle_timeout())
25 .tcp_keepalive(config.tcp_keepalive())
26 .build()?;
27
28 Ok(Self {
29 config: Arc::new(config),
30 http_client,
31 chain,
32 network,
33 })
34 }
35
36 pub fn chain(&self) -> Chain {
37 self.chain
38 }
39
40 pub fn network(&self) -> Network {
41 self.network
42 }
43
44 async fn call<T: serde::de::DeserializeOwned>(
45 &self,
46 method: &str,
47 params: serde_json::Value,
48 ) -> Result<T> {
49 let request = JsonRpcRequest::new(method.to_string(), params);
50
51 let response = self
52 .http_client
53 .post(&self.config.url)
54 .basic_auth(&self.config.username, Some(&self.config.password))
55 .json(&request)
56 .send()
57 .await?;
58
59 let status = response.status();
60
61 if !status.is_success() {
62 let body = response.text().await.unwrap_or_default();
63 return Err(RpcError::RequestFailed(format!(
64 "http {}: {}",
65 status, body
66 )));
67 }
68
69 let text = response.text().await?;
70 let rpc_response: JsonRpcResponse<T> = serde_json::from_str(&text)?;
71
72 if let Some(error) = rpc_response.error {
73 return Err(RpcError::RpcResponseError {
74 code: error.code,
75 message: error.message,
76 });
77 }
78
79 rpc_response.result.ok_or_else(|| RpcError::NoResult)
80 }
81}
82
83#[async_trait]
84impl BlockchainClient for JsonRpcClient {
85 async fn get_blockchain_info(&self) -> Result<BlockchainInfo> {
86 self.call("getblockchaininfo", json!([])).await
87 }
88
89 async fn get_raw_transaction_with_block(
90 &self,
91 txid: &str,
92 verbose: bool,
93 blockhash: Option<&str>,
94 ) -> Result<RawTransaction> {
95 let params = if let Some(hash) = blockhash {
96 json!([txid, if verbose { 1 } else { 0 }, hash])
97 } else {
98 json!([txid, if verbose { 1 } else { 0 }])
99 };
100
101 match self
102 .call::<RawTransaction>("getrawtransaction", params)
103 .await
104 {
105 Ok(tx) => Ok(tx),
106
107 Err(RpcError::RpcResponseError { code: -5, .. }) => {
108 let wallet_tx: serde_json::Value =
110 self.call("gettransaction", json!([txid])).await?;
111
112 if let Some(hex) = wallet_tx.get("hex").and_then(|h| h.as_str()) {
113 self.call("decoderawtransaction", json!([hex])).await
114 } else {
115 Err(RpcError::InvalidResponse(
116 "gettransaction response missing hex field".to_string(),
117 ))
118 }
119 }
120
121 Err(e) => Err(e),
122 }
123 }
124
125 async fn list_unspent(
126 &self,
127 min_conf: Option<u32>,
128 max_conf: Option<u32>,
129 addresses: Option<&[String]>,
130 ) -> Result<Vec<Utxo>> {
131 let params = json!([
132 min_conf.unwrap_or(1),
133 max_conf.unwrap_or(9_999_999),
134 addresses.unwrap_or(&[])
135 ]);
136
137 self.call("listunspent", params).await
138 }
139
140 async fn list_transactions(
141 &self,
142 label: Option<&str>,
143 count: Option<usize>,
144 skip: Option<usize>,
145 include_watchonly: bool,
146 ) -> Result<Vec<TransactionListItem>> {
147 let params = json!([
148 label.unwrap_or("*"),
149 count.unwrap_or(10),
150 skip.unwrap_or(0),
151 include_watchonly
152 ]);
153
154 self.call("listtransactions", params).await
155 }
156
157 async fn get_received_by_address(&self, address: &str, min_conf: Option<u32>) -> Result<f64> {
158 let params = json!([address, min_conf.unwrap_or(1)]);
159 self.call("getreceivedbyaddress", params).await
160 }
161
162 async fn list_received_by_address(
163 &self,
164 min_conf: Option<u32>,
165 include_empty: bool,
166 include_watchonly: bool,
167 ) -> Result<Vec<ReceivedByAddress>> {
168 let params = json!([min_conf.unwrap_or(1), include_empty, include_watchonly]);
169 self.call("listreceivedbyaddress", params).await
170 }
171
172 async fn is_address_watched(&self, address: &str) -> Result<bool> {
173 let validation = self.validate_address(address).await?;
174
175 if !validation.isvalid {
176 return Ok(false);
177 }
178
179 if validation.ismine == Some(true) || validation.iswatchonly == Some(true) {
180 return Ok(true);
181 }
182
183 let received = self.list_received_by_address(Some(0), true, true).await?;
184
185 Ok(received.iter().any(|r| r.address == address))
186 }
187
188 async fn import_address(&self, address: &str, label: Option<&str>, rescan: bool) -> Result<()> {
189 let params = json!([address, label.unwrap_or(""), rescan]);
190 match self
191 .call::<()>("importaddress", params)
192 .await
193 {
194 Ok(_) | Err(RpcError::NoResult) => Ok(()),
195 Err(e) => Err(e),
196 }
197 }
198
199 async fn validate_address(&self, address: &str) -> Result<AddressValidation> {
200 self.call("validateaddress", json!([address])).await
201 }
202
203 async fn get_transaction(&self, txid: &str) -> Result<serde_json::Value> {
204 self.call("gettransaction", json!([txid])).await
205 }
206
207 async fn get_block_count(&self) -> Result<u64> {
208 self.call("getblockcount", json!([])).await
209 }
210
211 async fn get_best_block_hash(&self) -> Result<String> {
212 self.call("getbestblockhash", json!([])).await
213 }
214}