Skip to main content

ethos_bitcoind/node/
node_manager.rs

1//! Node module for Bitcoin Core RPC testing
2//!
3//! This module provides utilities for managing Bitcoin Core nodes in test environments.
4
5use std::process::Stdio;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use async_trait::async_trait;
10use tempfile::TempDir;
11use tokio::io::AsyncBufReadExt;
12use tokio::process::{Child, Command};
13use tokio::sync::{Mutex, RwLock};
14use tracing::{debug, error, info};
15
16use crate::test_config::TestConfig;
17use crate::transport::core::TransportExt;
18use crate::transport::{DefaultTransport, TransportError};
19
20/// RPC codes indicating node still initializing (-28 warmup, -4 warmup).
21const INIT_WAIT_RPC_CODES: [&str; 2] = ["\"code\":-28", "\"code\":-4"];
22const INIT_MAX_RETRIES: u32 = 30;
23
24/// Represents the current state of a node
25#[derive(Debug, Default, Clone)]
26pub struct NodeState {
27    /// Whether the node is currently running
28    pub is_running: bool,
29}
30
31/// Configuration for port selection behavior
32#[derive(Debug, Clone)]
33pub enum PortSelection {
34    /// Use the specified port number
35    Fixed(u16),
36    /// Let the OS assign an available port
37    Dynamic,
38    /// Use port 0 (not recommended, may cause daemon to fail)
39    Zero,
40}
41
42/// Trait defining the interface for a node manager
43#[async_trait]
44pub trait NodeManager: Send + Sync + std::any::Any + std::fmt::Debug {
45    async fn start(&self) -> Result<(), TransportError>;
46    async fn stop(&mut self) -> Result<(), TransportError>;
47    async fn get_state(&self) -> Result<NodeState, TransportError>;
48    /// Return the RPC port this manager was configured with
49    fn rpc_port(&self) -> u16;
50    /// Return the RPC username this manager was configured with
51    fn rpc_username(&self) -> &str;
52    /// Return the RPC password this manager was configured with
53    fn rpc_password(&self) -> &str;
54    /// Create a transport for communicating with the node
55    async fn create_transport(
56        &self,
57    ) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError>;
58}
59
60/// Implementation of the node manager
61#[derive(Debug)]
62pub struct BitcoinNodeManager {
63    /// Shared state of the node
64    state: Arc<RwLock<NodeState>>,
65    /// Child process handle for the daemon
66    child: Arc<Mutex<Option<Child>>>,
67    /// RPC port for communication with the node
68    pub rpc_port: u16,
69    /// Test configuration for the node
70    config: TestConfig,
71    /// Temporary directory for node data (cleaned up on drop)
72    _datadir: Option<TempDir>,
73}
74
75impl BitcoinNodeManager {
76    /// Create a new node manager with default configuration
77    pub fn new() -> Result<Self, TransportError> { Self::new_with_config(&TestConfig::default()) }
78
79    /// Create a new node manager with custom configuration
80    pub fn new_with_config(config: &TestConfig) -> Result<Self, TransportError> {
81        let datadir = TempDir::new()?;
82
83        // Handle automatic port selection
84        let rpc_port = if config.rpc_port == 0 {
85            // Get a random free port by binding to 127.0.0.1:0
86            // The listener is dropped at the end of the block, freeing the port
87            {
88                let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
89                listener.local_addr()?.port()
90            }
91        } else {
92            config.rpc_port
93        };
94
95        Ok(Self {
96            state: Arc::new(RwLock::new(NodeState::default())),
97            child: Arc::new(Mutex::new(None)),
98            rpc_port,
99            config: config.clone(),
100            _datadir: Some(datadir),
101        })
102    }
103
104    /// Get the RPC port for this node manager
105    pub fn rpc_port(&self) -> u16 { self.rpc_port }
106
107    /// Gets the test configuration used by this node manager
108    pub fn config(&self) -> &TestConfig { &self.config }
109
110    /// Get the RPC username from the configuration
111    pub fn rpc_username(&self) -> &str { &self.config.rpc_username }
112
113    /// Get the RPC password from the configuration
114    pub fn rpc_password(&self) -> &str { &self.config.rpc_password }
115}
116
117#[async_trait]
118impl NodeManager for BitcoinNodeManager {
119    async fn start(&self) -> Result<(), TransportError> {
120        let mut state = self.state.write().await;
121        if state.is_running {
122            return Ok(());
123        }
124
125        let datadir = self._datadir.as_ref().unwrap().path();
126        let exe = self
127            .config
128            .bitcoind_path
129            .as_deref()
130            .unwrap_or_else(|| std::path::Path::new("bitcoind"));
131        let mut cmd = Command::new(exe);
132
133        let chain = format!("-chain={}", self.config.as_chain_str());
134        let data_dir = format!("-datadir={}", datadir.display());
135        let rpc_port = format!("-rpcport={}", self.rpc_port);
136        let rpc_bind = format!("-rpcbind=127.0.0.1:{}", self.rpc_port);
137        let rpc_user = format!("-rpcuser={}", self.config.rpc_username);
138        let rpc_password = format!("-rpcpassword={}", self.config.rpc_password);
139
140        let mut args = vec![
141            &chain,
142            "-listen=0",
143            &data_dir,
144            &rpc_port,
145            &rpc_bind,
146            "-rpcallowip=127.0.0.1",
147            "-fallbackfee=0.0002",
148            "-server=1",
149            "-prune=1",
150            &rpc_user,
151            &rpc_password,
152        ];
153
154        for arg in &self.config.extra_args {
155            args.push(arg);
156        }
157
158        cmd.args(&args);
159
160        // Capture both stdout and stderr for better error reporting
161        cmd.stderr(Stdio::piped());
162        cmd.stdout(Stdio::piped());
163
164        let mut child = cmd.spawn()?;
165
166        // Read stderr in a separate task
167        let stderr = child.stderr.take().unwrap();
168        let stderr_reader = tokio::io::BufReader::new(stderr);
169        tokio::spawn(async move {
170            let mut lines = stderr_reader.lines();
171            while let Ok(Some(line)) = lines.next_line().await {
172                error!("bitcoind stderr: {}", line);
173            }
174        });
175
176        // Read stdout in a separate task
177        let stdout = child.stdout.take().unwrap();
178        let stdout_reader = tokio::io::BufReader::new(stdout);
179        tokio::spawn(async move {
180            let mut lines = stdout_reader.lines();
181            while let Ok(Some(line)) = lines.next_line().await {
182                info!("bitcoind stdout: {}", line);
183            }
184        });
185
186        // Store the child process
187        let mut child_guard = self.child.lock().await;
188        *child_guard = Some(child);
189
190        info!("Waiting for bitcoind node to initialize...");
191        tokio::time::sleep(Duration::from_millis(150)).await;
192
193        // Create transport for RPC health check
194        let transport = DefaultTransport::new(
195            format!("http://127.0.0.1:{}/", self.rpc_port),
196            Some((self.config.rpc_username.clone(), self.config.rpc_password.clone())),
197        );
198
199        // Wait for node to be ready
200        let deadline = Instant::now() + Duration::from_secs(10);
201        let mut attempts = 0;
202        let mut last_error: Option<String> = None;
203        while Instant::now() < deadline {
204            if let Some(child) = child_guard.as_mut() {
205                if let Ok(Some(status)) = child.try_wait() {
206                    let error = format!("bitcoind node exited early with status: {}", status);
207                    error!("{}", error);
208                    return Err(TransportError::Rpc(error));
209                }
210            }
211
212            // Try to connect to RPC
213            match transport.call::<serde_json::Value>("getnetworkinfo", &[]).await {
214                Ok(_) => {
215                    state.is_running = true;
216                    info!("bitcoind node started successfully on port {}", self.rpc_port);
217                    return Ok(());
218                }
219                Err(e) => {
220                    last_error = Some(e.to_string());
221                    debug!("Failed to connect to RPC (attempt {}): {}", attempts, e);
222                }
223            }
224
225            attempts += 1;
226            tokio::time::sleep(Duration::from_millis(200)).await;
227        }
228
229        let error = match last_error {
230            Some(e) => format!(
231                "Timed out waiting for node to start on port {} after {} attempts. Last RPC error: {}",
232                self.rpc_port, attempts, e
233            ),
234            None => format!(
235                "Timed out waiting for node to start on port {} after {} attempts",
236                self.rpc_port, attempts
237            ),
238        };
239        error!("{}", error);
240        return Err(TransportError::Rpc(error));
241    }
242
243    async fn stop(&mut self) -> Result<(), TransportError> {
244        let mut state = self.state.write().await;
245        if !state.is_running {
246            return Ok(());
247        }
248
249        let mut child = self.child.lock().await;
250        if let Some(mut child_process) = child.take() {
251            info!("Stopping bitcoind node...");
252            let _ = child_process.kill().await;
253        }
254
255        state.is_running = false;
256        info!("bitcoind node stopped");
257        Ok(())
258    }
259
260    async fn get_state(&self) -> Result<NodeState, TransportError> {
261        Ok(self.state.read().await.clone())
262    }
263
264    fn rpc_port(&self) -> u16 { self.rpc_port }
265
266    fn rpc_username(&self) -> &str { &self.config.rpc_username }
267
268    fn rpc_password(&self) -> &str { &self.config.rpc_password }
269
270    async fn create_transport(
271        &self,
272    ) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError> {
273        // Create HTTP transport for Bitcoin Core
274        let rpc_url = format!("http://127.0.0.1:{}", self.rpc_port());
275        let auth = Some((self.rpc_username().to_string(), self.rpc_password().to_string()));
276        let transport = Arc::new(DefaultTransport::new(rpc_url, auth));
277
278        let mut retries = 0;
279
280        loop {
281            match transport.call::<serde_json::Value>("getnetworkinfo", &[]).await {
282                Ok(_) => break,
283                Err(TransportError::Rpc(e)) => {
284                    let is_init_state = INIT_WAIT_RPC_CODES.iter().any(|state| e.contains(state));
285                    if is_init_state && retries < INIT_MAX_RETRIES {
286                        tracing::debug!(
287                            "Waiting for initialization: {} (attempt {}/{})",
288                            e,
289                            retries + 1,
290                            INIT_MAX_RETRIES
291                        );
292                        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
293                        retries += 1;
294                        continue;
295                    }
296                    return Err(TransportError::Rpc(e));
297                }
298                Err(e) => return Err(e),
299            }
300        }
301
302        if retries > 0 {
303            tracing::debug!("Node initialization completed after {} attempts", retries);
304        }
305
306        Ok(transport)
307    }
308}