bitcoind_async_client/client/
mod.rs1use std::{
2 fmt,
3 fs::File,
4 io::{BufRead, BufReader},
5 path::PathBuf,
6 sync::{
7 atomic::{AtomicUsize, Ordering},
8 Arc,
9 },
10 time::Duration,
11};
12
13use crate::error::{BitcoinRpcError, ClientError};
14use base64::{engine::general_purpose, Engine};
15use bitreq::{post, Client as BitreqClient, Error as BitreqError};
16use serde::{de, Deserialize, Serialize};
17use serde_json::{json, value::Value};
18use tokio::time::sleep;
19use tracing::*;
20
21#[cfg(all(feature = "29_0", feature = "30_2"))]
22compile_error!(
23 "Bitcoin Core version features are mutually exclusive; select only one of `29_0` or `30_2`."
24);
25
26#[cfg(all(feature = "29_0", not(feature = "30_2")))]
27pub mod v29;
28
29#[cfg(all(feature = "30_2", not(feature = "29_0")))]
30pub mod v30;
31
32pub type ClientResult<T> = Result<T, ClientError>;
34
35const DEFAULT_MAX_RETRIES: u16 = 3;
37
38const DEFAULT_RETRY_INTERVAL_MS: u64 = 1_000;
40
41const DEFAULT_TIMEOUT_SECONDS: u64 = 30;
43
44const DEFAULT_HTTP_CLIENT_CAPACITY: usize = 10;
46
47pub fn to_value<T>(value: T) -> ClientResult<Value>
49where
50 T: Serialize,
51{
52 serde_json::to_value(value)
53 .map_err(|e| ClientError::Param(format!("Error creating value: {e}")))
54}
55
56#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
58pub enum Auth {
59 UserPass(String, String),
60 CookieFile(PathBuf),
61}
62
63impl Auth {
64 pub(crate) fn get_user_pass(self) -> ClientResult<(Option<String>, Option<String>)> {
65 match self {
66 Auth::UserPass(u, p) => Ok((Some(u), Some(p))),
67 Auth::CookieFile(path) => {
68 let line = BufReader::new(
69 File::open(path).map_err(|e| ClientError::Other(e.to_string()))?,
70 )
71 .lines()
72 .next()
73 .ok_or(ClientError::Other("Invalid cookie file".to_string()))?
74 .map_err(|e| ClientError::Other(e.to_string()))?;
75 let colon = line
76 .find(':')
77 .ok_or(ClientError::Other("Invalid cookie file".to_string()))?;
78 Ok((Some(line[..colon].into()), Some(line[colon + 1..].into())))
79 }
80 }
81 }
82}
83
84#[derive(Clone)]
86pub struct Client {
87 url: String,
89
90 authorization: String,
92
93 timeout: u64,
95
96 id: Arc<AtomicUsize>,
102
103 max_retries: u16,
105
106 retry_interval: u64,
108
109 http_client: BitreqClient,
113}
114
115impl fmt::Debug for Client {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 f.debug_struct("Client")
118 .field("url", &self.url)
119 .field("timeout", &self.timeout)
120 .field("id", &self.id)
121 .field("max_retries", &self.max_retries)
122 .field("retry_interval", &self.retry_interval)
123 .finish_non_exhaustive()
124 }
125}
126
127#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
129struct Response<R> {
130 pub result: Option<R>,
131 pub error: Option<BitcoinRpcError>,
132 pub id: u64,
133}
134
135impl Client {
136 pub fn new(
138 url: String,
139 auth: Auth,
140 max_retries: Option<u16>,
141 retry_interval: Option<u64>,
142 timeout: Option<u64>,
143 ) -> ClientResult<Self> {
144 let (username_opt, password_opt) = auth.get_user_pass()?;
145 let (Some(username), Some(password)) = (
146 username_opt.filter(|u| !u.is_empty()),
147 password_opt.filter(|p| !p.is_empty()),
148 ) else {
149 return Err(ClientError::MissingUserPassword);
150 };
151
152 let user_pw = general_purpose::STANDARD.encode(format!("{username}:{password}"));
153 let authorization = format!("Basic {user_pw}");
154
155 let id = Arc::new(AtomicUsize::new(0));
156
157 let max_retries = max_retries.unwrap_or(DEFAULT_MAX_RETRIES);
158 let retry_interval = retry_interval.unwrap_or(DEFAULT_RETRY_INTERVAL_MS);
159 let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT_SECONDS);
160
161 let http_client = BitreqClient::new(DEFAULT_HTTP_CLIENT_CAPACITY);
162
163 trace!(url = %url, "Created bitcoin client");
164
165 Ok(Self {
166 url,
167 authorization,
168 timeout,
169 id,
170 max_retries,
171 retry_interval,
172 http_client,
173 })
174 }
175
176 fn next_id(&self) -> usize {
177 self.id.fetch_add(1, Ordering::AcqRel)
178 }
179
180 async fn call<T: de::DeserializeOwned + fmt::Debug>(
181 &self,
182 method: &str,
183 params: &[Value],
184 ) -> ClientResult<T> {
185 let mut retries = 0;
186 loop {
187 debug!(%method, ?params, %retries, "Calling bitcoin client");
188
189 let id = self.next_id();
190
191 let body = serde_json::to_vec(&json!({
192 "jsonrpc": "1.0",
193 "id": id,
194 "method": method,
195 "params": params
196 }))
197 .map_err(|e| ClientError::Param(format!("Error serializing request: {e}")))?;
198
199 let request = post(&self.url)
200 .with_header("Authorization", &self.authorization)
201 .with_header("Content-Type", "application/json")
202 .with_body(body)
203 .with_timeout(self.timeout);
204
205 let response = self.http_client.send_async(request).await;
206
207 match response {
208 Ok(resp) => {
209 let status_code = resp.status_code;
210 let raw_response = resp
211 .as_str()
212 .map_err(|e| ClientError::Parse(e.to_string()))?;
213
214 if !(200..300).contains(&status_code) {
215 if let Ok(data) = serde_json::from_str::<Response<Value>>(raw_response) {
216 if let Some(err) = data.error {
217 return Err(ClientError::Server(err.code, err.message));
218 }
219 }
220
221 return Err(ClientError::Status(
222 status_code as u16,
223 format!("{} | body: {raw_response}", resp.reason_phrase),
224 ));
225 }
226
227 trace!(%raw_response, "Raw response received");
228 let data: Response<T> = serde_json::from_str(raw_response)
229 .map_err(|e| ClientError::Parse(e.to_string()))?;
230 if let Some(err) = data.error {
231 return Err(ClientError::Server(err.code, err.message));
232 }
233 return data
234 .result
235 .ok_or_else(|| ClientError::Other("Empty data received".to_string()));
236 }
237 Err(err) => {
238 warn!(err = %err, "Error calling bitcoin client");
239
240 let should_retry = Self::is_error_recoverable(&err);
242 if !should_retry {
243 return Err(err.into());
244 }
245 }
246 }
247 retries += 1;
248 if retries >= self.max_retries {
249 return Err(ClientError::MaxRetriesExceeded(self.max_retries));
250 }
251 sleep(Duration::from_millis(self.retry_interval)).await;
252 }
253 }
254
255 fn is_error_recoverable(err: &BitreqError) -> bool {
257 match err {
258 BitreqError::AddressNotFound
260 | BitreqError::IoError(_)
261 | BitreqError::RustlsCreateConnection(_) => {
262 warn!(err = %err, "connection error, retrying...");
263 true
264 }
265
266 BitreqError::RedirectLocationMissing => false,
268 BitreqError::InfiniteRedirectionLoop => false,
269 BitreqError::TooManyRedirections => false,
270
271 BitreqError::HeadersOverflow => false,
273 BitreqError::StatusLineOverflow => false,
274 BitreqError::BodyOverflow => false,
275
276 BitreqError::MalformedChunkLength
278 | BitreqError::MalformedChunkEnd
279 | BitreqError::MalformedContentLength
280 | BitreqError::InvalidUtf8InResponse => {
281 warn!(err = %err, "malformed response, retrying...");
282 true
283 }
284
285 BitreqError::InvalidUtf8InBody(_) => false,
287
288 BitreqError::HttpsFeatureNotEnabled => false,
290
291 BitreqError::Other(_) => false,
293
294 _ => false,
296 }
297 }
298
299 #[cfg(feature = "raw_rpc")]
300 pub async fn call_raw<R: de::DeserializeOwned + fmt::Debug>(
302 &self,
303 method: &str,
304 params: &[serde_json::Value],
305 ) -> ClientResult<R> {
306 self.call::<R>(method, params).await
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use std::time::Duration;
313
314 use tokio::{
315 io::{AsyncReadExt, AsyncWriteExt},
316 net::{TcpListener, TcpStream},
317 sync::oneshot,
318 time::{sleep, timeout},
319 };
320
321 use super::*;
322
323 async fn read_http_request(stream: &mut TcpStream) {
324 let mut buf = vec![0u8; 4096];
325 let mut total = Vec::new();
326 loop {
327 let n = stream.read(&mut buf).await.expect("read request");
328 if n == 0 {
329 break;
330 }
331 total.extend_from_slice(&buf[..n]);
332 let Some(hdr_end) = total.windows(4).position(|w| w == b"\r\n\r\n") else {
333 continue;
334 };
335 let headers = std::str::from_utf8(&total[..hdr_end]).unwrap_or("");
336 let cl: usize = headers
337 .lines()
338 .find_map(|l| {
339 let mut parts = l.splitn(2, ':');
340 let k = parts.next()?.trim();
341 if k.eq_ignore_ascii_case("Content-Length") {
342 parts.next()?.trim().parse().ok()
343 } else {
344 None
345 }
346 })
347 .unwrap_or(0);
348 if total.len() >= hdr_end + 4 + cl {
349 break;
350 }
351 }
352 }
353
354 async fn write_json_response(stream: &mut TcpStream, body: &str) {
355 let response = format!(
356 concat!(
357 "HTTP/1.1 200 OK\r\n",
358 "Content-Type: application/json\r\n",
359 "Connection: keep-alive\r\n",
360 "Content-Length: {}\r\n\r\n{}"
361 ),
362 body.len(),
363 body,
364 );
365 stream
366 .write_all(response.as_bytes())
367 .await
368 .expect("write response");
369 stream.flush().await.expect("flush response");
370 }
371
372 #[tokio::test]
375 async fn retry_recovers_from_dead_pooled_connection() {
376 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
377 let addr = listener.local_addr().expect("addr");
378
379 let (ready_tx, ready_rx) = oneshot::channel();
380 let server = tokio::spawn(async move {
381 let (mut first_stream, _) = listener.accept().await.expect("accept 1");
382 read_http_request(&mut first_stream).await;
383 write_json_response(
384 &mut first_stream,
385 r#"{"result":"first","error":null,"id":0}"#,
386 )
387 .await;
388
389 sleep(Duration::from_millis(100)).await;
392 drop(first_stream);
393 let _ = ready_tx.send(());
394
395 let (mut second_stream, _) = listener.accept().await.expect("accept 2");
396 read_http_request(&mut second_stream).await;
397 write_json_response(
398 &mut second_stream,
399 r#"{"result":"second","error":null,"id":1}"#,
400 )
401 .await;
402 });
403
404 let url = format!("http://{}", addr);
405 let client = Client::new(
406 url,
407 Auth::UserPass("user".into(), "pass".into()),
408 Some(3),
409 Some(10),
410 Some(5),
411 )
412 .expect("client");
413
414 let first: String = client.call("ping", &[]).await.expect("first call");
415 assert_eq!(first, "first");
416
417 ready_rx.await.expect("ready signal");
418
419 let second: String = timeout(Duration::from_secs(5), client.call("ping", &[]))
420 .await
421 .expect("call did not time out")
422 .expect("second call");
423 assert_eq!(second, "second");
424
425 server.await.expect("server task");
426 }
427}