alloy_node_bindings/nodes/
reth.rs1use crate::{utils::extract_endpoint, NodeError, NODE_STARTUP_TIMEOUT};
4use alloy_genesis::Genesis;
5use rand::Rng;
6use std::{
7 ffi::OsString,
8 fs::create_dir,
9 io::{BufRead, BufReader},
10 path::PathBuf,
11 process::{Child, ChildStdout, Command, Stdio},
12 time::Instant,
13};
14use url::Url;
15
16const API: &str = "eth,net,web3,txpool,trace,rpc,reth,ots,admin,debug";
18
19const RETH: &str = "reth";
21
22const DEFAULT_HTTP_PORT: u16 = 8545;
24
25const DEFAULT_WS_PORT: u16 = 8546;
27
28const DEFAULT_AUTH_PORT: u16 = 8551;
30
31const DEFAULT_P2P_PORT: u16 = 30303;
33
34#[derive(Debug)]
38pub struct RethInstance {
39 pid: Child,
40 host: String,
41 instance: u16,
42 http_port: u16,
43 ws_port: u16,
44 auth_port: Option<u16>,
45 p2p_port: Option<u16>,
46 ipc: Option<PathBuf>,
47 data_dir: Option<PathBuf>,
48 genesis: Option<Genesis>,
49}
50
51impl RethInstance {
52 pub fn host(&self) -> &str {
54 &self.host
55 }
56
57 pub const fn instance(&self) -> u16 {
59 self.instance
60 }
61
62 pub const fn http_port(&self) -> u16 {
64 self.http_port
65 }
66
67 pub const fn ws_port(&self) -> u16 {
69 self.ws_port
70 }
71
72 pub const fn auth_port(&self) -> Option<u16> {
74 self.auth_port
75 }
76
77 pub const fn p2p_port(&self) -> Option<u16> {
80 self.p2p_port
81 }
82
83 #[doc(alias = "http_endpoint")]
85 pub fn endpoint(&self) -> String {
86 format!("http://{}:{}", self.host, self.http_port)
87 }
88
89 pub fn ws_endpoint(&self) -> String {
91 format!("ws://{}:{}", self.host, self.ws_port)
92 }
93
94 pub fn ipc_endpoint(&self) -> String {
96 self.ipc.clone().map_or_else(|| "reth.ipc".to_string(), |ipc| ipc.display().to_string())
97 }
98
99 #[doc(alias = "http_endpoint_url")]
101 pub fn endpoint_url(&self) -> Url {
102 Url::parse(&self.endpoint()).unwrap()
103 }
104
105 pub fn ws_endpoint_url(&self) -> Url {
107 Url::parse(&self.ws_endpoint()).unwrap()
108 }
109
110 pub const fn data_dir(&self) -> Option<&PathBuf> {
112 self.data_dir.as_ref()
113 }
114
115 pub const fn genesis(&self) -> Option<&Genesis> {
117 self.genesis.as_ref()
118 }
119
120 pub fn stdout(&mut self) -> Result<ChildStdout, NodeError> {
125 self.pid.stdout.take().ok_or(NodeError::NoStdout)
126 }
127}
128
129impl Drop for RethInstance {
130 fn drop(&mut self) {
131 self.pid.kill().expect("could not kill reth");
132 }
133}
134
135#[derive(Clone, Debug, Default)]
154#[must_use = "This Builder struct does nothing unless it is `spawn`ed"]
155pub struct Reth {
156 dev: bool,
157 host: Option<String>,
158 http_port: u16,
159 ws_port: u16,
160 auth_port: u16,
161 p2p_port: u16,
162 block_time: Option<String>,
163 instance: u16,
164 discovery_enabled: bool,
165 program: Option<PathBuf>,
166 ipc_path: Option<PathBuf>,
167 ipc_enabled: bool,
168 data_dir: Option<PathBuf>,
169 chain_or_path: Option<String>,
170 genesis: Option<Genesis>,
171 args: Vec<OsString>,
172 keep_stdout: bool,
173}
174
175impl Reth {
176 pub fn new() -> Self {
182 Self {
183 dev: false,
184 host: None,
185 http_port: DEFAULT_HTTP_PORT,
186 ws_port: DEFAULT_WS_PORT,
187 auth_port: DEFAULT_AUTH_PORT,
188 p2p_port: DEFAULT_P2P_PORT,
189 block_time: None,
190 instance: rand::thread_rng().gen_range(1..200),
191 discovery_enabled: true,
192 program: None,
193 ipc_path: None,
194 ipc_enabled: false,
195 data_dir: None,
196 chain_or_path: None,
197 genesis: None,
198 args: Vec::new(),
199 keep_stdout: false,
200 }
201 }
202
203 pub fn at(path: impl Into<PathBuf>) -> Self {
216 Self::new().path(path)
217 }
218
219 pub fn path<T: Into<PathBuf>>(mut self, path: T) -> Self {
224 self.program = Some(path.into());
225 self
226 }
227
228 pub const fn dev(mut self) -> Self {
230 self.dev = true;
231 self
232 }
233
234 pub fn host<T: Into<String>>(mut self, host: T) -> Self {
238 self.host = Some(host.into());
239 self
240 }
241
242 pub const fn http_port(mut self, http_port: u16) -> Self {
245 self.http_port = http_port;
246 self.instance = 0;
247 self
248 }
249
250 pub const fn ws_port(mut self, ws_port: u16) -> Self {
253 self.ws_port = ws_port;
254 self.instance = 0;
255 self
256 }
257
258 pub const fn auth_port(mut self, auth_port: u16) -> Self {
261 self.auth_port = auth_port;
262 self.instance = 0;
263 self
264 }
265
266 pub const fn p2p_port(mut self, p2p_port: u16) -> Self {
269 self.p2p_port = p2p_port;
270 self.instance = 0;
271 self
272 }
273
274 pub fn block_time(mut self, block_time: &str) -> Self {
278 self.block_time = Some(block_time.to_string());
279 self
280 }
281
282 pub const fn disable_discovery(mut self) -> Self {
284 self.discovery_enabled = false;
285 self
286 }
287
288 pub fn chain_or_path(mut self, chain_or_path: &str) -> Self {
291 self.chain_or_path = Some(chain_or_path.to_string());
292 self
293 }
294
295 pub const fn enable_ipc(mut self) -> Self {
297 self.ipc_enabled = true;
298 self
299 }
300
301 pub const fn instance(mut self, instance: u16) -> Self {
304 self.instance = instance;
305 self
306 }
307
308 pub fn ipc_path<T: Into<PathBuf>>(mut self, path: T) -> Self {
310 self.ipc_path = Some(path.into());
311 self
312 }
313
314 pub fn data_dir<T: Into<PathBuf>>(mut self, path: T) -> Self {
316 self.data_dir = Some(path.into());
317 self
318 }
319
320 pub fn genesis(mut self, genesis: Genesis) -> Self {
327 self.genesis = Some(genesis);
328 self
329 }
330
331 pub const fn keep_stdout(mut self) -> Self {
335 self.keep_stdout = true;
336 self
337 }
338
339 pub fn arg<T: Into<OsString>>(mut self, arg: T) -> Self {
343 self.args.push(arg.into());
344 self
345 }
346
347 pub fn args<I, S>(mut self, args: I) -> Self
351 where
352 I: IntoIterator<Item = S>,
353 S: Into<OsString>,
354 {
355 for arg in args {
356 self = self.arg(arg);
357 }
358 self
359 }
360
361 #[track_caller]
367 pub fn spawn(self) -> RethInstance {
368 self.try_spawn().unwrap()
369 }
370
371 pub fn try_spawn(self) -> Result<RethInstance, NodeError> {
373 let bin_path = self
374 .program
375 .as_ref()
376 .map_or_else(|| RETH.as_ref(), |bin| bin.as_os_str())
377 .to_os_string();
378 let mut cmd = Command::new(&bin_path);
379 cmd.stdout(Stdio::piped());
381
382 cmd.arg("node");
384
385 if self.http_port != DEFAULT_HTTP_PORT {
387 cmd.arg("--http.port").arg(self.http_port.to_string());
388 }
389
390 if self.ws_port != DEFAULT_WS_PORT {
391 cmd.arg("--ws.port").arg(self.ws_port.to_string());
392 }
393
394 if self.auth_port != DEFAULT_AUTH_PORT {
395 cmd.arg("--authrpc.port").arg(self.auth_port.to_string());
396 }
397
398 if self.p2p_port != DEFAULT_P2P_PORT {
399 cmd.arg("--discovery.port").arg(self.p2p_port.to_string());
400 }
401
402 if self.dev {
404 cmd.arg("--dev");
411
412 if let Some(block_time) = self.block_time {
414 cmd.arg("--dev.block-time").arg(block_time);
415 }
416 }
417
418 if !self.ipc_enabled {
420 cmd.arg("--ipcdisable");
421 }
422
423 cmd.arg("--http");
425 cmd.arg("--http.api").arg(API);
426
427 if let Some(ref host) = self.host {
428 cmd.arg("--http.addr").arg(host);
429 }
430
431 cmd.arg("--ws");
433 cmd.arg("--ws.api").arg(API);
434
435 if let Some(ref host) = self.host {
436 cmd.arg("--ws.addr").arg(host);
437 }
438
439 if let Some(ipc) = &self.ipc_path {
441 cmd.arg("--ipcpath").arg(ipc);
442 }
443
444 if self.instance > 0 {
449 cmd.arg("--instance").arg(self.instance.to_string());
450 }
451
452 if let Some(data_dir) = &self.data_dir {
453 cmd.arg("--datadir").arg(data_dir);
454
455 if !data_dir.exists() {
457 create_dir(data_dir).map_err(NodeError::CreateDirError)?;
458 }
459 }
460
461 if self.discovery_enabled {
462 cmd.arg("--verbosity").arg("-vvv");
464 } else {
465 cmd.arg("--disable-discovery");
466 cmd.arg("--no-persist-peers");
467 }
468
469 if let Some(chain_or_path) = self.chain_or_path {
470 cmd.arg("--chain").arg(chain_or_path);
471 }
472
473 cmd.arg("--color").arg("never");
475
476 cmd.args(self.args);
478
479 let mut child = cmd.spawn().map_err(NodeError::SpawnError)?;
480
481 let stdout = child.stdout.take().ok_or(NodeError::NoStdout)?;
482
483 let start = Instant::now();
484 let mut reader = BufReader::new(stdout);
485
486 let mut http_port = 0;
487 let mut ws_port = 0;
488 let mut auth_port = 0;
489 let mut p2p_port = 0;
490
491 let mut ports_started = false;
492 let mut p2p_started = !self.discovery_enabled;
493
494 loop {
495 if start + NODE_STARTUP_TIMEOUT <= Instant::now() {
496 let _ = child.kill();
497 return Err(NodeError::Timeout);
498 }
499
500 let mut line = String::with_capacity(120);
501 reader.read_line(&mut line).map_err(NodeError::ReadLineError)?;
502
503 if line.contains("RPC HTTP server started") {
504 if let Some(addr) = extract_endpoint("url=", &line) {
505 http_port = addr.port();
506 }
507 }
508
509 if line.contains("RPC WS server started") {
510 if let Some(addr) = extract_endpoint("url=", &line) {
511 ws_port = addr.port();
512 }
513 }
514
515 if line.contains("RPC auth server started") {
516 if let Some(addr) = extract_endpoint("url=", &line) {
517 auth_port = addr.port();
518 }
519 }
520
521 if line.contains("ERROR") {
523 let _ = child.kill();
524 return Err(NodeError::Fatal(line));
525 }
526
527 if http_port != 0 && ws_port != 0 && auth_port != 0 {
528 ports_started = true;
529 }
530
531 if self.discovery_enabled {
532 if line.contains("Updated local ENR") {
533 if let Some(port) = extract_endpoint("IpV4 UDP Socket", &line) {
534 p2p_port = port.port();
535 p2p_started = true;
536 }
537 }
538 } else {
539 p2p_started = true;
540 }
541
542 if ports_started && p2p_started {
544 break;
545 }
546 }
547
548 if self.keep_stdout {
549 child.stdout = Some(reader.into_inner());
551 }
552
553 Ok(RethInstance {
554 pid: child,
555 host: self.host.unwrap_or_else(|| "localhost".to_string()),
556 instance: self.instance,
557 http_port,
558 ws_port,
559 p2p_port: (p2p_port != 0).then_some(p2p_port),
560 ipc: self.ipc_path,
561 data_dir: self.data_dir,
562 auth_port: Some(auth_port),
563 genesis: self.genesis,
564 })
565 }
566}
567
568#[cfg(test)]
569mod tests {
570 use super::*;
571
572 #[test]
573 fn can_set_host() {
574 let reth = Reth::new().host("0.0.0.0").dev().try_spawn();
575 if let Ok(reth) = reth {
576 assert_eq!(reth.host(), "0.0.0.0");
577 assert!(reth.endpoint().starts_with("http://0.0.0.0:"));
578 assert!(reth.ws_endpoint().starts_with("ws://0.0.0.0:"));
579 }
580 }
581
582 #[test]
583 fn default_host_is_localhost() {
584 let reth = Reth::new().dev().try_spawn();
585 if let Ok(reth) = reth {
586 assert_eq!(reth.host(), "localhost");
587 assert!(reth.endpoint().starts_with("http://localhost:"));
588 assert!(reth.ws_endpoint().starts_with("ws://localhost:"));
589 }
590 }
591}