alloy_node_bindings/nodes/
reth.rs1use crate::{
4 utils::{extract_endpoint, GracefulShutdown},
5 NodeError, NODE_STARTUP_TIMEOUT,
6};
7use alloy_genesis::Genesis;
8use rand::Rng;
9use std::{
10 ffi::OsString,
11 fs::create_dir_all,
12 io::{BufRead, BufReader},
13 path::PathBuf,
14 process::{Child, ChildStdout, Command, Stdio},
15 time::Instant,
16};
17use url::Url;
18
19const API: &str = "eth,net,web3,txpool,trace,rpc,reth,ots,admin,debug";
21
22const RETH: &str = "reth";
24
25const DEFAULT_HTTP_PORT: u16 = 8545;
27
28const DEFAULT_WS_PORT: u16 = 8546;
30
31const DEFAULT_AUTH_PORT: u16 = 8551;
33
34const DEFAULT_P2P_PORT: u16 = 30303;
36
37#[derive(Debug)]
41pub struct RethInstance {
42 pid: Child,
43 host: String,
44 instance: u16,
45 http_port: u16,
46 ws_port: u16,
47 auth_port: Option<u16>,
48 p2p_port: Option<u16>,
49 ipc: Option<PathBuf>,
50 data_dir: Option<PathBuf>,
51 genesis: Option<Genesis>,
52}
53
54impl RethInstance {
55 pub fn host(&self) -> &str {
57 &self.host
58 }
59
60 pub const fn instance(&self) -> u16 {
62 self.instance
63 }
64
65 pub const fn http_port(&self) -> u16 {
67 self.http_port
68 }
69
70 pub const fn ws_port(&self) -> u16 {
72 self.ws_port
73 }
74
75 pub const fn auth_port(&self) -> Option<u16> {
77 self.auth_port
78 }
79
80 pub const fn p2p_port(&self) -> Option<u16> {
83 self.p2p_port
84 }
85
86 #[doc(alias = "http_endpoint")]
88 pub fn endpoint(&self) -> String {
89 format!("http://{}:{}", self.host, self.http_port)
90 }
91
92 pub fn ws_endpoint(&self) -> String {
94 format!("ws://{}:{}", self.host, self.ws_port)
95 }
96
97 pub fn ipc_endpoint(&self) -> String {
99 self.ipc.as_ref().map_or_else(|| "reth.ipc".to_string(), |ipc| ipc.display().to_string())
100 }
101
102 #[doc(alias = "http_endpoint_url")]
104 pub fn endpoint_url(&self) -> Url {
105 Url::parse(&self.endpoint()).unwrap()
106 }
107
108 pub fn ws_endpoint_url(&self) -> Url {
110 Url::parse(&self.ws_endpoint()).unwrap()
111 }
112
113 pub const fn data_dir(&self) -> Option<&PathBuf> {
115 self.data_dir.as_ref()
116 }
117
118 pub const fn genesis(&self) -> Option<&Genesis> {
120 self.genesis.as_ref()
121 }
122
123 pub fn stdout(&mut self) -> Result<ChildStdout, NodeError> {
128 self.pid.stdout.take().ok_or(NodeError::NoStdout)
129 }
130}
131
132impl Drop for RethInstance {
133 fn drop(&mut self) {
134 GracefulShutdown::shutdown(&mut self.pid, 10, "reth");
135 }
136}
137
138#[derive(Clone, Debug)]
157#[must_use = "This Builder struct does nothing unless it is `spawn`ed"]
158pub struct Reth {
159 dev: bool,
160 host: Option<String>,
161 http_port: u16,
162 ws_port: u16,
163 auth_port: u16,
164 p2p_port: u16,
165 block_time: Option<String>,
166 instance: u16,
167 discovery_enabled: bool,
168 program: Option<PathBuf>,
169 ipc_path: Option<PathBuf>,
170 ipc_enabled: bool,
171 data_dir: Option<PathBuf>,
172 chain_or_path: Option<String>,
173 genesis: Option<Genesis>,
174 args: Vec<OsString>,
175 keep_stdout: bool,
176}
177
178impl Default for Reth {
179 fn default() -> Self {
180 Self::new()
181 }
182}
183
184impl Reth {
185 pub fn new() -> Self {
191 Self {
192 dev: false,
193 host: None,
194 http_port: DEFAULT_HTTP_PORT,
195 ws_port: DEFAULT_WS_PORT,
196 auth_port: DEFAULT_AUTH_PORT,
197 p2p_port: DEFAULT_P2P_PORT,
198 block_time: None,
199 instance: rand::thread_rng().gen_range(1..200),
200 discovery_enabled: true,
201 program: None,
202 ipc_path: None,
203 ipc_enabled: false,
204 data_dir: None,
205 chain_or_path: None,
206 genesis: None,
207 args: Vec::new(),
208 keep_stdout: false,
209 }
210 }
211
212 pub fn at(path: impl Into<PathBuf>) -> Self {
225 Self::new().path(path)
226 }
227
228 pub fn path<T: Into<PathBuf>>(mut self, path: T) -> Self {
233 self.program = Some(path.into());
234 self
235 }
236
237 pub const fn dev(mut self) -> Self {
239 self.dev = true;
240 self
241 }
242
243 pub fn host<T: Into<String>>(mut self, host: T) -> Self {
247 self.host = Some(host.into());
248 self
249 }
250
251 pub const fn http_port(mut self, http_port: u16) -> Self {
254 self.http_port = http_port;
255 self.instance = 0;
256 self
257 }
258
259 pub const fn ws_port(mut self, ws_port: u16) -> Self {
262 self.ws_port = ws_port;
263 self.instance = 0;
264 self
265 }
266
267 pub const fn auth_port(mut self, auth_port: u16) -> Self {
270 self.auth_port = auth_port;
271 self.instance = 0;
272 self
273 }
274
275 pub const fn p2p_port(mut self, p2p_port: u16) -> Self {
278 self.p2p_port = p2p_port;
279 self.instance = 0;
280 self
281 }
282
283 pub fn block_time(mut self, block_time: &str) -> Self {
287 self.block_time = Some(block_time.to_string());
288 self
289 }
290
291 pub const fn disable_discovery(mut self) -> Self {
293 self.discovery_enabled = false;
294 self
295 }
296
297 pub fn chain_or_path(mut self, chain_or_path: &str) -> Self {
302 self.chain_or_path = Some(chain_or_path.to_string());
303 self
304 }
305
306 pub const fn enable_ipc(mut self) -> Self {
308 self.ipc_enabled = true;
309 self
310 }
311
312 pub const fn instance(mut self, instance: u16) -> Self {
315 self.instance = instance;
316 self
317 }
318
319 pub fn ipc_path<T: Into<PathBuf>>(mut self, path: T) -> Self {
323 self.ipc_path = Some(path.into());
324 self.ipc_enabled = true;
325 self
326 }
327
328 pub fn data_dir<T: Into<PathBuf>>(mut self, path: T) -> Self {
330 self.data_dir = Some(path.into());
331 self
332 }
333
334 pub fn genesis(mut self, genesis: Genesis) -> Self {
341 self.genesis = Some(genesis);
342 self
343 }
344
345 pub const fn keep_stdout(mut self) -> Self {
349 self.keep_stdout = true;
350 self
351 }
352
353 pub fn arg<T: Into<OsString>>(mut self, arg: T) -> Self {
357 self.args.push(arg.into());
358 self
359 }
360
361 pub fn args<I, S>(mut self, args: I) -> Self
365 where
366 I: IntoIterator<Item = S>,
367 S: Into<OsString>,
368 {
369 for arg in args {
370 self = self.arg(arg);
371 }
372 self
373 }
374
375 #[track_caller]
381 pub fn spawn(self) -> RethInstance {
382 self.try_spawn().unwrap()
383 }
384
385 pub fn try_spawn(self) -> Result<RethInstance, NodeError> {
387 let bin_path = self
388 .program
389 .as_ref()
390 .map_or_else(|| RETH.as_ref(), |bin| bin.as_os_str())
391 .to_os_string();
392 let mut cmd = Command::new(&bin_path);
393 cmd.stdout(Stdio::piped());
395
396 cmd.arg("node");
398
399 if self.http_port != DEFAULT_HTTP_PORT {
401 cmd.arg("--http.port").arg(self.http_port.to_string());
402 }
403
404 if self.ws_port != DEFAULT_WS_PORT {
405 cmd.arg("--ws.port").arg(self.ws_port.to_string());
406 }
407
408 if self.auth_port != DEFAULT_AUTH_PORT {
409 cmd.arg("--authrpc.port").arg(self.auth_port.to_string());
410 }
411
412 if self.p2p_port != DEFAULT_P2P_PORT {
413 cmd.arg("--discovery.port").arg(self.p2p_port.to_string());
414 }
415
416 if self.dev {
418 cmd.arg("--dev");
425
426 if let Some(block_time) = self.block_time {
428 cmd.arg("--dev.block-time").arg(block_time);
429 }
430 }
431
432 if !self.ipc_enabled {
434 cmd.arg("--ipcdisable");
435 }
436
437 cmd.arg("--http");
439 cmd.arg("--http.api").arg(API);
440
441 if let Some(ref host) = self.host {
442 cmd.arg("--http.addr").arg(host);
443 }
444
445 cmd.arg("--ws");
447 cmd.arg("--ws.api").arg(API);
448
449 if let Some(ref host) = self.host {
450 cmd.arg("--ws.addr").arg(host);
451 }
452
453 if let Some(ipc) = &self.ipc_path {
455 cmd.arg("--ipcpath").arg(ipc);
456 }
457
458 if self.instance > 0 {
463 cmd.arg("--instance").arg(self.instance.to_string());
464 }
465
466 if let Some(data_dir) = &self.data_dir {
467 cmd.arg("--datadir").arg(data_dir);
468
469 if !data_dir.exists() {
471 create_dir_all(data_dir).map_err(NodeError::CreateDirError)?;
472 }
473 }
474
475 if self.discovery_enabled {
476 cmd.arg("--verbosity").arg("-vvv");
478 } else {
479 cmd.arg("--disable-discovery");
480 cmd.arg("--no-persist-peers");
481 }
482
483 if let Some(chain_or_path) = self.chain_or_path {
484 cmd.arg("--chain").arg(chain_or_path);
485 }
486
487 cmd.arg("--color").arg("never");
489
490 cmd.args(self.args);
492
493 let mut child = cmd.spawn().map_err(NodeError::SpawnError)?;
494
495 let stdout = child.stdout.take().ok_or(NodeError::NoStdout)?;
496
497 let start = Instant::now();
498 let mut reader = BufReader::new(stdout);
499
500 let mut http_port = 0;
501 let mut ws_port = 0;
502 let mut auth_port = 0;
503 let mut p2p_port = 0;
504
505 let mut ports_started = false;
506 let mut p2p_started = !self.discovery_enabled;
507
508 loop {
509 if start + NODE_STARTUP_TIMEOUT <= Instant::now() {
510 let _ = child.kill();
511 return Err(NodeError::Timeout);
512 }
513
514 let mut line = String::with_capacity(120);
515 reader.read_line(&mut line).map_err(NodeError::ReadLineError)?;
516
517 if line.contains("RPC HTTP server started") {
518 if let Some(addr) = extract_endpoint("url=", &line) {
519 http_port = addr.port();
520 }
521 }
522
523 if line.contains("RPC WS server started") {
524 if let Some(addr) = extract_endpoint("url=", &line) {
525 ws_port = addr.port();
526 }
527 }
528
529 if line.contains("RPC auth server started") {
530 if let Some(addr) = extract_endpoint("url=", &line) {
531 auth_port = addr.port();
532 }
533 }
534
535 if line.contains("ERROR") {
537 let _ = child.kill();
538 return Err(NodeError::Fatal(line));
539 }
540
541 if http_port != 0 && ws_port != 0 && auth_port != 0 {
542 ports_started = true;
543 }
544
545 if self.discovery_enabled {
546 if line.contains("Updated local ENR") {
547 if let Some(port) = extract_endpoint("IpV4 UDP Socket", &line) {
548 p2p_port = port.port();
549 p2p_started = true;
550 }
551 }
552 } else {
553 p2p_started = true;
554 }
555
556 if ports_started && p2p_started {
558 break;
559 }
560 }
561
562 if self.keep_stdout {
563 child.stdout = Some(reader.into_inner());
565 }
566
567 Ok(RethInstance {
568 pid: child,
569 host: self.host.unwrap_or_else(|| "localhost".to_string()),
570 instance: self.instance,
571 http_port,
572 ws_port,
573 p2p_port: (p2p_port != 0).then_some(p2p_port),
574 ipc: self.ipc_path,
575 data_dir: self.data_dir,
576 auth_port: Some(auth_port),
577 genesis: self.genesis,
578 })
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585
586 #[test]
587 fn can_set_host() {
588 let reth = Reth::new().host("0.0.0.0").dev().try_spawn();
589 if let Ok(reth) = reth {
590 assert_eq!(reth.host(), "0.0.0.0");
591 assert!(reth.endpoint().starts_with("http://0.0.0.0:"));
592 assert!(reth.ws_endpoint().starts_with("ws://0.0.0.0:"));
593 }
594 }
595
596 #[test]
597 fn default_host_is_localhost() {
598 let reth = Reth::new().dev().try_spawn();
599 if let Ok(reth) = reth {
600 assert_eq!(reth.host(), "localhost");
601 assert!(reth.endpoint().starts_with("http://localhost:"));
602 assert!(reth.ws_endpoint().starts_with("ws://localhost:"));
603 }
604 }
605
606 #[test]
607 fn default_matches_new_semantics() {
608 let reth = Reth::default();
609
610 assert!(!reth.dev);
611 assert_eq!(reth.host, None);
612 assert_eq!(reth.http_port, DEFAULT_HTTP_PORT);
613 assert_eq!(reth.ws_port, DEFAULT_WS_PORT);
614 assert_eq!(reth.auth_port, DEFAULT_AUTH_PORT);
615 assert_eq!(reth.p2p_port, DEFAULT_P2P_PORT);
616 assert_eq!(reth.block_time, None);
617 assert!((1..200).contains(&reth.instance));
618 assert!(reth.discovery_enabled);
619 assert_eq!(reth.program, None);
620 assert_eq!(reth.ipc_path, None);
621 assert!(!reth.ipc_enabled);
622 assert_eq!(reth.data_dir, None);
623 assert_eq!(reth.chain_or_path, None);
624 assert_eq!(reth.genesis, None);
625 assert!(reth.args.is_empty());
626 assert!(!reth.keep_stdout);
627 }
628}