lightning_block_sync/
rpc.rs

1//! Simple RPC client implementation which implements [`BlockSource`] against a Bitcoin Core RPC
2//! endpoint.
3
4use crate::gossip::UtxoSource;
5use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse};
6use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource};
7
8use bitcoin::hash_types::BlockHash;
9use bitcoin::OutPoint;
10
11use std::sync::Mutex;
12
13use serde_json;
14
15use std::convert::TryFrom;
16use std::convert::TryInto;
17use std::error::Error;
18use std::fmt;
19use std::sync::atomic::{AtomicUsize, Ordering};
20
21/// An error returned by the RPC server.
22#[derive(Debug)]
23pub struct RpcError {
24	/// The error code.
25	pub code: i64,
26	/// The error message.
27	pub message: String,
28}
29
30impl fmt::Display for RpcError {
31	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32		write!(f, "RPC error {}: {}", self.code, self.message)
33	}
34}
35
36impl Error for RpcError {}
37
38/// A simple RPC client for calling methods using HTTP `POST`.
39///
40/// Implements [`BlockSource`] and may return an `Err` containing [`RpcError`]. See
41/// [`RpcClient::call_method`] for details.
42pub struct RpcClient {
43	basic_auth: String,
44	endpoint: HttpEndpoint,
45	client: Mutex<Option<HttpClient>>,
46	id: AtomicUsize,
47}
48
49impl RpcClient {
50	/// Creates a new RPC client connected to the given endpoint with the provided credentials. The
51	/// credentials should be a base64 encoding of a user name and password joined by a colon, as is
52	/// required for HTTP basic access authentication.
53	pub fn new(credentials: &str, endpoint: HttpEndpoint) -> Self {
54		Self {
55			basic_auth: "Basic ".to_string() + credentials,
56			endpoint,
57			client: Mutex::new(None),
58			id: AtomicUsize::new(0),
59		}
60	}
61
62	/// Calls a method with the response encoded in JSON format and interpreted as type `T`.
63	///
64	/// When an `Err` is returned, [`std::io::Error::into_inner`] may contain an [`RpcError`] if
65	/// [`std::io::Error::kind`] is [`std::io::ErrorKind::Other`].
66	pub async fn call_method<T>(
67		&self, method: &str, params: &[serde_json::Value],
68	) -> std::io::Result<T>
69	where
70		JsonResponse: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error>,
71	{
72		let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
73		let uri = self.endpoint.path();
74		let content = serde_json::json!({
75			"method": method,
76			"params": params,
77			"id": &self.id.fetch_add(1, Ordering::AcqRel).to_string()
78		});
79
80		let reserved_client = self.client.lock().unwrap().take();
81		let mut client = if let Some(client) = reserved_client {
82			client
83		} else {
84			HttpClient::connect(&self.endpoint)?
85		};
86		let http_response =
87			client.post::<JsonResponse>(&uri, &host, &self.basic_auth, content).await;
88		*self.client.lock().unwrap() = Some(client);
89
90		let mut response = match http_response {
91			Ok(JsonResponse(response)) => response,
92			Err(e) if e.kind() == std::io::ErrorKind::Other => {
93				match e.get_ref().unwrap().downcast_ref::<HttpError>() {
94					Some(http_error) => match JsonResponse::try_from(http_error.contents.clone()) {
95						Ok(JsonResponse(response)) => response,
96						Err(_) => Err(e)?,
97					},
98					None => Err(e)?,
99				}
100			},
101			Err(e) => Err(e)?,
102		};
103
104		if !response.is_object() {
105			return Err(std::io::Error::new(
106				std::io::ErrorKind::InvalidData,
107				"expected JSON object",
108			));
109		}
110
111		let error = &response["error"];
112		if !error.is_null() {
113			// TODO: Examine error code for a more precise std::io::ErrorKind.
114			let rpc_error = RpcError {
115				code: error["code"].as_i64().unwrap_or(-1),
116				message: error["message"].as_str().unwrap_or("unknown error").to_string(),
117			};
118			return Err(std::io::Error::new(std::io::ErrorKind::Other, rpc_error));
119		}
120
121		let result = match response.get_mut("result") {
122			Some(result) => result.take(),
123			None => {
124				return Err(std::io::Error::new(
125					std::io::ErrorKind::InvalidData,
126					"expected JSON result",
127				))
128			},
129		};
130
131		JsonResponse(result).try_into()
132	}
133}
134
135impl BlockSource for RpcClient {
136	fn get_header<'a>(
137		&'a self, header_hash: &'a BlockHash, _height: Option<u32>,
138	) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
139		Box::pin(async move {
140			let header_hash = serde_json::json!(header_hash.to_string());
141			Ok(self.call_method("getblockheader", &[header_hash]).await?)
142		})
143	}
144
145	fn get_block<'a>(
146		&'a self, header_hash: &'a BlockHash,
147	) -> AsyncBlockSourceResult<'a, BlockData> {
148		Box::pin(async move {
149			let header_hash = serde_json::json!(header_hash.to_string());
150			let verbosity = serde_json::json!(0);
151			Ok(BlockData::FullBlock(self.call_method("getblock", &[header_hash, verbosity]).await?))
152		})
153	}
154
155	fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
156		Box::pin(async move { Ok(self.call_method("getblockchaininfo", &[]).await?) })
157	}
158}
159
160impl UtxoSource for RpcClient {
161	fn get_block_hash_by_height<'a>(
162		&'a self, block_height: u32,
163	) -> AsyncBlockSourceResult<'a, BlockHash> {
164		Box::pin(async move {
165			let height_param = serde_json::json!(block_height);
166			Ok(self.call_method("getblockhash", &[height_param]).await?)
167		})
168	}
169
170	fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool> {
171		Box::pin(async move {
172			let txid_param = serde_json::json!(outpoint.txid.to_string());
173			let vout_param = serde_json::json!(outpoint.vout);
174			let include_mempool = serde_json::json!(false);
175			let utxo_opt: serde_json::Value =
176				self.call_method("gettxout", &[txid_param, vout_param, include_mempool]).await?;
177			Ok(!utxo_opt.is_null())
178		})
179	}
180}
181
182#[cfg(test)]
183mod tests {
184	use super::*;
185	use crate::http::client_tests::{HttpServer, MessageBody};
186
187	use bitcoin::hashes::Hash;
188
189	/// Credentials encoded in base64.
190	const CREDENTIALS: &'static str = "dXNlcjpwYXNzd29yZA==";
191
192	/// Converts a JSON value into `u64`.
193	impl TryInto<u64> for JsonResponse {
194		type Error = std::io::Error;
195
196		fn try_into(self) -> std::io::Result<u64> {
197			match self.0.as_u64() {
198				None => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "not a number")),
199				Some(n) => Ok(n),
200			}
201		}
202	}
203
204	#[tokio::test]
205	async fn call_method_returning_unknown_response() {
206		let server = HttpServer::responding_with_not_found();
207		let client = RpcClient::new(CREDENTIALS, server.endpoint());
208
209		match client.call_method::<u64>("getblockcount", &[]).await {
210			Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
211			Ok(_) => panic!("Expected error"),
212		}
213	}
214
215	#[tokio::test]
216	async fn call_method_returning_malfomred_response() {
217		let response = serde_json::json!("foo");
218		let server = HttpServer::responding_with_ok(MessageBody::Content(response));
219		let client = RpcClient::new(CREDENTIALS, server.endpoint());
220
221		match client.call_method::<u64>("getblockcount", &[]).await {
222			Err(e) => {
223				assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
224				assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object");
225			},
226			Ok(_) => panic!("Expected error"),
227		}
228	}
229
230	#[tokio::test]
231	async fn call_method_returning_error() {
232		let response = serde_json::json!({
233			"error": { "code": -8, "message": "invalid parameter" },
234		});
235		let server = HttpServer::responding_with_server_error(response);
236		let client = RpcClient::new(CREDENTIALS, server.endpoint());
237
238		let invalid_block_hash = serde_json::json!("foo");
239		match client.call_method::<u64>("getblock", &[invalid_block_hash]).await {
240			Err(e) => {
241				assert_eq!(e.kind(), std::io::ErrorKind::Other);
242				let rpc_error: Box<RpcError> = e.into_inner().unwrap().downcast().unwrap();
243				assert_eq!(rpc_error.code, -8);
244				assert_eq!(rpc_error.message, "invalid parameter");
245			},
246			Ok(_) => panic!("Expected error"),
247		}
248	}
249
250	#[tokio::test]
251	async fn call_method_returning_missing_result() {
252		let response = serde_json::json!({});
253		let server = HttpServer::responding_with_ok(MessageBody::Content(response));
254		let client = RpcClient::new(CREDENTIALS, server.endpoint());
255
256		match client.call_method::<u64>("getblockcount", &[]).await {
257			Err(e) => {
258				assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
259				assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON result");
260			},
261			Ok(_) => panic!("Expected error"),
262		}
263	}
264
265	#[tokio::test]
266	async fn call_method_returning_malformed_result() {
267		let response = serde_json::json!({ "result": "foo" });
268		let server = HttpServer::responding_with_ok(MessageBody::Content(response));
269		let client = RpcClient::new(CREDENTIALS, server.endpoint());
270
271		match client.call_method::<u64>("getblockcount", &[]).await {
272			Err(e) => {
273				assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
274				assert_eq!(e.get_ref().unwrap().to_string(), "not a number");
275			},
276			Ok(_) => panic!("Expected error"),
277		}
278	}
279
280	#[tokio::test]
281	async fn call_method_returning_valid_result() {
282		let response = serde_json::json!({ "result": 654470 });
283		let server = HttpServer::responding_with_ok(MessageBody::Content(response));
284		let client = RpcClient::new(CREDENTIALS, server.endpoint());
285
286		match client.call_method::<u64>("getblockcount", &[]).await {
287			Err(e) => panic!("Unexpected error: {:?}", e),
288			Ok(count) => assert_eq!(count, 654470),
289		}
290	}
291
292	#[tokio::test]
293	async fn fails_to_fetch_spent_utxo() {
294		let response = serde_json::json!({ "result": null });
295		let server = HttpServer::responding_with_ok(MessageBody::Content(response));
296		let client = RpcClient::new(CREDENTIALS, server.endpoint());
297		let outpoint = OutPoint::new(bitcoin::Txid::from_byte_array([0; 32]), 0);
298		let unspent_output = client.is_output_unspent(outpoint).await.unwrap();
299		assert_eq!(unspent_output, false);
300	}
301
302	#[tokio::test]
303	async fn fetches_utxo() {
304		let response = serde_json::json!({ "result": {"bestblock": 1, "confirmations": 42}});
305		let server = HttpServer::responding_with_ok(MessageBody::Content(response));
306		let client = RpcClient::new(CREDENTIALS, server.endpoint());
307		let outpoint = OutPoint::new(bitcoin::Txid::from_byte_array([0; 32]), 0);
308		let unspent_output = client.is_output_unspent(outpoint).await.unwrap();
309		assert_eq!(unspent_output, true);
310	}
311}