1use crate::add_services::config::PortRange;
10use crate::helpers::{
11 check_port_availability, get_bin_version, get_start_port_if_applicable, increment_port_option,
12};
13
14#[cfg(feature = "faucet")]
15use crate::helpers::get_username;
16#[cfg(feature = "faucet")]
17use sn_service_management::FaucetServiceData;
18#[cfg(feature = "faucet")]
19use sn_transfers::get_faucet_data_dir;
20
21use color_eyre::eyre::OptionExt;
22use color_eyre::{eyre::eyre, Result};
23use colored::Colorize;
24use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
25#[cfg(test)]
26use mockall::automock;
27use sn_evm::{EvmNetwork, RewardsAddress};
28use sn_logging::LogFormat;
29use sn_service_management::{
30 control::ServiceControl,
31 rpc::{RpcActions, RpcClient},
32 NodeRegistry, NodeServiceData, ServiceStatus,
33};
34use std::{
35 net::{IpAddr, Ipv4Addr, SocketAddr},
36 path::PathBuf,
37 process::{Command, Stdio},
38 str::FromStr,
39};
40use sysinfo::{Pid, System};
41
42#[cfg_attr(test, automock)]
43pub trait Launcher {
44 fn get_safenode_path(&self) -> PathBuf;
45 #[cfg(feature = "faucet")]
46 fn launch_faucet(&self, genesis_multiaddr: &Multiaddr) -> Result<u32>;
47 #[allow(clippy::too_many_arguments)]
48 fn launch_node(
49 &self,
50 bootstrap_peers: Vec<Multiaddr>,
51 log_format: Option<LogFormat>,
52 metrics_port: Option<u16>,
53 node_port: Option<u16>,
54 owner: Option<String>,
55 rpc_socket_addr: SocketAddr,
56 rewards_address: RewardsAddress,
57 evm_network: Option<EvmNetwork>,
58 ) -> Result<()>;
59 fn wait(&self, delay: u64);
60}
61
62#[derive(Default)]
63pub struct LocalSafeLauncher {
64 #[cfg(feature = "faucet")]
65 pub faucet_bin_path: PathBuf,
66 pub safenode_bin_path: PathBuf,
67}
68
69impl Launcher for LocalSafeLauncher {
70 fn get_safenode_path(&self) -> PathBuf {
71 self.safenode_bin_path.clone()
72 }
73
74 #[cfg(feature = "faucet")]
75 fn launch_faucet(&self, genesis_multiaddr: &Multiaddr) -> Result<u32> {
76 info!("Launching the faucet server...");
77 debug!("Using genesis_multiaddr: {}", genesis_multiaddr.to_string());
78 let args = vec![
79 "--peer".to_string(),
80 genesis_multiaddr.to_string(),
81 "server".to_string(),
82 ];
83
84 #[cfg(feature = "faucet")]
85 debug!(
86 "Using faucet binary: {}",
87 self.faucet_bin_path.to_string_lossy()
88 );
89
90 debug!("Using args: {}", args.join(" "));
91
92 let child = Command::new(self.faucet_bin_path.clone())
93 .args(args)
94 .stdout(Stdio::inherit())
95 .stderr(Stdio::inherit())
96 .spawn()?;
97 Ok(child.id())
98 }
99
100 fn launch_node(
101 &self,
102 bootstrap_peers: Vec<Multiaddr>,
103 log_format: Option<LogFormat>,
104 metrics_port: Option<u16>,
105 node_port: Option<u16>,
106 owner: Option<String>,
107 rpc_socket_addr: SocketAddr,
108 rewards_address: RewardsAddress,
109 evm_network: Option<EvmNetwork>,
110 ) -> Result<()> {
111 let mut args = Vec::new();
112
113 if let Some(owner) = owner {
114 args.push("--owner".to_string());
115 args.push(owner);
116 }
117
118 if bootstrap_peers.is_empty() {
119 args.push("--first".to_string())
120 } else {
121 for peer in bootstrap_peers {
122 args.push("--peer".to_string());
123 args.push(peer.to_string());
124 }
125 }
126
127 if let Some(log_format) = log_format {
128 args.push("--log-format".to_string());
129 args.push(log_format.as_str().to_string());
130 }
131
132 if let Some(metrics_port) = metrics_port {
133 args.push("--metrics-server-port".to_string());
134 args.push(metrics_port.to_string());
135 }
136
137 if let Some(node_port) = node_port {
138 args.push("--port".to_string());
139 args.push(node_port.to_string());
140 }
141
142 args.push("--local".to_string());
143 args.push("--rpc".to_string());
144 args.push(rpc_socket_addr.to_string());
145
146 args.push("--rewards-address".to_string());
147 args.push(rewards_address.to_string());
148
149 if let Some(network) = evm_network {
150 args.push(format!("evm-{}", network.identifier()));
151
152 if let EvmNetwork::Custom(custom) = network {
153 args.push("--rpc-url".to_string());
154 args.push(custom.rpc_url_http.to_string());
155 args.push("--payment-token-address".to_string());
156 args.push(custom.payment_token_address.to_string());
157 args.push("--data-payments-address".to_string());
158 args.push(custom.data_payments_address.to_string());
159 }
160 }
161
162 Command::new(self.safenode_bin_path.clone())
163 .args(args)
164 .stdout(Stdio::inherit())
165 .stderr(Stdio::inherit())
166 .spawn()
167 .inspect_err(|err| error!("Error while spawning node process: {err:?}"))?;
168
169 Ok(())
170 }
171
172 fn wait(&self, delay: u64) {
176 std::thread::sleep(std::time::Duration::from_millis(delay));
177 }
178}
179
180pub fn kill_network(node_registry: &NodeRegistry, keep_directories: bool) -> Result<()> {
181 let mut system = System::new_all();
182 system.refresh_all();
183
184 if let Some(faucet) = &node_registry.faucet {
187 if let Some(process) = system.process(Pid::from(faucet.pid.unwrap() as usize)) {
192 process.kill();
193 debug!("Faucet has been killed");
194 println!("{} Killed faucet", "✓".green());
195 }
196 }
197
198 let faucet_data_path = dirs_next::data_dir()
199 .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
200 .join("safe")
201 .join("test_faucet");
202 if faucet_data_path.is_dir() {
203 std::fs::remove_dir_all(faucet_data_path)?;
204 debug!("Removed faucet data directory");
205 }
206 let genesis_data_path = dirs_next::data_dir()
207 .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
208 .join("safe")
209 .join("test_genesis");
210 if genesis_data_path.is_dir() {
211 debug!("Removed genesis data directory");
212 std::fs::remove_dir_all(genesis_data_path)?;
213 }
214
215 for node in node_registry.nodes.iter() {
216 println!("{}:", node.service_name);
217 if let Some(pid) = node.pid {
220 if let Some(process) = system.process(Pid::from(pid as usize)) {
224 process.kill();
225 debug!("Killed node: {} ({})", node.service_name, pid);
226 println!(" {} Killed process", "✓".green());
227 }
228 }
229
230 if !keep_directories {
231 if let Err(e) = std::fs::remove_dir_all(&node.data_dir_path) {
234 error!("Failed to remove node data directory: {:?}", e);
235 println!(
236 " {} Failed to remove {}: {e}",
237 "✗".red(),
238 node.data_dir_path.to_string_lossy()
239 );
240 } else {
241 debug!("Removed node data directory: {:?}", node.data_dir_path);
242 println!(
243 " {} Removed {}",
244 "✓".green(),
245 node.data_dir_path.to_string_lossy()
246 );
247 }
248 }
249 }
250
251 Ok(())
252}
253
254pub struct LocalNetworkOptions {
255 pub enable_metrics_server: bool,
256 #[cfg(feature = "faucet")]
257 pub faucet_bin_path: PathBuf,
258 pub join: bool,
259 pub interval: u64,
260 pub metrics_port: Option<PortRange>,
261 pub node_port: Option<PortRange>,
262 pub node_count: u16,
263 pub owner: Option<String>,
264 pub owner_prefix: Option<String>,
265 pub peers: Option<Vec<Multiaddr>>,
266 pub rpc_port: Option<PortRange>,
267 pub safenode_bin_path: PathBuf,
268 pub skip_validation: bool,
269 pub log_format: Option<LogFormat>,
270 pub rewards_address: RewardsAddress,
271 pub evm_network: Option<EvmNetwork>,
272}
273
274pub async fn run_network(
275 options: LocalNetworkOptions,
276 node_registry: &mut NodeRegistry,
277 service_control: &dyn ServiceControl,
278) -> Result<()> {
279 info!("Running local network");
280
281 if let Some(port_range) = &options.node_port {
283 port_range.validate(options.node_count)?;
284 check_port_availability(port_range, &node_registry.nodes)?;
285 }
286
287 if let Some(port_range) = &options.metrics_port {
288 port_range.validate(options.node_count)?;
289 check_port_availability(port_range, &node_registry.nodes)?;
290 }
291
292 if let Some(port_range) = &options.rpc_port {
293 port_range.validate(options.node_count)?;
294 check_port_availability(port_range, &node_registry.nodes)?;
295 }
296
297 let launcher = LocalSafeLauncher {
298 safenode_bin_path: options.safenode_bin_path.to_path_buf(),
299 #[cfg(feature = "faucet")]
300 faucet_bin_path: options.faucet_bin_path.to_path_buf(),
301 };
302
303 let mut node_port = get_start_port_if_applicable(options.node_port);
304 let mut metrics_port = get_start_port_if_applicable(options.metrics_port);
305 let mut rpc_port = get_start_port_if_applicable(options.rpc_port);
306
307 let (bootstrap_peers, start) = if options.join {
309 if let Some(peers) = options.peers {
310 (peers, 1)
311 } else {
312 let peer = node_registry
313 .nodes
314 .iter()
315 .find_map(|n| n.listen_addr.clone())
316 .ok_or_eyre("Unable to obtain a peer to connect to")?;
317 (peer, 1)
318 }
319 } else {
320 let rpc_free_port = if let Some(port) = rpc_port {
321 port
322 } else {
323 service_control.get_available_port()?
324 };
325 let metrics_free_port = if let Some(port) = metrics_port {
326 Some(port)
327 } else if options.enable_metrics_server {
328 Some(service_control.get_available_port()?)
329 } else {
330 None
331 };
332 let rpc_socket_addr =
333 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
334 let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
335
336 let number = (node_registry.nodes.len() as u16) + 1;
337 let owner = get_node_owner(&options.owner_prefix, &options.owner, &number);
338 let node = run_node(
339 RunNodeOptions {
340 bootstrap_peers: vec![],
341 genesis: true,
342 metrics_port: metrics_free_port,
343 node_port,
344 interval: options.interval,
345 log_format: options.log_format,
346 number,
347 owner,
348 rpc_socket_addr,
349 rewards_address: options.rewards_address,
350 evm_network: options.evm_network.clone(),
351 version: get_bin_version(&launcher.get_safenode_path())?,
352 },
353 &launcher,
354 &rpc_client,
355 )
356 .await?;
357 node_registry.nodes.push(node.clone());
358 let bootstrap_peers = node
359 .listen_addr
360 .ok_or_eyre("The listen address was not set")?;
361 node_port = increment_port_option(node_port);
362 metrics_port = increment_port_option(metrics_port);
363 rpc_port = increment_port_option(rpc_port);
364 (bootstrap_peers, 2)
365 };
366 node_registry.save()?;
367
368 for _ in start..=options.node_count {
369 let rpc_free_port = if let Some(port) = rpc_port {
370 port
371 } else {
372 service_control.get_available_port()?
373 };
374 let metrics_free_port = if let Some(port) = metrics_port {
375 Some(port)
376 } else if options.enable_metrics_server {
377 Some(service_control.get_available_port()?)
378 } else {
379 None
380 };
381 let rpc_socket_addr =
382 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
383 let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
384
385 let number = (node_registry.nodes.len() as u16) + 1;
386 let owner = get_node_owner(&options.owner_prefix, &options.owner, &number);
387 let node = run_node(
388 RunNodeOptions {
389 bootstrap_peers: bootstrap_peers.clone(),
390 genesis: false,
391 metrics_port: metrics_free_port,
392 node_port,
393 interval: options.interval,
394 log_format: options.log_format,
395 number,
396 owner,
397 rpc_socket_addr,
398 rewards_address: options.rewards_address,
399 evm_network: options.evm_network.clone(),
400 version: get_bin_version(&launcher.get_safenode_path())?,
401 },
402 &launcher,
403 &rpc_client,
404 )
405 .await?;
406 node_registry.nodes.push(node);
407
408 node_registry.save()?;
413
414 node_port = increment_port_option(node_port);
415 metrics_port = increment_port_option(metrics_port);
416 rpc_port = increment_port_option(rpc_port);
417 }
418
419 if !options.skip_validation {
420 debug!("Waiting for 10 seconds before validating the network...");
421 println!("Waiting for 10 seconds before validating the network...");
422 std::thread::sleep(std::time::Duration::from_secs(10));
423 validate_network(node_registry, bootstrap_peers.clone()).await?;
424 }
425
426 #[cfg(feature = "faucet")]
427 if !options.join {
428 println!("Launching the faucet server...");
429 let pid = launcher.launch_faucet(&bootstrap_peers[0])?;
430 let version = get_bin_version(&options.faucet_bin_path)?;
431 let faucet = FaucetServiceData {
432 faucet_path: options.faucet_bin_path,
433 local: true,
434 log_dir_path: get_faucet_data_dir(),
435 pid: Some(pid),
436 service_name: "faucet".to_string(),
437 status: ServiceStatus::Running,
438 user: get_username()?,
439 version,
440 };
441 node_registry.faucet = Some(faucet);
442 }
443
444 Ok(())
445}
446
447pub struct RunNodeOptions {
448 pub bootstrap_peers: Vec<Multiaddr>,
449 pub genesis: bool,
450 pub interval: u64,
451 pub log_format: Option<LogFormat>,
452 pub metrics_port: Option<u16>,
453 pub node_port: Option<u16>,
454 pub number: u16,
455 pub owner: Option<String>,
456 pub rpc_socket_addr: SocketAddr,
457 pub rewards_address: RewardsAddress,
458 pub evm_network: Option<EvmNetwork>,
459 pub version: String,
460}
461
462pub async fn run_node(
463 run_options: RunNodeOptions,
464 launcher: &dyn Launcher,
465 rpc_client: &dyn RpcActions,
466) -> Result<NodeServiceData> {
467 info!("Launching node {}...", run_options.number);
468 println!("Launching node {}...", run_options.number);
469 launcher.launch_node(
470 run_options.bootstrap_peers.clone(),
471 run_options.log_format,
472 run_options.metrics_port,
473 run_options.node_port,
474 run_options.owner.clone(),
475 run_options.rpc_socket_addr,
476 run_options.rewards_address,
477 run_options.evm_network.clone(),
478 )?;
479 launcher.wait(run_options.interval);
480
481 let node_info = rpc_client.node_info().await?;
482 let peer_id = node_info.peer_id;
483 let network_info = rpc_client.network_info().await?;
484 let connected_peers = Some(network_info.connected_peers);
485 let listen_addrs = network_info
486 .listeners
487 .into_iter()
488 .map(|addr| addr.with(Protocol::P2p(node_info.peer_id)))
489 .collect();
490
491 Ok(NodeServiceData {
492 auto_restart: false,
493 connected_peers,
494 data_dir_path: node_info.data_path,
495 evm_network: run_options.evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
496 genesis: run_options.genesis,
497 home_network: false,
498 listen_addr: Some(listen_addrs),
499 local: true,
500 log_dir_path: node_info.log_path,
501 log_format: run_options.log_format,
502 max_archived_log_files: None,
503 max_log_files: None,
504 metrics_port: run_options.metrics_port,
505 node_ip: None,
506 node_port: run_options.node_port,
507 number: run_options.number,
508 owner: run_options.owner,
509 peer_id: Some(peer_id),
510 pid: Some(node_info.pid),
511 rewards_address: run_options.rewards_address,
512 reward_balance: None,
513 rpc_socket_addr: run_options.rpc_socket_addr,
514 safenode_path: launcher.get_safenode_path(),
515 status: ServiceStatus::Running,
516 service_name: format!("safenode-local{}", run_options.number),
517 upnp: false,
518 user: None,
519 user_mode: false,
520 version: run_options.version.to_string(),
521 })
522}
523
524async fn validate_network(node_registry: &mut NodeRegistry, peers: Vec<Multiaddr>) -> Result<()> {
529 let mut all_peers = node_registry
530 .nodes
531 .iter()
532 .map(|n| n.peer_id.ok_or_eyre("The PeerId was not set"))
533 .collect::<Result<Vec<PeerId>>>()?;
534 let additional_peers = peers
538 .into_iter()
539 .filter_map(|addr| {
540 addr.to_string()
541 .rsplit('/')
542 .next()
543 .and_then(|id_str| PeerId::from_str(id_str).ok())
544 })
545 .collect::<Vec<PeerId>>();
546 all_peers.extend(additional_peers);
547
548 for node in node_registry.nodes.iter() {
549 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
550 let net_info = rpc_client.network_info().await?;
551 let peers = net_info.connected_peers;
552 let peer_id = node.peer_id.ok_or_eyre("The PeerId was not set")?;
553 debug!("Node {peer_id} has {} peers", peers.len());
554 println!("Node {peer_id} has {} peers", peers.len());
555
556 let invalid_peers: Vec<PeerId> = peers
559 .iter()
560 .filter(|peer| !all_peers.contains(peer))
561 .cloned()
562 .collect();
563 if !invalid_peers.is_empty() {
564 for invalid_peer in invalid_peers.iter() {
565 println!("Invalid peer found: {}", invalid_peer);
566 }
567 error!("Network validation failed: {invalid_peers:?}");
568 return Err(eyre!("Network validation failed",));
569 }
570 }
571 Ok(())
572}
573
574fn get_node_owner(
575 owner_prefix: &Option<String>,
576 owner: &Option<String>,
577 number: &u16,
578) -> Option<String> {
579 if let Some(prefix) = owner_prefix {
580 Some(format!("{}_{}", prefix, number))
581 } else {
582 owner.clone()
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589 use async_trait::async_trait;
590 use libp2p_identity::PeerId;
591 use mockall::mock;
592 use mockall::predicate::*;
593 use sn_evm::utils::dummy_address;
594 use sn_service_management::{
595 error::Result as RpcResult,
596 rpc::{NetworkInfo, NodeInfo, RecordAddress, RpcActions},
597 };
598 use std::str::FromStr;
599
600 mock! {
601 pub RpcClient {}
602 #[async_trait]
603 impl RpcActions for RpcClient {
604 async fn node_info(&self) -> RpcResult<NodeInfo>;
605 async fn network_info(&self) -> RpcResult<NetworkInfo>;
606 async fn record_addresses(&self) -> RpcResult<Vec<RecordAddress>>;
607 async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> RpcResult<()>;
608 async fn node_stop(&self, delay_millis: u64) -> RpcResult<()>;
609 async fn node_update(&self, delay_millis: u64) -> RpcResult<()>;
610 async fn is_node_connected_to_network(&self, timeout: std::time::Duration) -> RpcResult<()>;
611 async fn update_log_level(&self, log_levels: String) -> RpcResult<()>;
612 }
613 }
614
615 #[tokio::test]
616 async fn run_node_should_launch_the_genesis_node() -> Result<()> {
617 let mut mock_launcher = MockLauncher::new();
618 let mut mock_rpc_client = MockRpcClient::new();
619 let rewards_address = dummy_address();
620
621 let peer_id = PeerId::from_str("12D3KooWS2tpXGGTmg2AHFiDh57yPQnat49YHnyqoggzXZWpqkCR")?;
622 let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 13000);
623 mock_launcher
624 .expect_launch_node()
625 .with(
626 eq(vec![]),
627 eq(None),
628 eq(None),
629 eq(None),
630 eq(None),
631 eq(rpc_socket_addr),
632 eq(rewards_address),
633 eq(None),
634 )
635 .times(1)
636 .returning(|_, _, _, _, _, _, _, _| Ok(()));
637 mock_launcher
638 .expect_wait()
639 .with(eq(100))
640 .times(1)
641 .returning(|_| ());
642 mock_launcher
643 .expect_get_safenode_path()
644 .times(1)
645 .returning(|| PathBuf::from("/usr/local/bin/safenode"));
646
647 mock_rpc_client
648 .expect_node_info()
649 .times(1)
650 .returning(move || {
651 Ok(NodeInfo {
652 pid: 1000,
653 peer_id,
654 data_path: PathBuf::from(format!("~/.local/share/safe/{peer_id}")),
655 log_path: PathBuf::from(format!("~/.local/share/safe/{peer_id}/logs")),
656 version: "0.100.12".to_string(),
657 uptime: std::time::Duration::from_secs(1), wallet_balance: 0,
659 })
660 });
661 mock_rpc_client
662 .expect_network_info()
663 .times(1)
664 .returning(move || {
665 Ok(NetworkInfo {
666 connected_peers: Vec::new(),
667 listeners: Vec::new(),
668 })
669 });
670
671 let node = run_node(
672 RunNodeOptions {
673 bootstrap_peers: vec![],
674 genesis: true,
675 interval: 100,
676 log_format: None,
677 metrics_port: None,
678 node_port: None,
679 number: 1,
680 owner: None,
681 rpc_socket_addr,
682 rewards_address,
683 evm_network: None,
684 version: "0.100.12".to_string(),
685 },
686 &mock_launcher,
687 &mock_rpc_client,
688 )
689 .await?;
690
691 assert!(node.genesis);
692 assert_eq!(node.version, "0.100.12");
693 assert_eq!(node.service_name, "safenode-local1");
694 assert_eq!(
695 node.data_dir_path,
696 PathBuf::from(format!("~/.local/share/safe/{peer_id}"))
697 );
698 assert_eq!(
699 node.log_dir_path,
700 PathBuf::from(format!("~/.local/share/safe/{peer_id}/logs"))
701 );
702 assert_eq!(node.number, 1);
703 assert_eq!(node.pid, Some(1000));
704 assert_eq!(node.rpc_socket_addr, rpc_socket_addr);
705 assert_eq!(node.status, ServiceStatus::Running);
706 assert_eq!(node.safenode_path, PathBuf::from("/usr/local/bin/safenode"));
707
708 Ok(())
709 }
710}