Skip to main content

ethos_bitcoind/node/
node_manager.rs

1//! Node module for Bitcoin Core RPC testing
2//!
3//! This module provides utilities for managing Bitcoin Core nodes in test environments.
4
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use tempfile::TempDir;
10use tokio::process::{Child, Command};
11use tokio::sync::{Mutex, RwLock};
12use std::process::Stdio;
13
14use crate::test_config::TestConfig;
15use crate::transport::{TransportError, core::TransportExt};
16
17use tracing::{info, debug, error};
18
19use tokio::io::AsyncBufReadExt;
20use std::time::Instant;
21
22/// Represents the current state of a node
23#[derive(Debug, Default, Clone)]
24pub struct NodeState {
25    /// Whether the node is currently running
26    pub is_running: bool,
27}
28
29/// Configuration for port selection behavior
30#[derive(Debug, Clone)]
31pub enum PortSelection {
32    /// Use the specified port number
33    Fixed(u16),
34    /// Let the OS assign an available port
35    Dynamic,
36    /// Use port 0 (not recommended, may cause daemon to fail)
37    Zero,
38}
39
40/// Trait defining the interface for a node manager
41#[async_trait]
42pub trait NodeManager: Send + Sync + std::any::Any + std::fmt::Debug {
43    async fn start(&self) -> Result<(), TransportError>;
44    async fn stop(&mut self) -> Result<(), TransportError>;
45    async fn get_state(&self) -> Result<NodeState, TransportError>;
46    /// Return the RPC port this manager was configured with
47    fn rpc_port(&self) -> u16;
48    /// Return the RPC username this manager was configured with
49    fn rpc_username(&self) -> &str;
50    /// Return the RPC password this manager was configured with
51    fn rpc_password(&self) -> &str;
52    /// Create a transport for communicating with the node
53    async fn create_transport(&self) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError>;
54}
55
56/// Implementation of the node manager
57#[derive(Debug)]
58pub struct BitcoinNodeManager {
59    /// Shared state of the node
60    state: Arc<RwLock<NodeState>>,
61    /// Child process handle for the daemon
62    child: Arc<Mutex<Option<Child>>>,
63    /// RPC port for communication with the node
64    pub rpc_port: u16,
65    /// Test configuration for the node
66    config: TestConfig,
67    /// Temporary directory for node data (cleaned up on drop)
68    _datadir: Option<TempDir>,
69}
70
71impl BitcoinNodeManager {
72    /// Create a new node manager with default configuration
73    pub fn new() -> Result<Self, TransportError> {
74        Self::new_with_config(&TestConfig::default())
75    }
76
77    /// Create a new node manager with custom configuration
78    pub fn new_with_config(config: &TestConfig) -> Result<Self, TransportError> {
79        let datadir = TempDir::new()?;
80
81        // Handle automatic port selection
82        let rpc_port = if config.rpc_port == 0 {
83            // Get a random free port by binding to 127.0.0.1:0
84            // The listener is dropped at the end of the block, freeing the port
85            {
86                let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
87                listener.local_addr()?.port()
88            }
89        } else {
90            config.rpc_port
91        };
92
93        Ok(Self {
94            state: Arc::new(RwLock::new(NodeState::default())),
95            child: Arc::new(Mutex::new(None)),
96            rpc_port,
97            config: config.clone(),
98            _datadir: Some(datadir),
99        })
100
101    }
102
103    /// Get the RPC port for this node manager
104    pub fn rpc_port(&self) -> u16 { self.rpc_port }
105
106    /// Get the RPC username from the configuration
107    pub fn rpc_username(&self) -> &str { &self.config.rpc_username }
108
109    /// Get the RPC password from the configuration
110    pub fn rpc_password(&self) -> &str { &self.config.rpc_password }
111
112}
113
114#[async_trait]
115impl NodeManager for BitcoinNodeManager {
116
117    async fn start(&self) -> Result<(), TransportError> {
118        let mut state = self.state.write().await;
119        if state.is_running {
120            return Ok(());
121        }
122
123        let datadir = self._datadir.as_ref().unwrap().path();
124        let mut cmd = Command::new("bitcoind");
125
126        let chain = format!("-chain={}", self.config.as_chain_str());
127        let data_dir = format!("-datadir={}", datadir.display());
128        let rpc_port = format!("-rpcport={}", self.rpc_port);
129        let rpc_bind = format!("-rpcbind=127.0.0.1:{}", self.rpc_port);
130        let rpc_user = format!("-rpcuser={}", self.config.rpc_username);
131        let rpc_password = format!("-rpcpassword={}", self.config.rpc_password);
132
133        let mut args = vec![
134            &chain,
135            "-listen=0",
136            &data_dir,
137            &rpc_port,
138            &rpc_bind,
139            "-rpcallowip=127.0.0.1",
140            "-fallbackfee=0.0002",
141            "-server=1",
142            "-prune=1",
143            &rpc_user,
144            &rpc_password,
145        ];
146
147        for arg in &self.config.extra_args {
148            args.push(arg);
149        }
150
151        cmd.args(&args);
152
153        // Capture both stdout and stderr for better error reporting
154        cmd.stderr(Stdio::piped());
155        cmd.stdout(Stdio::piped());
156
157        let mut child = cmd.spawn()?;
158
159        // Read stderr in a separate task
160        let stderr = child.stderr.take().unwrap();
161        let stderr_reader = tokio::io::BufReader::new(stderr);
162        tokio::spawn(async move {
163            let mut lines = stderr_reader.lines();
164            while let Ok(Some(line)) = lines.next_line().await {
165                error!("bitcoind stderr: {}", line);
166            }
167        });
168
169        // Read stdout in a separate task
170        let stdout = child.stdout.take().unwrap();
171        let stdout_reader = tokio::io::BufReader::new(stdout);
172        tokio::spawn(async move {
173            let mut lines = stdout_reader.lines();
174            while let Ok(Some(line)) = lines.next_line().await {
175                info!("bitcoind stdout: {}", line);
176            }
177        });
178
179        // Store the child process
180        let mut child_guard = self.child.lock().await;
181        *child_guard = Some(child);
182
183        info!("Waiting for bitcoind node to initialize...");
184        tokio::time::sleep(Duration::from_millis(150)).await;
185
186        // Wait for node to be ready
187        let deadline = Instant::now() + Duration::from_secs(10);
188        let mut attempts = 0;
189        while Instant::now() < deadline {
190            if let Some(child) = child_guard.as_mut() {
191                if let Ok(Some(status)) = child.try_wait() {
192                    let error = format!("bitcoind node exited early with status: {}", status);
193                    error!("{}", error);
194                    return Err(TransportError::Rpc(error));
195                }
196            }
197
198            // Try to connect to RPC
199            let client = reqwest::Client::new();
200            match client
201                .post(format!("http://127.0.0.1:{}/", self.rpc_port))
202                .basic_auth(&self.config.rpc_username, Some(&self.config.rpc_password))
203                .json(&serde_json::json!({
204                    "jsonrpc": "2.0",
205                    "method": "getnetworkinfo",
206                    "params": [],
207                    "id": 1
208                }))
209                .send()
210                .await
211            {
212                Ok(response) =>
213                    if response.status().is_success() {
214                        state.is_running = true;
215                        info!("bitcoind node started successfully on port {}", self.rpc_port);
216                        return Ok(());
217                    } else {
218                        debug!(
219                            "RPC request failed with status {} (attempt {})",
220                            response.status(),
221                            attempts
222                        );
223                    },
224                Err(e) => {
225                    debug!("Failed to connect to RPC (attempt {}): {}", attempts, e);
226                }
227            }
228
229            attempts += 1;
230            tokio::time::sleep(Duration::from_millis(200)).await;
231        }
232
233        let error = format!(
234            "Timed out waiting for bitcoind node to start on port {} after {} attempts",
235            self.rpc_port, attempts
236        );
237        error!("{}", error);
238        return Err(TransportError::Rpc(error));
239
240    }
241
242    async fn stop(&mut self) -> Result<(), TransportError> {
243        let mut state = self.state.write().await;
244        if !state.is_running {
245            return Ok(());
246        }
247
248        let mut child = self.child.lock().await;
249        if let Some(mut child_process) = child.take() {
250            info!("Stopping bitcoind node...");
251            let _ = child_process.kill().await;
252        }
253
254        state.is_running = false;
255        info!("bitcoind node stopped");
256        Ok(())
257    }
258
259    async fn get_state(&self) -> Result<NodeState, TransportError> {
260        Ok(self.state.read().await.clone())
261    }
262
263    fn rpc_port(&self) -> u16 { self.rpc_port }
264
265    fn rpc_username(&self) -> &str { &self.config.rpc_username }
266
267    fn rpc_password(&self) -> &str { &self.config.rpc_password }
268
269    async fn create_transport(&self) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError> {
270        use std::sync::Arc;
271        use crate::transport::DefaultTransport;
272
273        // Create HTTP transport for Bitcoin Core
274        let rpc_url = format!("http://127.0.0.1:{}", self.rpc_port());
275        let auth = Some((self.rpc_username().to_string(), self.rpc_password().to_string()));
276        let transport = Arc::new(DefaultTransport::new(rpc_url, auth));
277
278        // Wait for node to be ready for RPC with Bitcoin Core specific initialization logic
279        // Bitcoin Core initialization states that require waiting:
280        // -28: RPC in warmup
281        // -4:  RPC in warmup (alternative code)
282        let init_states = [
283            "\"code\":-28",
284            "\"code\":-4",
285        ];
286
287        let max_retries = 30;
288        let mut retries = 0;
289
290        loop {
291            match transport.call::<serde_json::Value>("getnetworkinfo", &[]).await {
292                Ok(_) => break,
293                Err(TransportError::Rpc(e)) => {
294                    // Check if the error matches any known initialization state
295                    let is_init_state = init_states.iter().any(|state| e.contains(state));
296                    if is_init_state && retries < max_retries {
297                        tracing::debug!("Waiting for initialization: {} (attempt {}/{})", e, retries + 1, max_retries);
298                        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
299                        retries += 1;
300                        continue;
301                    }
302                    return Err(TransportError::Rpc(e));
303                }
304                Err(e) => return Err(e),
305            }
306        }
307
308        if retries > 0 {
309            tracing::debug!("Node initialization completed after {} attempts", retries);
310        }
311
312        Ok(transport)
313    }
314
315}
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320
321    #[test]
322    fn test_extra_args() {
323        let config = crate::test_config::TestConfig {
324            extra_args: vec!["-debug=1".to_string()],
325            ..crate::test_config::TestConfig::default()
326        };
327
328        let node_manager = BitcoinNodeManager::new_with_config(&config)
329            .expect("Failed to create node manager with extra args");
330
331        assert_eq!(node_manager.config.extra_args[0], "-debug=1");
332    }
333}