use std::{
ffi::{OsStr, OsString},
net::TcpListener,
process,
sync::atomic::{AtomicU16, Ordering},
thread, time,
};
use sp_keyring::AccountKeyring;
use subxt::{Config, OnlineClient};
pub struct TestNodeProcess<R: Config> {
proc: process::Child,
client: OnlineClient<R>,
pub ws_url: String,
}
impl<R> Drop for TestNodeProcess<R>
where
R: Config,
{
fn drop(&mut self) {
let _ = self.kill();
}
}
impl<R> TestNodeProcess<R>
where
R: Config,
{
pub fn build<S>(program: S, chain_type: String, force_authoring: bool) -> TestNodeProcessBuilder
where
S: AsRef<OsStr> + Clone,
{
TestNodeProcessBuilder::new(program, chain_type, force_authoring)
}
pub fn kill(&mut self) -> Result<(), String> {
tracing::info!("Killing node process {}", self.proc.id());
if let Err(err) = self.proc.kill() {
let err = format!("Error killing node process {}: {err}", self.proc.id());
tracing::error!("{}", err);
return Err(err);
}
Ok(())
}
pub fn client(&self) -> &OnlineClient<R> {
&self.client
}
}
pub struct TestNodeProcessBuilder {
node_path: OsString,
authority: Option<AccountKeyring>,
scan_port_range: bool,
chain_type: String,
force_authoring: bool,
}
impl TestNodeProcessBuilder {
pub fn new<P>(node_path: P, chain_type: String, force_authoring: bool) -> TestNodeProcessBuilder
where
P: AsRef<OsStr>,
{
Self {
node_path: node_path.as_ref().into(),
authority: None,
scan_port_range: false,
chain_type,
force_authoring,
}
}
pub fn with_authority(&mut self, account: AccountKeyring) -> &mut Self {
self.authority = Some(account);
self
}
pub fn scan_for_open_ports(&mut self) -> &mut Self {
self.scan_port_range = true;
self
}
pub async fn spawn<R>(&self) -> Result<TestNodeProcess<R>, String>
where
R: Config,
{
let mut cmd = process::Command::new(&self.node_path);
cmd.env("RUST_LOG", "error").arg(&self.chain_type).arg("--tmp");
if self.force_authoring {
cmd.arg("--force-authoring");
}
if let Some(authority) = self.authority {
let authority = format!("{authority:?}");
let arg = format!("--{}", authority.as_str().to_lowercase());
cmd.arg(arg);
}
let ws_port = if self.scan_port_range {
let (p2p_port, _http_port, ws_port) = next_open_port()
.ok_or_else(|| "No available ports in the given port range".to_owned())?;
cmd.arg(format!("--port={p2p_port}"));
cmd.arg(format!("--rpc-port={ws_port}"));
tracing::info!("ws port: {ws_port}");
ws_port
} else {
9944
};
let ws_url = format!("ws://127.0.0.1:{ws_port}");
let mut proc = cmd.spawn().map_err(|e| {
format!("Error spawning substrate node '{}': {e}", self.node_path.to_string_lossy())
})?;
const MAX_ATTEMPTS: u32 = 6;
let mut attempts = 1;
let mut wait_secs = 1;
let client = loop {
thread::sleep(time::Duration::from_secs(wait_secs));
tracing::info!(
"Connecting to contracts enabled node, attempt {}/{}",
attempts,
MAX_ATTEMPTS
);
let result = OnlineClient::<R>::from_url(ws_url.clone()).await;
match result {
Ok(client) => break Ok(client),
Err(err) => {
if attempts < MAX_ATTEMPTS {
attempts += 1;
wait_secs *= 2; continue;
}
break Err(err);
},
}
};
match client {
Ok(client) => Ok(TestNodeProcess { proc, client, ws_url }),
Err(err) => {
let err = format!(
"Failed to connect to node rpc at {ws_url} after {attempts} attempts: {err}"
);
tracing::error!("{}", err);
proc.kill()
.map_err(|e| format!("Error killing substrate process '{}': {e}", proc.id()))?;
Err(err)
},
}
}
}
const START_PORT: u16 = 9900;
const END_PORT: u16 = 10000;
const MAX_PORTS: u16 = 1000;
static PORT: AtomicU16 = AtomicU16::new(START_PORT);
fn next_open_port() -> Option<(u16, u16, u16)> {
let mut ports = Vec::new();
let mut ports_scanned = 0u16;
loop {
let _ = PORT.compare_exchange(END_PORT, START_PORT, Ordering::SeqCst, Ordering::SeqCst);
let next = PORT.fetch_add(1, Ordering::SeqCst);
if TcpListener::bind(("0.0.0.0", next)).is_ok() {
ports.push(next);
if ports.len() == 3 {
return Some((ports[0], ports[1], ports[2]));
}
}
ports_scanned += 1;
if ports_scanned == MAX_PORTS {
return None;
}
}
}