1use crate::add_services::config::PortRange;
10use crate::helpers::{
11 check_port_availability, get_bin_version, get_start_port_if_applicable, increment_port_option,
12};
13
14use ant_bootstrap::InitialPeersConfig;
15use ant_evm::{EvmNetwork, RewardsAddress};
16use ant_logging::LogFormat;
17use ant_service_management::NodeRegistryManager;
18use ant_service_management::node::NODE_SERVICE_DATA_SCHEMA_LATEST;
19use ant_service_management::{
20 NodeServiceData, ServiceStatus,
21 control::ServiceControl,
22 rpc::{RpcActions, RpcClient},
23};
24use color_eyre::eyre::OptionExt;
25use color_eyre::{Result, eyre::eyre};
26use colored::Colorize;
27use libp2p::{Multiaddr, PeerId, multiaddr::Protocol};
28#[cfg(test)]
29use mockall::automock;
30use std::{
31 net::{IpAddr, Ipv4Addr, SocketAddr},
32 path::PathBuf,
33 process::{Command, Stdio},
34 str::FromStr,
35};
36use sysinfo::{Pid, System};
37
38#[cfg_attr(test, automock)]
39pub trait Launcher {
40 fn get_antnode_path(&self) -> PathBuf;
41 #[allow(clippy::too_many_arguments)]
42 fn launch_node(
43 &self,
44 first: bool,
45 log_format: Option<LogFormat>,
46 metrics_port: Option<u16>,
47 node_port: Option<u16>,
48 rpc_socket_addr: SocketAddr,
49 rewards_address: RewardsAddress,
50 evm_network: EvmNetwork,
51 ) -> Result<()>;
52 fn wait(&self, delay: u64);
53}
54
55#[derive(Default)]
56pub struct LocalSafeLauncher {
57 pub antnode_bin_path: PathBuf,
58}
59
60impl Launcher for LocalSafeLauncher {
61 fn get_antnode_path(&self) -> PathBuf {
62 self.antnode_bin_path.clone()
63 }
64
65 fn launch_node(
66 &self,
67 first: bool,
68 log_format: Option<LogFormat>,
69 metrics_port: Option<u16>,
70 node_port: Option<u16>,
71 rpc_socket_addr: SocketAddr,
72 rewards_address: RewardsAddress,
73 evm_network: EvmNetwork,
74 ) -> Result<()> {
75 let mut args = Vec::new();
76
77 if first {
78 args.push("--first".to_string())
79 }
80
81 if let Some(log_format) = log_format {
82 args.push("--log-format".to_string());
83 args.push(log_format.as_str().to_string());
84 }
85
86 if let Some(metrics_port) = metrics_port {
87 args.push("--metrics-server-port".to_string());
88 args.push(metrics_port.to_string());
89 }
90
91 if let Some(node_port) = node_port {
92 args.push("--port".to_string());
93 args.push(node_port.to_string());
94 }
95
96 args.push("--local".to_string());
97 args.push("--rpc".to_string());
98 args.push(rpc_socket_addr.to_string());
99
100 args.push("--rewards-address".to_string());
101 args.push(rewards_address.to_string());
102
103 args.push(format!("evm-{}", evm_network.identifier()));
104
105 if let EvmNetwork::Custom(custom) = evm_network {
106 args.push("--rpc-url".to_string());
107 args.push(custom.rpc_url_http.to_string());
108 args.push("--payment-token-address".to_string());
109 args.push(custom.payment_token_address.to_string());
110 args.push("--data-payments-address".to_string());
111 args.push(custom.data_payments_address.to_string());
112 }
113
114 Command::new(self.antnode_bin_path.clone())
115 .args(args)
116 .stdout(Stdio::inherit())
117 .stderr(Stdio::inherit())
118 .spawn()
119 .inspect_err(|err| error!("Error while spawning node process: {err:?}"))?;
120
121 Ok(())
122 }
123
124 fn wait(&self, delay: u64) {
128 std::thread::sleep(std::time::Duration::from_millis(delay));
129 }
130}
131
132fn kill_evm_testnet_processes(system: &mut System) {
134 for (pid, process) in system.processes() {
136 let process_name = process.name().to_lowercase();
137 if process_name.contains("anvil") || process_name.contains("evm-testnet") {
138 debug!("Killing EVM testnet process: {} ({})", process_name, pid);
139 process.kill();
140 println!(" {} Killed EVM testnet process ({})", "✓".green(), pid);
141 }
142 }
143}
144
145pub async fn kill_network(
146 node_registry: NodeRegistryManager,
147 keep_directories: bool,
148) -> Result<()> {
149 let mut system = System::new_all();
150 system.refresh_all();
151
152 let genesis_data_path = dirs_next::data_dir()
153 .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
154 .join("autonomi")
155 .join("test_genesis");
156 if genesis_data_path.is_dir() {
157 debug!("Removed genesis data directory");
158 std::fs::remove_dir_all(genesis_data_path)?;
159 }
160
161 kill_evm_testnet_processes(&mut system);
162
163 for node in node_registry.nodes.read().await.iter() {
164 let node = node.read().await;
165 println!("{}:", node.service_name);
166 if let Some(pid) = node.pid {
169 if let Some(process) = system.process(Pid::from(pid as usize)) {
173 process.kill();
174 debug!("Killed node: {} ({})", node.service_name, pid);
175 println!(" {} Killed process", "✓".green());
176 }
177 }
178
179 if !keep_directories {
180 if let Err(e) = std::fs::remove_dir_all(&node.data_dir_path) {
183 error!("Failed to remove node data directory: {:?}", e);
184 println!(
185 " {} Failed to remove {}: {e}",
186 "✗".red(),
187 node.data_dir_path.to_string_lossy()
188 );
189 } else {
190 debug!("Removed node data directory: {:?}", node.data_dir_path);
191 println!(
192 " {} Removed {}",
193 "✓".green(),
194 node.data_dir_path.to_string_lossy()
195 );
196 }
197 }
198 }
199
200 Ok(())
201}
202
203pub struct LocalNetworkOptions {
204 pub antnode_bin_path: PathBuf,
205 pub enable_metrics_server: bool,
206 pub join: bool,
207 pub interval: u64,
208 pub metrics_port: Option<PortRange>,
209 pub node_port: Option<PortRange>,
210 pub node_count: u16,
211 pub peers: Option<Vec<Multiaddr>>,
212 pub rpc_port: Option<PortRange>,
213 pub skip_validation: bool,
214 pub log_format: Option<LogFormat>,
215 pub rewards_address: RewardsAddress,
216 pub evm_network: EvmNetwork,
217}
218
219pub async fn run_network(
220 options: LocalNetworkOptions,
221 node_registry: NodeRegistryManager,
222 service_control: &dyn ServiceControl,
223) -> Result<()> {
224 info!("Running local network");
225
226 if let Some(port_range) = &options.node_port {
228 port_range.validate(options.node_count)?;
229 check_port_availability(port_range, &node_registry.nodes).await?;
230 }
231
232 if let Some(port_range) = &options.metrics_port {
233 port_range.validate(options.node_count)?;
234 check_port_availability(port_range, &node_registry.nodes).await?;
235 }
236
237 if let Some(port_range) = &options.rpc_port {
238 port_range.validate(options.node_count)?;
239 check_port_availability(port_range, &node_registry.nodes).await?;
240 }
241
242 let launcher = LocalSafeLauncher {
243 antnode_bin_path: options.antnode_bin_path.to_path_buf(),
244 };
245
246 let mut node_port = get_start_port_if_applicable(options.node_port);
247 let mut metrics_port = get_start_port_if_applicable(options.metrics_port);
248 let mut rpc_port = get_start_port_if_applicable(options.rpc_port);
249
250 let (bootstrap_peers, start) = if options.join {
252 if let Some(peers) = options.peers {
253 (peers, 1)
254 } else {
255 let mut peers = Vec::new();
256 for node in node_registry.nodes.read().await.iter() {
257 let node = node.read().await;
258 if let Some(listen_addr) = &node.listen_addr {
259 peers.extend(listen_addr.clone());
260 }
261 }
262 (peers, 1)
263 }
264 } else {
265 let rpc_free_port = if let Some(port) = rpc_port {
266 port
267 } else {
268 service_control.get_available_port()?
269 };
270 let metrics_free_port = if let Some(port) = metrics_port {
271 Some(port)
272 } else if options.enable_metrics_server {
273 Some(service_control.get_available_port()?)
274 } else {
275 None
276 };
277 let rpc_socket_addr =
278 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
279 let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
280
281 let number = (node_registry.nodes.read().await.len() as u16) + 1;
282 let node = run_node(
283 RunNodeOptions {
284 first: true,
285 metrics_port: metrics_free_port,
286 node_port,
287 interval: options.interval,
288 log_format: options.log_format,
289 number,
290 rpc_socket_addr,
291 rewards_address: options.rewards_address,
292 evm_network: options.evm_network.clone(),
293 version: get_bin_version(&launcher.get_antnode_path())?,
294 },
295 &launcher,
296 &rpc_client,
297 )
298 .await?;
299 node_registry.push_node(node.clone()).await;
300 let bootstrap_peers = node
301 .listen_addr
302 .ok_or_eyre("The listen address was not set")?;
303 node_port = increment_port_option(node_port);
304 metrics_port = increment_port_option(metrics_port);
305 rpc_port = increment_port_option(rpc_port);
306 (bootstrap_peers, 2)
307 };
308 node_registry.save().await?;
309
310 for _ in start..=options.node_count {
311 let rpc_free_port = if let Some(port) = rpc_port {
312 port
313 } else {
314 service_control.get_available_port()?
315 };
316 let metrics_free_port = if let Some(port) = metrics_port {
317 Some(port)
318 } else if options.enable_metrics_server {
319 Some(service_control.get_available_port()?)
320 } else {
321 None
322 };
323 let rpc_socket_addr =
324 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
325 let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
326
327 let number = (node_registry.nodes.read().await.len() as u16) + 1;
328 let node = run_node(
329 RunNodeOptions {
330 first: false,
331 metrics_port: metrics_free_port,
332 node_port,
333 interval: options.interval,
334 log_format: options.log_format,
335 number,
336 rpc_socket_addr,
337 rewards_address: options.rewards_address,
338 evm_network: options.evm_network.clone(),
339 version: get_bin_version(&launcher.get_antnode_path())?,
340 },
341 &launcher,
342 &rpc_client,
343 )
344 .await?;
345 node_registry.push_node(node).await;
346
347 node_registry.save().await?;
352
353 node_port = increment_port_option(node_port);
354 metrics_port = increment_port_option(metrics_port);
355 rpc_port = increment_port_option(rpc_port);
356 }
357
358 if !options.skip_validation {
359 debug!("Waiting for 10 seconds before validating the network...");
360 println!("Waiting for 10 seconds before validating the network...");
361 std::thread::sleep(std::time::Duration::from_secs(10));
362 validate_network(node_registry, bootstrap_peers.clone()).await?;
363 }
364
365 Ok(())
366}
367
368pub struct RunNodeOptions {
369 pub first: bool,
370 pub interval: u64,
371 pub log_format: Option<LogFormat>,
372 pub metrics_port: Option<u16>,
373 pub node_port: Option<u16>,
374 pub number: u16,
375 pub rpc_socket_addr: SocketAddr,
376 pub rewards_address: RewardsAddress,
377 pub evm_network: EvmNetwork,
378 pub version: String,
379}
380
381pub async fn run_node(
382 run_options: RunNodeOptions,
383 launcher: &dyn Launcher,
384 rpc_client: &dyn RpcActions,
385) -> Result<NodeServiceData> {
386 if run_options.number == 2 {
390 launcher.wait(5000);
392 }
393
394 info!("Launching node {}...", run_options.number);
395 println!("Launching node {}...", run_options.number);
396 launcher.launch_node(
397 run_options.first,
398 run_options.log_format,
399 run_options.metrics_port,
400 run_options.node_port,
401 run_options.rpc_socket_addr,
402 run_options.rewards_address,
403 run_options.evm_network.clone(),
404 )?;
405 launcher.wait(run_options.interval);
406
407 let node_info = rpc_client.node_info().await?;
408 let peer_id = node_info.peer_id;
409 let network_info = rpc_client.network_info().await?;
410 let connected_peers = Some(network_info.connected_peers);
411 let listen_addrs = network_info
412 .listeners
413 .into_iter()
414 .map(|addr| addr.with(Protocol::P2p(node_info.peer_id)))
415 .collect();
416
417 Ok(NodeServiceData {
418 alpha: false,
419 antnode_path: launcher.get_antnode_path(),
420 auto_restart: false,
421 connected_peers,
422 data_dir_path: node_info.data_path,
423 evm_network: run_options.evm_network,
424 relay: false,
425 initial_peers_config: InitialPeersConfig {
426 first: run_options.first,
427 addrs: vec![],
428 network_contacts_url: vec![],
429 local: true,
430 ignore_cache: true,
431 bootstrap_cache_dir: None,
432 },
433 listen_addr: Some(listen_addrs),
434 log_dir_path: node_info.log_path,
435 log_format: run_options.log_format,
436 max_archived_log_files: None,
437 max_log_files: None,
438 metrics_port: run_options.metrics_port,
439 network_id: None,
440 node_ip: None,
441 node_port: run_options.node_port,
442 number: run_options.number,
443 peer_id: Some(peer_id),
444 pid: Some(node_info.pid),
445 rewards_address: run_options.rewards_address,
446 reward_balance: None,
447 rpc_socket_addr: run_options.rpc_socket_addr,
448 schema_version: NODE_SERVICE_DATA_SCHEMA_LATEST,
449 status: ServiceStatus::Running,
450 service_name: format!("antnode-local{}", run_options.number),
451 no_upnp: false,
452 user: None,
453 user_mode: false,
454 version: run_options.version.to_string(),
455 write_older_cache_files: false,
456 })
457}
458
459async fn validate_network(node_registry: NodeRegistryManager, peers: Vec<Multiaddr>) -> Result<()> {
464 let mut all_peers = Vec::new();
465 for node in node_registry.nodes.read().await.iter() {
466 let node = node.read().await;
467 if let Some(peer_id) = &node.peer_id {
468 all_peers.push(*peer_id);
469 } else {
470 return Err(eyre!(
471 "The PeerId was not set for node: {}",
472 node.service_name
473 ));
474 }
475 }
476
477 let additional_peers = peers
481 .into_iter()
482 .filter_map(|addr| {
483 addr.to_string()
484 .rsplit('/')
485 .next()
486 .and_then(|id_str| PeerId::from_str(id_str).ok())
487 })
488 .collect::<Vec<PeerId>>();
489 all_peers.extend(additional_peers);
490
491 for node in node_registry.nodes.read().await.iter() {
492 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
493 let net_info = rpc_client.network_info().await?;
494 let peers = net_info.connected_peers;
495 let peer_id = node
496 .read()
497 .await
498 .peer_id
499 .ok_or_eyre("The PeerId was not set")?;
500 debug!("Node {peer_id} has {} peers", peers.len());
501 println!("Node {peer_id} has {} peers", peers.len());
502
503 let invalid_peers: Vec<PeerId> = peers
506 .iter()
507 .filter(|peer| !all_peers.contains(peer))
508 .cloned()
509 .collect();
510 if !invalid_peers.is_empty() {
511 for invalid_peer in invalid_peers.iter() {
512 println!("Invalid peer found: {invalid_peer}");
513 }
514 error!("Network validation failed: {invalid_peers:?}");
515 return Err(eyre!("Network validation failed",));
516 }
517 }
518 Ok(())
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524 use ant_evm::utils::dummy_address;
525 use ant_service_management::{
526 error::Result as RpcResult,
527 rpc::{NetworkInfo, NodeInfo, RecordAddress, RpcActions},
528 };
529 use async_trait::async_trait;
530 use evmlib::CustomNetwork;
531 use libp2p_identity::PeerId;
532 use mockall::mock;
533 use mockall::predicate::*;
534 use std::str::FromStr;
535
536 mock! {
537 pub RpcClient {}
538 #[async_trait]
539 impl RpcActions for RpcClient {
540 async fn node_info(&self) -> RpcResult<NodeInfo>;
541 async fn network_info(&self) -> RpcResult<NetworkInfo>;
542 async fn record_addresses(&self) -> RpcResult<Vec<RecordAddress>>;
543 async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> RpcResult<()>;
544 async fn node_stop(&self, delay_millis: u64) -> RpcResult<()>;
545 async fn node_update(&self, delay_millis: u64) -> RpcResult<()>;
546 async fn is_node_connected_to_network(&self, timeout: std::time::Duration) -> RpcResult<()>;
547 async fn update_log_level(&self, log_levels: String) -> RpcResult<()>;
548 }
549 }
550
551 #[tokio::test]
552 async fn run_node_should_launch_the_genesis_node() -> Result<()> {
553 let mut mock_launcher = MockLauncher::new();
554 let mut mock_rpc_client = MockRpcClient::new();
555 let rewards_address = dummy_address();
556
557 let peer_id = PeerId::from_str("12D3KooWS2tpXGGTmg2AHFiDh57yPQnat49YHnyqoggzXZWpqkCR")?;
558 let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 13000);
559 mock_launcher
560 .expect_launch_node()
561 .with(
562 eq(true),
563 eq(None),
564 eq(None),
565 eq(None),
566 eq(rpc_socket_addr),
567 eq(rewards_address),
568 eq(EvmNetwork::Custom(CustomNetwork::new(
569 "http://localhost:61611",
570 "0x5FbDB2315678afecb367f032d93F642f64180aa3",
571 "0x8464135c8F25Da09e49BC8782676a84730C318bC",
572 ))),
573 )
574 .times(1)
575 .returning(|_, _, _, _, _, _, _| Ok(()));
576 mock_launcher
577 .expect_wait()
578 .with(eq(100))
579 .times(1)
580 .returning(|_| ());
581 mock_launcher
582 .expect_get_antnode_path()
583 .times(1)
584 .returning(|| PathBuf::from("/usr/local/bin/antnode"));
585
586 mock_rpc_client
587 .expect_node_info()
588 .times(1)
589 .returning(move || {
590 Ok(NodeInfo {
591 pid: 1000,
592 peer_id,
593 data_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}")),
594 log_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs")),
595 version: "0.100.12".to_string(),
596 uptime: std::time::Duration::from_secs(1), wallet_balance: 0,
598 })
599 });
600 mock_rpc_client
601 .expect_network_info()
602 .times(1)
603 .returning(move || {
604 Ok(NetworkInfo {
605 connected_peers: Vec::new(),
606 listeners: Vec::new(),
607 })
608 });
609
610 let node = run_node(
611 RunNodeOptions {
612 first: true,
613 interval: 100,
614 log_format: None,
615 metrics_port: None,
616 node_port: None,
617 number: 1,
618 rpc_socket_addr,
619 rewards_address,
620 evm_network: EvmNetwork::Custom(CustomNetwork::new(
621 "http://localhost:61611",
622 "0x5FbDB2315678afecb367f032d93F642f64180aa3",
623 "0x8464135c8F25Da09e49BC8782676a84730C318bC",
624 )),
625 version: "0.100.12".to_string(),
626 },
627 &mock_launcher,
628 &mock_rpc_client,
629 )
630 .await?;
631
632 assert!(node.initial_peers_config.first);
633 assert_eq!(node.version, "0.100.12");
634 assert_eq!(node.service_name, "antnode-local1");
635 assert_eq!(
636 node.data_dir_path,
637 PathBuf::from(format!("~/.local/share/autonomi/{peer_id}"))
638 );
639 assert_eq!(
640 node.log_dir_path,
641 PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs"))
642 );
643 assert_eq!(node.number, 1);
644 assert_eq!(node.pid, Some(1000));
645 assert_eq!(node.rpc_socket_addr, rpc_socket_addr);
646 assert_eq!(node.status, ServiceStatus::Running);
647 assert_eq!(node.antnode_path, PathBuf::from("/usr/local/bin/antnode"));
648
649 Ok(())
650 }
651}