1use crate::add_services::config::PortRange;
10use crate::helpers::{
11 check_port_availability, get_bin_version, get_start_port_if_applicable, increment_port_option,
12};
13
14use ant_bootstrap::InitialPeersConfig;
15use ant_evm::{EvmNetwork, RewardsAddress};
16use ant_logging::LogFormat;
17use ant_service_management::NodeRegistryManager;
18use ant_service_management::node::NODE_SERVICE_DATA_SCHEMA_LATEST;
19use ant_service_management::{
20 NodeServiceData, ServiceStatus,
21 control::ServiceControl,
22 rpc::{RpcActions, RpcClient},
23};
24use color_eyre::eyre::OptionExt;
25use color_eyre::{Result, eyre::eyre};
26use colored::Colorize;
27use libp2p::{Multiaddr, PeerId, multiaddr::Protocol};
28#[cfg(test)]
29use mockall::automock;
30use std::{
31 net::{IpAddr, Ipv4Addr, SocketAddr},
32 path::PathBuf,
33 process::{Command, Stdio},
34 str::FromStr,
35};
36use sysinfo::{Pid, System};
37
38#[cfg_attr(test, automock)]
39pub trait Launcher {
40 fn get_antnode_path(&self) -> PathBuf;
41 #[allow(clippy::too_many_arguments)]
42 fn launch_node(
43 &self,
44 first: bool,
45 log_format: Option<LogFormat>,
46 metrics_port: Option<u16>,
47 node_port: Option<u16>,
48 rpc_socket_addr: SocketAddr,
49 rewards_address: RewardsAddress,
50 evm_network: EvmNetwork,
51 ) -> Result<()>;
52 fn wait(&self, delay: u64);
53}
54
55#[derive(Default)]
56pub struct LocalSafeLauncher {
57 pub antnode_bin_path: PathBuf,
58}
59
60impl Launcher for LocalSafeLauncher {
61 fn get_antnode_path(&self) -> PathBuf {
62 self.antnode_bin_path.clone()
63 }
64
65 fn launch_node(
66 &self,
67 first: bool,
68 log_format: Option<LogFormat>,
69 metrics_port: Option<u16>,
70 node_port: Option<u16>,
71 rpc_socket_addr: SocketAddr,
72 rewards_address: RewardsAddress,
73 evm_network: EvmNetwork,
74 ) -> Result<()> {
75 let mut args = Vec::new();
76
77 if first {
78 args.push("--first".to_string())
79 }
80
81 if let Some(log_format) = log_format {
82 args.push("--log-format".to_string());
83 args.push(log_format.as_str().to_string());
84 }
85
86 if let Some(metrics_port) = metrics_port {
87 args.push("--metrics-server-port".to_string());
88 args.push(metrics_port.to_string());
89 }
90
91 if let Some(node_port) = node_port {
92 args.push("--port".to_string());
93 args.push(node_port.to_string());
94 }
95
96 args.push("--local".to_string());
97 args.push("--rpc".to_string());
98 args.push(rpc_socket_addr.to_string());
99
100 args.push("--rewards-address".to_string());
101 args.push(rewards_address.to_string());
102
103 args.push(format!("evm-{}", evm_network.identifier()));
104
105 if let EvmNetwork::Custom(custom) = evm_network {
106 args.push("--rpc-url".to_string());
107 args.push(custom.rpc_url_http.to_string());
108 args.push("--payment-token-address".to_string());
109 args.push(custom.payment_token_address.to_string());
110 args.push("--data-payments-address".to_string());
111 args.push(custom.data_payments_address.to_string());
112
113 if let Some(merkle_addr) = custom.merkle_payments_address {
114 args.push("--merkle-payments-address".to_string());
115 args.push(merkle_addr.to_string());
116 }
117 }
118
119 Command::new(self.antnode_bin_path.clone())
120 .args(args)
121 .stdout(Stdio::inherit())
122 .stderr(Stdio::inherit())
123 .spawn()
124 .inspect_err(|err| error!("Error while spawning node process: {err:?}"))?;
125
126 Ok(())
127 }
128
129 fn wait(&self, delay: u64) {
133 std::thread::sleep(std::time::Duration::from_millis(delay));
134 }
135}
136
137fn kill_evm_testnet_processes(system: &mut System) {
139 for (pid, process) in system.processes() {
141 let process_name = process.name().to_lowercase();
142 if process_name.contains("anvil") || process_name.contains("evm-testnet") {
143 debug!("Killing EVM testnet process: {} ({})", process_name, pid);
144 process.kill();
145 println!(" {} Killed EVM testnet process ({})", "✓".green(), pid);
146 }
147 }
148}
149
150pub async fn kill_network(
151 node_registry: NodeRegistryManager,
152 keep_directories: bool,
153) -> Result<()> {
154 let mut system = System::new_all();
155 system.refresh_all();
156
157 let genesis_data_path = dirs_next::data_dir()
158 .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
159 .join("autonomi")
160 .join("test_genesis");
161 if genesis_data_path.is_dir() {
162 debug!("Removed genesis data directory");
163 std::fs::remove_dir_all(genesis_data_path)?;
164 }
165
166 kill_evm_testnet_processes(&mut system);
167
168 for node in node_registry.nodes.read().await.iter() {
169 let node = node.read().await;
170 println!("{}:", node.service_name);
171 if let Some(pid) = node.pid {
174 if let Some(process) = system.process(Pid::from(pid as usize)) {
178 process.kill();
179 debug!("Killed node: {} ({})", node.service_name, pid);
180 println!(" {} Killed process", "✓".green());
181 }
182 }
183
184 if !keep_directories {
185 if let Err(e) = std::fs::remove_dir_all(&node.data_dir_path) {
188 error!("Failed to remove node data directory: {:?}", e);
189 println!(
190 " {} Failed to remove {}: {e}",
191 "✗".red(),
192 node.data_dir_path.to_string_lossy()
193 );
194 } else {
195 debug!("Removed node data directory: {:?}", node.data_dir_path);
196 println!(
197 " {} Removed {}",
198 "✓".green(),
199 node.data_dir_path.to_string_lossy()
200 );
201 }
202 }
203 }
204
205 Ok(())
206}
207
208pub struct LocalNetworkOptions {
209 pub antnode_bin_path: PathBuf,
210 pub enable_metrics_server: bool,
211 pub join: bool,
212 pub interval: u64,
213 pub metrics_port: Option<PortRange>,
214 pub node_port: Option<PortRange>,
215 pub node_count: u16,
216 pub peers: Option<Vec<Multiaddr>>,
217 pub rpc_port: Option<PortRange>,
218 pub skip_validation: bool,
219 pub log_format: Option<LogFormat>,
220 pub rewards_address: RewardsAddress,
221 pub evm_network: EvmNetwork,
222}
223
224pub async fn run_network(
225 options: LocalNetworkOptions,
226 node_registry: NodeRegistryManager,
227 service_control: &dyn ServiceControl,
228) -> Result<()> {
229 info!("Running local network");
230
231 if let Some(port_range) = &options.node_port {
233 port_range.validate(options.node_count)?;
234 check_port_availability(port_range, &node_registry.nodes).await?;
235 }
236
237 if let Some(port_range) = &options.metrics_port {
238 port_range.validate(options.node_count)?;
239 check_port_availability(port_range, &node_registry.nodes).await?;
240 }
241
242 if let Some(port_range) = &options.rpc_port {
243 port_range.validate(options.node_count)?;
244 check_port_availability(port_range, &node_registry.nodes).await?;
245 }
246
247 let launcher = LocalSafeLauncher {
248 antnode_bin_path: options.antnode_bin_path.to_path_buf(),
249 };
250
251 let mut node_port = get_start_port_if_applicable(options.node_port);
252 let mut metrics_port = get_start_port_if_applicable(options.metrics_port);
253 let mut rpc_port = get_start_port_if_applicable(options.rpc_port);
254
255 let (bootstrap_peers, start) = if options.join {
257 if let Some(peers) = options.peers {
258 (peers, 1)
259 } else {
260 let mut peers = Vec::new();
261 for node in node_registry.nodes.read().await.iter() {
262 let node = node.read().await;
263 if let Some(listen_addr) = &node.listen_addr {
264 peers.extend(listen_addr.clone());
265 }
266 }
267 (peers, 1)
268 }
269 } else {
270 let rpc_free_port = if let Some(port) = rpc_port {
271 port
272 } else {
273 service_control.get_available_port()?
274 };
275 let metrics_free_port = if let Some(port) = metrics_port {
276 Some(port)
277 } else if options.enable_metrics_server {
278 Some(service_control.get_available_port()?)
279 } else {
280 None
281 };
282 let rpc_socket_addr =
283 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
284 let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
285
286 let number = (node_registry.nodes.read().await.len() as u16) + 1;
287 let node = run_node(
288 RunNodeOptions {
289 first: true,
290 metrics_port: metrics_free_port,
291 node_port,
292 interval: options.interval,
293 log_format: options.log_format,
294 number,
295 rpc_socket_addr,
296 rewards_address: options.rewards_address,
297 evm_network: options.evm_network.clone(),
298 version: get_bin_version(&launcher.get_antnode_path())?,
299 },
300 &launcher,
301 &rpc_client,
302 )
303 .await?;
304 node_registry.push_node(node.clone()).await;
305 let bootstrap_peers = node
306 .listen_addr
307 .ok_or_eyre("The listen address was not set")?;
308 node_port = increment_port_option(node_port);
309 metrics_port = increment_port_option(metrics_port);
310 rpc_port = increment_port_option(rpc_port);
311 (bootstrap_peers, 2)
312 };
313 node_registry.save().await?;
314
315 for _ in start..=options.node_count {
316 let rpc_free_port = if let Some(port) = rpc_port {
317 port
318 } else {
319 service_control.get_available_port()?
320 };
321 let metrics_free_port = if let Some(port) = metrics_port {
322 Some(port)
323 } else if options.enable_metrics_server {
324 Some(service_control.get_available_port()?)
325 } else {
326 None
327 };
328 let rpc_socket_addr =
329 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
330 let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
331
332 let number = (node_registry.nodes.read().await.len() as u16) + 1;
333 let node = run_node(
334 RunNodeOptions {
335 first: false,
336 metrics_port: metrics_free_port,
337 node_port,
338 interval: options.interval,
339 log_format: options.log_format,
340 number,
341 rpc_socket_addr,
342 rewards_address: options.rewards_address,
343 evm_network: options.evm_network.clone(),
344 version: get_bin_version(&launcher.get_antnode_path())?,
345 },
346 &launcher,
347 &rpc_client,
348 )
349 .await?;
350 node_registry.push_node(node).await;
351
352 node_registry.save().await?;
357
358 node_port = increment_port_option(node_port);
359 metrics_port = increment_port_option(metrics_port);
360 rpc_port = increment_port_option(rpc_port);
361 }
362
363 if !options.skip_validation {
364 debug!("Waiting for 10 seconds before validating the network...");
365 println!("Waiting for 10 seconds before validating the network...");
366 std::thread::sleep(std::time::Duration::from_secs(10));
367 validate_network(node_registry, bootstrap_peers.clone()).await?;
368 }
369
370 Ok(())
371}
372
373pub struct RunNodeOptions {
374 pub first: bool,
375 pub interval: u64,
376 pub log_format: Option<LogFormat>,
377 pub metrics_port: Option<u16>,
378 pub node_port: Option<u16>,
379 pub number: u16,
380 pub rpc_socket_addr: SocketAddr,
381 pub rewards_address: RewardsAddress,
382 pub evm_network: EvmNetwork,
383 pub version: String,
384}
385
386pub async fn run_node(
387 run_options: RunNodeOptions,
388 launcher: &dyn Launcher,
389 rpc_client: &dyn RpcActions,
390) -> Result<NodeServiceData> {
391 if run_options.number == 2 {
395 launcher.wait(5000);
397 }
398
399 info!("Launching node {}...", run_options.number);
400 println!("Launching node {}...", run_options.number);
401 launcher.launch_node(
402 run_options.first,
403 run_options.log_format,
404 run_options.metrics_port,
405 run_options.node_port,
406 run_options.rpc_socket_addr,
407 run_options.rewards_address,
408 run_options.evm_network.clone(),
409 )?;
410 launcher.wait(run_options.interval);
411
412 let node_info = rpc_client.node_info().await?;
413 let peer_id = node_info.peer_id;
414 let network_info = rpc_client.network_info().await?;
415 let connected_peers = Some(network_info.connected_peers);
416 let listen_addrs = network_info
417 .listeners
418 .into_iter()
419 .map(|addr| addr.with(Protocol::P2p(node_info.peer_id)))
420 .collect();
421
422 Ok(NodeServiceData {
423 alpha: false,
424 antnode_path: launcher.get_antnode_path(),
425 auto_restart: false,
426 connected_peers,
427 data_dir_path: node_info.data_path,
428 evm_network: run_options.evm_network,
429 relay: false,
430 initial_peers_config: InitialPeersConfig {
431 first: run_options.first,
432 addrs: vec![],
433 network_contacts_url: vec![],
434 local: true,
435 ignore_cache: true,
436 bootstrap_cache_dir: None,
437 },
438 listen_addr: Some(listen_addrs),
439 log_dir_path: node_info.log_path,
440 log_format: run_options.log_format,
441 max_archived_log_files: None,
442 max_log_files: None,
443 metrics_port: run_options.metrics_port,
444 network_id: None,
445 node_ip: None,
446 node_port: run_options.node_port,
447 number: run_options.number,
448 peer_id: Some(peer_id),
449 pid: Some(node_info.pid),
450 rewards_address: run_options.rewards_address,
451 reward_balance: None,
452 rpc_socket_addr: run_options.rpc_socket_addr,
453 schema_version: NODE_SERVICE_DATA_SCHEMA_LATEST,
454 status: ServiceStatus::Running,
455 service_name: format!("antnode-local{}", run_options.number),
456 no_upnp: false,
457 user: None,
458 user_mode: false,
459 version: run_options.version.to_string(),
460 write_older_cache_files: false,
461 })
462}
463
464async fn validate_network(node_registry: NodeRegistryManager, peers: Vec<Multiaddr>) -> Result<()> {
469 let mut all_peers = Vec::new();
470 for node in node_registry.nodes.read().await.iter() {
471 let node = node.read().await;
472 if let Some(peer_id) = &node.peer_id {
473 all_peers.push(*peer_id);
474 } else {
475 return Err(eyre!(
476 "The PeerId was not set for node: {}",
477 node.service_name
478 ));
479 }
480 }
481
482 let additional_peers = peers
486 .into_iter()
487 .filter_map(|addr| {
488 addr.to_string()
489 .rsplit('/')
490 .next()
491 .and_then(|id_str| PeerId::from_str(id_str).ok())
492 })
493 .collect::<Vec<PeerId>>();
494 all_peers.extend(additional_peers);
495
496 for node in node_registry.nodes.read().await.iter() {
497 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
498 let net_info = rpc_client.network_info().await?;
499 let peers = net_info.connected_peers;
500 let peer_id = node
501 .read()
502 .await
503 .peer_id
504 .ok_or_eyre("The PeerId was not set")?;
505 debug!("Node {peer_id} has {} peers", peers.len());
506 println!("Node {peer_id} has {} peers", peers.len());
507
508 let invalid_peers: Vec<PeerId> = peers
511 .iter()
512 .filter(|peer| !all_peers.contains(peer))
513 .cloned()
514 .collect();
515 if !invalid_peers.is_empty() {
516 for invalid_peer in invalid_peers.iter() {
517 println!("Invalid peer found: {invalid_peer}");
518 }
519 error!("Network validation failed: {invalid_peers:?}");
520 return Err(eyre!("Network validation failed",));
521 }
522 }
523 Ok(())
524}
525
526#[cfg(test)]
527mod tests {
528 use super::*;
529 use ant_evm::utils::dummy_address;
530 use ant_service_management::{
531 error::Result as RpcResult,
532 rpc::{NetworkInfo, NodeInfo, RecordAddress, RpcActions},
533 };
534 use async_trait::async_trait;
535 use evmlib::CustomNetwork;
536 use libp2p_identity::PeerId;
537 use mockall::mock;
538 use mockall::predicate::*;
539 use std::str::FromStr;
540
541 mock! {
542 pub RpcClient {}
543 #[async_trait]
544 impl RpcActions for RpcClient {
545 async fn node_info(&self) -> RpcResult<NodeInfo>;
546 async fn network_info(&self) -> RpcResult<NetworkInfo>;
547 async fn record_addresses(&self) -> RpcResult<Vec<RecordAddress>>;
548 async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> RpcResult<()>;
549 async fn node_stop(&self, delay_millis: u64) -> RpcResult<()>;
550 async fn node_update(&self, delay_millis: u64) -> RpcResult<()>;
551 async fn is_node_connected_to_network(&self, timeout: std::time::Duration) -> RpcResult<()>;
552 async fn update_log_level(&self, log_levels: String) -> RpcResult<()>;
553 }
554 }
555
556 #[tokio::test]
557 async fn run_node_should_launch_the_genesis_node() -> Result<()> {
558 let mut mock_launcher = MockLauncher::new();
559 let mut mock_rpc_client = MockRpcClient::new();
560 let rewards_address = dummy_address();
561
562 let peer_id = PeerId::from_str("12D3KooWS2tpXGGTmg2AHFiDh57yPQnat49YHnyqoggzXZWpqkCR")?;
563 let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 13000);
564 mock_launcher
565 .expect_launch_node()
566 .with(
567 eq(true),
568 eq(None),
569 eq(None),
570 eq(None),
571 eq(rpc_socket_addr),
572 eq(rewards_address),
573 eq(EvmNetwork::Custom(CustomNetwork::new(
574 "http://localhost:61611",
575 "0x5FbDB2315678afecb367f032d93F642f64180aa3",
576 "0x8464135c8F25Da09e49BC8782676a84730C318bC",
577 Some("0x663F3ad617193148711d28f5334eE4Ed07016602"),
578 ))),
579 )
580 .times(1)
581 .returning(|_, _, _, _, _, _, _| Ok(()));
582 mock_launcher
583 .expect_wait()
584 .with(eq(100))
585 .times(1)
586 .returning(|_| ());
587 mock_launcher
588 .expect_get_antnode_path()
589 .times(1)
590 .returning(|| PathBuf::from("/usr/local/bin/antnode"));
591
592 mock_rpc_client
593 .expect_node_info()
594 .times(1)
595 .returning(move || {
596 Ok(NodeInfo {
597 pid: 1000,
598 peer_id,
599 data_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}")),
600 log_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs")),
601 version: "0.100.12".to_string(),
602 uptime: std::time::Duration::from_secs(1), wallet_balance: 0,
604 })
605 });
606 mock_rpc_client
607 .expect_network_info()
608 .times(1)
609 .returning(move || {
610 Ok(NetworkInfo {
611 connected_peers: Vec::new(),
612 listeners: Vec::new(),
613 })
614 });
615
616 let node = run_node(
617 RunNodeOptions {
618 first: true,
619 interval: 100,
620 log_format: None,
621 metrics_port: None,
622 node_port: None,
623 number: 1,
624 rpc_socket_addr,
625 rewards_address,
626 evm_network: EvmNetwork::Custom(CustomNetwork::new(
627 "http://localhost:61611",
628 "0x5FbDB2315678afecb367f032d93F642f64180aa3",
629 "0x8464135c8F25Da09e49BC8782676a84730C318bC",
630 Some("0x663F3ad617193148711d28f5334eE4Ed07016602"),
631 )),
632 version: "0.100.12".to_string(),
633 },
634 &mock_launcher,
635 &mock_rpc_client,
636 )
637 .await?;
638
639 assert!(node.initial_peers_config.first);
640 assert_eq!(node.version, "0.100.12");
641 assert_eq!(node.service_name, "antnode-local1");
642 assert_eq!(
643 node.data_dir_path,
644 PathBuf::from(format!("~/.local/share/autonomi/{peer_id}"))
645 );
646 assert_eq!(
647 node.log_dir_path,
648 PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs"))
649 );
650 assert_eq!(node.number, 1);
651 assert_eq!(node.pid, Some(1000));
652 assert_eq!(node.rpc_socket_addr, rpc_socket_addr);
653 assert_eq!(node.status, ServiceStatus::Running);
654 assert_eq!(node.antnode_path, PathBuf::from("/usr/local/bin/antnode"));
655
656 Ok(())
657 }
658}