Skip to main content

bitcoind_async_client/client/
mod.rs

1use std::{
2    fmt,
3    fs::File,
4    io::{BufRead, BufReader},
5    path::PathBuf,
6    sync::{
7        atomic::{AtomicUsize, Ordering},
8        Arc,
9    },
10    time::Duration,
11};
12
13use crate::error::{BitcoinRpcError, ClientError};
14use base64::{engine::general_purpose, Engine};
15use bitreq::{post, Client as BitreqClient, Error as BitreqError};
16use serde::{de, Deserialize, Serialize};
17use serde_json::{json, value::Value};
18use tokio::time::sleep;
19use tracing::*;
20
21#[cfg(all(feature = "29_0", feature = "30_2"))]
22compile_error!(
23    "Bitcoin Core version features are mutually exclusive; select only one of `29_0` or `30_2`."
24);
25
26#[cfg(all(feature = "29_0", not(feature = "30_2")))]
27pub mod v29;
28
29#[cfg(all(feature = "30_2", not(feature = "29_0")))]
30pub mod v30;
31
32/// This is an alias for the result type returned by the [`Client`].
33pub type ClientResult<T> = Result<T, ClientError>;
34
35/// The maximum number of retries for a request.
36const DEFAULT_MAX_RETRIES: u16 = 3;
37
38/// The maximum number of retries for a request.
39const DEFAULT_RETRY_INTERVAL_MS: u64 = 1_000;
40
41/// The timeout for a request in seconds.
42const DEFAULT_TIMEOUT_SECONDS: u64 = 30;
43
44/// The default capacity for the HTTP client connection pool.
45const DEFAULT_HTTP_CLIENT_CAPACITY: usize = 10;
46
47/// Custom implementation to convert a value to a `Value` type.
48pub fn to_value<T>(value: T) -> ClientResult<Value>
49where
50    T: Serialize,
51{
52    serde_json::to_value(value)
53        .map_err(|e| ClientError::Param(format!("Error creating value: {e}")))
54}
55
56/// The different authentication methods for the client.
57#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
58pub enum Auth {
59    UserPass(String, String),
60    CookieFile(PathBuf),
61}
62
63impl Auth {
64    pub(crate) fn get_user_pass(self) -> ClientResult<(Option<String>, Option<String>)> {
65        match self {
66            Auth::UserPass(u, p) => Ok((Some(u), Some(p))),
67            Auth::CookieFile(path) => {
68                let line = BufReader::new(
69                    File::open(path).map_err(|e| ClientError::Other(e.to_string()))?,
70                )
71                .lines()
72                .next()
73                .ok_or(ClientError::Other("Invalid cookie file".to_string()))?
74                .map_err(|e| ClientError::Other(e.to_string()))?;
75                let colon = line
76                    .find(':')
77                    .ok_or(ClientError::Other("Invalid cookie file".to_string()))?;
78                Ok((Some(line[..colon].into()), Some(line[colon + 1..].into())))
79            }
80        }
81    }
82}
83
84/// An `async` client for interacting with a `bitcoind` instance.
85#[derive(Clone)]
86pub struct Client {
87    /// The URL of the `bitcoind` instance.
88    url: String,
89
90    /// The authorization header value for Basic auth.
91    authorization: String,
92
93    /// The timeout for requests in seconds.
94    timeout: u64,
95
96    /// The ID of the current request.
97    ///
98    /// # Implementation Details
99    ///
100    /// Using an [`Arc`] so that [`Client`] is [`Clone`].
101    id: Arc<AtomicUsize>,
102
103    /// The maximum number of retries for a request.
104    max_retries: u16,
105
106    /// Interval between retries for a request in ms.
107    retry_interval: u64,
108
109    /// The HTTP client for making requests.
110    ///
111    /// This is used to reuse TCP connections across requests.
112    http_client: BitreqClient,
113}
114
115impl fmt::Debug for Client {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        f.debug_struct("Client")
118            .field("url", &self.url)
119            .field("timeout", &self.timeout)
120            .field("id", &self.id)
121            .field("max_retries", &self.max_retries)
122            .field("retry_interval", &self.retry_interval)
123            .finish_non_exhaustive()
124    }
125}
126
127/// Response returned by the `bitcoind` RPC server.
128#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
129struct Response<R> {
130    pub result: Option<R>,
131    pub error: Option<BitcoinRpcError>,
132    pub id: u64,
133}
134
135impl Client {
136    /// Creates a new [`Client`] with the given URL, username, and password.
137    pub fn new(
138        url: String,
139        auth: Auth,
140        max_retries: Option<u16>,
141        retry_interval: Option<u64>,
142        timeout: Option<u64>,
143    ) -> ClientResult<Self> {
144        let (username_opt, password_opt) = auth.get_user_pass()?;
145        let (Some(username), Some(password)) = (
146            username_opt.filter(|u| !u.is_empty()),
147            password_opt.filter(|p| !p.is_empty()),
148        ) else {
149            return Err(ClientError::MissingUserPassword);
150        };
151
152        let user_pw = general_purpose::STANDARD.encode(format!("{username}:{password}"));
153        let authorization = format!("Basic {user_pw}");
154
155        let id = Arc::new(AtomicUsize::new(0));
156
157        let max_retries = max_retries.unwrap_or(DEFAULT_MAX_RETRIES);
158        let retry_interval = retry_interval.unwrap_or(DEFAULT_RETRY_INTERVAL_MS);
159        let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT_SECONDS);
160
161        let http_client = BitreqClient::new(DEFAULT_HTTP_CLIENT_CAPACITY);
162
163        trace!(url = %url, "Created bitcoin client");
164
165        Ok(Self {
166            url,
167            authorization,
168            timeout,
169            id,
170            max_retries,
171            retry_interval,
172            http_client,
173        })
174    }
175
176    fn next_id(&self) -> usize {
177        self.id.fetch_add(1, Ordering::AcqRel)
178    }
179
180    async fn call<T: de::DeserializeOwned + fmt::Debug>(
181        &self,
182        method: &str,
183        params: &[Value],
184    ) -> ClientResult<T> {
185        let mut retries = 0;
186        loop {
187            debug!(%method, ?params, %retries, "Calling bitcoin client");
188
189            let id = self.next_id();
190
191            let body = serde_json::to_vec(&json!({
192                "jsonrpc": "1.0",
193                "id": id,
194                "method": method,
195                "params": params
196            }))
197            .map_err(|e| ClientError::Param(format!("Error serializing request: {e}")))?;
198
199            let request = post(&self.url)
200                .with_header("Authorization", &self.authorization)
201                .with_header("Content-Type", "application/json")
202                .with_body(body)
203                .with_timeout(self.timeout);
204
205            let response = self.http_client.send_async(request).await;
206
207            match response {
208                Ok(resp) => {
209                    let status_code = resp.status_code;
210                    let raw_response = resp
211                        .as_str()
212                        .map_err(|e| ClientError::Parse(e.to_string()))?;
213
214                    if !(200..300).contains(&status_code) {
215                        if let Ok(data) = serde_json::from_str::<Response<Value>>(raw_response) {
216                            if let Some(err) = data.error {
217                                return Err(ClientError::Server(err.code, err.message));
218                            }
219                        }
220
221                        return Err(ClientError::Status(
222                            status_code as u16,
223                            format!("{} | body: {raw_response}", resp.reason_phrase),
224                        ));
225                    }
226
227                    trace!(%raw_response, "Raw response received");
228                    let data: Response<T> = serde_json::from_str(raw_response)
229                        .map_err(|e| ClientError::Parse(e.to_string()))?;
230                    if let Some(err) = data.error {
231                        return Err(ClientError::Server(err.code, err.message));
232                    }
233                    return data
234                        .result
235                        .ok_or_else(|| ClientError::Other("Empty data received".to_string()));
236                }
237                Err(err) => {
238                    warn!(err = %err, "Error calling bitcoin client");
239
240                    // Classify bitreq errors for retry logic
241                    let should_retry = Self::is_error_recoverable(&err);
242                    if !should_retry {
243                        return Err(err.into());
244                    }
245                }
246            }
247            retries += 1;
248            if retries >= self.max_retries {
249                return Err(ClientError::MaxRetriesExceeded(self.max_retries));
250            }
251            sleep(Duration::from_millis(self.retry_interval)).await;
252        }
253    }
254
255    /// Returns `true` if the error is potentially recoverable and should be retried.
256    fn is_error_recoverable(err: &BitreqError) -> bool {
257        match err {
258            // Connection/network errors - might be recoverable
259            BitreqError::AddressNotFound
260            | BitreqError::IoError(_)
261            | BitreqError::RustlsCreateConnection(_) => {
262                warn!(err = %err, "connection error, retrying...");
263                true
264            }
265
266            // Redirect errors - not retryable
267            BitreqError::RedirectLocationMissing => false,
268            BitreqError::InfiniteRedirectionLoop => false,
269            BitreqError::TooManyRedirections => false,
270
271            // Size limit errors - not retryable
272            BitreqError::HeadersOverflow => false,
273            BitreqError::StatusLineOverflow => false,
274            BitreqError::BodyOverflow => false,
275
276            // Protocol/parsing errors - might be recoverable
277            BitreqError::MalformedChunkLength
278            | BitreqError::MalformedChunkEnd
279            | BitreqError::MalformedContentLength
280            | BitreqError::InvalidUtf8InResponse => {
281                warn!(err = %err, "malformed response, retrying...");
282                true
283            }
284
285            // UTF-8 in body - not retryable
286            BitreqError::InvalidUtf8InBody(_) => false,
287
288            // HTTPS not enabled - not retryable
289            BitreqError::HttpsFeatureNotEnabled => false,
290
291            // Other errors - not retryable
292            BitreqError::Other(_) => false,
293
294            // Non-exhaustive match fallback
295            _ => false,
296        }
297    }
298
299    #[cfg(feature = "raw_rpc")]
300    /// Low-level RPC call wrapper; sends raw params and returns the deserialized result.
301    pub async fn call_raw<R: de::DeserializeOwned + fmt::Debug>(
302        &self,
303        method: &str,
304        params: &[serde_json::Value],
305    ) -> ClientResult<R> {
306        self.call::<R>(method, params).await
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use std::time::Duration;
313
314    use tokio::{
315        io::{AsyncReadExt, AsyncWriteExt},
316        net::{TcpListener, TcpStream},
317        sync::oneshot,
318        time::{sleep, timeout},
319    };
320
321    use super::*;
322
323    async fn read_http_request(stream: &mut TcpStream) {
324        let mut buf = vec![0u8; 4096];
325        let mut total = Vec::new();
326        loop {
327            let n = stream.read(&mut buf).await.expect("read request");
328            if n == 0 {
329                break;
330            }
331            total.extend_from_slice(&buf[..n]);
332            let Some(hdr_end) = total.windows(4).position(|w| w == b"\r\n\r\n") else {
333                continue;
334            };
335            let headers = std::str::from_utf8(&total[..hdr_end]).unwrap_or("");
336            let cl: usize = headers
337                .lines()
338                .find_map(|l| {
339                    let mut parts = l.splitn(2, ':');
340                    let k = parts.next()?.trim();
341                    if k.eq_ignore_ascii_case("Content-Length") {
342                        parts.next()?.trim().parse().ok()
343                    } else {
344                        None
345                    }
346                })
347                .unwrap_or(0);
348            if total.len() >= hdr_end + 4 + cl {
349                break;
350            }
351        }
352    }
353
354    async fn write_json_response(stream: &mut TcpStream, body: &str) {
355        let response = format!(
356            concat!(
357                "HTTP/1.1 200 OK\r\n",
358                "Content-Type: application/json\r\n",
359                "Connection: keep-alive\r\n",
360                "Content-Length: {}\r\n\r\n{}"
361            ),
362            body.len(),
363            body,
364        );
365        stream
366            .write_all(response.as_bytes())
367            .await
368            .expect("write response");
369        stream.flush().await.expect("flush response");
370    }
371
372    /// Regression test for issue #101: a pooled keep-alive socket that is later
373    /// closed server-side must not permanently poison future RPC calls.
374    #[tokio::test]
375    async fn retry_recovers_from_dead_pooled_connection() {
376        let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
377        let addr = listener.local_addr().expect("addr");
378
379        let (ready_tx, ready_rx) = oneshot::channel();
380        let server = tokio::spawn(async move {
381            let (mut first_stream, _) = listener.accept().await.expect("accept 1");
382            read_http_request(&mut first_stream).await;
383            write_json_response(
384                &mut first_stream,
385                r#"{"result":"first","error":null,"id":0}"#,
386            )
387            .await;
388
389            // Keep the socket alive long enough for the client to cache it, then
390            // close it server-side to mimic bitcoind's rpcservertimeout behavior.
391            sleep(Duration::from_millis(100)).await;
392            drop(first_stream);
393            let _ = ready_tx.send(());
394
395            let (mut second_stream, _) = listener.accept().await.expect("accept 2");
396            read_http_request(&mut second_stream).await;
397            write_json_response(
398                &mut second_stream,
399                r#"{"result":"second","error":null,"id":1}"#,
400            )
401            .await;
402        });
403
404        let url = format!("http://{}", addr);
405        let client = Client::new(
406            url,
407            Auth::UserPass("user".into(), "pass".into()),
408            Some(3),
409            Some(10),
410            Some(5),
411        )
412        .expect("client");
413
414        let first: String = client.call("ping", &[]).await.expect("first call");
415        assert_eq!(first, "first");
416
417        ready_rx.await.expect("ready signal");
418
419        let second: String = timeout(Duration::from_secs(5), client.call("ping", &[]))
420            .await
421            .expect("call did not time out")
422            .expect("second call");
423        assert_eq!(second, "second");
424
425        server.await.expect("server task");
426    }
427}