ethos_bitcoind/node/
node_manager.rs1use std::process::Stdio;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use async_trait::async_trait;
10use tempfile::TempDir;
11use tokio::io::AsyncBufReadExt;
12use tokio::process::{Child, Command};
13use tokio::sync::{Mutex, RwLock};
14use tracing::{debug, error, info};
15
16use crate::test_config::TestConfig;
17use crate::transport::core::TransportExt;
18use crate::transport::{DefaultTransport, TransportError};
19
20const INIT_WAIT_RPC_CODES: [&str; 2] = ["\"code\":-28", "\"code\":-4"];
22const INIT_MAX_RETRIES: u32 = 30;
23
24#[derive(Debug, Default, Clone)]
26pub struct NodeState {
27 pub is_running: bool,
29}
30
31#[derive(Debug, Clone)]
33pub enum PortSelection {
34 Fixed(u16),
36 Dynamic,
38 Zero,
40}
41
42#[async_trait]
44pub trait NodeManager: Send + Sync + std::any::Any + std::fmt::Debug {
45 async fn start(&self) -> Result<(), TransportError>;
46 async fn stop(&mut self) -> Result<(), TransportError>;
47 async fn get_state(&self) -> Result<NodeState, TransportError>;
48 fn rpc_port(&self) -> u16;
50 fn rpc_username(&self) -> &str;
52 fn rpc_password(&self) -> &str;
54 async fn create_transport(
56 &self,
57 ) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError>;
58}
59
60#[derive(Debug)]
62pub struct BitcoinNodeManager {
63 state: Arc<RwLock<NodeState>>,
65 child: Arc<Mutex<Option<Child>>>,
67 pub rpc_port: u16,
69 config: TestConfig,
71 _datadir: Option<TempDir>,
73}
74
75impl BitcoinNodeManager {
76 pub fn new() -> Result<Self, TransportError> { Self::new_with_config(&TestConfig::default()) }
78
79 pub fn new_with_config(config: &TestConfig) -> Result<Self, TransportError> {
81 let datadir = TempDir::new()?;
82
83 let rpc_port = if config.rpc_port == 0 {
85 {
88 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
89 listener.local_addr()?.port()
90 }
91 } else {
92 config.rpc_port
93 };
94
95 Ok(Self {
96 state: Arc::new(RwLock::new(NodeState::default())),
97 child: Arc::new(Mutex::new(None)),
98 rpc_port,
99 config: config.clone(),
100 _datadir: Some(datadir),
101 })
102 }
103
104 pub fn rpc_port(&self) -> u16 { self.rpc_port }
106
107 pub fn config(&self) -> &TestConfig { &self.config }
109
110 pub fn rpc_username(&self) -> &str { &self.config.rpc_username }
112
113 pub fn rpc_password(&self) -> &str { &self.config.rpc_password }
115}
116
117#[async_trait]
118impl NodeManager for BitcoinNodeManager {
119 async fn start(&self) -> Result<(), TransportError> {
121 let mut state = self.state.write().await;
122 if state.is_running {
123 return Ok(());
124 }
125
126 let datadir = self._datadir.as_ref().expect("datadir is set at construction").path();
127 let exe = self
128 .config
129 .bitcoind_path
130 .as_deref()
131 .unwrap_or_else(|| std::path::Path::new("bitcoind"));
132 let mut cmd = Command::new(exe);
133
134 let chain_str = self
135 .config
136 .as_chain_str()
137 .map_err(|_| TransportError::Rpc("Unsupported network".into()))?;
138 let chain = format!("-chain={}", chain_str);
139 let data_dir = format!("-datadir={}", datadir.display());
140 let rpc_port = format!("-rpcport={}", self.rpc_port);
141 let rpc_bind = format!("-rpcbind=127.0.0.1:{}", self.rpc_port);
142 let rpc_user = format!("-rpcuser={}", self.config.rpc_username);
143 let rpc_password = format!("-rpcpassword={}", self.config.rpc_password);
144
145 let mut args = vec![
146 &chain,
147 "-listen=0",
148 &data_dir,
149 &rpc_port,
150 &rpc_bind,
151 "-rpcallowip=127.0.0.1",
152 "-fallbackfee=0.0002",
153 "-server=1",
154 "-prune=1",
155 &rpc_user,
156 &rpc_password,
157 ];
158
159 for arg in &self.config.extra_args {
160 args.push(arg);
161 }
162
163 cmd.args(&args);
164
165 cmd.stderr(Stdio::piped());
167 cmd.stdout(Stdio::piped());
168
169 let mut child = cmd.spawn()?;
170
171 let stderr = child.stderr.take().unwrap();
173 let stderr_reader = tokio::io::BufReader::new(stderr);
174 tokio::spawn(async move {
175 let mut lines = stderr_reader.lines();
176 while let Ok(Some(line)) = lines.next_line().await {
177 error!("bitcoind stderr: {}", line);
178 }
179 });
180
181 let stdout = child.stdout.take().unwrap();
183 let stdout_reader = tokio::io::BufReader::new(stdout);
184 tokio::spawn(async move {
185 let mut lines = stdout_reader.lines();
186 while let Ok(Some(line)) = lines.next_line().await {
187 info!("bitcoind stdout: {}", line);
188 }
189 });
190
191 let mut child_guard = self.child.lock().await;
193 *child_guard = Some(child);
194
195 info!("Waiting for bitcoind node to initialize...");
196 tokio::time::sleep(Duration::from_millis(150)).await;
197
198 let transport = DefaultTransport::new(
200 format!("http://127.0.0.1:{}/", self.rpc_port),
201 Some((self.config.rpc_username.clone(), self.config.rpc_password.clone())),
202 );
203
204 let deadline = Instant::now() + Duration::from_secs(10);
206 let mut attempts = 0;
207 let mut last_error: Option<String> = None;
208 while Instant::now() < deadline {
209 if let Some(child) = child_guard.as_mut() {
210 if let Ok(Some(status)) = child.try_wait() {
211 let error = format!("bitcoind node exited early with status: {}", status);
212 error!("{}", error);
213 return Err(TransportError::Rpc(error));
214 }
215 }
216
217 match transport.call::<serde_json::Value>("getnetworkinfo", &[]).await {
219 Ok(_) => {
220 state.is_running = true;
221 info!("bitcoind node started successfully on port {}", self.rpc_port);
222 return Ok(());
223 }
224 Err(e) => {
225 last_error = Some(e.to_string());
226 debug!("Failed to connect to RPC (attempt {}): {}", attempts, e);
227 }
228 }
229
230 attempts += 1;
231 tokio::time::sleep(Duration::from_millis(200)).await;
232 }
233
234 let error = match last_error {
235 Some(e) => format!(
236 "Timed out waiting for node to start on port {} after {} attempts. Last RPC error: {}",
237 self.rpc_port, attempts, e
238 ),
239 None => format!(
240 "Timed out waiting for node to start on port {} after {} attempts",
241 self.rpc_port, attempts
242 ),
243 };
244 error!("{}", error);
245 return Err(TransportError::Rpc(error));
246 }
247
248 async fn stop(&mut self) -> Result<(), TransportError> {
249 let mut state = self.state.write().await;
250 if !state.is_running {
251 return Ok(());
252 }
253
254 let mut child = self.child.lock().await;
255 if let Some(mut child_process) = child.take() {
256 info!("Stopping bitcoind node...");
257 let _ = child_process.kill().await;
258 }
259
260 state.is_running = false;
261 info!("bitcoind node stopped");
262 Ok(())
263 }
264
265 async fn get_state(&self) -> Result<NodeState, TransportError> {
266 Ok(self.state.read().await.clone())
267 }
268
269 fn rpc_port(&self) -> u16 { self.rpc_port }
270
271 fn rpc_username(&self) -> &str { &self.config.rpc_username }
272
273 fn rpc_password(&self) -> &str { &self.config.rpc_password }
274
275 async fn create_transport(
276 &self,
277 ) -> Result<std::sync::Arc<crate::transport::DefaultTransport>, TransportError> {
278 let rpc_url = format!("http://127.0.0.1:{}", self.rpc_port());
280 let auth = Some((self.rpc_username().to_string(), self.rpc_password().to_string()));
281 let transport = Arc::new(DefaultTransport::new(rpc_url, auth));
282
283 let mut retries = 0;
284
285 loop {
286 match transport.call::<serde_json::Value>("getnetworkinfo", &[]).await {
287 Ok(_) => break,
288 Err(TransportError::Rpc(e)) => {
289 let is_init_state = INIT_WAIT_RPC_CODES.iter().any(|state| e.contains(state));
290 if is_init_state && retries < INIT_MAX_RETRIES {
291 tracing::debug!(
292 "Waiting for initialization: {} (attempt {}/{})",
293 e,
294 retries + 1,
295 INIT_MAX_RETRIES
296 );
297 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
298 retries += 1;
299 continue;
300 }
301 return Err(TransportError::Rpc(e));
302 }
303 Err(e) => return Err(e),
304 }
305 }
306
307 if retries > 0 {
308 tracing::debug!("Node initialization completed after {} attempts", retries);
309 }
310
311 Ok(transport)
312 }
313}