ethos_bitcoind/node/
node_manager.rs1use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use tempfile::TempDir;
10use tokio::process::{Child, Command};
11use tokio::sync::{Mutex, RwLock};
12use std::process::Stdio;
13
14use crate::test_config::TestConfig;
15use crate::transport::{TransportError, core::TransportExt};
16
17use tracing::{info, debug, error};
18
19use tokio::io::AsyncBufReadExt;
20use std::time::Instant;
21
22#[derive(Debug, Default, Clone)]
24pub struct NodeState {
25 pub is_running: bool,
27}
28
29#[derive(Debug, Clone)]
31pub enum PortSelection {
32 Fixed(u16),
34 Dynamic,
36 Zero,
38}
39
40#[async_trait]
42pub trait NodeManager: Send + Sync + std::any::Any + std::fmt::Debug {
43 async fn start(&self) -> Result<(), TransportError>;
44 async fn stop(&mut self) -> Result<(), TransportError>;
45 async fn get_state(&self) -> Result<NodeState, TransportError>;
46 fn rpc_port(&self) -> u16;
48 fn rpc_username(&self) -> &str;
50 fn rpc_password(&self) -> &str;
52 async fn create_transport(&self) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError>;
54}
55
56#[derive(Debug)]
58pub struct BitcoinNodeManager {
59 state: Arc<RwLock<NodeState>>,
61 child: Arc<Mutex<Option<Child>>>,
63 pub rpc_port: u16,
65 config: TestConfig,
67 _datadir: Option<TempDir>,
69}
70
71impl BitcoinNodeManager {
72 pub fn new() -> Result<Self, TransportError> {
74 Self::new_with_config(&TestConfig::default())
75 }
76
77 pub fn new_with_config(config: &TestConfig) -> Result<Self, TransportError> {
79 let datadir = TempDir::new()?;
80
81 let rpc_port = if config.rpc_port == 0 {
83 {
86 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
87 listener.local_addr()?.port()
88 }
89 } else {
90 config.rpc_port
91 };
92
93 Ok(Self {
94 state: Arc::new(RwLock::new(NodeState::default())),
95 child: Arc::new(Mutex::new(None)),
96 rpc_port,
97 config: config.clone(),
98 _datadir: Some(datadir),
99 })
100
101 }
102
103 pub fn rpc_port(&self) -> u16 { self.rpc_port }
105
106 pub fn rpc_username(&self) -> &str { &self.config.rpc_username }
108
109 pub fn rpc_password(&self) -> &str { &self.config.rpc_password }
111
112}
113
114#[async_trait]
115impl NodeManager for BitcoinNodeManager {
116
117 async fn start(&self) -> Result<(), TransportError> {
118 let mut state = self.state.write().await;
119 if state.is_running {
120 return Ok(());
121 }
122
123 let datadir = self._datadir.as_ref().unwrap().path();
124 let mut cmd = Command::new("bitcoind");
125
126 let chain = format!("-chain={}", self.config.as_chain_str());
127 let data_dir = format!("-datadir={}", datadir.display());
128 let rpc_port = format!("-rpcport={}", self.rpc_port);
129 let rpc_bind = format!("-rpcbind=127.0.0.1:{}", self.rpc_port);
130 let rpc_user = format!("-rpcuser={}", self.config.rpc_username);
131 let rpc_password = format!("-rpcpassword={}", self.config.rpc_password);
132
133 let mut args = vec![
134 &chain,
135 "-listen=0",
136 &data_dir,
137 &rpc_port,
138 &rpc_bind,
139 "-rpcallowip=127.0.0.1",
140 "-fallbackfee=0.0002",
141 "-server=1",
142 "-prune=1",
143 &rpc_user,
144 &rpc_password,
145 ];
146
147 for arg in &self.config.extra_args {
148 args.push(arg);
149 }
150
151 cmd.args(&args);
152
153 cmd.stderr(Stdio::piped());
155 cmd.stdout(Stdio::piped());
156
157 let mut child = cmd.spawn()?;
158
159 let stderr = child.stderr.take().unwrap();
161 let stderr_reader = tokio::io::BufReader::new(stderr);
162 tokio::spawn(async move {
163 let mut lines = stderr_reader.lines();
164 while let Ok(Some(line)) = lines.next_line().await {
165 error!("bitcoind stderr: {}", line);
166 }
167 });
168
169 let stdout = child.stdout.take().unwrap();
171 let stdout_reader = tokio::io::BufReader::new(stdout);
172 tokio::spawn(async move {
173 let mut lines = stdout_reader.lines();
174 while let Ok(Some(line)) = lines.next_line().await {
175 info!("bitcoind stdout: {}", line);
176 }
177 });
178
179 let mut child_guard = self.child.lock().await;
181 *child_guard = Some(child);
182
183 info!("Waiting for bitcoind node to initialize...");
184 tokio::time::sleep(Duration::from_millis(150)).await;
185
186 let deadline = Instant::now() + Duration::from_secs(10);
188 let mut attempts = 0;
189 while Instant::now() < deadline {
190 if let Some(child) = child_guard.as_mut() {
191 if let Ok(Some(status)) = child.try_wait() {
192 let error = format!("bitcoind node exited early with status: {}", status);
193 error!("{}", error);
194 return Err(TransportError::Rpc(error));
195 }
196 }
197
198 let client = reqwest::Client::new();
200 match client
201 .post(format!("http://127.0.0.1:{}/", self.rpc_port))
202 .basic_auth(&self.config.rpc_username, Some(&self.config.rpc_password))
203 .json(&serde_json::json!({
204 "jsonrpc": "2.0",
205 "method": "getnetworkinfo",
206 "params": [],
207 "id": 1
208 }))
209 .send()
210 .await
211 {
212 Ok(response) =>
213 if response.status().is_success() {
214 state.is_running = true;
215 info!("bitcoind node started successfully on port {}", self.rpc_port);
216 return Ok(());
217 } else {
218 debug!(
219 "RPC request failed with status {} (attempt {})",
220 response.status(),
221 attempts
222 );
223 },
224 Err(e) => {
225 debug!("Failed to connect to RPC (attempt {}): {}", attempts, e);
226 }
227 }
228
229 attempts += 1;
230 tokio::time::sleep(Duration::from_millis(200)).await;
231 }
232
233 let error = format!(
234 "Timed out waiting for bitcoind node to start on port {} after {} attempts",
235 self.rpc_port, attempts
236 );
237 error!("{}", error);
238 return Err(TransportError::Rpc(error));
239
240 }
241
242 async fn stop(&mut self) -> Result<(), TransportError> {
243 let mut state = self.state.write().await;
244 if !state.is_running {
245 return Ok(());
246 }
247
248 let mut child = self.child.lock().await;
249 if let Some(mut child_process) = child.take() {
250 info!("Stopping bitcoind node...");
251 let _ = child_process.kill().await;
252 }
253
254 state.is_running = false;
255 info!("bitcoind node stopped");
256 Ok(())
257 }
258
259 async fn get_state(&self) -> Result<NodeState, TransportError> {
260 Ok(self.state.read().await.clone())
261 }
262
263 fn rpc_port(&self) -> u16 { self.rpc_port }
264
265 fn rpc_username(&self) -> &str { &self.config.rpc_username }
266
267 fn rpc_password(&self) -> &str { &self.config.rpc_password }
268
269 async fn create_transport(&self) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError> {
270 use std::sync::Arc;
271 use crate::transport::DefaultTransport;
272
273 let rpc_url = format!("http://127.0.0.1:{}", self.rpc_port());
275 let auth = Some((self.rpc_username().to_string(), self.rpc_password().to_string()));
276 let transport = Arc::new(DefaultTransport::new(rpc_url, auth));
277
278 let init_states = [
283 "\"code\":-28",
284 "\"code\":-4",
285 ];
286
287 let max_retries = 30;
288 let mut retries = 0;
289
290 loop {
291 match transport.call::<serde_json::Value>("getnetworkinfo", &[]).await {
292 Ok(_) => break,
293 Err(TransportError::Rpc(e)) => {
294 let is_init_state = init_states.iter().any(|state| e.contains(state));
296 if is_init_state && retries < max_retries {
297 tracing::debug!("Waiting for initialization: {} (attempt {}/{})", e, retries + 1, max_retries);
298 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
299 retries += 1;
300 continue;
301 }
302 return Err(TransportError::Rpc(e));
303 }
304 Err(e) => return Err(e),
305 }
306 }
307
308 if retries > 0 {
309 tracing::debug!("Node initialization completed after {} attempts", retries);
310 }
311
312 Ok(transport)
313 }
314
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320
321 #[test]
322 fn test_extra_args() {
323 let config = crate::test_config::TestConfig {
324 extra_args: vec!["-debug=1".to_string()],
325 ..crate::test_config::TestConfig::default()
326 };
327
328 let node_manager = BitcoinNodeManager::new_with_config(&config)
329 .expect("Failed to create node manager with extra args");
330
331 assert_eq!(node_manager.config.extra_args[0], "-debug=1");
332 }
333}