ant_node_manager/
local.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::add_services::config::PortRange;
10use crate::helpers::{
11    check_port_availability, get_bin_version, get_start_port_if_applicable, increment_port_option,
12};
13
14use ant_bootstrap::InitialPeersConfig;
15use ant_evm::{EvmNetwork, RewardsAddress};
16use ant_logging::LogFormat;
17use ant_service_management::NodeRegistryManager;
18use ant_service_management::node::NODE_SERVICE_DATA_SCHEMA_LATEST;
19use ant_service_management::{
20    NodeServiceData, ServiceStatus,
21    control::ServiceControl,
22    rpc::{RpcActions, RpcClient},
23};
24use color_eyre::eyre::OptionExt;
25use color_eyre::{Result, eyre::eyre};
26use colored::Colorize;
27use libp2p::{Multiaddr, PeerId, multiaddr::Protocol};
28#[cfg(test)]
29use mockall::automock;
30use std::{
31    net::{IpAddr, Ipv4Addr, SocketAddr},
32    path::PathBuf,
33    process::{Command, Stdio},
34    str::FromStr,
35};
36use sysinfo::{Pid, System};
37
38#[cfg_attr(test, automock)]
39pub trait Launcher {
40    fn get_antnode_path(&self) -> PathBuf;
41    #[allow(clippy::too_many_arguments)]
42    fn launch_node(
43        &self,
44        first: bool,
45        log_format: Option<LogFormat>,
46        metrics_port: Option<u16>,
47        node_port: Option<u16>,
48        rpc_socket_addr: SocketAddr,
49        rewards_address: RewardsAddress,
50        evm_network: EvmNetwork,
51    ) -> Result<()>;
52    fn wait(&self, delay: u64);
53}
54
55#[derive(Default)]
56pub struct LocalSafeLauncher {
57    pub antnode_bin_path: PathBuf,
58}
59
60impl Launcher for LocalSafeLauncher {
61    fn get_antnode_path(&self) -> PathBuf {
62        self.antnode_bin_path.clone()
63    }
64
65    fn launch_node(
66        &self,
67        first: bool,
68        log_format: Option<LogFormat>,
69        metrics_port: Option<u16>,
70        node_port: Option<u16>,
71        rpc_socket_addr: SocketAddr,
72        rewards_address: RewardsAddress,
73        evm_network: EvmNetwork,
74    ) -> Result<()> {
75        let mut args = Vec::new();
76
77        if first {
78            args.push("--first".to_string())
79        }
80
81        if let Some(log_format) = log_format {
82            args.push("--log-format".to_string());
83            args.push(log_format.as_str().to_string());
84        }
85
86        if let Some(metrics_port) = metrics_port {
87            args.push("--metrics-server-port".to_string());
88            args.push(metrics_port.to_string());
89        }
90
91        if let Some(node_port) = node_port {
92            args.push("--port".to_string());
93            args.push(node_port.to_string());
94        }
95
96        args.push("--local".to_string());
97        args.push("--rpc".to_string());
98        args.push(rpc_socket_addr.to_string());
99
100        args.push("--rewards-address".to_string());
101        args.push(rewards_address.to_string());
102
103        args.push(format!("evm-{}", evm_network.identifier()));
104
105        if let EvmNetwork::Custom(custom) = evm_network {
106            args.push("--rpc-url".to_string());
107            args.push(custom.rpc_url_http.to_string());
108            args.push("--payment-token-address".to_string());
109            args.push(custom.payment_token_address.to_string());
110            args.push("--data-payments-address".to_string());
111            args.push(custom.data_payments_address.to_string());
112        }
113
114        Command::new(self.antnode_bin_path.clone())
115            .args(args)
116            .stdout(Stdio::inherit())
117            .stderr(Stdio::inherit())
118            .spawn()
119            .inspect_err(|err| error!("Error while spawning node process: {err:?}"))?;
120
121        Ok(())
122    }
123
124    /// Provide a delay for the service to start or stop.
125    ///
126    /// This is wrapped mainly just for unit testing.
127    fn wait(&self, delay: u64) {
128        std::thread::sleep(std::time::Duration::from_millis(delay));
129    }
130}
131
132/// Kill any running EVM testnet processes
133fn kill_evm_testnet_processes(system: &mut System) {
134    // Look for anvil processes (which are used by evm-testnet)
135    for (pid, process) in system.processes() {
136        let process_name = process.name().to_lowercase();
137        if process_name.contains("anvil") || process_name.contains("evm-testnet") {
138            debug!("Killing EVM testnet process: {} ({})", process_name, pid);
139            process.kill();
140            println!("  {} Killed EVM testnet process ({})", "✓".green(), pid);
141        }
142    }
143}
144
145pub async fn kill_network(
146    node_registry: NodeRegistryManager,
147    keep_directories: bool,
148) -> Result<()> {
149    let mut system = System::new_all();
150    system.refresh_all();
151
152    let genesis_data_path = dirs_next::data_dir()
153        .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
154        .join("autonomi")
155        .join("test_genesis");
156    if genesis_data_path.is_dir() {
157        debug!("Removed genesis data directory");
158        std::fs::remove_dir_all(genesis_data_path)?;
159    }
160
161    kill_evm_testnet_processes(&mut system);
162
163    for node in node_registry.nodes.read().await.iter() {
164        let node = node.read().await;
165        println!("{}:", node.service_name);
166        // If the PID is not set it means the `status` command ran and determined the node was
167        // already dead anyway, so we don't need to do anything.
168        if let Some(pid) = node.pid {
169            // It could be possible that None would be returned here, if the process had already
170            // died, but the `status` command had not ran. In that case, we don't need to do
171            // anything anyway.
172            if let Some(process) = system.process(Pid::from(pid as usize)) {
173                process.kill();
174                debug!("Killed node: {} ({})", node.service_name, pid);
175                println!("  {} Killed process", "✓".green());
176            }
177        }
178
179        if !keep_directories {
180            // At this point we don't allow path overrides, so deleting the data directory will clear
181            // the log directory also.
182            if let Err(e) = std::fs::remove_dir_all(&node.data_dir_path) {
183                error!("Failed to remove node data directory: {:?}", e);
184                println!(
185                    "  {} Failed to remove {}: {e}",
186                    "✗".red(),
187                    node.data_dir_path.to_string_lossy()
188                );
189            } else {
190                debug!("Removed node data directory: {:?}", node.data_dir_path);
191                println!(
192                    "  {} Removed {}",
193                    "✓".green(),
194                    node.data_dir_path.to_string_lossy()
195                );
196            }
197        }
198    }
199
200    Ok(())
201}
202
203pub struct LocalNetworkOptions {
204    pub antnode_bin_path: PathBuf,
205    pub enable_metrics_server: bool,
206    pub join: bool,
207    pub interval: u64,
208    pub metrics_port: Option<PortRange>,
209    pub node_port: Option<PortRange>,
210    pub node_count: u16,
211    pub peers: Option<Vec<Multiaddr>>,
212    pub rpc_port: Option<PortRange>,
213    pub skip_validation: bool,
214    pub log_format: Option<LogFormat>,
215    pub rewards_address: RewardsAddress,
216    pub evm_network: EvmNetwork,
217}
218
219pub async fn run_network(
220    options: LocalNetworkOptions,
221    node_registry: NodeRegistryManager,
222    service_control: &dyn ServiceControl,
223) -> Result<()> {
224    info!("Running local network");
225
226    // Check port availability when joining a local network.
227    if let Some(port_range) = &options.node_port {
228        port_range.validate(options.node_count)?;
229        check_port_availability(port_range, &node_registry.nodes).await?;
230    }
231
232    if let Some(port_range) = &options.metrics_port {
233        port_range.validate(options.node_count)?;
234        check_port_availability(port_range, &node_registry.nodes).await?;
235    }
236
237    if let Some(port_range) = &options.rpc_port {
238        port_range.validate(options.node_count)?;
239        check_port_availability(port_range, &node_registry.nodes).await?;
240    }
241
242    let launcher = LocalSafeLauncher {
243        antnode_bin_path: options.antnode_bin_path.to_path_buf(),
244    };
245
246    let mut node_port = get_start_port_if_applicable(options.node_port);
247    let mut metrics_port = get_start_port_if_applicable(options.metrics_port);
248    let mut rpc_port = get_start_port_if_applicable(options.rpc_port);
249
250    // Start the bootstrap node if it doesnt exist.
251    let (bootstrap_peers, start) = if options.join {
252        if let Some(peers) = options.peers {
253            (peers, 1)
254        } else {
255            let mut peers = Vec::new();
256            for node in node_registry.nodes.read().await.iter() {
257                let node = node.read().await;
258                if let Some(listen_addr) = &node.listen_addr {
259                    peers.extend(listen_addr.clone());
260                }
261            }
262            (peers, 1)
263        }
264    } else {
265        let rpc_free_port = if let Some(port) = rpc_port {
266            port
267        } else {
268            service_control.get_available_port()?
269        };
270        let metrics_free_port = if let Some(port) = metrics_port {
271            Some(port)
272        } else if options.enable_metrics_server {
273            Some(service_control.get_available_port()?)
274        } else {
275            None
276        };
277        let rpc_socket_addr =
278            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
279        let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
280
281        let number = (node_registry.nodes.read().await.len() as u16) + 1;
282        let node = run_node(
283            RunNodeOptions {
284                first: true,
285                metrics_port: metrics_free_port,
286                node_port,
287                interval: options.interval,
288                log_format: options.log_format,
289                number,
290                rpc_socket_addr,
291                rewards_address: options.rewards_address,
292                evm_network: options.evm_network.clone(),
293                version: get_bin_version(&launcher.get_antnode_path())?,
294            },
295            &launcher,
296            &rpc_client,
297        )
298        .await?;
299        node_registry.push_node(node.clone()).await;
300        let bootstrap_peers = node
301            .listen_addr
302            .ok_or_eyre("The listen address was not set")?;
303        node_port = increment_port_option(node_port);
304        metrics_port = increment_port_option(metrics_port);
305        rpc_port = increment_port_option(rpc_port);
306        (bootstrap_peers, 2)
307    };
308    node_registry.save().await?;
309
310    for _ in start..=options.node_count {
311        let rpc_free_port = if let Some(port) = rpc_port {
312            port
313        } else {
314            service_control.get_available_port()?
315        };
316        let metrics_free_port = if let Some(port) = metrics_port {
317            Some(port)
318        } else if options.enable_metrics_server {
319            Some(service_control.get_available_port()?)
320        } else {
321            None
322        };
323        let rpc_socket_addr =
324            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
325        let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
326
327        let number = (node_registry.nodes.read().await.len() as u16) + 1;
328        let node = run_node(
329            RunNodeOptions {
330                first: false,
331                metrics_port: metrics_free_port,
332                node_port,
333                interval: options.interval,
334                log_format: options.log_format,
335                number,
336                rpc_socket_addr,
337                rewards_address: options.rewards_address,
338                evm_network: options.evm_network.clone(),
339                version: get_bin_version(&launcher.get_antnode_path())?,
340            },
341            &launcher,
342            &rpc_client,
343        )
344        .await?;
345        node_registry.push_node(node).await;
346
347        // We save the node registry for each launch because it's possible any node can fail to
348        // launch, or maybe the validation will fail. In the error case, we will want to use the
349        // `kill` command for the nodes that we did spin up. The `kill` command works on the basis
350        // of what's in the node registry.
351        node_registry.save().await?;
352
353        node_port = increment_port_option(node_port);
354        metrics_port = increment_port_option(metrics_port);
355        rpc_port = increment_port_option(rpc_port);
356    }
357
358    if !options.skip_validation {
359        debug!("Waiting for 10 seconds before validating the network...");
360        println!("Waiting for 10 seconds before validating the network...");
361        std::thread::sleep(std::time::Duration::from_secs(10));
362        validate_network(node_registry, bootstrap_peers.clone()).await?;
363    }
364
365    Ok(())
366}
367
368pub struct RunNodeOptions {
369    pub first: bool,
370    pub interval: u64,
371    pub log_format: Option<LogFormat>,
372    pub metrics_port: Option<u16>,
373    pub node_port: Option<u16>,
374    pub number: u16,
375    pub rpc_socket_addr: SocketAddr,
376    pub rewards_address: RewardsAddress,
377    pub evm_network: EvmNetwork,
378    pub version: String,
379}
380
381pub async fn run_node(
382    run_options: RunNodeOptions,
383    launcher: &dyn Launcher,
384    rpc_client: &dyn RpcActions,
385) -> Result<NodeServiceData> {
386    // For local network, after the first node got launched,
387    // it takes little bit time to setup the contact_service.
388    // Hence need an extra wait before launching the second node.
389    if run_options.number == 2 {
390        // in milliseconds
391        launcher.wait(5000);
392    }
393
394    info!("Launching node {}...", run_options.number);
395    println!("Launching node {}...", run_options.number);
396    launcher.launch_node(
397        run_options.first,
398        run_options.log_format,
399        run_options.metrics_port,
400        run_options.node_port,
401        run_options.rpc_socket_addr,
402        run_options.rewards_address,
403        run_options.evm_network.clone(),
404    )?;
405    launcher.wait(run_options.interval);
406
407    let node_info = rpc_client.node_info().await?;
408    let peer_id = node_info.peer_id;
409    let network_info = rpc_client.network_info().await?;
410    let connected_peers = Some(network_info.connected_peers);
411    let listen_addrs = network_info
412        .listeners
413        .into_iter()
414        .map(|addr| addr.with(Protocol::P2p(node_info.peer_id)))
415        .collect();
416
417    Ok(NodeServiceData {
418        alpha: false,
419        antnode_path: launcher.get_antnode_path(),
420        auto_restart: false,
421        connected_peers,
422        data_dir_path: node_info.data_path,
423        evm_network: run_options.evm_network,
424        relay: false,
425        initial_peers_config: InitialPeersConfig {
426            first: run_options.first,
427            addrs: vec![],
428            network_contacts_url: vec![],
429            local: true,
430            ignore_cache: true,
431            bootstrap_cache_dir: None,
432        },
433        listen_addr: Some(listen_addrs),
434        log_dir_path: node_info.log_path,
435        log_format: run_options.log_format,
436        max_archived_log_files: None,
437        max_log_files: None,
438        metrics_port: run_options.metrics_port,
439        network_id: None,
440        node_ip: None,
441        node_port: run_options.node_port,
442        number: run_options.number,
443        peer_id: Some(peer_id),
444        pid: Some(node_info.pid),
445        rewards_address: run_options.rewards_address,
446        reward_balance: None,
447        rpc_socket_addr: run_options.rpc_socket_addr,
448        schema_version: NODE_SERVICE_DATA_SCHEMA_LATEST,
449        status: ServiceStatus::Running,
450        service_name: format!("antnode-local{}", run_options.number),
451        no_upnp: false,
452        user: None,
453        user_mode: false,
454        version: run_options.version.to_string(),
455        write_older_cache_files: false,
456    })
457}
458
459//
460// Private Helpers
461//
462
463async fn validate_network(node_registry: NodeRegistryManager, peers: Vec<Multiaddr>) -> Result<()> {
464    let mut all_peers = Vec::new();
465    for node in node_registry.nodes.read().await.iter() {
466        let node = node.read().await;
467        if let Some(peer_id) = &node.peer_id {
468            all_peers.push(*peer_id);
469        } else {
470            return Err(eyre!(
471                "The PeerId was not set for node: {}",
472                node.service_name
473            ));
474        }
475    }
476
477    // The additional peers are peers being managed outwith the node manager. This only applies
478    // when we've joined a network not being managed by the node manager. Otherwise, this list will
479    // be empty.
480    let additional_peers = peers
481        .into_iter()
482        .filter_map(|addr| {
483            addr.to_string()
484                .rsplit('/')
485                .next()
486                .and_then(|id_str| PeerId::from_str(id_str).ok())
487        })
488        .collect::<Vec<PeerId>>();
489    all_peers.extend(additional_peers);
490
491    for node in node_registry.nodes.read().await.iter() {
492        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
493        let net_info = rpc_client.network_info().await?;
494        let peers = net_info.connected_peers;
495        let peer_id = node
496            .read()
497            .await
498            .peer_id
499            .ok_or_eyre("The PeerId was not set")?;
500        debug!("Node {peer_id} has {} peers", peers.len());
501        println!("Node {peer_id} has {} peers", peers.len());
502
503        // Look for peers that are not supposed to be present in the network. This can happen if
504        // the node has connected to peers on other networks.
505        let invalid_peers: Vec<PeerId> = peers
506            .iter()
507            .filter(|peer| !all_peers.contains(peer))
508            .cloned()
509            .collect();
510        if !invalid_peers.is_empty() {
511            for invalid_peer in invalid_peers.iter() {
512                println!("Invalid peer found: {invalid_peer}");
513            }
514            error!("Network validation failed: {invalid_peers:?}");
515            return Err(eyre!("Network validation failed",));
516        }
517    }
518    Ok(())
519}
520
521#[cfg(test)]
522mod tests {
523    use super::*;
524    use ant_evm::utils::dummy_address;
525    use ant_service_management::{
526        error::Result as RpcResult,
527        rpc::{NetworkInfo, NodeInfo, RecordAddress, RpcActions},
528    };
529    use async_trait::async_trait;
530    use evmlib::CustomNetwork;
531    use libp2p_identity::PeerId;
532    use mockall::mock;
533    use mockall::predicate::*;
534    use std::str::FromStr;
535
536    mock! {
537        pub RpcClient {}
538        #[async_trait]
539        impl RpcActions for RpcClient {
540            async fn node_info(&self) -> RpcResult<NodeInfo>;
541            async fn network_info(&self) -> RpcResult<NetworkInfo>;
542            async fn record_addresses(&self) -> RpcResult<Vec<RecordAddress>>;
543            async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> RpcResult<()>;
544            async fn node_stop(&self, delay_millis: u64) -> RpcResult<()>;
545            async fn node_update(&self, delay_millis: u64) -> RpcResult<()>;
546            async fn is_node_connected_to_network(&self, timeout: std::time::Duration) -> RpcResult<()>;
547            async fn update_log_level(&self, log_levels: String) -> RpcResult<()>;
548        }
549    }
550
551    #[tokio::test]
552    async fn run_node_should_launch_the_genesis_node() -> Result<()> {
553        let mut mock_launcher = MockLauncher::new();
554        let mut mock_rpc_client = MockRpcClient::new();
555        let rewards_address = dummy_address();
556
557        let peer_id = PeerId::from_str("12D3KooWS2tpXGGTmg2AHFiDh57yPQnat49YHnyqoggzXZWpqkCR")?;
558        let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 13000);
559        mock_launcher
560            .expect_launch_node()
561            .with(
562                eq(true),
563                eq(None),
564                eq(None),
565                eq(None),
566                eq(rpc_socket_addr),
567                eq(rewards_address),
568                eq(EvmNetwork::Custom(CustomNetwork::new(
569                    "http://localhost:61611",
570                    "0x5FbDB2315678afecb367f032d93F642f64180aa3",
571                    "0x8464135c8F25Da09e49BC8782676a84730C318bC",
572                ))),
573            )
574            .times(1)
575            .returning(|_, _, _, _, _, _, _| Ok(()));
576        mock_launcher
577            .expect_wait()
578            .with(eq(100))
579            .times(1)
580            .returning(|_| ());
581        mock_launcher
582            .expect_get_antnode_path()
583            .times(1)
584            .returning(|| PathBuf::from("/usr/local/bin/antnode"));
585
586        mock_rpc_client
587            .expect_node_info()
588            .times(1)
589            .returning(move || {
590                Ok(NodeInfo {
591                    pid: 1000,
592                    peer_id,
593                    data_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}")),
594                    log_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs")),
595                    version: "0.100.12".to_string(),
596                    uptime: std::time::Duration::from_secs(1), // the service was just started
597                    wallet_balance: 0,
598                })
599            });
600        mock_rpc_client
601            .expect_network_info()
602            .times(1)
603            .returning(move || {
604                Ok(NetworkInfo {
605                    connected_peers: Vec::new(),
606                    listeners: Vec::new(),
607                })
608            });
609
610        let node = run_node(
611            RunNodeOptions {
612                first: true,
613                interval: 100,
614                log_format: None,
615                metrics_port: None,
616                node_port: None,
617                number: 1,
618                rpc_socket_addr,
619                rewards_address,
620                evm_network: EvmNetwork::Custom(CustomNetwork::new(
621                    "http://localhost:61611",
622                    "0x5FbDB2315678afecb367f032d93F642f64180aa3",
623                    "0x8464135c8F25Da09e49BC8782676a84730C318bC",
624                )),
625                version: "0.100.12".to_string(),
626            },
627            &mock_launcher,
628            &mock_rpc_client,
629        )
630        .await?;
631
632        assert!(node.initial_peers_config.first);
633        assert_eq!(node.version, "0.100.12");
634        assert_eq!(node.service_name, "antnode-local1");
635        assert_eq!(
636            node.data_dir_path,
637            PathBuf::from(format!("~/.local/share/autonomi/{peer_id}"))
638        );
639        assert_eq!(
640            node.log_dir_path,
641            PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs"))
642        );
643        assert_eq!(node.number, 1);
644        assert_eq!(node.pid, Some(1000));
645        assert_eq!(node.rpc_socket_addr, rpc_socket_addr);
646        assert_eq!(node.status, ServiceStatus::Running);
647        assert_eq!(node.antnode_path, PathBuf::from("/usr/local/bin/antnode"));
648
649        Ok(())
650    }
651}