1use crate::add_services::config::PortRange;
10use crate::helpers::{
11 check_port_availability, get_bin_version, get_start_port_if_applicable, increment_port_option,
12};
13
14use ant_bootstrap::InitialPeersConfig;
15use ant_evm::{EvmNetwork, RewardsAddress};
16use ant_logging::LogFormat;
17use ant_service_management::{
18 control::ServiceControl,
19 rpc::{RpcActions, RpcClient},
20 NodeRegistry, NodeServiceData, ServiceStatus,
21};
22use color_eyre::eyre::OptionExt;
23use color_eyre::{eyre::eyre, Result};
24use colored::Colorize;
25use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
26#[cfg(test)]
27use mockall::automock;
28use std::{
29 net::{IpAddr, Ipv4Addr, SocketAddr},
30 path::PathBuf,
31 process::{Command, Stdio},
32 str::FromStr,
33};
34use sysinfo::{Pid, System};
35
36#[cfg_attr(test, automock)]
37pub trait Launcher {
38 fn get_antnode_path(&self) -> PathBuf;
39 #[allow(clippy::too_many_arguments)]
40 fn launch_node(
41 &self,
42 first: bool,
43 log_format: Option<LogFormat>,
44 metrics_port: Option<u16>,
45 node_port: Option<u16>,
46 rpc_socket_addr: SocketAddr,
47 rewards_address: RewardsAddress,
48 evm_network: Option<EvmNetwork>,
49 ) -> Result<()>;
50 fn wait(&self, delay: u64);
51}
52
53#[derive(Default)]
54pub struct LocalSafeLauncher {
55 pub antnode_bin_path: PathBuf,
56}
57
58impl Launcher for LocalSafeLauncher {
59 fn get_antnode_path(&self) -> PathBuf {
60 self.antnode_bin_path.clone()
61 }
62
63 fn launch_node(
64 &self,
65 first: bool,
66 log_format: Option<LogFormat>,
67 metrics_port: Option<u16>,
68 node_port: Option<u16>,
69 rpc_socket_addr: SocketAddr,
70 rewards_address: RewardsAddress,
71 evm_network: Option<EvmNetwork>,
72 ) -> Result<()> {
73 let mut args = Vec::new();
74
75 if first {
76 args.push("--first".to_string())
77 }
78
79 if let Some(log_format) = log_format {
80 args.push("--log-format".to_string());
81 args.push(log_format.as_str().to_string());
82 }
83
84 if let Some(metrics_port) = metrics_port {
85 args.push("--metrics-server-port".to_string());
86 args.push(metrics_port.to_string());
87 }
88
89 if let Some(node_port) = node_port {
90 args.push("--port".to_string());
91 args.push(node_port.to_string());
92 }
93
94 args.push("--local".to_string());
95 args.push("--rpc".to_string());
96 args.push(rpc_socket_addr.to_string());
97
98 args.push("--rewards-address".to_string());
99 args.push(rewards_address.to_string());
100
101 if let Some(network) = evm_network {
102 args.push(format!("evm-{}", network.identifier()));
103
104 if let EvmNetwork::Custom(custom) = network {
105 args.push("--rpc-url".to_string());
106 args.push(custom.rpc_url_http.to_string());
107 args.push("--payment-token-address".to_string());
108 args.push(custom.payment_token_address.to_string());
109 args.push("--data-payments-address".to_string());
110 args.push(custom.data_payments_address.to_string());
111 }
112 }
113
114 Command::new(self.antnode_bin_path.clone())
115 .args(args)
116 .stdout(Stdio::inherit())
117 .stderr(Stdio::inherit())
118 .spawn()
119 .inspect_err(|err| error!("Error while spawning node process: {err:?}"))?;
120
121 Ok(())
122 }
123
124 fn wait(&self, delay: u64) {
128 std::thread::sleep(std::time::Duration::from_millis(delay));
129 }
130}
131
132pub fn kill_network(node_registry: &NodeRegistry, keep_directories: bool) -> Result<()> {
133 let mut system = System::new_all();
134 system.refresh_all();
135
136 if let Some(faucet) = &node_registry.faucet {
139 if let Some(process) = system.process(Pid::from(faucet.pid.unwrap() as usize)) {
144 process.kill();
145 debug!("Faucet has been killed");
146 println!("{} Killed faucet", "✓".green());
147 }
148 }
149
150 let faucet_data_path = dirs_next::data_dir()
151 .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
152 .join("autonomi")
153 .join("test_faucet");
154 if faucet_data_path.is_dir() {
155 std::fs::remove_dir_all(faucet_data_path)?;
156 debug!("Removed faucet data directory");
157 }
158 let genesis_data_path = dirs_next::data_dir()
159 .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
160 .join("autonomi")
161 .join("test_genesis");
162 if genesis_data_path.is_dir() {
163 debug!("Removed genesis data directory");
164 std::fs::remove_dir_all(genesis_data_path)?;
165 }
166
167 for node in node_registry.nodes.iter() {
168 println!("{}:", node.service_name);
169 if let Some(pid) = node.pid {
172 if let Some(process) = system.process(Pid::from(pid as usize)) {
176 process.kill();
177 debug!("Killed node: {} ({})", node.service_name, pid);
178 println!(" {} Killed process", "✓".green());
179 }
180 }
181
182 if !keep_directories {
183 if let Err(e) = std::fs::remove_dir_all(&node.data_dir_path) {
186 error!("Failed to remove node data directory: {:?}", e);
187 println!(
188 " {} Failed to remove {}: {e}",
189 "✗".red(),
190 node.data_dir_path.to_string_lossy()
191 );
192 } else {
193 debug!("Removed node data directory: {:?}", node.data_dir_path);
194 println!(
195 " {} Removed {}",
196 "✓".green(),
197 node.data_dir_path.to_string_lossy()
198 );
199 }
200 }
201 }
202
203 Ok(())
204}
205
206pub struct LocalNetworkOptions {
207 pub antnode_bin_path: PathBuf,
208 pub enable_metrics_server: bool,
209 pub join: bool,
210 pub interval: u64,
211 pub metrics_port: Option<PortRange>,
212 pub node_port: Option<PortRange>,
213 pub node_count: u16,
214 pub peers: Option<Vec<Multiaddr>>,
215 pub rpc_port: Option<PortRange>,
216 pub skip_validation: bool,
217 pub log_format: Option<LogFormat>,
218 pub rewards_address: RewardsAddress,
219 pub evm_network: Option<EvmNetwork>,
220}
221
222pub async fn run_network(
223 options: LocalNetworkOptions,
224 node_registry: &mut NodeRegistry,
225 service_control: &dyn ServiceControl,
226) -> Result<()> {
227 info!("Running local network");
228
229 if let Some(port_range) = &options.node_port {
231 port_range.validate(options.node_count)?;
232 check_port_availability(port_range, &node_registry.nodes)?;
233 }
234
235 if let Some(port_range) = &options.metrics_port {
236 port_range.validate(options.node_count)?;
237 check_port_availability(port_range, &node_registry.nodes)?;
238 }
239
240 if let Some(port_range) = &options.rpc_port {
241 port_range.validate(options.node_count)?;
242 check_port_availability(port_range, &node_registry.nodes)?;
243 }
244
245 let launcher = LocalSafeLauncher {
246 antnode_bin_path: options.antnode_bin_path.to_path_buf(),
247 };
248
249 let mut node_port = get_start_port_if_applicable(options.node_port);
250 let mut metrics_port = get_start_port_if_applicable(options.metrics_port);
251 let mut rpc_port = get_start_port_if_applicable(options.rpc_port);
252
253 let (bootstrap_peers, start) = if options.join {
255 if let Some(peers) = options.peers {
256 (peers, 1)
257 } else {
258 let peer = node_registry
259 .nodes
260 .iter()
261 .find_map(|n| n.listen_addr.clone())
262 .ok_or_eyre("Unable to obtain a peer to connect to")?;
263 (peer, 1)
264 }
265 } else {
266 let rpc_free_port = if let Some(port) = rpc_port {
267 port
268 } else {
269 service_control.get_available_port()?
270 };
271 let metrics_free_port = if let Some(port) = metrics_port {
272 Some(port)
273 } else if options.enable_metrics_server {
274 Some(service_control.get_available_port()?)
275 } else {
276 None
277 };
278 let rpc_socket_addr =
279 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
280 let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
281
282 let number = (node_registry.nodes.len() as u16) + 1;
283 let node = run_node(
284 RunNodeOptions {
285 first: true,
286 metrics_port: metrics_free_port,
287 node_port,
288 interval: options.interval,
289 log_format: options.log_format,
290 number,
291 rpc_socket_addr,
292 rewards_address: options.rewards_address,
293 evm_network: options.evm_network.clone(),
294 version: get_bin_version(&launcher.get_antnode_path())?,
295 },
296 &launcher,
297 &rpc_client,
298 )
299 .await?;
300 node_registry.nodes.push(node.clone());
301 let bootstrap_peers = node
302 .listen_addr
303 .ok_or_eyre("The listen address was not set")?;
304 node_port = increment_port_option(node_port);
305 metrics_port = increment_port_option(metrics_port);
306 rpc_port = increment_port_option(rpc_port);
307 (bootstrap_peers, 2)
308 };
309 node_registry.save()?;
310
311 for _ in start..=options.node_count {
312 let rpc_free_port = if let Some(port) = rpc_port {
313 port
314 } else {
315 service_control.get_available_port()?
316 };
317 let metrics_free_port = if let Some(port) = metrics_port {
318 Some(port)
319 } else if options.enable_metrics_server {
320 Some(service_control.get_available_port()?)
321 } else {
322 None
323 };
324 let rpc_socket_addr =
325 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
326 let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
327
328 let number = (node_registry.nodes.len() as u16) + 1;
329 let node = run_node(
330 RunNodeOptions {
331 first: false,
332 metrics_port: metrics_free_port,
333 node_port,
334 interval: options.interval,
335 log_format: options.log_format,
336 number,
337 rpc_socket_addr,
338 rewards_address: options.rewards_address,
339 evm_network: options.evm_network.clone(),
340 version: get_bin_version(&launcher.get_antnode_path())?,
341 },
342 &launcher,
343 &rpc_client,
344 )
345 .await?;
346 node_registry.nodes.push(node);
347
348 node_registry.save()?;
353
354 node_port = increment_port_option(node_port);
355 metrics_port = increment_port_option(metrics_port);
356 rpc_port = increment_port_option(rpc_port);
357 }
358
359 if !options.skip_validation {
360 debug!("Waiting for 10 seconds before validating the network...");
361 println!("Waiting for 10 seconds before validating the network...");
362 std::thread::sleep(std::time::Duration::from_secs(10));
363 validate_network(node_registry, bootstrap_peers.clone()).await?;
364 }
365
366 Ok(())
367}
368
369pub struct RunNodeOptions {
370 pub first: bool,
371 pub interval: u64,
372 pub log_format: Option<LogFormat>,
373 pub metrics_port: Option<u16>,
374 pub node_port: Option<u16>,
375 pub number: u16,
376 pub rpc_socket_addr: SocketAddr,
377 pub rewards_address: RewardsAddress,
378 pub evm_network: Option<EvmNetwork>,
379 pub version: String,
380}
381
382pub async fn run_node(
383 run_options: RunNodeOptions,
384 launcher: &dyn Launcher,
385 rpc_client: &dyn RpcActions,
386) -> Result<NodeServiceData> {
387 info!("Launching node {}...", run_options.number);
388 println!("Launching node {}...", run_options.number);
389 launcher.launch_node(
390 run_options.first,
391 run_options.log_format,
392 run_options.metrics_port,
393 run_options.node_port,
394 run_options.rpc_socket_addr,
395 run_options.rewards_address,
396 run_options.evm_network.clone(),
397 )?;
398 launcher.wait(run_options.interval);
399
400 let node_info = rpc_client.node_info().await?;
401 let peer_id = node_info.peer_id;
402 let network_info = rpc_client.network_info().await?;
403 let connected_peers = Some(network_info.connected_peers);
404 let listen_addrs = network_info
405 .listeners
406 .into_iter()
407 .map(|addr| addr.with(Protocol::P2p(node_info.peer_id)))
408 .collect();
409
410 Ok(NodeServiceData {
411 antnode_path: launcher.get_antnode_path(),
412 auto_restart: false,
413 connected_peers,
414 data_dir_path: node_info.data_path,
415 evm_network: run_options.evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
416 home_network: false,
417 listen_addr: Some(listen_addrs),
418 log_dir_path: node_info.log_path,
419 log_format: run_options.log_format,
420 max_archived_log_files: None,
421 max_log_files: None,
422 metrics_port: run_options.metrics_port,
423 network_id: None,
424 node_ip: None,
425 node_port: run_options.node_port,
426 number: run_options.number,
427 peer_id: Some(peer_id),
428 peers_args: InitialPeersConfig {
429 first: run_options.first,
430 addrs: vec![],
431 network_contacts_url: vec![],
432 local: true,
433 disable_mainnet_contacts: true,
434 ignore_cache: true,
435 bootstrap_cache_dir: None,
436 },
437 pid: Some(node_info.pid),
438 rewards_address: run_options.rewards_address,
439 reward_balance: None,
440 rpc_socket_addr: run_options.rpc_socket_addr,
441 status: ServiceStatus::Running,
442 service_name: format!("antnode-local{}", run_options.number),
443 upnp: false,
444 user: None,
445 user_mode: false,
446 version: run_options.version.to_string(),
447 })
448}
449
450async fn validate_network(node_registry: &mut NodeRegistry, peers: Vec<Multiaddr>) -> Result<()> {
455 let mut all_peers = node_registry
456 .nodes
457 .iter()
458 .map(|n| n.peer_id.ok_or_eyre("The PeerId was not set"))
459 .collect::<Result<Vec<PeerId>>>()?;
460 let additional_peers = peers
464 .into_iter()
465 .filter_map(|addr| {
466 addr.to_string()
467 .rsplit('/')
468 .next()
469 .and_then(|id_str| PeerId::from_str(id_str).ok())
470 })
471 .collect::<Vec<PeerId>>();
472 all_peers.extend(additional_peers);
473
474 for node in node_registry.nodes.iter() {
475 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
476 let net_info = rpc_client.network_info().await?;
477 let peers = net_info.connected_peers;
478 let peer_id = node.peer_id.ok_or_eyre("The PeerId was not set")?;
479 debug!("Node {peer_id} has {} peers", peers.len());
480 println!("Node {peer_id} has {} peers", peers.len());
481
482 let invalid_peers: Vec<PeerId> = peers
485 .iter()
486 .filter(|peer| !all_peers.contains(peer))
487 .cloned()
488 .collect();
489 if !invalid_peers.is_empty() {
490 for invalid_peer in invalid_peers.iter() {
491 println!("Invalid peer found: {}", invalid_peer);
492 }
493 error!("Network validation failed: {invalid_peers:?}");
494 return Err(eyre!("Network validation failed",));
495 }
496 }
497 Ok(())
498}
499
500#[cfg(test)]
501mod tests {
502 use super::*;
503 use ant_evm::utils::dummy_address;
504 use ant_service_management::{
505 error::Result as RpcResult,
506 rpc::{NetworkInfo, NodeInfo, RecordAddress, RpcActions},
507 };
508 use async_trait::async_trait;
509 use libp2p_identity::PeerId;
510 use mockall::mock;
511 use mockall::predicate::*;
512 use std::str::FromStr;
513
514 mock! {
515 pub RpcClient {}
516 #[async_trait]
517 impl RpcActions for RpcClient {
518 async fn node_info(&self) -> RpcResult<NodeInfo>;
519 async fn network_info(&self) -> RpcResult<NetworkInfo>;
520 async fn record_addresses(&self) -> RpcResult<Vec<RecordAddress>>;
521 async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> RpcResult<()>;
522 async fn node_stop(&self, delay_millis: u64) -> RpcResult<()>;
523 async fn node_update(&self, delay_millis: u64) -> RpcResult<()>;
524 async fn is_node_connected_to_network(&self, timeout: std::time::Duration) -> RpcResult<()>;
525 async fn update_log_level(&self, log_levels: String) -> RpcResult<()>;
526 }
527 }
528
529 #[tokio::test]
530 async fn run_node_should_launch_the_genesis_node() -> Result<()> {
531 let mut mock_launcher = MockLauncher::new();
532 let mut mock_rpc_client = MockRpcClient::new();
533 let rewards_address = dummy_address();
534
535 let peer_id = PeerId::from_str("12D3KooWS2tpXGGTmg2AHFiDh57yPQnat49YHnyqoggzXZWpqkCR")?;
536 let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 13000);
537 mock_launcher
538 .expect_launch_node()
539 .with(
540 eq(true),
541 eq(None),
542 eq(None),
543 eq(None),
544 eq(rpc_socket_addr),
545 eq(rewards_address),
546 eq(None),
547 )
548 .times(1)
549 .returning(|_, _, _, _, _, _, _| Ok(()));
550 mock_launcher
551 .expect_wait()
552 .with(eq(100))
553 .times(1)
554 .returning(|_| ());
555 mock_launcher
556 .expect_get_antnode_path()
557 .times(1)
558 .returning(|| PathBuf::from("/usr/local/bin/antnode"));
559
560 mock_rpc_client
561 .expect_node_info()
562 .times(1)
563 .returning(move || {
564 Ok(NodeInfo {
565 pid: 1000,
566 peer_id,
567 data_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}")),
568 log_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs")),
569 version: "0.100.12".to_string(),
570 uptime: std::time::Duration::from_secs(1), wallet_balance: 0,
572 })
573 });
574 mock_rpc_client
575 .expect_network_info()
576 .times(1)
577 .returning(move || {
578 Ok(NetworkInfo {
579 connected_peers: Vec::new(),
580 listeners: Vec::new(),
581 })
582 });
583
584 let node = run_node(
585 RunNodeOptions {
586 first: true,
587 interval: 100,
588 log_format: None,
589 metrics_port: None,
590 node_port: None,
591 number: 1,
592 rpc_socket_addr,
593 rewards_address,
594 evm_network: None,
595 version: "0.100.12".to_string(),
596 },
597 &mock_launcher,
598 &mock_rpc_client,
599 )
600 .await?;
601
602 assert!(node.peers_args.first);
603 assert_eq!(node.version, "0.100.12");
604 assert_eq!(node.service_name, "antnode-local1");
605 assert_eq!(
606 node.data_dir_path,
607 PathBuf::from(format!("~/.local/share/autonomi/{peer_id}"))
608 );
609 assert_eq!(
610 node.log_dir_path,
611 PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs"))
612 );
613 assert_eq!(node.number, 1);
614 assert_eq!(node.pid, Some(1000));
615 assert_eq!(node.rpc_socket_addr, rpc_socket_addr);
616 assert_eq!(node.status, ServiceStatus::Running);
617 assert_eq!(node.antnode_path, PathBuf::from("/usr/local/bin/antnode"));
618
619 Ok(())
620 }
621}