ethos_bitcoind/node/
node_manager.rs1use std::process::Stdio;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use async_trait::async_trait;
10use tempfile::TempDir;
11use tokio::io::AsyncBufReadExt;
12use tokio::process::{Child, Command};
13use tokio::sync::{Mutex, RwLock};
14use tracing::{debug, error, info};
15
16use crate::test_config::TestConfig;
17use crate::transport::core::TransportExt;
18use crate::transport::{DefaultTransport, TransportError};
19
20#[derive(Debug, Default, Clone)]
22pub struct NodeState {
23 pub is_running: bool,
25}
26
27#[derive(Debug, Clone)]
29pub enum PortSelection {
30 Fixed(u16),
32 Dynamic,
34 Zero,
36}
37
38#[async_trait]
40pub trait NodeManager: Send + Sync + std::any::Any + std::fmt::Debug {
41 async fn start(&self) -> Result<(), TransportError>;
42 async fn stop(&mut self) -> Result<(), TransportError>;
43 async fn get_state(&self) -> Result<NodeState, TransportError>;
44 fn rpc_port(&self) -> u16;
46 fn rpc_username(&self) -> &str;
48 fn rpc_password(&self) -> &str;
50 async fn create_transport(
52 &self,
53 ) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError>;
54}
55
56#[derive(Debug)]
58pub struct BitcoinNodeManager {
59 state: Arc<RwLock<NodeState>>,
61 child: Arc<Mutex<Option<Child>>>,
63 pub rpc_port: u16,
65 config: TestConfig,
67 _datadir: Option<TempDir>,
69}
70
71impl BitcoinNodeManager {
72 pub fn new() -> Result<Self, TransportError> { Self::new_with_config(&TestConfig::default()) }
74
75 pub fn new_with_config(config: &TestConfig) -> Result<Self, TransportError> {
77 let datadir = TempDir::new()?;
78
79 let rpc_port = if config.rpc_port == 0 {
81 {
84 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
85 listener.local_addr()?.port()
86 }
87 } else {
88 config.rpc_port
89 };
90
91 Ok(Self {
92 state: Arc::new(RwLock::new(NodeState::default())),
93 child: Arc::new(Mutex::new(None)),
94 rpc_port,
95 config: config.clone(),
96 _datadir: Some(datadir),
97 })
98 }
99
100 pub fn rpc_port(&self) -> u16 { self.rpc_port }
102
103 pub fn config(&self) -> &TestConfig { &self.config }
105
106 pub fn rpc_username(&self) -> &str { &self.config.rpc_username }
108
109 pub fn rpc_password(&self) -> &str { &self.config.rpc_password }
111}
112
113#[async_trait]
114impl NodeManager for BitcoinNodeManager {
115 async fn start(&self) -> Result<(), TransportError> {
116 let mut state = self.state.write().await;
117 if state.is_running {
118 return Ok(());
119 }
120
121 let datadir = self._datadir.as_ref().unwrap().path();
122 let exe = self
123 .config
124 .bitcoind_path
125 .as_deref()
126 .unwrap_or_else(|| std::path::Path::new("bitcoind"));
127 let mut cmd = Command::new(exe);
128
129 let chain = format!("-chain={}", self.config.as_chain_str());
130 let data_dir = format!("-datadir={}", datadir.display());
131 let rpc_port = format!("-rpcport={}", self.rpc_port);
132 let rpc_bind = format!("-rpcbind=127.0.0.1:{}", self.rpc_port);
133 let rpc_user = format!("-rpcuser={}", self.config.rpc_username);
134 let rpc_password = format!("-rpcpassword={}", self.config.rpc_password);
135
136 let mut args = vec![
137 &chain,
138 "-listen=0",
139 &data_dir,
140 &rpc_port,
141 &rpc_bind,
142 "-rpcallowip=127.0.0.1",
143 "-fallbackfee=0.0002",
144 "-server=1",
145 "-prune=1",
146 &rpc_user,
147 &rpc_password,
148 ];
149
150 for arg in &self.config.extra_args {
151 args.push(arg);
152 }
153
154 cmd.args(&args);
155
156 cmd.stderr(Stdio::piped());
158 cmd.stdout(Stdio::piped());
159
160 let mut child = cmd.spawn()?;
161
162 let stderr = child.stderr.take().unwrap();
164 let stderr_reader = tokio::io::BufReader::new(stderr);
165 tokio::spawn(async move {
166 let mut lines = stderr_reader.lines();
167 while let Ok(Some(line)) = lines.next_line().await {
168 error!("bitcoind stderr: {}", line);
169 }
170 });
171
172 let stdout = child.stdout.take().unwrap();
174 let stdout_reader = tokio::io::BufReader::new(stdout);
175 tokio::spawn(async move {
176 let mut lines = stdout_reader.lines();
177 while let Ok(Some(line)) = lines.next_line().await {
178 info!("bitcoind stdout: {}", line);
179 }
180 });
181
182 let mut child_guard = self.child.lock().await;
184 *child_guard = Some(child);
185
186 info!("Waiting for bitcoind node to initialize...");
187 tokio::time::sleep(Duration::from_millis(150)).await;
188
189 let transport = DefaultTransport::new(
191 format!("http://127.0.0.1:{}/", self.rpc_port),
192 Some((self.config.rpc_username.clone(), self.config.rpc_password.clone())),
193 );
194
195 let deadline = Instant::now() + Duration::from_secs(10);
197 let mut attempts = 0;
198 let mut last_error: Option<String> = None;
199 while Instant::now() < deadline {
200 if let Some(child) = child_guard.as_mut() {
201 if let Ok(Some(status)) = child.try_wait() {
202 let error = format!("bitcoind node exited early with status: {}", status);
203 error!("{}", error);
204 return Err(TransportError::Rpc(error));
205 }
206 }
207
208 match transport.call::<serde_json::Value>("getnetworkinfo", &[]).await {
210 Ok(_) => {
211 state.is_running = true;
212 info!("bitcoind node started successfully on port {}", self.rpc_port);
213 return Ok(());
214 }
215 Err(e) => {
216 last_error = Some(e.to_string());
217 debug!("Failed to connect to RPC (attempt {}): {}", attempts, e);
218 }
219 }
220
221 attempts += 1;
222 tokio::time::sleep(Duration::from_millis(200)).await;
223 }
224
225 let error = match last_error {
226 Some(e) => format!(
227 "Timed out waiting for node to start on port {} after {} attempts. Last RPC error: {}",
228 self.rpc_port, attempts, e
229 ),
230 None => format!(
231 "Timed out waiting for node to start on port {} after {} attempts",
232 self.rpc_port, attempts
233 ),
234 };
235 error!("{}", error);
236 return Err(TransportError::Rpc(error));
237 }
238
239 async fn stop(&mut self) -> Result<(), TransportError> {
240 let mut state = self.state.write().await;
241 if !state.is_running {
242 return Ok(());
243 }
244
245 let mut child = self.child.lock().await;
246 if let Some(mut child_process) = child.take() {
247 info!("Stopping bitcoind node...");
248 let _ = child_process.kill().await;
249 }
250
251 state.is_running = false;
252 info!("bitcoind node stopped");
253 Ok(())
254 }
255
256 async fn get_state(&self) -> Result<NodeState, TransportError> {
257 Ok(self.state.read().await.clone())
258 }
259
260 fn rpc_port(&self) -> u16 { self.rpc_port }
261
262 fn rpc_username(&self) -> &str { &self.config.rpc_username }
263
264 fn rpc_password(&self) -> &str { &self.config.rpc_password }
265
266 async fn create_transport(
267 &self,
268 ) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError> {
269 use std::sync::Arc;
270
271 use crate::transport::DefaultTransport;
272
273 let rpc_url = format!("http://127.0.0.1:{}", self.rpc_port());
275 let auth = Some((self.rpc_username().to_string(), self.rpc_password().to_string()));
276 let transport = Arc::new(DefaultTransport::new(rpc_url, auth));
277
278 let init_states = ["\"code\":-28", "\"code\":-4"];
283
284 let max_retries = 30;
285 let mut retries = 0;
286
287 loop {
288 match transport.call::<serde_json::Value>("getnetworkinfo", &[]).await {
289 Ok(_) => break,
290 Err(TransportError::Rpc(e)) => {
291 let is_init_state = init_states.iter().any(|state| e.contains(state));
293 if is_init_state && retries < max_retries {
294 tracing::debug!(
295 "Waiting for initialization: {} (attempt {}/{})",
296 e,
297 retries + 1,
298 max_retries
299 );
300 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
301 retries += 1;
302 continue;
303 }
304 return Err(TransportError::Rpc(e));
305 }
306 Err(e) => return Err(e),
307 }
308 }
309
310 if retries > 0 {
311 tracing::debug!("Node initialization completed after {} attempts", retries);
312 }
313
314 Ok(transport)
315 }
316}