Skip to main content

blueprint_chain_setup_tangle/
testnet.rs

1// Original: https://github.com/paritytech/subxt/blob/3219659f12a36fe6b7408bf4ac1db184414c6c0c/testing/substrate-runner/src/lib.rs
2#![allow(unused, clippy::missing_errors_doc, clippy::missing_panics_doc)]
3
4use crate::error::Error;
5use blueprint_std::borrow::Cow;
6use blueprint_std::collections::HashMap;
7use blueprint_std::ffi::OsString;
8use blueprint_std::io::{self, BufRead, BufReader, Read};
9use blueprint_std::process::{self, Child, Command};
10use blueprint_std::sync::mpsc;
11use blueprint_std::thread;
12
13/// Environment variable to set the path to the binary.
14pub const TANGLE_NODE_ENV: &str = "TANGLE_NODE";
15
16impl std::fmt::Display for Error {
17    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18        match self {
19            Error::Io(err) => write!(f, "IO error: {err}"),
20            Error::CouldNotExtractPort(log) => write!(
21                f,
22                "could not extract port from running substrate node's stdout: {log}"
23            ),
24            Error::CouldNotExtractP2pAddress(log) => write!(
25                f,
26                "could not extract p2p address from running substrate node's stdout: {log}"
27            ),
28            Error::CouldNotExtractP2pPort(log) => write!(
29                f,
30                "could not extract p2p port from running substrate node's stdout: {log}"
31            ),
32        }
33    }
34}
35
36impl std::error::Error for Error {}
37
38type CowStr = Cow<'static, str>;
39
40#[derive(Debug, Clone)]
41pub struct SubstrateNodeBuilder {
42    binary_paths: Vec<String>,
43    custom_flags: HashMap<CowStr, Option<CowStr>>,
44}
45
46impl Default for SubstrateNodeBuilder {
47    fn default() -> Self {
48        SubstrateNodeBuilder::new()
49    }
50}
51
52impl SubstrateNodeBuilder {
53    /// Configure a new Substrate node.
54    #[must_use]
55    pub fn new() -> Self {
56        SubstrateNodeBuilder {
57            binary_paths: vec![],
58            custom_flags: HashMap::default(),
59        }
60    }
61
62    /// Provide "tangle" as binary path.
63    pub fn tangle(&mut self) -> &mut Self {
64        self.binary_paths = vec!["tangle".into()];
65        self
66    }
67
68    /// Set the path to the `substrate` binary; defaults to "substrate-node"
69    /// or "substrate".
70    pub fn binary_paths<I, S>(&mut self, paths: I) -> &mut Self
71    where
72        I: IntoIterator<Item = S>,
73        S: Into<String>,
74    {
75        self.binary_paths = paths.into_iter().map(Into::into).collect();
76        self
77    }
78
79    /// Add a single binary path.
80    pub fn add_binary_path<S: Into<String>>(&mut self, path: S) -> &mut Self {
81        self.binary_paths.push(path.into());
82        self
83    }
84
85    /// Provide a boolean argument like `--alice`
86    pub fn arg(&mut self, s: impl Into<CowStr>) -> &mut Self {
87        self.custom_flags.insert(s.into(), None);
88        self
89    }
90
91    /// Provide an argument with a value.
92    pub fn arg_val(&mut self, key: impl Into<CowStr>, val: impl Into<CowStr>) -> &mut Self {
93        self.custom_flags.insert(key.into(), Some(val.into()));
94        self
95    }
96
97    /// Spawn the node, handing back an object which, when dropped, will stop it.
98    pub fn spawn(mut self) -> Result<SubstrateNode, Error> {
99        // Try to spawn the binary at each path, returning the
100        // first "ok" or last error that we encountered.
101        let mut res = Err(io::Error::other("No binary path provided"));
102
103        let path = Command::new("mktemp")
104            .arg("-d")
105            .output()
106            .expect("failed to create base dir");
107        let path = String::from_utf8(path.stdout).expect("bad path");
108        let mut bin_path = OsString::new();
109        for binary_path in &self.binary_paths {
110            let binary_path = &std::path::absolute(binary_path)
111                .expect("bad path")
112                .into_os_string();
113            blueprint_core::info!("Trying to spawn Tangle node binary at {:?}", binary_path);
114            self.custom_flags
115                .insert("base-path".into(), Some(path.clone().into()));
116
117            res = SubstrateNodeBuilder::try_spawn(binary_path, &self.custom_flags);
118            if res.is_ok() {
119                bin_path.clone_from(binary_path);
120                break;
121            }
122        }
123
124        let mut proc = match res {
125            Ok(proc) => proc,
126            Err(e) => return Err(Error::Io(e)),
127        };
128
129        // Create channels for log capturing
130        let (init_tx, init_rx) = mpsc::channel();
131        let (log_tx, log_rx) = mpsc::channel();
132
133        // Take stderr for port detection and logging
134        let stderr = proc.stderr.take().unwrap();
135        let init_tx_clone = init_tx.clone();
136        let log_tx_clone = log_tx.clone();
137
138        // Spawn thread to handle stderr
139        let stderr_handle = thread::spawn(move || {
140            let reader = BufReader::new(stderr);
141            for line in reader.lines().map_while(Result::ok) {
142                blueprint_core::debug!(target: "tangle-node", "node-stderr: {}", line);
143                let _ = init_tx_clone.send(line.clone());
144                let _ = log_tx_clone.send(line);
145            }
146        });
147
148        // Take stdout for logging
149        let stdout = proc.stdout.take().unwrap();
150        let log_tx_stdout = log_tx.clone();
151
152        // Spawn thread to handle stdout
153        let stdout_handle = thread::spawn(move || {
154            let reader = BufReader::new(stdout);
155            for line in reader.lines().map_while(Result::ok) {
156                blueprint_core::debug!(target: "tangle-node", "node-stdout: {}", line);
157                let _ = log_tx_stdout.send(line);
158            }
159        });
160
161        // Process initialization logs with timeout
162        let running_node = try_find_substrate_port_from_output(&init_rx);
163
164        let ws_port = running_node.ws_port()?;
165        let p2p_address = running_node.p2p_address()?;
166        let p2p_port = running_node.p2p_port()?;
167
168        Ok(SubstrateNode {
169            binary_path: bin_path,
170            custom_flags: self.custom_flags,
171            proc,
172            ws_port,
173            p2p_address,
174            p2p_port,
175            base_path: path,
176            stdout_handle: Some(stdout_handle),
177            stderr_handle: Some(stderr_handle),
178            log_capture_tx: Some(log_tx),
179        })
180    }
181
182    // Attempt to spawn a binary with the path/flags given.
183    fn try_spawn(
184        binary_path: &OsString,
185        custom_flags: &HashMap<CowStr, Option<CowStr>>,
186    ) -> Result<Child, std::io::Error> {
187        let mut cmd = Command::new(binary_path);
188
189        cmd.env("RUST_LOG", "info,libp2p_tcp=debug")
190            .stdout(process::Stdio::piped())
191            .stderr(process::Stdio::piped())
192            .arg("--dev")
193            .arg("--port=0");
194
195        for (key, val) in custom_flags {
196            let arg = match val {
197                Some(val) => format!("--{key}={val}"),
198                None => format!("--{key}"),
199            };
200            cmd.arg(arg);
201        }
202
203        blueprint_core::trace!("Spawning Tangle node with command: {cmd:?}");
204        cmd.spawn()
205    }
206}
207
208pub struct SubstrateNode {
209    binary_path: OsString,
210    custom_flags: HashMap<CowStr, Option<CowStr>>,
211    proc: process::Child,
212    ws_port: u16,
213    p2p_address: String,
214    p2p_port: u32,
215    base_path: String,
216    stdout_handle: Option<thread::JoinHandle<()>>,
217    stderr_handle: Option<thread::JoinHandle<()>>,
218    log_capture_tx: Option<mpsc::Sender<String>>,
219}
220
221impl SubstrateNode {
222    /// Configure and spawn a new [`SubstrateNode`].
223    #[must_use]
224    pub fn builder() -> SubstrateNodeBuilder {
225        SubstrateNodeBuilder::new()
226    }
227
228    /// Return the ID of the running process.
229    #[must_use]
230    pub fn id(&self) -> u32 {
231        self.proc.id()
232    }
233
234    /// Return the port that WS connections are accepted on.
235    #[must_use]
236    pub fn ws_port(&self) -> u16 {
237        self.ws_port
238    }
239
240    /// Return the libp2p address of the running node.
241    #[must_use]
242    pub fn p2p_address(&self) -> String {
243        self.p2p_address.clone()
244    }
245
246    /// Return the libp2p port of the running node.
247    #[must_use]
248    pub fn p2p_port(&self) -> u32 {
249        self.p2p_port
250    }
251
252    /// Kill the process.
253    pub fn kill(&mut self) -> std::io::Result<()> {
254        self.proc.kill()
255    }
256
257    /// restart the node, handing back an object which, when dropped, will stop it.
258    pub fn restart(&mut self) -> Result<(), std::io::Error> {
259        // First kill the existing process
260        self.kill()?;
261
262        // Drop existing log channels and handles
263        self.log_capture_tx.take();
264        if let Some(handle) = self.stdout_handle.take() {
265            let _ = handle.join();
266        }
267        if let Some(handle) = self.stderr_handle.take() {
268            let _ = handle.join();
269        }
270
271        // Create new channels for logging
272        let (log_tx, _log_rx) = mpsc::channel();
273
274        // Spawn new process
275        let mut proc = self.try_spawn()?;
276
277        // Setup new logging for stdout
278        if let Some(stdout) = proc.stdout.take() {
279            let log_tx_clone = log_tx.clone();
280            let handle = thread::spawn(move || {
281                let reader = BufReader::new(stdout);
282                for line in reader.lines().map_while(Result::ok) {
283                    blueprint_core::debug!(target: "tangle-node","node-stdout: {}", line);
284                    let _ = log_tx_clone.send(line);
285                }
286            });
287            self.stdout_handle = Some(handle);
288        }
289
290        // Setup new logging for stderr
291        if let Some(stderr) = proc.stderr.take() {
292            let log_tx_clone = log_tx.clone();
293            let handle = thread::spawn(move || {
294                let reader = BufReader::new(stderr);
295                for line in reader.lines().map_while(Result::ok) {
296                    blueprint_core::debug!(target: "tangle-node","node-stderr: {}", line);
297                    let _ = log_tx_clone.send(line);
298                }
299            });
300            self.stderr_handle = Some(handle);
301        }
302
303        self.proc = proc;
304        self.log_capture_tx = Some(log_tx);
305
306        Ok(())
307    }
308
309    // Attempt to spawn a binary with the path/flags given.
310    fn try_spawn(&mut self) -> Result<Child, std::io::Error> {
311        let mut cmd = Command::new(&self.binary_path);
312
313        cmd.env("RUST_LOG", "info,libp2p_tcp=debug")
314            .stdout(process::Stdio::piped())
315            .stderr(process::Stdio::piped())
316            .arg("--dev");
317
318        for (key, val) in &self.custom_flags {
319            let arg = match val {
320                Some(val) => format!("--{key}={val}"),
321                None => format!("--{key}"),
322            };
323            cmd.arg(arg);
324        }
325
326        cmd.arg(format!("--rpc-port={}", self.ws_port));
327        cmd.arg(format!("--port={}", self.p2p_port));
328
329        blueprint_core::debug!("Restarting Tangle node with command: {:?}", cmd);
330        cmd.spawn()
331    }
332
333    fn setup_log_handling(&mut self) {
334        if let Some(stdout) = self.proc.stdout.take() {
335            let log_tx = self.log_capture_tx.clone();
336            let handle = thread::spawn(move || {
337                let reader = BufReader::new(stdout);
338                for line in reader.lines().map_while(Result::ok) {
339                    blueprint_core::debug!(target: "tangle-node", "node-stdout: {}", line);
340                    if let Some(tx) = &log_tx {
341                        let _ = tx.send(line);
342                    }
343                }
344            });
345            self.stdout_handle = Some(handle);
346        }
347
348        if let Some(stderr) = self.proc.stderr.take() {
349            let log_tx = self.log_capture_tx.clone();
350            let handle = thread::spawn(move || {
351                let reader = BufReader::new(stderr);
352                for line in reader.lines().map_while(Result::ok) {
353                    blueprint_core::debug!(target: "tangle-node", "node-stderr: {}", line);
354                    if let Some(tx) = &log_tx {
355                        let _ = tx.send(line);
356                    }
357                }
358            });
359            self.stderr_handle = Some(handle);
360        }
361    }
362
363    fn cleanup(&self) {
364        let _ = Command::new("rm")
365            .args(["-rf", &self.base_path])
366            .output()
367            .expect("success");
368    }
369}
370
371impl Drop for SubstrateNode {
372    fn drop(&mut self) {
373        // First drop the log capture channel to signal threads to exit
374        self.log_capture_tx.take();
375
376        // Kill the process
377        let _ = self.kill();
378        if let Some(handle) = self.stdout_handle.take() {
379            let _ = handle.join();
380        }
381        if let Some(handle) = self.stderr_handle.take() {
382            let _ = handle.join();
383        }
384
385        self.cleanup();
386    }
387}
388
389// Consume a stderr reader from a spawned substrate command and
390// locate the port number that is logged out to it.
391fn try_find_substrate_port_from_output(rx: &mpsc::Receiver<String>) -> SubstrateNodeInfo {
392    let mut port = None;
393    let mut p2p_address = None;
394    let mut p2p_port = None;
395    let mut log = String::new();
396
397    let timeout = std::time::Duration::from_secs(30);
398    let start = std::time::Instant::now();
399
400    while start.elapsed() < timeout {
401        // Try to receive with a small timeout to avoid blocking forever
402        let line = match rx.recv_timeout(std::time::Duration::from_millis(100)) {
403            Ok(line) => line,
404            Err(mpsc::RecvTimeoutError::Timeout) => continue,
405            Err(mpsc::RecvTimeoutError::Disconnected) => break,
406        };
407
408        blueprint_core::debug!(target: "tangle-node", "{}", line);
409        log.push_str(&line);
410        log.push('\n');
411
412        // Parse the port lines
413        let line_port = line
414            // oldest message:
415            .rsplit_once("Listening for new connections on 127.0.0.1:")
416            // slightly newer message:
417            .or_else(|| line.rsplit_once("Running JSON-RPC WS server: addr=127.0.0.1:"))
418            // newest message (jsonrpsee merging http and ws servers):
419            .or_else(|| line.rsplit_once("Running JSON-RPC server: addr=127.0.0.1:"))
420            // Sometimes the addr is 0.0.0.0
421            .or_else(|| line.rsplit_once("Running JSON-RPC server: addr=0.0.0.0:"))
422            .map(|(_, port_str)| port_str);
423
424        if let Some(ports) = line_port {
425            // If more than one rpc server is started the log will capture multiple ports
426            // such as `addr=127.0.0.1:9944,[::1]:9944`
427            let port_str: String = ports.chars().take_while(|c| c.is_numeric()).collect();
428
429            // expect to have a number here (the chars after '127.0.0.1:') and parse them into a u16.
430            let port_num = port_str
431                .parse()
432                .unwrap_or_else(|_| panic!("valid port expected for log line, got '{port_str}'"));
433            port = Some(port_num);
434        }
435
436        // Parse the p2p address line
437        let line_address = line
438            .rsplit_once("Local node identity is: ")
439            .map(|(_, address_str)| address_str);
440
441        if let Some(line_address) = line_address {
442            let address = line_address.trim_end_matches(|b: char| b.is_ascii_whitespace());
443            p2p_address = Some(address.into());
444        }
445
446        // Parse the p2p port line (present in debug logs)
447        let p2p_port_line = line
448            .rsplit_once("New listen address: /ip4/127.0.0.1/tcp/")
449            .map(|(_, address_str)| address_str);
450
451        if let Some(line_port) = p2p_port_line {
452            // trim non-numeric chars from the end of the port part of the line.
453            let port_str = line_port.trim_end_matches(|b: char| !b.is_ascii_digit());
454
455            // expect to have a number here (the chars after '127.0.0.1:') and parse them into a u16.
456            let port_num = port_str
457                .parse()
458                .unwrap_or_else(|_| panic!("valid port expected for log line, got '{port_str}'"));
459            p2p_port = Some(port_num);
460        }
461
462        if port.is_some() && p2p_address.is_some() && p2p_port.is_some() {
463            break;
464        }
465    }
466
467    SubstrateNodeInfo {
468        ws_port: port,
469        p2p_address,
470        p2p_port,
471        log,
472    }
473}
474
475/// Data extracted from the running node's stdout.
476#[derive(Debug)]
477pub struct SubstrateNodeInfo {
478    ws_port: Option<u16>,
479    p2p_address: Option<String>,
480    p2p_port: Option<u32>,
481    log: String,
482}
483
484impl SubstrateNodeInfo {
485    pub fn ws_port(&self) -> Result<u16, Error> {
486        self.ws_port
487            .ok_or_else(|| Error::CouldNotExtractPort(self.log.clone()))
488    }
489
490    pub fn p2p_address(&self) -> Result<String, Error> {
491        self.p2p_address
492            .clone()
493            .ok_or_else(|| Error::CouldNotExtractP2pAddress(self.log.clone()))
494    }
495
496    pub fn p2p_port(&self) -> Result<u32, Error> {
497        self.p2p_port
498            .ok_or_else(|| Error::CouldNotExtractP2pPort(self.log.clone()))
499    }
500}
501
502#[derive(Debug, Clone, Default)]
503pub struct NodeConfig {
504    pub use_local_tangle: bool,
505    pub log_level: Option<String>,
506    pub log_targets: Vec<(String, String)>,
507}
508
509impl NodeConfig {
510    #[must_use]
511    pub fn new(use_local_tangle: bool) -> Self {
512        Self {
513            use_local_tangle,
514            log_level: None,
515            log_targets: Vec::new(),
516        }
517    }
518
519    #[must_use]
520    pub fn with_log_level(mut self, level: impl Into<String>) -> Self {
521        self.log_level = Some(level.into());
522        self
523    }
524
525    #[must_use]
526    pub fn with_log_target(mut self, target: impl Into<String>, level: impl Into<String>) -> Self {
527        self.log_targets.push((target.into(), level.into()));
528        self
529    }
530
531    #[must_use]
532    pub fn to_log_string(&self) -> String {
533        let mut parts = Vec::new();
534
535        // Add global level if set
536        if let Some(level) = &self.log_level {
537            parts.push(level.clone());
538        }
539
540        // Add target-specific levels
541        for (target, level) in &self.log_targets {
542            parts.push(format!("{target}={level}"));
543        }
544
545        parts.join(",")
546    }
547}