use crate::{utils::extract_endpoint, NodeError, NODE_STARTUP_TIMEOUT};
use alloy_genesis::Genesis;
use rand::Rng;
use std::{
ffi::OsString,
fs::create_dir,
io::{BufRead, BufReader},
path::PathBuf,
process::{Child, ChildStdout, Command, Stdio},
time::Instant,
};
use url::Url;
const API: &str = "eth,net,web3,txpool,trace,rpc,reth,ots,admin,debug";
const RETH: &str = "reth";
const DEFAULT_HTTP_PORT: u16 = 8545;
const DEFAULT_WS_PORT: u16 = 8546;
const DEFAULT_AUTH_PORT: u16 = 8551;
const DEFAULT_P2P_PORT: u16 = 30303;
#[derive(Debug)]
pub struct RethInstance {
pid: Child,
instance: u16,
http_port: u16,
ws_port: u16,
auth_port: Option<u16>,
p2p_port: Option<u16>,
ipc: Option<PathBuf>,
data_dir: Option<PathBuf>,
genesis: Option<Genesis>,
}
impl RethInstance {
pub const fn instance(&self) -> u16 {
self.instance
}
pub const fn http_port(&self) -> u16 {
self.http_port
}
pub const fn ws_port(&self) -> u16 {
self.ws_port
}
pub const fn auth_port(&self) -> Option<u16> {
self.auth_port
}
pub const fn p2p_port(&self) -> Option<u16> {
self.p2p_port
}
#[doc(alias = "http_endpoint")]
pub fn endpoint(&self) -> String {
format!("http://localhost:{}", self.http_port)
}
pub fn ws_endpoint(&self) -> String {
format!("ws://localhost:{}", self.ws_port)
}
pub fn ipc_endpoint(&self) -> String {
self.ipc.clone().map_or_else(|| "reth.ipc".to_string(), |ipc| ipc.display().to_string())
}
#[doc(alias = "http_endpoint_url")]
pub fn endpoint_url(&self) -> Url {
Url::parse(&self.endpoint()).unwrap()
}
pub fn ws_endpoint_url(&self) -> Url {
Url::parse(&self.ws_endpoint()).unwrap()
}
pub const fn data_dir(&self) -> Option<&PathBuf> {
self.data_dir.as_ref()
}
pub const fn genesis(&self) -> Option<&Genesis> {
self.genesis.as_ref()
}
pub fn stdout(&mut self) -> Result<ChildStdout, NodeError> {
self.pid.stdout.take().ok_or(NodeError::NoStdout)
}
}
impl Drop for RethInstance {
fn drop(&mut self) {
self.pid.kill().expect("could not kill reth");
}
}
#[derive(Clone, Debug, Default)]
#[must_use = "This Builder struct does nothing unless it is `spawn`ed"]
pub struct Reth {
dev: bool,
http_port: u16,
ws_port: u16,
auth_port: u16,
p2p_port: u16,
block_time: Option<String>,
instance: u16,
discovery_enabled: bool,
program: Option<PathBuf>,
ipc_path: Option<PathBuf>,
ipc_enabled: bool,
data_dir: Option<PathBuf>,
chain_or_path: Option<String>,
genesis: Option<Genesis>,
args: Vec<OsString>,
keep_stdout: bool,
}
impl Reth {
pub fn new() -> Self {
Self {
dev: false,
http_port: DEFAULT_HTTP_PORT,
ws_port: DEFAULT_WS_PORT,
auth_port: DEFAULT_AUTH_PORT,
p2p_port: DEFAULT_P2P_PORT,
block_time: None,
instance: rand::thread_rng().gen_range(1..200),
discovery_enabled: true,
program: None,
ipc_path: None,
ipc_enabled: false,
data_dir: None,
chain_or_path: None,
genesis: None,
args: Vec::new(),
keep_stdout: false,
}
}
pub fn at(path: impl Into<PathBuf>) -> Self {
Self::new().path(path)
}
pub fn path<T: Into<PathBuf>>(mut self, path: T) -> Self {
self.program = Some(path.into());
self
}
pub const fn dev(mut self) -> Self {
self.dev = true;
self
}
pub const fn http_port(mut self, http_port: u16) -> Self {
self.http_port = http_port;
self.instance = 0;
self
}
pub const fn ws_port(mut self, ws_port: u16) -> Self {
self.ws_port = ws_port;
self.instance = 0;
self
}
pub const fn auth_port(mut self, auth_port: u16) -> Self {
self.auth_port = auth_port;
self.instance = 0;
self
}
pub const fn p2p_port(mut self, p2p_port: u16) -> Self {
self.p2p_port = p2p_port;
self.instance = 0;
self
}
pub fn block_time(mut self, block_time: &str) -> Self {
self.block_time = Some(block_time.to_string());
self
}
pub const fn disable_discovery(mut self) -> Self {
self.discovery_enabled = false;
self
}
pub fn chain_or_path(mut self, chain_or_path: &str) -> Self {
self.chain_or_path = Some(chain_or_path.to_string());
self
}
pub const fn enable_ipc(mut self) -> Self {
self.ipc_enabled = true;
self
}
pub const fn instance(mut self, instance: u16) -> Self {
self.instance = instance;
self
}
pub fn ipc_path<T: Into<PathBuf>>(mut self, path: T) -> Self {
self.ipc_path = Some(path.into());
self
}
pub fn data_dir<T: Into<PathBuf>>(mut self, path: T) -> Self {
self.data_dir = Some(path.into());
self
}
pub fn genesis(mut self, genesis: Genesis) -> Self {
self.genesis = Some(genesis);
self
}
pub const fn keep_stdout(mut self) -> Self {
self.keep_stdout = true;
self
}
pub fn arg<T: Into<OsString>>(mut self, arg: T) -> Self {
self.args.push(arg.into());
self
}
pub fn args<I, S>(mut self, args: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<OsString>,
{
for arg in args {
self = self.arg(arg);
}
self
}
#[track_caller]
pub fn spawn(self) -> RethInstance {
self.try_spawn().unwrap()
}
pub fn try_spawn(self) -> Result<RethInstance, NodeError> {
let bin_path = self
.program
.as_ref()
.map_or_else(|| RETH.as_ref(), |bin| bin.as_os_str())
.to_os_string();
let mut cmd = Command::new(&bin_path);
cmd.stdout(Stdio::piped());
cmd.arg("node");
if self.http_port != DEFAULT_HTTP_PORT {
cmd.arg("--http.port").arg(self.http_port.to_string());
}
if self.ws_port != DEFAULT_WS_PORT {
cmd.arg("--ws.port").arg(self.ws_port.to_string());
}
if self.auth_port != DEFAULT_AUTH_PORT {
cmd.arg("--authrpc.port").arg(self.auth_port.to_string());
}
if self.p2p_port != DEFAULT_P2P_PORT {
cmd.arg("--discovery.port").arg(self.p2p_port.to_string());
}
if self.dev {
cmd.arg("--dev");
if let Some(block_time) = self.block_time {
cmd.arg("--dev.block-time").arg(block_time);
}
}
if !self.ipc_enabled {
cmd.arg("--ipcdisable");
}
cmd.arg("--http");
cmd.arg("--http.api").arg(API);
cmd.arg("--ws");
cmd.arg("--ws.api").arg(API);
if let Some(ipc) = &self.ipc_path {
cmd.arg("--ipcpath").arg(ipc);
}
if self.instance > 0 {
cmd.arg("--instance").arg(self.instance.to_string());
}
if let Some(data_dir) = &self.data_dir {
cmd.arg("--datadir").arg(data_dir);
if !data_dir.exists() {
create_dir(data_dir).map_err(NodeError::CreateDirError)?;
}
}
if self.discovery_enabled {
cmd.arg("--verbosity").arg("-vvv");
} else {
cmd.arg("--disable-discovery");
cmd.arg("--no-persist-peers");
}
if let Some(chain_or_path) = self.chain_or_path {
cmd.arg("--chain").arg(chain_or_path);
}
cmd.arg("--color").arg("never");
cmd.args(self.args);
let mut child = cmd.spawn().map_err(NodeError::SpawnError)?;
let stdout = child.stdout.take().ok_or(NodeError::NoStdout)?;
let start = Instant::now();
let mut reader = BufReader::new(stdout);
let mut http_port = 0;
let mut ws_port = 0;
let mut auth_port = 0;
let mut p2p_port = 0;
let mut ports_started = false;
let mut p2p_started = !self.discovery_enabled;
loop {
if start + NODE_STARTUP_TIMEOUT <= Instant::now() {
let _ = child.kill();
return Err(NodeError::Timeout);
}
let mut line = String::with_capacity(120);
reader.read_line(&mut line).map_err(NodeError::ReadLineError)?;
if line.contains("RPC HTTP server started") {
if let Some(addr) = extract_endpoint("url=", &line) {
http_port = addr.port();
}
}
if line.contains("RPC WS server started") {
if let Some(addr) = extract_endpoint("url=", &line) {
ws_port = addr.port();
}
}
if line.contains("RPC auth server started") {
if let Some(addr) = extract_endpoint("url=", &line) {
auth_port = addr.port();
}
}
if line.contains("ERROR") {
let _ = child.kill();
return Err(NodeError::Fatal(line));
}
if http_port != 0 && ws_port != 0 && auth_port != 0 {
ports_started = true;
}
if self.discovery_enabled {
if line.contains("Updated local ENR") {
if let Some(port) = extract_endpoint("IpV4 UDP Socket", &line) {
p2p_port = port.port();
p2p_started = true;
}
}
} else {
p2p_started = true;
}
if ports_started && p2p_started {
break;
}
}
if self.keep_stdout {
child.stdout = Some(reader.into_inner());
}
Ok(RethInstance {
pid: child,
instance: self.instance,
http_port,
ws_port,
p2p_port: (p2p_port != 0).then_some(p2p_port),
ipc: self.ipc_path,
data_dir: self.data_dir,
auth_port: Some(auth_port),
genesis: self.genesis,
})
}
}