1use crate::add_services::config::PortRange;
10use crate::helpers::{
11 check_port_availability, get_bin_version, get_start_port_if_applicable, increment_port_option,
12};
13
14use ant_bootstrap::InitialPeersConfig;
15use ant_evm::{EvmNetwork, RewardsAddress};
16use ant_logging::LogFormat;
17use ant_service_management::node::NODE_SERVICE_DATA_SCHEMA_LATEST;
18use ant_service_management::{
19 control::ServiceControl,
20 rpc::{RpcActions, RpcClient},
21 NodeRegistry, NodeServiceData, ServiceStatus,
22};
23use color_eyre::eyre::OptionExt;
24use color_eyre::{eyre::eyre, Result};
25use colored::Colorize;
26use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
27#[cfg(test)]
28use mockall::automock;
29use std::{
30 net::{IpAddr, Ipv4Addr, SocketAddr},
31 path::PathBuf,
32 process::{Command, Stdio},
33 str::FromStr,
34};
35use sysinfo::{Pid, System};
36
37#[cfg_attr(test, automock)]
38pub trait Launcher {
39 fn get_antnode_path(&self) -> PathBuf;
40 #[allow(clippy::too_many_arguments)]
41 fn launch_node(
42 &self,
43 first: bool,
44 log_format: Option<LogFormat>,
45 metrics_port: Option<u16>,
46 node_port: Option<u16>,
47 rpc_socket_addr: SocketAddr,
48 rewards_address: RewardsAddress,
49 evm_network: Option<EvmNetwork>,
50 ) -> Result<()>;
51 fn wait(&self, delay: u64);
52}
53
54#[derive(Default)]
55pub struct LocalSafeLauncher {
56 pub antnode_bin_path: PathBuf,
57}
58
59impl Launcher for LocalSafeLauncher {
60 fn get_antnode_path(&self) -> PathBuf {
61 self.antnode_bin_path.clone()
62 }
63
64 fn launch_node(
65 &self,
66 first: bool,
67 log_format: Option<LogFormat>,
68 metrics_port: Option<u16>,
69 node_port: Option<u16>,
70 rpc_socket_addr: SocketAddr,
71 rewards_address: RewardsAddress,
72 evm_network: Option<EvmNetwork>,
73 ) -> Result<()> {
74 let mut args = Vec::new();
75
76 if first {
77 args.push("--first".to_string())
78 }
79
80 if let Some(log_format) = log_format {
81 args.push("--log-format".to_string());
82 args.push(log_format.as_str().to_string());
83 }
84
85 if let Some(metrics_port) = metrics_port {
86 args.push("--metrics-server-port".to_string());
87 args.push(metrics_port.to_string());
88 }
89
90 if let Some(node_port) = node_port {
91 args.push("--port".to_string());
92 args.push(node_port.to_string());
93 }
94
95 args.push("--local".to_string());
96 args.push("--rpc".to_string());
97 args.push(rpc_socket_addr.to_string());
98
99 args.push("--rewards-address".to_string());
100 args.push(rewards_address.to_string());
101
102 if let Some(network) = evm_network {
103 args.push(format!("evm-{}", network.identifier()));
104
105 if let EvmNetwork::Custom(custom) = network {
106 args.push("--rpc-url".to_string());
107 args.push(custom.rpc_url_http.to_string());
108 args.push("--payment-token-address".to_string());
109 args.push(custom.payment_token_address.to_string());
110 args.push("--data-payments-address".to_string());
111 args.push(custom.data_payments_address.to_string());
112 }
113 }
114
115 Command::new(self.antnode_bin_path.clone())
116 .args(args)
117 .stdout(Stdio::inherit())
118 .stderr(Stdio::inherit())
119 .spawn()
120 .inspect_err(|err| error!("Error while spawning node process: {err:?}"))?;
121
122 Ok(())
123 }
124
125 fn wait(&self, delay: u64) {
129 std::thread::sleep(std::time::Duration::from_millis(delay));
130 }
131}
132
133pub fn kill_network(node_registry: &NodeRegistry, keep_directories: bool) -> Result<()> {
134 let mut system = System::new_all();
135 system.refresh_all();
136
137 if let Some(faucet) = &node_registry.faucet {
140 if let Some(process) = system.process(Pid::from(faucet.pid.unwrap() as usize)) {
145 process.kill();
146 debug!("Faucet has been killed");
147 println!("{} Killed faucet", "✓".green());
148 }
149 }
150
151 let faucet_data_path = dirs_next::data_dir()
152 .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
153 .join("autonomi")
154 .join("test_faucet");
155 if faucet_data_path.is_dir() {
156 std::fs::remove_dir_all(faucet_data_path)?;
157 debug!("Removed faucet data directory");
158 }
159 let genesis_data_path = dirs_next::data_dir()
160 .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
161 .join("autonomi")
162 .join("test_genesis");
163 if genesis_data_path.is_dir() {
164 debug!("Removed genesis data directory");
165 std::fs::remove_dir_all(genesis_data_path)?;
166 }
167
168 for node in node_registry.nodes.iter() {
169 println!("{}:", node.service_name);
170 if let Some(pid) = node.pid {
173 if let Some(process) = system.process(Pid::from(pid as usize)) {
177 process.kill();
178 debug!("Killed node: {} ({})", node.service_name, pid);
179 println!(" {} Killed process", "✓".green());
180 }
181 }
182
183 if !keep_directories {
184 if let Err(e) = std::fs::remove_dir_all(&node.data_dir_path) {
187 error!("Failed to remove node data directory: {:?}", e);
188 println!(
189 " {} Failed to remove {}: {e}",
190 "✗".red(),
191 node.data_dir_path.to_string_lossy()
192 );
193 } else {
194 debug!("Removed node data directory: {:?}", node.data_dir_path);
195 println!(
196 " {} Removed {}",
197 "✓".green(),
198 node.data_dir_path.to_string_lossy()
199 );
200 }
201 }
202 }
203
204 Ok(())
205}
206
207pub struct LocalNetworkOptions {
208 pub antnode_bin_path: PathBuf,
209 pub enable_metrics_server: bool,
210 pub join: bool,
211 pub interval: u64,
212 pub metrics_port: Option<PortRange>,
213 pub node_port: Option<PortRange>,
214 pub node_count: u16,
215 pub peers: Option<Vec<Multiaddr>>,
216 pub rpc_port: Option<PortRange>,
217 pub skip_validation: bool,
218 pub log_format: Option<LogFormat>,
219 pub rewards_address: RewardsAddress,
220 pub evm_network: Option<EvmNetwork>,
221}
222
223pub async fn run_network(
224 options: LocalNetworkOptions,
225 node_registry: &mut NodeRegistry,
226 service_control: &dyn ServiceControl,
227) -> Result<()> {
228 info!("Running local network");
229
230 if let Some(port_range) = &options.node_port {
232 port_range.validate(options.node_count)?;
233 check_port_availability(port_range, &node_registry.nodes)?;
234 }
235
236 if let Some(port_range) = &options.metrics_port {
237 port_range.validate(options.node_count)?;
238 check_port_availability(port_range, &node_registry.nodes)?;
239 }
240
241 if let Some(port_range) = &options.rpc_port {
242 port_range.validate(options.node_count)?;
243 check_port_availability(port_range, &node_registry.nodes)?;
244 }
245
246 let launcher = LocalSafeLauncher {
247 antnode_bin_path: options.antnode_bin_path.to_path_buf(),
248 };
249
250 let mut node_port = get_start_port_if_applicable(options.node_port);
251 let mut metrics_port = get_start_port_if_applicable(options.metrics_port);
252 let mut rpc_port = get_start_port_if_applicable(options.rpc_port);
253
254 let (bootstrap_peers, start) = if options.join {
256 if let Some(peers) = options.peers {
257 (peers, 1)
258 } else {
259 let peer = node_registry
260 .nodes
261 .iter()
262 .find_map(|n| n.listen_addr.clone())
263 .ok_or_eyre("Unable to obtain a peer to connect to")?;
264 (peer, 1)
265 }
266 } else {
267 let rpc_free_port = if let Some(port) = rpc_port {
268 port
269 } else {
270 service_control.get_available_port()?
271 };
272 let metrics_free_port = if let Some(port) = metrics_port {
273 Some(port)
274 } else if options.enable_metrics_server {
275 Some(service_control.get_available_port()?)
276 } else {
277 None
278 };
279 let rpc_socket_addr =
280 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
281 let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
282
283 let number = (node_registry.nodes.len() as u16) + 1;
284 let node = run_node(
285 RunNodeOptions {
286 first: true,
287 metrics_port: metrics_free_port,
288 node_port,
289 interval: options.interval,
290 log_format: options.log_format,
291 number,
292 rpc_socket_addr,
293 rewards_address: options.rewards_address,
294 evm_network: options.evm_network.clone(),
295 version: get_bin_version(&launcher.get_antnode_path())?,
296 },
297 &launcher,
298 &rpc_client,
299 )
300 .await?;
301 node_registry.nodes.push(node.clone());
302 let bootstrap_peers = node
303 .listen_addr
304 .ok_or_eyre("The listen address was not set")?;
305 node_port = increment_port_option(node_port);
306 metrics_port = increment_port_option(metrics_port);
307 rpc_port = increment_port_option(rpc_port);
308 (bootstrap_peers, 2)
309 };
310 node_registry.save()?;
311
312 for _ in start..=options.node_count {
313 let rpc_free_port = if let Some(port) = rpc_port {
314 port
315 } else {
316 service_control.get_available_port()?
317 };
318 let metrics_free_port = if let Some(port) = metrics_port {
319 Some(port)
320 } else if options.enable_metrics_server {
321 Some(service_control.get_available_port()?)
322 } else {
323 None
324 };
325 let rpc_socket_addr =
326 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
327 let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
328
329 let number = (node_registry.nodes.len() as u16) + 1;
330 let node = run_node(
331 RunNodeOptions {
332 first: false,
333 metrics_port: metrics_free_port,
334 node_port,
335 interval: options.interval,
336 log_format: options.log_format,
337 number,
338 rpc_socket_addr,
339 rewards_address: options.rewards_address,
340 evm_network: options.evm_network.clone(),
341 version: get_bin_version(&launcher.get_antnode_path())?,
342 },
343 &launcher,
344 &rpc_client,
345 )
346 .await?;
347 node_registry.nodes.push(node);
348
349 node_registry.save()?;
354
355 node_port = increment_port_option(node_port);
356 metrics_port = increment_port_option(metrics_port);
357 rpc_port = increment_port_option(rpc_port);
358 }
359
360 if !options.skip_validation {
361 debug!("Waiting for 10 seconds before validating the network...");
362 println!("Waiting for 10 seconds before validating the network...");
363 std::thread::sleep(std::time::Duration::from_secs(10));
364 validate_network(node_registry, bootstrap_peers.clone()).await?;
365 }
366
367 Ok(())
368}
369
370pub struct RunNodeOptions {
371 pub first: bool,
372 pub interval: u64,
373 pub log_format: Option<LogFormat>,
374 pub metrics_port: Option<u16>,
375 pub node_port: Option<u16>,
376 pub number: u16,
377 pub rpc_socket_addr: SocketAddr,
378 pub rewards_address: RewardsAddress,
379 pub evm_network: Option<EvmNetwork>,
380 pub version: String,
381}
382
383pub async fn run_node(
384 run_options: RunNodeOptions,
385 launcher: &dyn Launcher,
386 rpc_client: &dyn RpcActions,
387) -> Result<NodeServiceData> {
388 info!("Launching node {}...", run_options.number);
389 println!("Launching node {}...", run_options.number);
390 launcher.launch_node(
391 run_options.first,
392 run_options.log_format,
393 run_options.metrics_port,
394 run_options.node_port,
395 run_options.rpc_socket_addr,
396 run_options.rewards_address,
397 run_options.evm_network.clone(),
398 )?;
399 launcher.wait(run_options.interval);
400
401 let node_info = rpc_client.node_info().await?;
402 let peer_id = node_info.peer_id;
403 let network_info = rpc_client.network_info().await?;
404 let connected_peers = Some(network_info.connected_peers);
405 let listen_addrs = network_info
406 .listeners
407 .into_iter()
408 .map(|addr| addr.with(Protocol::P2p(node_info.peer_id)))
409 .collect();
410
411 Ok(NodeServiceData {
412 alpha: false,
413 antnode_path: launcher.get_antnode_path(),
414 auto_restart: false,
415 connected_peers,
416 data_dir_path: node_info.data_path,
417 evm_network: run_options.evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
418 relay: false,
419 initial_peers_config: InitialPeersConfig {
420 first: run_options.first,
421 addrs: vec![],
422 network_contacts_url: vec![],
423 local: true,
424 ignore_cache: true,
425 bootstrap_cache_dir: None,
426 },
427 listen_addr: Some(listen_addrs),
428 log_dir_path: node_info.log_path,
429 log_format: run_options.log_format,
430 max_archived_log_files: None,
431 max_log_files: None,
432 metrics_port: run_options.metrics_port,
433 network_id: None,
434 node_ip: None,
435 node_port: run_options.node_port,
436 number: run_options.number,
437 peer_id: Some(peer_id),
438 pid: Some(node_info.pid),
439 rewards_address: run_options.rewards_address,
440 reward_balance: None,
441 rpc_socket_addr: run_options.rpc_socket_addr,
442 schema_version: NODE_SERVICE_DATA_SCHEMA_LATEST,
443 status: ServiceStatus::Running,
444 service_name: format!("antnode-local{}", run_options.number),
445 no_upnp: false,
446 user: None,
447 user_mode: false,
448 version: run_options.version.to_string(),
449 })
450}
451
452async fn validate_network(node_registry: &mut NodeRegistry, peers: Vec<Multiaddr>) -> Result<()> {
457 let mut all_peers = node_registry
458 .nodes
459 .iter()
460 .map(|n| n.peer_id.ok_or_eyre("The PeerId was not set"))
461 .collect::<Result<Vec<PeerId>>>()?;
462 let additional_peers = peers
466 .into_iter()
467 .filter_map(|addr| {
468 addr.to_string()
469 .rsplit('/')
470 .next()
471 .and_then(|id_str| PeerId::from_str(id_str).ok())
472 })
473 .collect::<Vec<PeerId>>();
474 all_peers.extend(additional_peers);
475
476 for node in node_registry.nodes.iter() {
477 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
478 let net_info = rpc_client.network_info().await?;
479 let peers = net_info.connected_peers;
480 let peer_id = node.peer_id.ok_or_eyre("The PeerId was not set")?;
481 debug!("Node {peer_id} has {} peers", peers.len());
482 println!("Node {peer_id} has {} peers", peers.len());
483
484 let invalid_peers: Vec<PeerId> = peers
487 .iter()
488 .filter(|peer| !all_peers.contains(peer))
489 .cloned()
490 .collect();
491 if !invalid_peers.is_empty() {
492 for invalid_peer in invalid_peers.iter() {
493 println!("Invalid peer found: {}", invalid_peer);
494 }
495 error!("Network validation failed: {invalid_peers:?}");
496 return Err(eyre!("Network validation failed",));
497 }
498 }
499 Ok(())
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use ant_evm::utils::dummy_address;
506 use ant_service_management::{
507 error::Result as RpcResult,
508 rpc::{NetworkInfo, NodeInfo, RecordAddress, RpcActions},
509 };
510 use async_trait::async_trait;
511 use libp2p_identity::PeerId;
512 use mockall::mock;
513 use mockall::predicate::*;
514 use std::str::FromStr;
515
516 mock! {
517 pub RpcClient {}
518 #[async_trait]
519 impl RpcActions for RpcClient {
520 async fn node_info(&self) -> RpcResult<NodeInfo>;
521 async fn network_info(&self) -> RpcResult<NetworkInfo>;
522 async fn record_addresses(&self) -> RpcResult<Vec<RecordAddress>>;
523 async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> RpcResult<()>;
524 async fn node_stop(&self, delay_millis: u64) -> RpcResult<()>;
525 async fn node_update(&self, delay_millis: u64) -> RpcResult<()>;
526 async fn is_node_connected_to_network(&self, timeout: std::time::Duration) -> RpcResult<()>;
527 async fn update_log_level(&self, log_levels: String) -> RpcResult<()>;
528 }
529 }
530
531 #[tokio::test]
532 async fn run_node_should_launch_the_genesis_node() -> Result<()> {
533 let mut mock_launcher = MockLauncher::new();
534 let mut mock_rpc_client = MockRpcClient::new();
535 let rewards_address = dummy_address();
536
537 let peer_id = PeerId::from_str("12D3KooWS2tpXGGTmg2AHFiDh57yPQnat49YHnyqoggzXZWpqkCR")?;
538 let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 13000);
539 mock_launcher
540 .expect_launch_node()
541 .with(
542 eq(true),
543 eq(None),
544 eq(None),
545 eq(None),
546 eq(rpc_socket_addr),
547 eq(rewards_address),
548 eq(None),
549 )
550 .times(1)
551 .returning(|_, _, _, _, _, _, _| Ok(()));
552 mock_launcher
553 .expect_wait()
554 .with(eq(100))
555 .times(1)
556 .returning(|_| ());
557 mock_launcher
558 .expect_get_antnode_path()
559 .times(1)
560 .returning(|| PathBuf::from("/usr/local/bin/antnode"));
561
562 mock_rpc_client
563 .expect_node_info()
564 .times(1)
565 .returning(move || {
566 Ok(NodeInfo {
567 pid: 1000,
568 peer_id,
569 data_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}")),
570 log_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs")),
571 version: "0.100.12".to_string(),
572 uptime: std::time::Duration::from_secs(1), wallet_balance: 0,
574 })
575 });
576 mock_rpc_client
577 .expect_network_info()
578 .times(1)
579 .returning(move || {
580 Ok(NetworkInfo {
581 connected_peers: Vec::new(),
582 listeners: Vec::new(),
583 })
584 });
585
586 let node = run_node(
587 RunNodeOptions {
588 first: true,
589 interval: 100,
590 log_format: None,
591 metrics_port: None,
592 node_port: None,
593 number: 1,
594 rpc_socket_addr,
595 rewards_address,
596 evm_network: None,
597 version: "0.100.12".to_string(),
598 },
599 &mock_launcher,
600 &mock_rpc_client,
601 )
602 .await?;
603
604 assert!(node.initial_peers_config.first);
605 assert_eq!(node.version, "0.100.12");
606 assert_eq!(node.service_name, "antnode-local1");
607 assert_eq!(
608 node.data_dir_path,
609 PathBuf::from(format!("~/.local/share/autonomi/{peer_id}"))
610 );
611 assert_eq!(
612 node.log_dir_path,
613 PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs"))
614 );
615 assert_eq!(node.number, 1);
616 assert_eq!(node.pid, Some(1000));
617 assert_eq!(node.rpc_socket_addr, rpc_socket_addr);
618 assert_eq!(node.status, ServiceStatus::Running);
619 assert_eq!(node.antnode_path, PathBuf::from("/usr/local/bin/antnode"));
620
621 Ok(())
622 }
623}