Skip to main content

ethos_bitcoind/node/
node_manager.rs

1//! Node module for Bitcoin Core RPC testing
2//!
3//! This module provides utilities for managing Bitcoin Core nodes in test environments.
4
5use std::process::Stdio;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use async_trait::async_trait;
10use tempfile::TempDir;
11use tokio::io::AsyncBufReadExt;
12use tokio::process::{Child, Command};
13use tokio::sync::{Mutex, RwLock};
14use tracing::{debug, error, info};
15
16use crate::test_config::TestConfig;
17use crate::transport::core::TransportExt;
18use crate::transport::{DefaultTransport, TransportError};
19
20/// Represents the current state of a node
21#[derive(Debug, Default, Clone)]
22pub struct NodeState {
23    /// Whether the node is currently running
24    pub is_running: bool,
25}
26
27/// Configuration for port selection behavior
28#[derive(Debug, Clone)]
29pub enum PortSelection {
30    /// Use the specified port number
31    Fixed(u16),
32    /// Let the OS assign an available port
33    Dynamic,
34    /// Use port 0 (not recommended, may cause daemon to fail)
35    Zero,
36}
37
38/// Trait defining the interface for a node manager
39#[async_trait]
40pub trait NodeManager: Send + Sync + std::any::Any + std::fmt::Debug {
41    async fn start(&self) -> Result<(), TransportError>;
42    async fn stop(&mut self) -> Result<(), TransportError>;
43    async fn get_state(&self) -> Result<NodeState, TransportError>;
44    /// Return the RPC port this manager was configured with
45    fn rpc_port(&self) -> u16;
46    /// Return the RPC username this manager was configured with
47    fn rpc_username(&self) -> &str;
48    /// Return the RPC password this manager was configured with
49    fn rpc_password(&self) -> &str;
50    /// Create a transport for communicating with the node
51    async fn create_transport(
52        &self,
53    ) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError>;
54}
55
56/// Implementation of the node manager
57#[derive(Debug)]
58pub struct BitcoinNodeManager {
59    /// Shared state of the node
60    state: Arc<RwLock<NodeState>>,
61    /// Child process handle for the daemon
62    child: Arc<Mutex<Option<Child>>>,
63    /// RPC port for communication with the node
64    pub rpc_port: u16,
65    /// Test configuration for the node
66    config: TestConfig,
67    /// Temporary directory for node data (cleaned up on drop)
68    _datadir: Option<TempDir>,
69}
70
71impl BitcoinNodeManager {
72    /// Create a new node manager with default configuration
73    pub fn new() -> Result<Self, TransportError> { Self::new_with_config(&TestConfig::default()) }
74
75    /// Create a new node manager with custom configuration
76    pub fn new_with_config(config: &TestConfig) -> Result<Self, TransportError> {
77        let datadir = TempDir::new()?;
78
79        // Handle automatic port selection
80        let rpc_port = if config.rpc_port == 0 {
81            // Get a random free port by binding to 127.0.0.1:0
82            // The listener is dropped at the end of the block, freeing the port
83            {
84                let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
85                listener.local_addr()?.port()
86            }
87        } else {
88            config.rpc_port
89        };
90
91        Ok(Self {
92            state: Arc::new(RwLock::new(NodeState::default())),
93            child: Arc::new(Mutex::new(None)),
94            rpc_port,
95            config: config.clone(),
96            _datadir: Some(datadir),
97        })
98    }
99
100    /// Get the RPC port for this node manager
101    pub fn rpc_port(&self) -> u16 { self.rpc_port }
102
103    /// Gets the test configuration used by this node manager
104    pub fn config(&self) -> &TestConfig { &self.config }
105
106    /// Get the RPC username from the configuration
107    pub fn rpc_username(&self) -> &str { &self.config.rpc_username }
108
109    /// Get the RPC password from the configuration
110    pub fn rpc_password(&self) -> &str { &self.config.rpc_password }
111}
112
113#[async_trait]
114impl NodeManager for BitcoinNodeManager {
115    async fn start(&self) -> Result<(), TransportError> {
116        let mut state = self.state.write().await;
117        if state.is_running {
118            return Ok(());
119        }
120
121        let datadir = self._datadir.as_ref().unwrap().path();
122        let exe = self
123            .config
124            .bitcoind_path
125            .as_deref()
126            .unwrap_or_else(|| std::path::Path::new("bitcoind"));
127        let mut cmd = Command::new(exe);
128
129        let chain = format!("-chain={}", self.config.as_chain_str());
130        let data_dir = format!("-datadir={}", datadir.display());
131        let rpc_port = format!("-rpcport={}", self.rpc_port);
132        let rpc_bind = format!("-rpcbind=127.0.0.1:{}", self.rpc_port);
133        let rpc_user = format!("-rpcuser={}", self.config.rpc_username);
134        let rpc_password = format!("-rpcpassword={}", self.config.rpc_password);
135
136        let mut args = vec![
137            &chain,
138            "-listen=0",
139            &data_dir,
140            &rpc_port,
141            &rpc_bind,
142            "-rpcallowip=127.0.0.1",
143            "-fallbackfee=0.0002",
144            "-server=1",
145            "-prune=1",
146            &rpc_user,
147            &rpc_password,
148        ];
149
150        for arg in &self.config.extra_args {
151            args.push(arg);
152        }
153
154        cmd.args(&args);
155
156        // Capture both stdout and stderr for better error reporting
157        cmd.stderr(Stdio::piped());
158        cmd.stdout(Stdio::piped());
159
160        let mut child = cmd.spawn()?;
161
162        // Read stderr in a separate task
163        let stderr = child.stderr.take().unwrap();
164        let stderr_reader = tokio::io::BufReader::new(stderr);
165        tokio::spawn(async move {
166            let mut lines = stderr_reader.lines();
167            while let Ok(Some(line)) = lines.next_line().await {
168                error!("bitcoind stderr: {}", line);
169            }
170        });
171
172        // Read stdout in a separate task
173        let stdout = child.stdout.take().unwrap();
174        let stdout_reader = tokio::io::BufReader::new(stdout);
175        tokio::spawn(async move {
176            let mut lines = stdout_reader.lines();
177            while let Ok(Some(line)) = lines.next_line().await {
178                info!("bitcoind stdout: {}", line);
179            }
180        });
181
182        // Store the child process
183        let mut child_guard = self.child.lock().await;
184        *child_guard = Some(child);
185
186        info!("Waiting for bitcoind node to initialize...");
187        tokio::time::sleep(Duration::from_millis(150)).await;
188
189        // Create transport for RPC health check
190        let transport = DefaultTransport::new(
191            format!("http://127.0.0.1:{}/", self.rpc_port),
192            Some((self.config.rpc_username.clone(), self.config.rpc_password.clone())),
193        );
194
195        // Wait for node to be ready
196        let deadline = Instant::now() + Duration::from_secs(10);
197        let mut attempts = 0;
198        let mut last_error: Option<String> = None;
199        while Instant::now() < deadline {
200            if let Some(child) = child_guard.as_mut() {
201                if let Ok(Some(status)) = child.try_wait() {
202                    let error = format!("bitcoind node exited early with status: {}", status);
203                    error!("{}", error);
204                    return Err(TransportError::Rpc(error));
205                }
206            }
207
208            // Try to connect to RPC
209            match transport.call::<serde_json::Value>("getnetworkinfo", &[]).await {
210                Ok(_) => {
211                    state.is_running = true;
212                    info!("bitcoind node started successfully on port {}", self.rpc_port);
213                    return Ok(());
214                }
215                Err(e) => {
216                    last_error = Some(e.to_string());
217                    debug!("Failed to connect to RPC (attempt {}): {}", attempts, e);
218                }
219            }
220
221            attempts += 1;
222            tokio::time::sleep(Duration::from_millis(200)).await;
223        }
224
225        let error = match last_error {
226            Some(e) => format!(
227                "Timed out waiting for node to start on port {} after {} attempts. Last RPC error: {}",
228                self.rpc_port, attempts, e
229            ),
230            None => format!(
231                "Timed out waiting for node to start on port {} after {} attempts",
232                self.rpc_port, attempts
233            ),
234        };
235        error!("{}", error);
236        return Err(TransportError::Rpc(error));
237    }
238
239    async fn stop(&mut self) -> Result<(), TransportError> {
240        let mut state = self.state.write().await;
241        if !state.is_running {
242            return Ok(());
243        }
244
245        let mut child = self.child.lock().await;
246        if let Some(mut child_process) = child.take() {
247            info!("Stopping bitcoind node...");
248            let _ = child_process.kill().await;
249        }
250
251        state.is_running = false;
252        info!("bitcoind node stopped");
253        Ok(())
254    }
255
256    async fn get_state(&self) -> Result<NodeState, TransportError> {
257        Ok(self.state.read().await.clone())
258    }
259
260    fn rpc_port(&self) -> u16 { self.rpc_port }
261
262    fn rpc_username(&self) -> &str { &self.config.rpc_username }
263
264    fn rpc_password(&self) -> &str { &self.config.rpc_password }
265
266    async fn create_transport(
267        &self,
268    ) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError> {
269        use std::sync::Arc;
270
271        use crate::transport::DefaultTransport;
272
273        // Create HTTP transport for Bitcoin Core
274        let rpc_url = format!("http://127.0.0.1:{}", self.rpc_port());
275        let auth = Some((self.rpc_username().to_string(), self.rpc_password().to_string()));
276        let transport = Arc::new(DefaultTransport::new(rpc_url, auth));
277
278        // Wait for node to be ready for RPC with Bitcoin Core specific initialization logic
279        // Bitcoin Core initialization states that require waiting:
280        // -28: RPC in warmup
281        // -4:  RPC in warmup (alternative code)
282        let init_states = ["\"code\":-28", "\"code\":-4"];
283
284        let max_retries = 30;
285        let mut retries = 0;
286
287        loop {
288            match transport.call::<serde_json::Value>("getnetworkinfo", &[]).await {
289                Ok(_) => break,
290                Err(TransportError::Rpc(e)) => {
291                    // Check if the error matches any known initialization state
292                    let is_init_state = init_states.iter().any(|state| e.contains(state));
293                    if is_init_state && retries < max_retries {
294                        tracing::debug!(
295                            "Waiting for initialization: {} (attempt {}/{})",
296                            e,
297                            retries + 1,
298                            max_retries
299                        );
300                        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
301                        retries += 1;
302                        continue;
303                    }
304                    return Err(TransportError::Rpc(e));
305                }
306                Err(e) => return Err(e),
307            }
308        }
309
310        if retries > 0 {
311            tracing::debug!("Node initialization completed after {} attempts", retries);
312        }
313
314        Ok(transport)
315    }
316}