Skip to main content

ethos_bitcoind/node/
node_manager.rs

1//! Node module for Bitcoin Core RPC testing
2//!
3//! This module provides utilities for managing Bitcoin Core nodes in test environments.
4
5use std::process::Stdio;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use async_trait::async_trait;
10use tempfile::TempDir;
11use tokio::io::AsyncBufReadExt;
12use tokio::process::{Child, Command};
13use tokio::sync::{Mutex, RwLock};
14use tracing::{debug, error, info};
15
16use crate::test_config::TestConfig;
17use crate::transport::core::TransportExt;
18use crate::transport::{DefaultTransport, TransportError};
19
20/// RPC codes indicating node still initializing (-28 warmup, -4 warmup).
21const INIT_WAIT_RPC_CODES: [&str; 2] = ["\"code\":-28", "\"code\":-4"];
22const INIT_MAX_RETRIES: u32 = 30;
23
24/// Represents the current state of a node
25#[derive(Debug, Default, Clone)]
26pub struct NodeState {
27    /// Whether the node is currently running
28    pub is_running: bool,
29}
30
31/// Configuration for port selection behavior
32#[derive(Debug, Clone)]
33pub enum PortSelection {
34    /// Use the specified port number
35    Fixed(u16),
36    /// Let the OS assign an available port
37    Dynamic,
38    /// Use port 0 (not recommended, may cause daemon to fail)
39    Zero,
40}
41
42/// Trait defining the interface for a node manager
43#[async_trait]
44pub trait NodeManager: Send + Sync + std::any::Any + std::fmt::Debug {
45    async fn start(&self) -> Result<(), TransportError>;
46    async fn stop(&mut self) -> Result<(), TransportError>;
47    async fn get_state(&self) -> Result<NodeState, TransportError>;
48    /// Return the RPC port this manager was configured with
49    fn rpc_port(&self) -> u16;
50    /// Return the RPC username this manager was configured with
51    fn rpc_username(&self) -> &str;
52    /// Return the RPC password this manager was configured with
53    fn rpc_password(&self) -> &str;
54    /// Create a transport for communicating with the node
55    async fn create_transport(
56        &self,
57    ) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError>;
58}
59
60/// Implementation of the node manager
61#[derive(Debug)]
62pub struct BitcoinNodeManager {
63    /// Shared state of the node
64    state: Arc<RwLock<NodeState>>,
65    /// Child process handle for the daemon
66    child: Arc<Mutex<Option<Child>>>,
67    /// RPC port for communication with the node
68    pub rpc_port: u16,
69    /// Test configuration for the node
70    config: TestConfig,
71    /// Temporary directory for node data (cleaned up on drop)
72    _datadir: Option<TempDir>,
73}
74
75impl BitcoinNodeManager {
76    /// Create a new node manager with default configuration
77    pub fn new() -> Result<Self, TransportError> { Self::new_with_config(&TestConfig::default()) }
78
79    /// Create a new node manager with custom configuration
80    pub fn new_with_config(config: &TestConfig) -> Result<Self, TransportError> {
81        let datadir = TempDir::new()?;
82
83        // Handle automatic port selection
84        let rpc_port = if config.rpc_port == 0 {
85            // Get a random free port by binding to 127.0.0.1:0
86            // The listener is dropped at the end of the block, freeing the port
87            {
88                let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
89                listener.local_addr()?.port()
90            }
91        } else {
92            config.rpc_port
93        };
94
95        Ok(Self {
96            state: Arc::new(RwLock::new(NodeState::default())),
97            child: Arc::new(Mutex::new(None)),
98            rpc_port,
99            config: config.clone(),
100            _datadir: Some(datadir),
101        })
102    }
103
104    /// Get the RPC port for this node manager
105    pub fn rpc_port(&self) -> u16 { self.rpc_port }
106
107    /// Gets the test configuration used by this node manager
108    pub fn config(&self) -> &TestConfig { &self.config }
109
110    /// Get the RPC username from the configuration
111    pub fn rpc_username(&self) -> &str { &self.config.rpc_username }
112
113    /// Get the RPC password from the configuration
114    pub fn rpc_password(&self) -> &str { &self.config.rpc_password }
115}
116
117#[async_trait]
118impl NodeManager for BitcoinNodeManager {
119    /// Start the node. The datadir is always `Some` when the manager is constructed via `new` / `new_with_config`.
120    async fn start(&self) -> Result<(), TransportError> {
121        let mut state = self.state.write().await;
122        if state.is_running {
123            return Ok(());
124        }
125
126        let datadir = self._datadir.as_ref().expect("datadir is set at construction").path();
127        let exe = self
128            .config
129            .bitcoind_path
130            .as_deref()
131            .unwrap_or_else(|| std::path::Path::new("bitcoind"));
132        let mut cmd = Command::new(exe);
133
134        let chain_str = self
135            .config
136            .as_chain_str()
137            .map_err(|_| TransportError::Rpc("Unsupported network".into()))?;
138        let chain = format!("-chain={}", chain_str);
139        let data_dir = format!("-datadir={}", datadir.display());
140        let rpc_port = format!("-rpcport={}", self.rpc_port);
141        let rpc_bind = format!("-rpcbind=127.0.0.1:{}", self.rpc_port);
142        let rpc_user = format!("-rpcuser={}", self.config.rpc_username);
143        let rpc_password = format!("-rpcpassword={}", self.config.rpc_password);
144
145        let mut args = vec![
146            &chain,
147            "-listen=0",
148            &data_dir,
149            &rpc_port,
150            &rpc_bind,
151            "-rpcallowip=127.0.0.1",
152            "-fallbackfee=0.0002",
153            "-server=1",
154            "-prune=1",
155            &rpc_user,
156            &rpc_password,
157        ];
158
159        for arg in &self.config.extra_args {
160            args.push(arg);
161        }
162
163        cmd.args(&args);
164
165        // Capture both stdout and stderr for better error reporting
166        cmd.stderr(Stdio::piped());
167        cmd.stdout(Stdio::piped());
168
169        let mut child = cmd.spawn()?;
170
171        // Read stderr in a separate task
172        let stderr = child.stderr.take().unwrap();
173        let stderr_reader = tokio::io::BufReader::new(stderr);
174        tokio::spawn(async move {
175            let mut lines = stderr_reader.lines();
176            while let Ok(Some(line)) = lines.next_line().await {
177                error!("bitcoind stderr: {}", line);
178            }
179        });
180
181        // Read stdout in a separate task
182        let stdout = child.stdout.take().unwrap();
183        let stdout_reader = tokio::io::BufReader::new(stdout);
184        tokio::spawn(async move {
185            let mut lines = stdout_reader.lines();
186            while let Ok(Some(line)) = lines.next_line().await {
187                info!("bitcoind stdout: {}", line);
188            }
189        });
190
191        // Store the child process
192        let mut child_guard = self.child.lock().await;
193        *child_guard = Some(child);
194
195        info!("Waiting for bitcoind node to initialize...");
196        tokio::time::sleep(Duration::from_millis(150)).await;
197
198        // Create transport for RPC health check
199        let transport = DefaultTransport::new(
200            format!("http://127.0.0.1:{}/", self.rpc_port),
201            Some((self.config.rpc_username.clone(), self.config.rpc_password.clone())),
202        );
203
204        // Wait for node to be ready
205        let deadline = Instant::now() + Duration::from_secs(10);
206        let mut attempts = 0;
207        let mut last_error: Option<String> = None;
208        while Instant::now() < deadline {
209            if let Some(child) = child_guard.as_mut() {
210                if let Ok(Some(status)) = child.try_wait() {
211                    let error = format!("bitcoind node exited early with status: {}", status);
212                    error!("{}", error);
213                    return Err(TransportError::Rpc(error));
214                }
215            }
216
217            // Try to connect to RPC
218            match transport.call::<serde_json::Value>("getnetworkinfo", &[]).await {
219                Ok(_) => {
220                    state.is_running = true;
221                    info!("bitcoind node started successfully on port {}", self.rpc_port);
222                    return Ok(());
223                }
224                Err(e) => {
225                    last_error = Some(e.to_string());
226                    debug!("Failed to connect to RPC (attempt {}): {}", attempts, e);
227                }
228            }
229
230            attempts += 1;
231            tokio::time::sleep(Duration::from_millis(200)).await;
232        }
233
234        let error = match last_error {
235            Some(e) => format!(
236                "Timed out waiting for node to start on port {} after {} attempts. Last RPC error: {}",
237                self.rpc_port, attempts, e
238            ),
239            None => format!(
240                "Timed out waiting for node to start on port {} after {} attempts",
241                self.rpc_port, attempts
242            ),
243        };
244        error!("{}", error);
245        return Err(TransportError::Rpc(error));
246    }
247
248    async fn stop(&mut self) -> Result<(), TransportError> {
249        let mut state = self.state.write().await;
250        if !state.is_running {
251            return Ok(());
252        }
253
254        let mut child = self.child.lock().await;
255        if let Some(mut child_process) = child.take() {
256            info!("Stopping bitcoind node...");
257            let _ = child_process.kill().await;
258        }
259
260        state.is_running = false;
261        info!("bitcoind node stopped");
262        Ok(())
263    }
264
265    async fn get_state(&self) -> Result<NodeState, TransportError> {
266        Ok(self.state.read().await.clone())
267    }
268
269    fn rpc_port(&self) -> u16 { self.rpc_port }
270
271    fn rpc_username(&self) -> &str { &self.config.rpc_username }
272
273    fn rpc_password(&self) -> &str { &self.config.rpc_password }
274
275    async fn create_transport(
276        &self,
277    ) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError> {
278        // Create HTTP transport for Bitcoin Core
279        let rpc_url = format!("http://127.0.0.1:{}", self.rpc_port());
280        let auth = Some((self.rpc_username().to_string(), self.rpc_password().to_string()));
281        let transport = Arc::new(DefaultTransport::new(rpc_url, auth));
282
283        let mut retries = 0;
284
285        loop {
286            match transport.call::<serde_json::Value>("getnetworkinfo", &[]).await {
287                Ok(_) => break,
288                Err(TransportError::Rpc(e)) => {
289                    let is_init_state = INIT_WAIT_RPC_CODES.iter().any(|state| e.contains(state));
290                    if is_init_state && retries < INIT_MAX_RETRIES {
291                        tracing::debug!(
292                            "Waiting for initialization: {} (attempt {}/{})",
293                            e,
294                            retries + 1,
295                            INIT_MAX_RETRIES
296                        );
297                        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
298                        retries += 1;
299                        continue;
300                    }
301                    return Err(TransportError::Rpc(e));
302                }
303                Err(e) => return Err(e),
304            }
305        }
306
307        if retries > 0 {
308            tracing::debug!("Node initialization completed after {} attempts", retries);
309        }
310
311        Ok(transport)
312    }
313}