Skip to main content

bitcoind_async_client/client/
mod.rs

1use std::{
2    fmt,
3    fs::File,
4    io::{BufRead, BufReader},
5    path::PathBuf,
6    sync::{
7        atomic::{AtomicUsize, Ordering},
8        Arc,
9    },
10    time::Duration,
11};
12
13use crate::error::{BitcoinRpcError, ClientError};
14use base64::{engine::general_purpose, Engine};
15use bitreq::{post, Client as BitreqClient, Error as BitreqError};
16use serde::{de, Deserialize, Serialize};
17use serde_json::{json, value::Value};
18use tokio::time::sleep;
19use tracing::*;
20
21#[cfg(feature = "29_0")]
22pub mod v29;
23
24/// This is an alias for the result type returned by the [`Client`].
25pub type ClientResult<T> = Result<T, ClientError>;
26
27/// The maximum number of retries for a request.
28const DEFAULT_MAX_RETRIES: u16 = 3;
29
30/// The maximum number of retries for a request.
31const DEFAULT_RETRY_INTERVAL_MS: u64 = 1_000;
32
33/// The timeout for a request in seconds.
34const DEFAULT_TIMEOUT_SECONDS: u64 = 30;
35
36/// The default capacity for the HTTP client connection pool.
37const DEFAULT_HTTP_CLIENT_CAPACITY: usize = 10;
38
39/// Custom implementation to convert a value to a `Value` type.
40pub fn to_value<T>(value: T) -> ClientResult<Value>
41where
42    T: Serialize,
43{
44    serde_json::to_value(value)
45        .map_err(|e| ClientError::Param(format!("Error creating value: {e}")))
46}
47
48/// The different authentication methods for the client.
49#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
50pub enum Auth {
51    UserPass(String, String),
52    CookieFile(PathBuf),
53}
54
55impl Auth {
56    pub(crate) fn get_user_pass(self) -> ClientResult<(Option<String>, Option<String>)> {
57        match self {
58            Auth::UserPass(u, p) => Ok((Some(u), Some(p))),
59            Auth::CookieFile(path) => {
60                let line = BufReader::new(
61                    File::open(path).map_err(|e| ClientError::Other(e.to_string()))?,
62                )
63                .lines()
64                .next()
65                .ok_or(ClientError::Other("Invalid cookie file".to_string()))?
66                .map_err(|e| ClientError::Other(e.to_string()))?;
67                let colon = line
68                    .find(':')
69                    .ok_or(ClientError::Other("Invalid cookie file".to_string()))?;
70                Ok((Some(line[..colon].into()), Some(line[colon + 1..].into())))
71            }
72        }
73    }
74}
75
76/// An `async` client for interacting with a `bitcoind` instance.
77#[derive(Clone)]
78pub struct Client {
79    /// The URL of the `bitcoind` instance.
80    url: String,
81
82    /// The authorization header value for Basic auth.
83    authorization: String,
84
85    /// The timeout for requests in seconds.
86    timeout: u64,
87
88    /// The ID of the current request.
89    ///
90    /// # Implementation Details
91    ///
92    /// Using an [`Arc`] so that [`Client`] is [`Clone`].
93    id: Arc<AtomicUsize>,
94
95    /// The maximum number of retries for a request.
96    max_retries: u16,
97
98    /// Interval between retries for a request in ms.
99    retry_interval: u64,
100
101    /// The HTTP client for making requests.
102    ///
103    /// This is used to reuse TCP connections across requests.
104    http_client: BitreqClient,
105}
106
107impl fmt::Debug for Client {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        f.debug_struct("Client")
110            .field("url", &self.url)
111            .field("timeout", &self.timeout)
112            .field("id", &self.id)
113            .field("max_retries", &self.max_retries)
114            .field("retry_interval", &self.retry_interval)
115            .finish_non_exhaustive()
116    }
117}
118
119/// Response returned by the `bitcoind` RPC server.
120#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
121struct Response<R> {
122    pub result: Option<R>,
123    pub error: Option<BitcoinRpcError>,
124    pub id: u64,
125}
126
127impl Client {
128    /// Creates a new [`Client`] with the given URL, username, and password.
129    pub fn new(
130        url: String,
131        auth: Auth,
132        max_retries: Option<u16>,
133        retry_interval: Option<u64>,
134        timeout: Option<u64>,
135    ) -> ClientResult<Self> {
136        let (username_opt, password_opt) = auth.get_user_pass()?;
137        let (Some(username), Some(password)) = (
138            username_opt.filter(|u| !u.is_empty()),
139            password_opt.filter(|p| !p.is_empty()),
140        ) else {
141            return Err(ClientError::MissingUserPassword);
142        };
143
144        let user_pw = general_purpose::STANDARD.encode(format!("{username}:{password}"));
145        let authorization = format!("Basic {user_pw}");
146
147        let id = Arc::new(AtomicUsize::new(0));
148
149        let max_retries = max_retries.unwrap_or(DEFAULT_MAX_RETRIES);
150        let retry_interval = retry_interval.unwrap_or(DEFAULT_RETRY_INTERVAL_MS);
151        let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT_SECONDS);
152
153        let http_client = BitreqClient::new(DEFAULT_HTTP_CLIENT_CAPACITY);
154
155        trace!(url = %url, "Created bitcoin client");
156
157        Ok(Self {
158            url,
159            authorization,
160            timeout,
161            id,
162            max_retries,
163            retry_interval,
164            http_client,
165        })
166    }
167
168    fn next_id(&self) -> usize {
169        self.id.fetch_add(1, Ordering::AcqRel)
170    }
171
172    async fn call<T: de::DeserializeOwned + fmt::Debug>(
173        &self,
174        method: &str,
175        params: &[Value],
176    ) -> ClientResult<T> {
177        let mut retries = 0;
178        loop {
179            trace!(%method, ?params, %retries, "Calling bitcoin client");
180
181            let id = self.next_id();
182
183            let body = serde_json::to_vec(&json!({
184                "jsonrpc": "1.0",
185                "id": id,
186                "method": method,
187                "params": params
188            }))
189            .map_err(|e| ClientError::Param(format!("Error serializing request: {e}")))?;
190
191            let request = post(&self.url)
192                .with_header("Authorization", &self.authorization)
193                .with_header("Content-Type", "application/json")
194                .with_body(body)
195                .with_timeout(self.timeout);
196
197            let response = self.http_client.send_async(request).await;
198
199            match response {
200                Ok(resp) => {
201                    let status_code = resp.status_code;
202                    let raw_response = resp
203                        .as_str()
204                        .map_err(|e| ClientError::Parse(e.to_string()))?;
205
206                    if !(200..300).contains(&status_code) {
207                        if let Ok(data) = serde_json::from_str::<Response<Value>>(raw_response) {
208                            if let Some(err) = data.error {
209                                return Err(ClientError::Server(err.code, err.message));
210                            }
211                        }
212
213                        return Err(ClientError::Status(
214                            status_code as u16,
215                            format!("{} | body: {raw_response}", resp.reason_phrase),
216                        ));
217                    }
218
219                    trace!(%raw_response, "Raw response received");
220                    let data: Response<T> = serde_json::from_str(raw_response)
221                        .map_err(|e| ClientError::Parse(e.to_string()))?;
222                    if let Some(err) = data.error {
223                        return Err(ClientError::Server(err.code, err.message));
224                    }
225                    return data
226                        .result
227                        .ok_or_else(|| ClientError::Other("Empty data received".to_string()));
228                }
229                Err(err) => {
230                    warn!(err = %err, "Error calling bitcoin client");
231
232                    // Classify bitreq errors for retry logic
233                    let should_retry = Self::is_error_recoverable(&err);
234                    if !should_retry {
235                        return Err(err.into());
236                    }
237                }
238            }
239            retries += 1;
240            if retries >= self.max_retries {
241                return Err(ClientError::MaxRetriesExceeded(self.max_retries));
242            }
243            sleep(Duration::from_millis(self.retry_interval)).await;
244        }
245    }
246
247    /// Returns `true` if the error is potentially recoverable and should be retried.
248    fn is_error_recoverable(err: &BitreqError) -> bool {
249        match err {
250            // Connection/network errors - might be recoverable
251            BitreqError::AddressNotFound
252            | BitreqError::IoError(_)
253            | BitreqError::RustlsCreateConnection(_) => {
254                warn!(err = %err, "connection error, retrying...");
255                true
256            }
257
258            // Redirect errors - not retryable
259            BitreqError::RedirectLocationMissing => false,
260            BitreqError::InfiniteRedirectionLoop => false,
261            BitreqError::TooManyRedirections => false,
262
263            // Size limit errors - not retryable
264            BitreqError::HeadersOverflow => false,
265            BitreqError::StatusLineOverflow => false,
266            BitreqError::BodyOverflow => false,
267
268            // Protocol/parsing errors - might be recoverable
269            BitreqError::MalformedChunkLength
270            | BitreqError::MalformedChunkEnd
271            | BitreqError::MalformedContentLength
272            | BitreqError::InvalidUtf8InResponse => {
273                warn!(err = %err, "malformed response, retrying...");
274                true
275            }
276
277            // UTF-8 in body - not retryable
278            BitreqError::InvalidUtf8InBody(_) => false,
279
280            // HTTPS not enabled - not retryable
281            BitreqError::HttpsFeatureNotEnabled => false,
282
283            // Other errors - not retryable
284            BitreqError::Other(_) => false,
285
286            // Non-exhaustive match fallback
287            _ => false,
288        }
289    }
290
291    #[cfg(feature = "raw_rpc")]
292    /// Low-level RPC call wrapper; sends raw params and returns the deserialized result.
293    pub async fn call_raw<R: de::DeserializeOwned + fmt::Debug>(
294        &self,
295        method: &str,
296        params: &[serde_json::Value],
297    ) -> ClientResult<R> {
298        self.call::<R>(method, params).await
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use std::time::Duration;
305
306    use tokio::{
307        io::{AsyncReadExt, AsyncWriteExt},
308        net::{TcpListener, TcpStream},
309        sync::oneshot,
310        time::{sleep, timeout},
311    };
312
313    use super::*;
314
315    async fn read_http_request(stream: &mut TcpStream) {
316        let mut buf = vec![0u8; 4096];
317        let mut total = Vec::new();
318        loop {
319            let n = stream.read(&mut buf).await.expect("read request");
320            if n == 0 {
321                break;
322            }
323            total.extend_from_slice(&buf[..n]);
324            let Some(hdr_end) = total.windows(4).position(|w| w == b"\r\n\r\n") else {
325                continue;
326            };
327            let headers = std::str::from_utf8(&total[..hdr_end]).unwrap_or("");
328            let cl: usize = headers
329                .lines()
330                .find_map(|l| {
331                    let mut parts = l.splitn(2, ':');
332                    let k = parts.next()?.trim();
333                    if k.eq_ignore_ascii_case("Content-Length") {
334                        parts.next()?.trim().parse().ok()
335                    } else {
336                        None
337                    }
338                })
339                .unwrap_or(0);
340            if total.len() >= hdr_end + 4 + cl {
341                break;
342            }
343        }
344    }
345
346    async fn write_json_response(stream: &mut TcpStream, body: &str) {
347        let response = format!(
348            concat!(
349                "HTTP/1.1 200 OK\r\n",
350                "Content-Type: application/json\r\n",
351                "Connection: keep-alive\r\n",
352                "Content-Length: {}\r\n\r\n{}"
353            ),
354            body.len(),
355            body,
356        );
357        stream
358            .write_all(response.as_bytes())
359            .await
360            .expect("write response");
361        stream.flush().await.expect("flush response");
362    }
363
364    /// Regression test for issue #101: a pooled keep-alive socket that is later
365    /// closed server-side must not permanently poison future RPC calls.
366    #[tokio::test]
367    async fn retry_recovers_from_dead_pooled_connection() {
368        let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
369        let addr = listener.local_addr().expect("addr");
370
371        let (ready_tx, ready_rx) = oneshot::channel();
372        let server = tokio::spawn(async move {
373            let (mut first_stream, _) = listener.accept().await.expect("accept 1");
374            read_http_request(&mut first_stream).await;
375            write_json_response(
376                &mut first_stream,
377                r#"{"result":"first","error":null,"id":0}"#,
378            )
379            .await;
380
381            // Keep the socket alive long enough for the client to cache it, then
382            // close it server-side to mimic bitcoind's rpcservertimeout behavior.
383            sleep(Duration::from_millis(100)).await;
384            drop(first_stream);
385            let _ = ready_tx.send(());
386
387            let (mut second_stream, _) = listener.accept().await.expect("accept 2");
388            read_http_request(&mut second_stream).await;
389            write_json_response(
390                &mut second_stream,
391                r#"{"result":"second","error":null,"id":1}"#,
392            )
393            .await;
394        });
395
396        let url = format!("http://{}", addr);
397        let client = Client::new(
398            url,
399            Auth::UserPass("user".into(), "pass".into()),
400            Some(3),
401            Some(10),
402            Some(5),
403        )
404        .expect("client");
405
406        let first: String = client.call("ping", &[]).await.expect("first call");
407        assert_eq!(first, "first");
408
409        ready_rx.await.expect("ready signal");
410
411        let second: String = timeout(Duration::from_secs(5), client.call("ping", &[]))
412            .await
413            .expect("call did not time out")
414            .expect("second call");
415        assert_eq!(second, "second");
416
417        server.await.expect("server task");
418    }
419}