halfin 0.3.8

A {regtest} bitcoin node runner 🏃‍♂️
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
//! # Halfin
//!
//! A bitcoin node running utility for integration testing.
//!
//! > A {regtest} bitcoin node runner 🏃‍♂️
//!
//! This crate makes it simple to run regtest [`bitcoind`](https://github.com/bitcoin/bitcoin)
//! and [`utreexod`](https://github.com/utreexo/utreexod) instances from Rust code, useful in
//! integration test contexts.
//!
//! Pretty much [`bitcoind`](https://crates.io/crates/bitcoind) with
//! [`utreexod`](https://github.com/utreexo/utreexod) support.
//!
//! ## Supported Implementations
//!
//! | Implementation | Version  | Feature Flag     | Default Feature |
//! |----------------|----------|----------------- | --------------- |
//! | `bitcoind`     | `v31.0`  | `bitcoind_31_0`  | Yes             |
//! |                |          |                  |                 |
//! | `utreexod`     | `v0.5.2` | `utreexod_0_5_2` | Yes             |
//!
//! ## Example
//!
//! ```rust,no_run
//! use halfin::connect;
//! use halfin::bitcoind::BitcoinD;
//! use halfin::utreexod::UtreexoD;
//!
//! let bitcoind = BitcoinD::new().unwrap();
//! bitcoind.generate(10).unwrap();
//! assert_eq!(bitcoind.get_chain_tip().unwrap(), 10);
//!
//! let utreexod = UtreexoD::new().unwrap();
//! utreexod.generate(10).unwrap();
//! assert_eq!(utreexod.get_chain_tip().unwrap(), 10);
//!
//! connect(&bitcoind, &utreexod).unwrap();
//! ```
//!
//! [`bitcoind`]: <https://github.com/bitcoin/bitcoin>
//! [`utreexod`]: <https://github.com/utreexo/utreexod>

use core::error;
use core::fmt;
use core::net::Ipv4Addr;
use core::net::SocketAddr;
use corepc_client::bitcoin::BlockHash;
use std::io::BufRead;
use std::io::BufReader;
use std::io::Read;
use std::net::TcpListener;
use std::path::PathBuf;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use tempfile::TempDir;
use tracing::debug;
use tracing::trace;

pub use serde_json;

#[allow(unused)]
pub(crate) use bitcoind::BitcoinD;
#[allow(unused)]
pub(crate) use utreexod::UtreexoD;

pub mod bitcoind;
pub mod utreexod;

/// The IPv4 localhost address.
const IPV4_LOCALHOST: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);

/// The maximum number of attempts at instantiating a [`BitcoinD`]/[`UtreexoD`].
pub const NODE_BUILDING_MAX_RETRIES: u8 = 5;

/// The [`Duration`] between attempts at instantiating a [`Node`].
pub const NODE_BUILDING_INTERVAL: Duration = Duration::from_millis(500);

/// The [`Duration`] interval between polls for [`connect`] and [`wait_for_height`].
pub const POLL_INTERVAL: Duration = Duration::from_millis(100);

/// The timeout [`Duration`] for [`connect`] and [`wait_for_height`].
pub const WAIT_TIMEOUT: Duration = Duration::from_secs(10);

/// The interval [`Duration`] between successive attempts of node connection.
pub const CONNECTION_INTERVAL: Duration = Duration::from_millis(150);

/// The timeout [`Duration`] for node connection.
pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);

/// Common interface across all node implementations ([`BitcoinD`]/[`UtreexoD`]).
pub trait Node {
    /// The [`Node`]'s human-readable name.
    fn get_name() -> &'static str;

    /// The [`Node`]'s binary name.
    fn get_bin_name() -> &'static str;

    /// Get the [`Node`]'s current chain height.
    fn get_chain_tip(&self) -> Result<u32, Error>;

    /// Get the [`Node`]'s current CBF height.
    fn get_filter_tip(&self) -> Result<u32, Error>;

    // Get the [`BlockHash`] of the block at `height`.
    fn get_block_hash(&self, height: u32) -> Result<BlockHash, Error>;

    /// Call a JSON-RPC `method` with the given `args` list.
    ///
    /// Response deserialization is not implemented for this method.
    ///
    /// It's up to the caller to parse the returned
    /// [`Value`](serde_json::Value) into a meaningful type.
    fn call(&self, method: &str, args: &[serde_json::Value]) -> Result<serde_json::Value, Error>;

    /// Get the [`Node`]'s P2P [`SocketAddr`].
    fn get_p2p_socket(&self) -> SocketAddr;

    /// Check whether the [`Node`] is connected to a peer with a specific [`SocketAddr`].
    fn has_peer(&self, socket: SocketAddr) -> Result<bool, Error>;

    /// Connect this [`Node`] to a peer at `socket` over P2P.
    fn add_peer(&self, socket: SocketAddr) -> Result<(), Error>;

    /// Get this [`Node`]' s peer count.
    fn get_peer_count(&self) -> Result<u32, Error>;

    /// How long to sleep between `get_height` RPC calls.
    ///
    /// Defaults to [`POLL_INTERVAL`].
    ///
    /// Override for nodes that need a longer settling time between RPC calls.
    fn poll_interval() -> Duration {
        POLL_INTERVAL
    }

    /// How long `wait_for_height` will poll before giving up.
    ///
    /// Defaults to [`WAIT_TIMEOUT`].
    ///
    /// Override for nodes that need more time to process blocks
    /// (e.g. [`UtreexoD`] needs more time to build the Merkle forest).
    fn wait_timeout() -> Duration {
        WAIT_TIMEOUT
    }
}

/// Connect [`Node`] A to [`Node`] B.
pub fn connect<A: Node, B: Node>(a: &A, b: &B) -> Result<(), Error> {
    let socket_a = a.get_p2p_socket();
    let socket_b = b.get_p2p_socket();

    debug!("Connecting socket={} to socket={}", socket_a, socket_b);

    a.add_peer(socket_b)?;

    let is_connected =
        || -> Result<bool, Error> { Ok(a.has_peer(socket_b)? || b.has_peer(socket_a)?) };

    // Wait for either side to confirm the connection by listening port.
    // We check both because `utreexod` does not expose the peer's listening
    // port in `getpeerinfo` for inbound connections, so only one side may
    // be able to verify by socket address.
    let start = Instant::now();
    while start.elapsed() < CONNECTION_TIMEOUT {
        if is_connected()? {
            // Allow time for v2 transport negotiation to settle,
            // or for v1 fallback to complete if v2 fails, then re-verify.
            thread::sleep(CONNECTION_INTERVAL * 4);
            if is_connected()? {
                debug!("Connected socket={} to socket={}", socket_a, socket_b);
                return Ok(());
            }
        }
        thread::sleep(CONNECTION_INTERVAL);
    }

    Err(Error::ConnectionTimeout(CONNECTION_TIMEOUT))
}

/// Connect [`Node`] A to [`Node`] B and wait for them to synchronize chains.
pub fn connect_and_sync<A: Node, B: Node>(a: &A, b: &B) -> Result<(), Error> {
    connect(a, b)?;

    let height_a = a.get_chain_tip()?;
    let height_b = b.get_chain_tip()?;

    let max_height = std::cmp::max(height_a, height_b);
    wait_for_height(a, max_height)?;
    wait_for_height(b, max_height)?;

    Ok(())
}

/// Poll a [`Node`] until its chain reaches `height`.
///
/// Throws an error if the node does not reach `height` within [`Node::wait_timeout`].
pub fn wait_for_height<N: Node>(node: &N, height: u32) -> Result<(), Error> {
    debug!("Waiting for {} to reach height={}", N::get_name(), height);

    let start = Instant::now();
    while start.elapsed() < N::wait_timeout() {
        if node.get_chain_tip().unwrap_or(0) >= height {
            return Ok(());
        }
        thread::sleep(N::poll_interval());
    }

    let curr_height = node.get_chain_tip().unwrap_or(0);
    Err(Error::ChainSyncTimeOut((
        height,
        curr_height,
        N::wait_timeout(),
    )))
}

/// Poll a [`Node`] until its chain reaches `height` with a custom `timeout`.
///
/// Throws an error if the node does not reach `height` within `timeout`.
pub fn wait_for_height_with_timeout<N: Node>(
    node: &N,
    height: u32,
    timeout: Duration,
) -> Result<(), Error> {
    debug!(
        "Waiting for {} to reach height={} (timeout={:?})",
        N::get_name(),
        height,
        timeout
    );

    let start = Instant::now();
    while start.elapsed() < timeout {
        if node.get_chain_tip().unwrap_or(0) >= height {
            return Ok(());
        }
        thread::sleep(N::poll_interval());
    }

    let curr_height = node.get_chain_tip().unwrap_or(0);
    Err(Error::ChainSyncTimeOut((height, curr_height, timeout)))
}

/// Poll a [`Node`] until its Compact Block Filters reach `height`.
///
/// Throws an error if the node does not reach `filter_height` within [`Node::wait_timeout`].
pub fn wait_for_filter_height<N: Node>(node: &N, filter_height: u32) -> Result<(), Error> {
    debug!(
        "Waiting for {} to reach filter height={}",
        N::get_name(),
        filter_height
    );

    let start = Instant::now();
    while start.elapsed() < N::wait_timeout() {
        if node.get_filter_tip().unwrap_or(0) >= filter_height {
            return Ok(());
        }
        thread::sleep(N::poll_interval());
    }

    let curr_filter_height = node.get_filter_tip().unwrap_or(0);
    Err(Error::ChainSyncTimeOut((
        filter_height,
        curr_filter_height,
        N::wait_timeout(),
    )))
}

/// Spawn a background thread that reads `reader` line by line and re-emits
/// each line as a [`trace!`] event, prefixed with `source`.
///
/// Used to pipe a child [`BitcoinD`]/[`UtreexoD`] process's `stdout`/`stderr`
/// into [`tracing`]. The thread exits on EOF, which happens when the process
/// dies and its pipe is closed.
pub(crate) fn pipe_to_tracing<R: Read + Send + 'static>(reader: R, source: &'static str) {
    thread::spawn(move || {
        let mut lines = BufReader::new(reader).lines();
        while let Some(Ok(line)) = lines.next() {
            // Skip blank lines so the trace stream mirrors the node's output.
            if !line.trim().is_empty() {
                trace!("{source}: {line}");
            }
        }
    });
}

/// Ask the OS for an available port, immediately unbind and return it.
///
/// Inlining is needed to curb TOCTOU race conditions.
#[inline]
pub fn get_available_port() -> u16 {
    TcpListener::bind((IPV4_LOCALHOST, 0))
        .unwrap()
        .local_addr()
        .unwrap()
        .port()
}

/// Owns a node's working directory, either as a temporary or a persistent path.
///
/// * [`DataDir::Temporary`]: backed by a [`TempDir`]; the directory is
///   deleted automatically when this value is dropped.
/// * [`DataDir::Persistent`]: backed by a plain [`PathBuf`]; the directory
///   survives the process and is never cleaned up automatically.
#[derive(Debug)]
pub enum DataDir {
    /// A persistent directory that is **not** cleaned up on drop.
    Persistent(PathBuf),
    /// A temporary directory that is deleted when this value is dropped.
    Temporary(TempDir),
}

impl DataDir {
    /// Return the underlying filesystem path regardless of variant.
    pub fn path(&self) -> PathBuf {
        match self {
            Self::Persistent(path) => path.to_owned(),
            Self::Temporary(tmp_dir) => tmp_dir.path().to_path_buf(),
        }
    }
}

#[derive(Debug)]
pub enum Error {
    /// The binary path is not absolute.
    BinaryPathNotAbsolute { bin_name: String, path: String },

    /// The binary path is not a file.
    BinaryPathNotFile { bin_name: String, path: String },

    /// The binary was not found at the expected location.
    BinaryNotFound((String, PathBuf)),

    /// Failed to spawn a [process](std::process::Child) for [`BitcoinD`]/[`UtreexoD`].
    FailedToSpawn(std::io::Error),

    /// Failed to instantiate a [`BitcoinD`]/[`UtreexoD`] after [`NODE_BUILDING_MAX_RETRIES`] attempts.
    ExhaustedNodeBuildingRetries(u8),

    /// Failed to stop [`BitcoinD`] or [`UtreexoD`] over JSON-RPC (e.g. `bitcoin-cli -regtest stop`).
    FailedToStop(corepc_client::client_sync::Error),

    /// I/O errors.
    Io(std::io::Error),

    /// JSON-RPC Errors.
    JsonRpc(corepc_client::client_sync::Error),

    /// Timed out whilst waiting for peer connection to succeed.
    PeerConnectionTimeout((SocketAddr, SocketAddr)),

    /// Both `tmpdir` and `workdir` were specified.
    BothDirsSpecified,

    /// [`BitcoinD`] is unresponsive (it's probably not running).
    UnresponsiveBitcoinD(corepc_client::client_sync::Error),

    /// [`UtreexoD`] is unresponsive (it's probably not running).
    UnresponsiveUtreexoD(corepc_client::client_sync::Error),

    /// Timed out whilst waiting for the cookie file to be generated.
    CookieFileTimeout(PathBuf),

    /// Timed out whilst waiting for the JSON-RPC client to be ready.
    RpcClientSetupTimeout,

    /// Received an unexpected response from the JSON-RPC server
    UnexpectedResponse(String),

    /// Timed out whilst waiting for the [`Node`]'s chain to synchronize up to `height`
    ChainSyncTimeOut((u32, u32, Duration)), // (current_height, target_height, timeout)

    /// Timed out whilst waiting for the [`Node`]'s to connect to each other.
    ConnectionTimeout(Duration),
}

#[rustfmt::skip]
impl fmt::Display for Error {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        use Error::*;

        match self {
            BinaryPathNotAbsolute { bin_name, path } => write!(f, "The `{}` binary path is not absolute (path={})", bin_name, path),
            BinaryPathNotFile { bin_name, path } => write!(f, "The `{}` binary path is not a file (path={})", bin_name, path),
            BinaryNotFound((bin_name, path)) => write!(f, "The `{}` binary was not found at the expected location={}", bin_name, path.display()),
            FailedToSpawn(err) => write!(f, "Failed to spawn a process for the node: {err:?}"),
            ExhaustedNodeBuildingRetries(retries) => write!(f, "Failed to instantiate the node after {} attempts", retries),
            FailedToStop(err) => write!(f, "Failed to stop the node over JSON-RPC: {err:?}"),
            Io(err) => write!(f, "I/O Error: {err:?}"),
            JsonRpc(err) => write!(f, "JSON-RPC Error: {err:?}"),
            PeerConnectionTimeout((local_socket, remote_socket)) => write!(f, "Timed out whilst waiting for connection between local={local_socket} and remote={remote_socket}"),
            BothDirsSpecified => write!(f, "Both `tempdir` and `workdir` were specified. You must choose one and only one"),
            UnresponsiveBitcoinD(err) => write!(f, "`BitcoinD` is unresponsive to JSON-RPC calls: {err:?}"),
            UnresponsiveUtreexoD(err) => write!(f, "`UtreexoD` is unresponsive to JSON-RPC calls: {err:?}"),
            CookieFileTimeout(cookie_path) => write!(f, "Timed out whilst waiting for the cookie={} to be generated", cookie_path.display()),
            RpcClientSetupTimeout => write!(f, "Timed out whilst waiting for the JSON-RPC client to be ready"),
            UnexpectedResponse(err) => write!(f, "Received an unexpected response from the JSON-RPC server: {err:?}"),
            ChainSyncTimeOut((target_height, current_height, timeout)) => write!(
                f,
                "Timed out after {} seconds whilst waiting for the node's chain to synchronize to height={} (current height={})",
                target_height, current_height, timeout.as_secs()
            ),
            ConnectionTimeout(timeout) => write!(
                f,
                "Timed out after {} seconds whilst waiting for the nodes to connect to each other",
                timeout.as_secs()
            ),
        }
    }
}

impl error::Error for Error {}