1use crate::gossip::UtxoSource;
5use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse};
6use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource};
7
8use bitcoin::hash_types::BlockHash;
9use bitcoin::OutPoint;
10
11use std::sync::Mutex;
12
13use serde_json;
14
15use std::convert::TryFrom;
16use std::convert::TryInto;
17use std::error::Error;
18use std::fmt;
19use std::sync::atomic::{AtomicUsize, Ordering};
20
21#[derive(Debug)]
23pub struct RpcError {
24 pub code: i64,
26 pub message: String,
28}
29
30impl fmt::Display for RpcError {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 write!(f, "RPC error {}: {}", self.code, self.message)
33 }
34}
35
36impl Error for RpcError {}
37
38pub struct RpcClient {
43 basic_auth: String,
44 endpoint: HttpEndpoint,
45 client: Mutex<Option<HttpClient>>,
46 id: AtomicUsize,
47}
48
49impl RpcClient {
50 pub fn new(credentials: &str, endpoint: HttpEndpoint) -> Self {
54 Self {
55 basic_auth: "Basic ".to_string() + credentials,
56 endpoint,
57 client: Mutex::new(None),
58 id: AtomicUsize::new(0),
59 }
60 }
61
62 pub async fn call_method<T>(
67 &self, method: &str, params: &[serde_json::Value],
68 ) -> std::io::Result<T>
69 where
70 JsonResponse: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error>,
71 {
72 let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
73 let uri = self.endpoint.path();
74 let content = serde_json::json!({
75 "method": method,
76 "params": params,
77 "id": &self.id.fetch_add(1, Ordering::AcqRel).to_string()
78 });
79
80 let reserved_client = self.client.lock().unwrap().take();
81 let mut client = if let Some(client) = reserved_client {
82 client
83 } else {
84 HttpClient::connect(&self.endpoint)?
85 };
86 let http_response =
87 client.post::<JsonResponse>(&uri, &host, &self.basic_auth, content).await;
88 *self.client.lock().unwrap() = Some(client);
89
90 let mut response = match http_response {
91 Ok(JsonResponse(response)) => response,
92 Err(e) if e.kind() == std::io::ErrorKind::Other => {
93 match e.get_ref().unwrap().downcast_ref::<HttpError>() {
94 Some(http_error) => match JsonResponse::try_from(http_error.contents.clone()) {
95 Ok(JsonResponse(response)) => response,
96 Err(_) => Err(e)?,
97 },
98 None => Err(e)?,
99 }
100 },
101 Err(e) => Err(e)?,
102 };
103
104 if !response.is_object() {
105 return Err(std::io::Error::new(
106 std::io::ErrorKind::InvalidData,
107 "expected JSON object",
108 ));
109 }
110
111 let error = &response["error"];
112 if !error.is_null() {
113 let rpc_error = RpcError {
115 code: error["code"].as_i64().unwrap_or(-1),
116 message: error["message"].as_str().unwrap_or("unknown error").to_string(),
117 };
118 return Err(std::io::Error::new(std::io::ErrorKind::Other, rpc_error));
119 }
120
121 let result = match response.get_mut("result") {
122 Some(result) => result.take(),
123 None => {
124 return Err(std::io::Error::new(
125 std::io::ErrorKind::InvalidData,
126 "expected JSON result",
127 ))
128 },
129 };
130
131 JsonResponse(result).try_into()
132 }
133}
134
135impl BlockSource for RpcClient {
136 fn get_header<'a>(
137 &'a self, header_hash: &'a BlockHash, _height: Option<u32>,
138 ) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
139 Box::pin(async move {
140 let header_hash = serde_json::json!(header_hash.to_string());
141 Ok(self.call_method("getblockheader", &[header_hash]).await?)
142 })
143 }
144
145 fn get_block<'a>(
146 &'a self, header_hash: &'a BlockHash,
147 ) -> AsyncBlockSourceResult<'a, BlockData> {
148 Box::pin(async move {
149 let header_hash = serde_json::json!(header_hash.to_string());
150 let verbosity = serde_json::json!(0);
151 Ok(BlockData::FullBlock(self.call_method("getblock", &[header_hash, verbosity]).await?))
152 })
153 }
154
155 fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
156 Box::pin(async move { Ok(self.call_method("getblockchaininfo", &[]).await?) })
157 }
158}
159
160impl UtxoSource for RpcClient {
161 fn get_block_hash_by_height<'a>(
162 &'a self, block_height: u32,
163 ) -> AsyncBlockSourceResult<'a, BlockHash> {
164 Box::pin(async move {
165 let height_param = serde_json::json!(block_height);
166 Ok(self.call_method("getblockhash", &[height_param]).await?)
167 })
168 }
169
170 fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool> {
171 Box::pin(async move {
172 let txid_param = serde_json::json!(outpoint.txid.to_string());
173 let vout_param = serde_json::json!(outpoint.vout);
174 let include_mempool = serde_json::json!(false);
175 let utxo_opt: serde_json::Value =
176 self.call_method("gettxout", &[txid_param, vout_param, include_mempool]).await?;
177 Ok(!utxo_opt.is_null())
178 })
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use crate::http::client_tests::{HttpServer, MessageBody};
186
187 use bitcoin::hashes::Hash;
188
189 const CREDENTIALS: &'static str = "dXNlcjpwYXNzd29yZA==";
191
192 impl TryInto<u64> for JsonResponse {
194 type Error = std::io::Error;
195
196 fn try_into(self) -> std::io::Result<u64> {
197 match self.0.as_u64() {
198 None => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "not a number")),
199 Some(n) => Ok(n),
200 }
201 }
202 }
203
204 #[tokio::test]
205 async fn call_method_returning_unknown_response() {
206 let server = HttpServer::responding_with_not_found();
207 let client = RpcClient::new(CREDENTIALS, server.endpoint());
208
209 match client.call_method::<u64>("getblockcount", &[]).await {
210 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
211 Ok(_) => panic!("Expected error"),
212 }
213 }
214
215 #[tokio::test]
216 async fn call_method_returning_malfomred_response() {
217 let response = serde_json::json!("foo");
218 let server = HttpServer::responding_with_ok(MessageBody::Content(response));
219 let client = RpcClient::new(CREDENTIALS, server.endpoint());
220
221 match client.call_method::<u64>("getblockcount", &[]).await {
222 Err(e) => {
223 assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
224 assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object");
225 },
226 Ok(_) => panic!("Expected error"),
227 }
228 }
229
230 #[tokio::test]
231 async fn call_method_returning_error() {
232 let response = serde_json::json!({
233 "error": { "code": -8, "message": "invalid parameter" },
234 });
235 let server = HttpServer::responding_with_server_error(response);
236 let client = RpcClient::new(CREDENTIALS, server.endpoint());
237
238 let invalid_block_hash = serde_json::json!("foo");
239 match client.call_method::<u64>("getblock", &[invalid_block_hash]).await {
240 Err(e) => {
241 assert_eq!(e.kind(), std::io::ErrorKind::Other);
242 let rpc_error: Box<RpcError> = e.into_inner().unwrap().downcast().unwrap();
243 assert_eq!(rpc_error.code, -8);
244 assert_eq!(rpc_error.message, "invalid parameter");
245 },
246 Ok(_) => panic!("Expected error"),
247 }
248 }
249
250 #[tokio::test]
251 async fn call_method_returning_missing_result() {
252 let response = serde_json::json!({});
253 let server = HttpServer::responding_with_ok(MessageBody::Content(response));
254 let client = RpcClient::new(CREDENTIALS, server.endpoint());
255
256 match client.call_method::<u64>("getblockcount", &[]).await {
257 Err(e) => {
258 assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
259 assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON result");
260 },
261 Ok(_) => panic!("Expected error"),
262 }
263 }
264
265 #[tokio::test]
266 async fn call_method_returning_malformed_result() {
267 let response = serde_json::json!({ "result": "foo" });
268 let server = HttpServer::responding_with_ok(MessageBody::Content(response));
269 let client = RpcClient::new(CREDENTIALS, server.endpoint());
270
271 match client.call_method::<u64>("getblockcount", &[]).await {
272 Err(e) => {
273 assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
274 assert_eq!(e.get_ref().unwrap().to_string(), "not a number");
275 },
276 Ok(_) => panic!("Expected error"),
277 }
278 }
279
280 #[tokio::test]
281 async fn call_method_returning_valid_result() {
282 let response = serde_json::json!({ "result": 654470 });
283 let server = HttpServer::responding_with_ok(MessageBody::Content(response));
284 let client = RpcClient::new(CREDENTIALS, server.endpoint());
285
286 match client.call_method::<u64>("getblockcount", &[]).await {
287 Err(e) => panic!("Unexpected error: {:?}", e),
288 Ok(count) => assert_eq!(count, 654470),
289 }
290 }
291
292 #[tokio::test]
293 async fn fails_to_fetch_spent_utxo() {
294 let response = serde_json::json!({ "result": null });
295 let server = HttpServer::responding_with_ok(MessageBody::Content(response));
296 let client = RpcClient::new(CREDENTIALS, server.endpoint());
297 let outpoint = OutPoint::new(bitcoin::Txid::from_byte_array([0; 32]), 0);
298 let unspent_output = client.is_output_unspent(outpoint).await.unwrap();
299 assert_eq!(unspent_output, false);
300 }
301
302 #[tokio::test]
303 async fn fetches_utxo() {
304 let response = serde_json::json!({ "result": {"bestblock": 1, "confirmations": 42}});
305 let server = HttpServer::responding_with_ok(MessageBody::Content(response));
306 let client = RpcClient::new(CREDENTIALS, server.endpoint());
307 let outpoint = OutPoint::new(bitcoin::Txid::from_byte_array([0; 32]), 0);
308 let unspent_output = client.is_output_unspent(outpoint).await.unwrap();
309 assert_eq!(unspent_output, true);
310 }
311}