blueprint_chain_setup_tangle/
testnet.rs1#![allow(unused, clippy::missing_errors_doc, clippy::missing_panics_doc)]
3
4use crate::error::Error;
5use blueprint_std::borrow::Cow;
6use blueprint_std::collections::HashMap;
7use blueprint_std::ffi::OsString;
8use blueprint_std::io::{self, BufRead, BufReader, Read};
9use blueprint_std::process::{self, Child, Command};
10use blueprint_std::sync::mpsc;
11use blueprint_std::thread;
12
13pub const TANGLE_NODE_ENV: &str = "TANGLE_NODE";
15
16impl std::fmt::Display for Error {
17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18 match self {
19 Error::Io(err) => write!(f, "IO error: {err}"),
20 Error::CouldNotExtractPort(log) => write!(
21 f,
22 "could not extract port from running substrate node's stdout: {log}"
23 ),
24 Error::CouldNotExtractP2pAddress(log) => write!(
25 f,
26 "could not extract p2p address from running substrate node's stdout: {log}"
27 ),
28 Error::CouldNotExtractP2pPort(log) => write!(
29 f,
30 "could not extract p2p port from running substrate node's stdout: {log}"
31 ),
32 }
33 }
34}
35
36impl std::error::Error for Error {}
37
38type CowStr = Cow<'static, str>;
39
40#[derive(Debug, Clone)]
41pub struct SubstrateNodeBuilder {
42 binary_paths: Vec<String>,
43 custom_flags: HashMap<CowStr, Option<CowStr>>,
44}
45
46impl Default for SubstrateNodeBuilder {
47 fn default() -> Self {
48 SubstrateNodeBuilder::new()
49 }
50}
51
52impl SubstrateNodeBuilder {
53 #[must_use]
55 pub fn new() -> Self {
56 SubstrateNodeBuilder {
57 binary_paths: vec![],
58 custom_flags: HashMap::default(),
59 }
60 }
61
62 pub fn tangle(&mut self) -> &mut Self {
64 self.binary_paths = vec!["tangle".into()];
65 self
66 }
67
68 pub fn binary_paths<I, S>(&mut self, paths: I) -> &mut Self
71 where
72 I: IntoIterator<Item = S>,
73 S: Into<String>,
74 {
75 self.binary_paths = paths.into_iter().map(Into::into).collect();
76 self
77 }
78
79 pub fn add_binary_path<S: Into<String>>(&mut self, path: S) -> &mut Self {
81 self.binary_paths.push(path.into());
82 self
83 }
84
85 pub fn arg(&mut self, s: impl Into<CowStr>) -> &mut Self {
87 self.custom_flags.insert(s.into(), None);
88 self
89 }
90
91 pub fn arg_val(&mut self, key: impl Into<CowStr>, val: impl Into<CowStr>) -> &mut Self {
93 self.custom_flags.insert(key.into(), Some(val.into()));
94 self
95 }
96
97 pub fn spawn(mut self) -> Result<SubstrateNode, Error> {
99 let mut res = Err(io::Error::other("No binary path provided"));
102
103 let path = Command::new("mktemp")
104 .arg("-d")
105 .output()
106 .expect("failed to create base dir");
107 let path = String::from_utf8(path.stdout).expect("bad path");
108 let mut bin_path = OsString::new();
109 for binary_path in &self.binary_paths {
110 let binary_path = &std::path::absolute(binary_path)
111 .expect("bad path")
112 .into_os_string();
113 blueprint_core::info!("Trying to spawn Tangle node binary at {:?}", binary_path);
114 self.custom_flags
115 .insert("base-path".into(), Some(path.clone().into()));
116
117 res = SubstrateNodeBuilder::try_spawn(binary_path, &self.custom_flags);
118 if res.is_ok() {
119 bin_path.clone_from(binary_path);
120 break;
121 }
122 }
123
124 let mut proc = match res {
125 Ok(proc) => proc,
126 Err(e) => return Err(Error::Io(e)),
127 };
128
129 let (init_tx, init_rx) = mpsc::channel();
131 let (log_tx, log_rx) = mpsc::channel();
132
133 let stderr = proc.stderr.take().unwrap();
135 let init_tx_clone = init_tx.clone();
136 let log_tx_clone = log_tx.clone();
137
138 let stderr_handle = thread::spawn(move || {
140 let reader = BufReader::new(stderr);
141 for line in reader.lines().map_while(Result::ok) {
142 blueprint_core::debug!(target: "tangle-node", "node-stderr: {}", line);
143 let _ = init_tx_clone.send(line.clone());
144 let _ = log_tx_clone.send(line);
145 }
146 });
147
148 let stdout = proc.stdout.take().unwrap();
150 let log_tx_stdout = log_tx.clone();
151
152 let stdout_handle = thread::spawn(move || {
154 let reader = BufReader::new(stdout);
155 for line in reader.lines().map_while(Result::ok) {
156 blueprint_core::debug!(target: "tangle-node", "node-stdout: {}", line);
157 let _ = log_tx_stdout.send(line);
158 }
159 });
160
161 let running_node = try_find_substrate_port_from_output(&init_rx);
163
164 let ws_port = running_node.ws_port()?;
165 let p2p_address = running_node.p2p_address()?;
166 let p2p_port = running_node.p2p_port()?;
167
168 Ok(SubstrateNode {
169 binary_path: bin_path,
170 custom_flags: self.custom_flags,
171 proc,
172 ws_port,
173 p2p_address,
174 p2p_port,
175 base_path: path,
176 stdout_handle: Some(stdout_handle),
177 stderr_handle: Some(stderr_handle),
178 log_capture_tx: Some(log_tx),
179 })
180 }
181
182 fn try_spawn(
184 binary_path: &OsString,
185 custom_flags: &HashMap<CowStr, Option<CowStr>>,
186 ) -> Result<Child, std::io::Error> {
187 let mut cmd = Command::new(binary_path);
188
189 cmd.env("RUST_LOG", "info,libp2p_tcp=debug")
190 .stdout(process::Stdio::piped())
191 .stderr(process::Stdio::piped())
192 .arg("--dev")
193 .arg("--port=0");
194
195 for (key, val) in custom_flags {
196 let arg = match val {
197 Some(val) => format!("--{key}={val}"),
198 None => format!("--{key}"),
199 };
200 cmd.arg(arg);
201 }
202
203 blueprint_core::trace!("Spawning Tangle node with command: {cmd:?}");
204 cmd.spawn()
205 }
206}
207
208pub struct SubstrateNode {
209 binary_path: OsString,
210 custom_flags: HashMap<CowStr, Option<CowStr>>,
211 proc: process::Child,
212 ws_port: u16,
213 p2p_address: String,
214 p2p_port: u32,
215 base_path: String,
216 stdout_handle: Option<thread::JoinHandle<()>>,
217 stderr_handle: Option<thread::JoinHandle<()>>,
218 log_capture_tx: Option<mpsc::Sender<String>>,
219}
220
221impl SubstrateNode {
222 #[must_use]
224 pub fn builder() -> SubstrateNodeBuilder {
225 SubstrateNodeBuilder::new()
226 }
227
228 #[must_use]
230 pub fn id(&self) -> u32 {
231 self.proc.id()
232 }
233
234 #[must_use]
236 pub fn ws_port(&self) -> u16 {
237 self.ws_port
238 }
239
240 #[must_use]
242 pub fn p2p_address(&self) -> String {
243 self.p2p_address.clone()
244 }
245
246 #[must_use]
248 pub fn p2p_port(&self) -> u32 {
249 self.p2p_port
250 }
251
252 pub fn kill(&mut self) -> std::io::Result<()> {
254 self.proc.kill()
255 }
256
257 pub fn restart(&mut self) -> Result<(), std::io::Error> {
259 self.kill()?;
261
262 self.log_capture_tx.take();
264 if let Some(handle) = self.stdout_handle.take() {
265 let _ = handle.join();
266 }
267 if let Some(handle) = self.stderr_handle.take() {
268 let _ = handle.join();
269 }
270
271 let (log_tx, _log_rx) = mpsc::channel();
273
274 let mut proc = self.try_spawn()?;
276
277 if let Some(stdout) = proc.stdout.take() {
279 let log_tx_clone = log_tx.clone();
280 let handle = thread::spawn(move || {
281 let reader = BufReader::new(stdout);
282 for line in reader.lines().map_while(Result::ok) {
283 blueprint_core::debug!(target: "tangle-node","node-stdout: {}", line);
284 let _ = log_tx_clone.send(line);
285 }
286 });
287 self.stdout_handle = Some(handle);
288 }
289
290 if let Some(stderr) = proc.stderr.take() {
292 let log_tx_clone = log_tx.clone();
293 let handle = thread::spawn(move || {
294 let reader = BufReader::new(stderr);
295 for line in reader.lines().map_while(Result::ok) {
296 blueprint_core::debug!(target: "tangle-node","node-stderr: {}", line);
297 let _ = log_tx_clone.send(line);
298 }
299 });
300 self.stderr_handle = Some(handle);
301 }
302
303 self.proc = proc;
304 self.log_capture_tx = Some(log_tx);
305
306 Ok(())
307 }
308
309 fn try_spawn(&mut self) -> Result<Child, std::io::Error> {
311 let mut cmd = Command::new(&self.binary_path);
312
313 cmd.env("RUST_LOG", "info,libp2p_tcp=debug")
314 .stdout(process::Stdio::piped())
315 .stderr(process::Stdio::piped())
316 .arg("--dev");
317
318 for (key, val) in &self.custom_flags {
319 let arg = match val {
320 Some(val) => format!("--{key}={val}"),
321 None => format!("--{key}"),
322 };
323 cmd.arg(arg);
324 }
325
326 cmd.arg(format!("--rpc-port={}", self.ws_port));
327 cmd.arg(format!("--port={}", self.p2p_port));
328
329 blueprint_core::debug!("Restarting Tangle node with command: {:?}", cmd);
330 cmd.spawn()
331 }
332
333 fn setup_log_handling(&mut self) {
334 if let Some(stdout) = self.proc.stdout.take() {
335 let log_tx = self.log_capture_tx.clone();
336 let handle = thread::spawn(move || {
337 let reader = BufReader::new(stdout);
338 for line in reader.lines().map_while(Result::ok) {
339 blueprint_core::debug!(target: "tangle-node", "node-stdout: {}", line);
340 if let Some(tx) = &log_tx {
341 let _ = tx.send(line);
342 }
343 }
344 });
345 self.stdout_handle = Some(handle);
346 }
347
348 if let Some(stderr) = self.proc.stderr.take() {
349 let log_tx = self.log_capture_tx.clone();
350 let handle = thread::spawn(move || {
351 let reader = BufReader::new(stderr);
352 for line in reader.lines().map_while(Result::ok) {
353 blueprint_core::debug!(target: "tangle-node", "node-stderr: {}", line);
354 if let Some(tx) = &log_tx {
355 let _ = tx.send(line);
356 }
357 }
358 });
359 self.stderr_handle = Some(handle);
360 }
361 }
362
363 fn cleanup(&self) {
364 let _ = Command::new("rm")
365 .args(["-rf", &self.base_path])
366 .output()
367 .expect("success");
368 }
369}
370
371impl Drop for SubstrateNode {
372 fn drop(&mut self) {
373 self.log_capture_tx.take();
375
376 let _ = self.kill();
378 if let Some(handle) = self.stdout_handle.take() {
379 let _ = handle.join();
380 }
381 if let Some(handle) = self.stderr_handle.take() {
382 let _ = handle.join();
383 }
384
385 self.cleanup();
386 }
387}
388
389fn try_find_substrate_port_from_output(rx: &mpsc::Receiver<String>) -> SubstrateNodeInfo {
392 let mut port = None;
393 let mut p2p_address = None;
394 let mut p2p_port = None;
395 let mut log = String::new();
396
397 let timeout = std::time::Duration::from_secs(30);
398 let start = std::time::Instant::now();
399
400 while start.elapsed() < timeout {
401 let line = match rx.recv_timeout(std::time::Duration::from_millis(100)) {
403 Ok(line) => line,
404 Err(mpsc::RecvTimeoutError::Timeout) => continue,
405 Err(mpsc::RecvTimeoutError::Disconnected) => break,
406 };
407
408 blueprint_core::debug!(target: "tangle-node", "{}", line);
409 log.push_str(&line);
410 log.push('\n');
411
412 let line_port = line
414 .rsplit_once("Listening for new connections on 127.0.0.1:")
416 .or_else(|| line.rsplit_once("Running JSON-RPC WS server: addr=127.0.0.1:"))
418 .or_else(|| line.rsplit_once("Running JSON-RPC server: addr=127.0.0.1:"))
420 .or_else(|| line.rsplit_once("Running JSON-RPC server: addr=0.0.0.0:"))
422 .map(|(_, port_str)| port_str);
423
424 if let Some(ports) = line_port {
425 let port_str: String = ports.chars().take_while(|c| c.is_numeric()).collect();
428
429 let port_num = port_str
431 .parse()
432 .unwrap_or_else(|_| panic!("valid port expected for log line, got '{port_str}'"));
433 port = Some(port_num);
434 }
435
436 let line_address = line
438 .rsplit_once("Local node identity is: ")
439 .map(|(_, address_str)| address_str);
440
441 if let Some(line_address) = line_address {
442 let address = line_address.trim_end_matches(|b: char| b.is_ascii_whitespace());
443 p2p_address = Some(address.into());
444 }
445
446 let p2p_port_line = line
448 .rsplit_once("New listen address: /ip4/127.0.0.1/tcp/")
449 .map(|(_, address_str)| address_str);
450
451 if let Some(line_port) = p2p_port_line {
452 let port_str = line_port.trim_end_matches(|b: char| !b.is_ascii_digit());
454
455 let port_num = port_str
457 .parse()
458 .unwrap_or_else(|_| panic!("valid port expected for log line, got '{port_str}'"));
459 p2p_port = Some(port_num);
460 }
461
462 if port.is_some() && p2p_address.is_some() && p2p_port.is_some() {
463 break;
464 }
465 }
466
467 SubstrateNodeInfo {
468 ws_port: port,
469 p2p_address,
470 p2p_port,
471 log,
472 }
473}
474
475#[derive(Debug)]
477pub struct SubstrateNodeInfo {
478 ws_port: Option<u16>,
479 p2p_address: Option<String>,
480 p2p_port: Option<u32>,
481 log: String,
482}
483
484impl SubstrateNodeInfo {
485 pub fn ws_port(&self) -> Result<u16, Error> {
486 self.ws_port
487 .ok_or_else(|| Error::CouldNotExtractPort(self.log.clone()))
488 }
489
490 pub fn p2p_address(&self) -> Result<String, Error> {
491 self.p2p_address
492 .clone()
493 .ok_or_else(|| Error::CouldNotExtractP2pAddress(self.log.clone()))
494 }
495
496 pub fn p2p_port(&self) -> Result<u32, Error> {
497 self.p2p_port
498 .ok_or_else(|| Error::CouldNotExtractP2pPort(self.log.clone()))
499 }
500}
501
502#[derive(Debug, Clone, Default)]
503pub struct NodeConfig {
504 pub use_local_tangle: bool,
505 pub log_level: Option<String>,
506 pub log_targets: Vec<(String, String)>,
507}
508
509impl NodeConfig {
510 #[must_use]
511 pub fn new(use_local_tangle: bool) -> Self {
512 Self {
513 use_local_tangle,
514 log_level: None,
515 log_targets: Vec::new(),
516 }
517 }
518
519 #[must_use]
520 pub fn with_log_level(mut self, level: impl Into<String>) -> Self {
521 self.log_level = Some(level.into());
522 self
523 }
524
525 #[must_use]
526 pub fn with_log_target(mut self, target: impl Into<String>, level: impl Into<String>) -> Self {
527 self.log_targets.push((target.into(), level.into()));
528 self
529 }
530
531 #[must_use]
532 pub fn to_log_string(&self) -> String {
533 let mut parts = Vec::new();
534
535 if let Some(level) = &self.log_level {
537 parts.push(level.clone());
538 }
539
540 for (target, level) in &self.log_targets {
542 parts.push(format!("{target}={level}"));
543 }
544
545 parts.join(",")
546 }
547}