1use core::error;
43use core::fmt;
44use core::net::Ipv4Addr;
45use core::net::SocketAddr;
46use corepc_client::bitcoin::BlockHash;
47use std::io::BufRead;
48use std::io::BufReader;
49use std::io::Read;
50use std::net::TcpListener;
51use std::path::PathBuf;
52use std::thread;
53use std::time::Duration;
54use std::time::Instant;
55use tempfile::TempDir;
56use tracing::debug;
57use tracing::trace;
58
59pub use serde_json;
60
61#[allow(unused)]
62pub(crate) use bitcoind::BitcoinD;
63#[allow(unused)]
64pub(crate) use utreexod::UtreexoD;
65
66pub mod bitcoind;
67pub mod electrsd;
68pub mod utreexod;
69
70const IPV4_LOCALHOST: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
72
73pub const NODE_BUILDING_MAX_RETRIES: u8 = 5;
75
76pub const NODE_BUILDING_INTERVAL: Duration = Duration::from_millis(500);
78
79pub const POLL_INTERVAL: Duration = Duration::from_millis(100);
81
82pub const WAIT_TIMEOUT: Duration = Duration::from_secs(10);
84
85pub const CONNECTION_INTERVAL: Duration = Duration::from_millis(150);
87
88pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
90
91pub trait Node {
93 fn get_name() -> &'static str;
95
96 fn get_bin_name() -> &'static str;
98
99 fn get_chain_tip(&self) -> Result<u32, Error>;
101
102 fn get_filter_tip(&self) -> Result<u32, Error>;
104
105 fn get_block_hash(&self, height: u32) -> Result<BlockHash, Error>;
107
108 fn call(&self, method: &str, args: &[serde_json::Value]) -> Result<serde_json::Value, Error>;
115
116 fn get_p2p_socket(&self) -> SocketAddr;
118
119 fn has_peer(&self, socket: SocketAddr) -> Result<bool, Error>;
121
122 fn add_peer(&self, socket: SocketAddr) -> Result<(), Error>;
124
125 fn get_peer_count(&self) -> Result<u32, Error>;
127
128 fn poll_interval() -> Duration {
134 POLL_INTERVAL
135 }
136
137 fn wait_timeout() -> Duration {
144 WAIT_TIMEOUT
145 }
146}
147
148pub fn connect<A: Node, B: Node>(a: &A, b: &B) -> Result<(), Error> {
150 let socket_a = a.get_p2p_socket();
151 let socket_b = b.get_p2p_socket();
152
153 debug!("Connecting socket={} to socket={}", socket_a, socket_b);
154
155 a.add_peer(socket_b)?;
156
157 let is_connected =
158 || -> Result<bool, Error> { Ok(a.has_peer(socket_b)? || b.has_peer(socket_a)?) };
159
160 let start = Instant::now();
165 while start.elapsed() < CONNECTION_TIMEOUT {
166 if is_connected()? {
167 thread::sleep(CONNECTION_INTERVAL * 4);
170 if is_connected()? {
171 debug!("Connected socket={} to socket={}", socket_a, socket_b);
172 return Ok(());
173 }
174 }
175 thread::sleep(CONNECTION_INTERVAL);
176 }
177
178 Err(Error::ConnectionTimeout(CONNECTION_TIMEOUT))
179}
180
181pub fn connect_and_sync<A: Node, B: Node>(a: &A, b: &B) -> Result<(), Error> {
183 connect(a, b)?;
184
185 let height_a = a.get_chain_tip()?;
186 let height_b = b.get_chain_tip()?;
187
188 let max_height = std::cmp::max(height_a, height_b);
189 wait_for_height(a, max_height)?;
190 wait_for_height(b, max_height)?;
191
192 Ok(())
193}
194
195pub fn wait_for_height<N: Node>(node: &N, height: u32) -> Result<(), Error> {
199 debug!("Waiting for {} to reach height={}", N::get_name(), height);
200
201 let start = Instant::now();
202 while start.elapsed() < N::wait_timeout() {
203 if node.get_chain_tip().unwrap_or(0) >= height {
204 return Ok(());
205 }
206 thread::sleep(N::poll_interval());
207 }
208
209 let curr_height = node.get_chain_tip().unwrap_or(0);
210 Err(Error::ChainSyncTimeOut((
211 height,
212 curr_height,
213 N::wait_timeout(),
214 )))
215}
216
217pub fn wait_for_height_with_timeout<N: Node>(
221 node: &N,
222 height: u32,
223 timeout: Duration,
224) -> Result<(), Error> {
225 debug!(
226 "Waiting for {} to reach height={} (timeout={:?})",
227 N::get_name(),
228 height,
229 timeout
230 );
231
232 let start = Instant::now();
233 while start.elapsed() < timeout {
234 if node.get_chain_tip().unwrap_or(0) >= height {
235 return Ok(());
236 }
237 thread::sleep(N::poll_interval());
238 }
239
240 let curr_height = node.get_chain_tip().unwrap_or(0);
241 Err(Error::ChainSyncTimeOut((height, curr_height, timeout)))
242}
243
244pub fn wait_for_filter_height<N: Node>(node: &N, filter_height: u32) -> Result<(), Error> {
248 debug!(
249 "Waiting for {} to reach filter height={}",
250 N::get_name(),
251 filter_height
252 );
253
254 let start = Instant::now();
255 while start.elapsed() < N::wait_timeout() {
256 if node.get_filter_tip().unwrap_or(0) >= filter_height {
257 return Ok(());
258 }
259 thread::sleep(N::poll_interval());
260 }
261
262 let curr_filter_height = node.get_filter_tip().unwrap_or(0);
263 Err(Error::ChainSyncTimeOut((
264 filter_height,
265 curr_filter_height,
266 N::wait_timeout(),
267 )))
268}
269
270pub(crate) fn pipe_to_tracing<R: Read + Send + 'static>(reader: R, source: &'static str) {
277 thread::spawn(move || {
278 let mut lines = BufReader::new(reader).lines();
279 while let Some(Ok(line)) = lines.next() {
280 if !line.trim().is_empty() {
282 trace!("{source}: {line}");
283 }
284 }
285 });
286}
287
288#[inline]
292pub fn get_available_port() -> u16 {
293 TcpListener::bind((IPV4_LOCALHOST, 0))
294 .unwrap()
295 .local_addr()
296 .unwrap()
297 .port()
298}
299
300#[derive(Debug)]
307pub enum DataDir {
308 Persistent(PathBuf),
310 Temporary(TempDir),
312}
313
314impl DataDir {
315 pub fn path(&self) -> PathBuf {
317 match self {
318 Self::Persistent(path) => path.to_owned(),
319 Self::Temporary(tmp_dir) => tmp_dir.path().to_path_buf(),
320 }
321 }
322}
323
324#[derive(Debug)]
325pub enum Error {
326 BinaryPathNotAbsolute { bin_name: String, path: String },
328
329 BinaryPathNotFile { bin_name: String, path: String },
331
332 BinaryNotFound((String, PathBuf)),
334
335 FailedToSpawn(std::io::Error),
337
338 ExhaustedNodeBuildingRetries(u8),
340
341 FailedToStop(corepc_client::client_sync::Error),
343
344 Io(std::io::Error),
346
347 JsonRpc(corepc_client::client_sync::Error),
349
350 PeerConnectionTimeout((SocketAddr, SocketAddr)),
352
353 BothDirsSpecified,
355
356 UnresponsiveBitcoinD(corepc_client::client_sync::Error),
358
359 UnresponsiveUtreexoD(corepc_client::client_sync::Error),
361
362 UnresponsiveElectrsD(electrum_client::Error),
364
365 ElectrsDIndexTimeout((String, Duration)),
367
368 CookieFileTimeout(PathBuf),
370
371 RpcClientSetupTimeout,
373
374 UnexpectedResponse(String),
376
377 ChainSyncTimeOut((u32, u32, Duration)), ConnectionTimeout(Duration),
382}
383
384#[rustfmt::skip]
385impl fmt::Display for Error {
386 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
387 use Error::*;
388
389 match self {
390 BinaryPathNotAbsolute { bin_name, path } => write!(f, "The `{}` binary path is not absolute (path={})", bin_name, path),
391 BinaryPathNotFile { bin_name, path } => write!(f, "The `{}` binary path is not a file (path={})", bin_name, path),
392 BinaryNotFound((bin_name, path)) => write!(f, "The `{}` binary was not found at the expected location={}", bin_name, path.display()),
393 FailedToSpawn(err) => write!(f, "Failed to spawn a process for the node: {err:?}"),
394 ExhaustedNodeBuildingRetries(retries) => write!(f, "Failed to instantiate the node after {} attempts", retries),
395 FailedToStop(err) => write!(f, "Failed to stop the node over JSON-RPC: {err:?}"),
396 Io(err) => write!(f, "I/O Error: {err:?}"),
397 JsonRpc(err) => write!(f, "JSON-RPC Error: {err:?}"),
398 PeerConnectionTimeout((local_socket, remote_socket)) => write!(f, "Timed out whilst waiting for connection between local={local_socket} and remote={remote_socket}"),
399 BothDirsSpecified => write!(f, "Both `tempdir` and `workdir` were specified. You must choose one and only one"),
400 UnresponsiveBitcoinD(err) => write!(f, "`BitcoinD` is unresponsive to JSON-RPC calls: {err:?}"),
401 UnresponsiveUtreexoD(err) => write!(f, "`UtreexoD` is unresponsive to JSON-RPC calls: {err:?}"),
402 UnresponsiveElectrsD(err) => write!(f, "`ElectrsD` is unresponsive to Electrum requests: {err:?}"),
403 ElectrsDIndexTimeout((description, timeout)) => write!(f, "Timed out after {} seconds whilst waiting for `ElectrsD` to index {description}", timeout.as_secs()),
404 CookieFileTimeout(cookie_path) => write!(f, "Timed out whilst waiting for the cookie={} to be generated", cookie_path.display()),
405 RpcClientSetupTimeout => write!(f, "Timed out whilst waiting for the JSON-RPC client to be ready"),
406 UnexpectedResponse(err) => write!(f, "Received an unexpected response from the JSON-RPC server: {err:?}"),
407 ChainSyncTimeOut((target_height, current_height, timeout)) => write!(
408 f,
409 "Timed out after {} seconds whilst waiting for the node's chain to synchronize to height={} (current height={})",
410 target_height, current_height, timeout.as_secs()
411 ),
412 ConnectionTimeout(timeout) => write!(
413 f,
414 "Timed out after {} seconds whilst waiting for the nodes to connect to each other",
415 timeout.as_secs()
416 ),
417 }
418 }
419}
420
421impl error::Error for Error {}