bitcoind_async_client/client/
mod.rs1use std::{
2 fmt,
3 fs::File,
4 io::{BufRead, BufReader},
5 path::PathBuf,
6 sync::{
7 atomic::{AtomicUsize, Ordering},
8 Arc,
9 },
10 time::Duration,
11};
12
13use crate::error::{BitcoinRpcError, ClientError};
14use base64::{engine::general_purpose, Engine};
15use bitreq::{post, Client as BitreqClient, Error as BitreqError};
16use serde::{de, Deserialize, Serialize};
17use serde_json::{json, value::Value};
18use tokio::time::sleep;
19use tracing::*;
20
21#[cfg(feature = "29_0")]
22pub mod v29;
23
24pub type ClientResult<T> = Result<T, ClientError>;
26
27const DEFAULT_MAX_RETRIES: u16 = 3;
29
30const DEFAULT_RETRY_INTERVAL_MS: u64 = 1_000;
32
33const DEFAULT_TIMEOUT_SECONDS: u64 = 30;
35
36const DEFAULT_HTTP_CLIENT_CAPACITY: usize = 10;
38
39pub fn to_value<T>(value: T) -> ClientResult<Value>
41where
42 T: Serialize,
43{
44 serde_json::to_value(value)
45 .map_err(|e| ClientError::Param(format!("Error creating value: {e}")))
46}
47
48#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
50pub enum Auth {
51 UserPass(String, String),
52 CookieFile(PathBuf),
53}
54
55impl Auth {
56 pub(crate) fn get_user_pass(self) -> ClientResult<(Option<String>, Option<String>)> {
57 match self {
58 Auth::UserPass(u, p) => Ok((Some(u), Some(p))),
59 Auth::CookieFile(path) => {
60 let line = BufReader::new(
61 File::open(path).map_err(|e| ClientError::Other(e.to_string()))?,
62 )
63 .lines()
64 .next()
65 .ok_or(ClientError::Other("Invalid cookie file".to_string()))?
66 .map_err(|e| ClientError::Other(e.to_string()))?;
67 let colon = line
68 .find(':')
69 .ok_or(ClientError::Other("Invalid cookie file".to_string()))?;
70 Ok((Some(line[..colon].into()), Some(line[colon + 1..].into())))
71 }
72 }
73 }
74}
75
76#[derive(Clone)]
78pub struct Client {
79 url: String,
81
82 authorization: String,
84
85 timeout: u64,
87
88 id: Arc<AtomicUsize>,
94
95 max_retries: u16,
97
98 retry_interval: u64,
100
101 http_client: BitreqClient,
105}
106
107impl fmt::Debug for Client {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 f.debug_struct("Client")
110 .field("url", &self.url)
111 .field("timeout", &self.timeout)
112 .field("id", &self.id)
113 .field("max_retries", &self.max_retries)
114 .field("retry_interval", &self.retry_interval)
115 .finish_non_exhaustive()
116 }
117}
118
119#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
121struct Response<R> {
122 pub result: Option<R>,
123 pub error: Option<BitcoinRpcError>,
124 pub id: u64,
125}
126
127impl Client {
128 pub fn new(
130 url: String,
131 auth: Auth,
132 max_retries: Option<u16>,
133 retry_interval: Option<u64>,
134 timeout: Option<u64>,
135 ) -> ClientResult<Self> {
136 let (username_opt, password_opt) = auth.get_user_pass()?;
137 let (Some(username), Some(password)) = (
138 username_opt.filter(|u| !u.is_empty()),
139 password_opt.filter(|p| !p.is_empty()),
140 ) else {
141 return Err(ClientError::MissingUserPassword);
142 };
143
144 let user_pw = general_purpose::STANDARD.encode(format!("{username}:{password}"));
145 let authorization = format!("Basic {user_pw}");
146
147 let id = Arc::new(AtomicUsize::new(0));
148
149 let max_retries = max_retries.unwrap_or(DEFAULT_MAX_RETRIES);
150 let retry_interval = retry_interval.unwrap_or(DEFAULT_RETRY_INTERVAL_MS);
151 let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT_SECONDS);
152
153 let http_client = BitreqClient::new(DEFAULT_HTTP_CLIENT_CAPACITY);
154
155 trace!(url = %url, "Created bitcoin client");
156
157 Ok(Self {
158 url,
159 authorization,
160 timeout,
161 id,
162 max_retries,
163 retry_interval,
164 http_client,
165 })
166 }
167
168 fn next_id(&self) -> usize {
169 self.id.fetch_add(1, Ordering::AcqRel)
170 }
171
172 async fn call<T: de::DeserializeOwned + fmt::Debug>(
173 &self,
174 method: &str,
175 params: &[Value],
176 ) -> ClientResult<T> {
177 let mut retries = 0;
178 loop {
179 trace!(%method, ?params, %retries, "Calling bitcoin client");
180
181 let id = self.next_id();
182
183 let body = serde_json::to_vec(&json!({
184 "jsonrpc": "1.0",
185 "id": id,
186 "method": method,
187 "params": params
188 }))
189 .map_err(|e| ClientError::Param(format!("Error serializing request: {e}")))?;
190
191 let request = post(&self.url)
192 .with_header("Authorization", &self.authorization)
193 .with_header("Content-Type", "application/json")
194 .with_body(body)
195 .with_timeout(self.timeout);
196
197 let response = self.http_client.send_async(request).await;
198
199 match response {
200 Ok(resp) => {
201 let status_code = resp.status_code;
202 let raw_response = resp
203 .as_str()
204 .map_err(|e| ClientError::Parse(e.to_string()))?;
205
206 if !(200..300).contains(&status_code) {
207 if let Ok(data) = serde_json::from_str::<Response<Value>>(raw_response) {
208 if let Some(err) = data.error {
209 return Err(ClientError::Server(err.code, err.message));
210 }
211 }
212
213 return Err(ClientError::Status(
214 status_code as u16,
215 format!("{} | body: {raw_response}", resp.reason_phrase),
216 ));
217 }
218
219 trace!(%raw_response, "Raw response received");
220 let data: Response<T> = serde_json::from_str(raw_response)
221 .map_err(|e| ClientError::Parse(e.to_string()))?;
222 if let Some(err) = data.error {
223 return Err(ClientError::Server(err.code, err.message));
224 }
225 return data
226 .result
227 .ok_or_else(|| ClientError::Other("Empty data received".to_string()));
228 }
229 Err(err) => {
230 warn!(err = %err, "Error calling bitcoin client");
231
232 let should_retry = Self::is_error_recoverable(&err);
234 if !should_retry {
235 return Err(err.into());
236 }
237 }
238 }
239 retries += 1;
240 if retries >= self.max_retries {
241 return Err(ClientError::MaxRetriesExceeded(self.max_retries));
242 }
243 sleep(Duration::from_millis(self.retry_interval)).await;
244 }
245 }
246
247 fn is_error_recoverable(err: &BitreqError) -> bool {
249 match err {
250 BitreqError::AddressNotFound
252 | BitreqError::IoError(_)
253 | BitreqError::RustlsCreateConnection(_) => {
254 warn!(err = %err, "connection error, retrying...");
255 true
256 }
257
258 BitreqError::RedirectLocationMissing => false,
260 BitreqError::InfiniteRedirectionLoop => false,
261 BitreqError::TooManyRedirections => false,
262
263 BitreqError::HeadersOverflow => false,
265 BitreqError::StatusLineOverflow => false,
266 BitreqError::BodyOverflow => false,
267
268 BitreqError::MalformedChunkLength
270 | BitreqError::MalformedChunkEnd
271 | BitreqError::MalformedContentLength
272 | BitreqError::InvalidUtf8InResponse => {
273 warn!(err = %err, "malformed response, retrying...");
274 true
275 }
276
277 BitreqError::InvalidUtf8InBody(_) => false,
279
280 BitreqError::HttpsFeatureNotEnabled => false,
282
283 BitreqError::Other(_) => false,
285
286 _ => false,
288 }
289 }
290
291 #[cfg(feature = "raw_rpc")]
292 pub async fn call_raw<R: de::DeserializeOwned + fmt::Debug>(
294 &self,
295 method: &str,
296 params: &[serde_json::Value],
297 ) -> ClientResult<R> {
298 self.call::<R>(method, params).await
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use std::time::Duration;
305
306 use tokio::{
307 io::{AsyncReadExt, AsyncWriteExt},
308 net::{TcpListener, TcpStream},
309 sync::oneshot,
310 time::{sleep, timeout},
311 };
312
313 use super::*;
314
315 async fn read_http_request(stream: &mut TcpStream) {
316 let mut buf = vec![0u8; 4096];
317 let mut total = Vec::new();
318 loop {
319 let n = stream.read(&mut buf).await.expect("read request");
320 if n == 0 {
321 break;
322 }
323 total.extend_from_slice(&buf[..n]);
324 let Some(hdr_end) = total.windows(4).position(|w| w == b"\r\n\r\n") else {
325 continue;
326 };
327 let headers = std::str::from_utf8(&total[..hdr_end]).unwrap_or("");
328 let cl: usize = headers
329 .lines()
330 .find_map(|l| {
331 let mut parts = l.splitn(2, ':');
332 let k = parts.next()?.trim();
333 if k.eq_ignore_ascii_case("Content-Length") {
334 parts.next()?.trim().parse().ok()
335 } else {
336 None
337 }
338 })
339 .unwrap_or(0);
340 if total.len() >= hdr_end + 4 + cl {
341 break;
342 }
343 }
344 }
345
346 async fn write_json_response(stream: &mut TcpStream, body: &str) {
347 let response = format!(
348 concat!(
349 "HTTP/1.1 200 OK\r\n",
350 "Content-Type: application/json\r\n",
351 "Connection: keep-alive\r\n",
352 "Content-Length: {}\r\n\r\n{}"
353 ),
354 body.len(),
355 body,
356 );
357 stream
358 .write_all(response.as_bytes())
359 .await
360 .expect("write response");
361 stream.flush().await.expect("flush response");
362 }
363
364 #[tokio::test]
367 async fn retry_recovers_from_dead_pooled_connection() {
368 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
369 let addr = listener.local_addr().expect("addr");
370
371 let (ready_tx, ready_rx) = oneshot::channel();
372 let server = tokio::spawn(async move {
373 let (mut first_stream, _) = listener.accept().await.expect("accept 1");
374 read_http_request(&mut first_stream).await;
375 write_json_response(
376 &mut first_stream,
377 r#"{"result":"first","error":null,"id":0}"#,
378 )
379 .await;
380
381 sleep(Duration::from_millis(100)).await;
384 drop(first_stream);
385 let _ = ready_tx.send(());
386
387 let (mut second_stream, _) = listener.accept().await.expect("accept 2");
388 read_http_request(&mut second_stream).await;
389 write_json_response(
390 &mut second_stream,
391 r#"{"result":"second","error":null,"id":1}"#,
392 )
393 .await;
394 });
395
396 let url = format!("http://{}", addr);
397 let client = Client::new(
398 url,
399 Auth::UserPass("user".into(), "pass".into()),
400 Some(3),
401 Some(10),
402 Some(5),
403 )
404 .expect("client");
405
406 let first: String = client.call("ping", &[]).await.expect("first call");
407 assert_eq!(first, "first");
408
409 ready_rx.await.expect("ready signal");
410
411 let second: String = timeout(Duration::from_secs(5), client.call("ping", &[]))
412 .await
413 .expect("call did not time out")
414 .expect("second call");
415 assert_eq!(second, "second");
416
417 server.await.expect("server task");
418 }
419}