ant_node_manager/
local.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::add_services::config::PortRange;
10use crate::helpers::{
11    check_port_availability, get_bin_version, get_start_port_if_applicable, increment_port_option,
12};
13
14use ant_bootstrap::InitialPeersConfig;
15use ant_evm::{EvmNetwork, RewardsAddress};
16use ant_logging::LogFormat;
17use ant_service_management::NodeRegistryManager;
18use ant_service_management::node::NODE_SERVICE_DATA_SCHEMA_LATEST;
19use ant_service_management::{
20    NodeServiceData, ServiceStatus,
21    control::ServiceControl,
22    rpc::{RpcActions, RpcClient},
23};
24use color_eyre::eyre::OptionExt;
25use color_eyre::{Result, eyre::eyre};
26use colored::Colorize;
27use libp2p::{Multiaddr, PeerId, multiaddr::Protocol};
28#[cfg(test)]
29use mockall::automock;
30use std::{
31    net::{IpAddr, Ipv4Addr, SocketAddr},
32    path::PathBuf,
33    process::{Command, Stdio},
34    str::FromStr,
35};
36use sysinfo::{Pid, System};
37
38#[cfg_attr(test, automock)]
39pub trait Launcher {
40    fn get_antnode_path(&self) -> PathBuf;
41    #[allow(clippy::too_many_arguments)]
42    fn launch_node(
43        &self,
44        first: bool,
45        log_format: Option<LogFormat>,
46        metrics_port: Option<u16>,
47        node_port: Option<u16>,
48        rpc_socket_addr: SocketAddr,
49        rewards_address: RewardsAddress,
50        evm_network: EvmNetwork,
51    ) -> Result<()>;
52    fn wait(&self, delay: u64);
53}
54
55#[derive(Default)]
56pub struct LocalSafeLauncher {
57    pub antnode_bin_path: PathBuf,
58}
59
60impl Launcher for LocalSafeLauncher {
61    fn get_antnode_path(&self) -> PathBuf {
62        self.antnode_bin_path.clone()
63    }
64
65    fn launch_node(
66        &self,
67        first: bool,
68        log_format: Option<LogFormat>,
69        metrics_port: Option<u16>,
70        node_port: Option<u16>,
71        rpc_socket_addr: SocketAddr,
72        rewards_address: RewardsAddress,
73        evm_network: EvmNetwork,
74    ) -> Result<()> {
75        let mut args = Vec::new();
76
77        if first {
78            args.push("--first".to_string())
79        }
80
81        if let Some(log_format) = log_format {
82            args.push("--log-format".to_string());
83            args.push(log_format.as_str().to_string());
84        }
85
86        if let Some(metrics_port) = metrics_port {
87            args.push("--metrics-server-port".to_string());
88            args.push(metrics_port.to_string());
89        }
90
91        if let Some(node_port) = node_port {
92            args.push("--port".to_string());
93            args.push(node_port.to_string());
94        }
95
96        args.push("--local".to_string());
97        args.push("--rpc".to_string());
98        args.push(rpc_socket_addr.to_string());
99
100        args.push("--rewards-address".to_string());
101        args.push(rewards_address.to_string());
102
103        args.push(format!("evm-{}", evm_network.identifier()));
104
105        if let EvmNetwork::Custom(custom) = evm_network {
106            args.push("--rpc-url".to_string());
107            args.push(custom.rpc_url_http.to_string());
108            args.push("--payment-token-address".to_string());
109            args.push(custom.payment_token_address.to_string());
110            args.push("--data-payments-address".to_string());
111            args.push(custom.data_payments_address.to_string());
112
113            if let Some(merkle_addr) = custom.merkle_payments_address {
114                args.push("--merkle-payments-address".to_string());
115                args.push(merkle_addr.to_string());
116            }
117        }
118
119        Command::new(self.antnode_bin_path.clone())
120            .args(args)
121            .stdout(Stdio::inherit())
122            .stderr(Stdio::inherit())
123            .spawn()
124            .inspect_err(|err| error!("Error while spawning node process: {err:?}"))?;
125
126        Ok(())
127    }
128
129    /// Provide a delay for the service to start or stop.
130    ///
131    /// This is wrapped mainly just for unit testing.
132    fn wait(&self, delay: u64) {
133        std::thread::sleep(std::time::Duration::from_millis(delay));
134    }
135}
136
137/// Kill any running EVM testnet processes
138fn kill_evm_testnet_processes(system: &mut System) {
139    // Look for anvil processes (which are used by evm-testnet)
140    for (pid, process) in system.processes() {
141        let process_name = process.name().to_lowercase();
142        if process_name.contains("anvil") || process_name.contains("evm-testnet") {
143            debug!("Killing EVM testnet process: {} ({})", process_name, pid);
144            process.kill();
145            println!("  {} Killed EVM testnet process ({})", "✓".green(), pid);
146        }
147    }
148}
149
150pub async fn kill_network(
151    node_registry: NodeRegistryManager,
152    keep_directories: bool,
153) -> Result<()> {
154    let mut system = System::new_all();
155    system.refresh_all();
156
157    let genesis_data_path = dirs_next::data_dir()
158        .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
159        .join("autonomi")
160        .join("test_genesis");
161    if genesis_data_path.is_dir() {
162        debug!("Removed genesis data directory");
163        std::fs::remove_dir_all(genesis_data_path)?;
164    }
165
166    kill_evm_testnet_processes(&mut system);
167
168    for node in node_registry.nodes.read().await.iter() {
169        let node = node.read().await;
170        println!("{}:", node.service_name);
171        // If the PID is not set it means the `status` command ran and determined the node was
172        // already dead anyway, so we don't need to do anything.
173        if let Some(pid) = node.pid {
174            // It could be possible that None would be returned here, if the process had already
175            // died, but the `status` command had not ran. In that case, we don't need to do
176            // anything anyway.
177            if let Some(process) = system.process(Pid::from(pid as usize)) {
178                process.kill();
179                debug!("Killed node: {} ({})", node.service_name, pid);
180                println!("  {} Killed process", "✓".green());
181            }
182        }
183
184        if !keep_directories {
185            // At this point we don't allow path overrides, so deleting the data directory will clear
186            // the log directory also.
187            if let Err(e) = std::fs::remove_dir_all(&node.data_dir_path) {
188                error!("Failed to remove node data directory: {:?}", e);
189                println!(
190                    "  {} Failed to remove {}: {e}",
191                    "✗".red(),
192                    node.data_dir_path.to_string_lossy()
193                );
194            } else {
195                debug!("Removed node data directory: {:?}", node.data_dir_path);
196                println!(
197                    "  {} Removed {}",
198                    "✓".green(),
199                    node.data_dir_path.to_string_lossy()
200                );
201            }
202        }
203    }
204
205    Ok(())
206}
207
208pub struct LocalNetworkOptions {
209    pub antnode_bin_path: PathBuf,
210    pub enable_metrics_server: bool,
211    pub join: bool,
212    pub interval: u64,
213    pub metrics_port: Option<PortRange>,
214    pub node_port: Option<PortRange>,
215    pub node_count: u16,
216    pub peers: Option<Vec<Multiaddr>>,
217    pub rpc_port: Option<PortRange>,
218    pub skip_validation: bool,
219    pub log_format: Option<LogFormat>,
220    pub rewards_address: RewardsAddress,
221    pub evm_network: EvmNetwork,
222}
223
224pub async fn run_network(
225    options: LocalNetworkOptions,
226    node_registry: NodeRegistryManager,
227    service_control: &dyn ServiceControl,
228) -> Result<()> {
229    info!("Running local network");
230
231    // Check port availability when joining a local network.
232    if let Some(port_range) = &options.node_port {
233        port_range.validate(options.node_count)?;
234        check_port_availability(port_range, &node_registry.nodes).await?;
235    }
236
237    if let Some(port_range) = &options.metrics_port {
238        port_range.validate(options.node_count)?;
239        check_port_availability(port_range, &node_registry.nodes).await?;
240    }
241
242    if let Some(port_range) = &options.rpc_port {
243        port_range.validate(options.node_count)?;
244        check_port_availability(port_range, &node_registry.nodes).await?;
245    }
246
247    let launcher = LocalSafeLauncher {
248        antnode_bin_path: options.antnode_bin_path.to_path_buf(),
249    };
250
251    let mut node_port = get_start_port_if_applicable(options.node_port);
252    let mut metrics_port = get_start_port_if_applicable(options.metrics_port);
253    let mut rpc_port = get_start_port_if_applicable(options.rpc_port);
254
255    // Start the bootstrap node if it doesnt exist.
256    let (bootstrap_peers, start) = if options.join {
257        if let Some(peers) = options.peers {
258            (peers, 1)
259        } else {
260            let mut peers = Vec::new();
261            for node in node_registry.nodes.read().await.iter() {
262                let node = node.read().await;
263                if let Some(listen_addr) = &node.listen_addr {
264                    peers.extend(listen_addr.clone());
265                }
266            }
267            (peers, 1)
268        }
269    } else {
270        let rpc_free_port = if let Some(port) = rpc_port {
271            port
272        } else {
273            service_control.get_available_port()?
274        };
275        let metrics_free_port = if let Some(port) = metrics_port {
276            Some(port)
277        } else if options.enable_metrics_server {
278            Some(service_control.get_available_port()?)
279        } else {
280            None
281        };
282        let rpc_socket_addr =
283            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
284        let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
285
286        let number = (node_registry.nodes.read().await.len() as u16) + 1;
287        let node = run_node(
288            RunNodeOptions {
289                first: true,
290                metrics_port: metrics_free_port,
291                node_port,
292                interval: options.interval,
293                log_format: options.log_format,
294                number,
295                rpc_socket_addr,
296                rewards_address: options.rewards_address,
297                evm_network: options.evm_network.clone(),
298                version: get_bin_version(&launcher.get_antnode_path())?,
299            },
300            &launcher,
301            &rpc_client,
302        )
303        .await?;
304        node_registry.push_node(node.clone()).await;
305        let bootstrap_peers = node
306            .listen_addr
307            .ok_or_eyre("The listen address was not set")?;
308        node_port = increment_port_option(node_port);
309        metrics_port = increment_port_option(metrics_port);
310        rpc_port = increment_port_option(rpc_port);
311        (bootstrap_peers, 2)
312    };
313    node_registry.save().await?;
314
315    for _ in start..=options.node_count {
316        let rpc_free_port = if let Some(port) = rpc_port {
317            port
318        } else {
319            service_control.get_available_port()?
320        };
321        let metrics_free_port = if let Some(port) = metrics_port {
322            Some(port)
323        } else if options.enable_metrics_server {
324            Some(service_control.get_available_port()?)
325        } else {
326            None
327        };
328        let rpc_socket_addr =
329            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
330        let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
331
332        let number = (node_registry.nodes.read().await.len() as u16) + 1;
333        let node = run_node(
334            RunNodeOptions {
335                first: false,
336                metrics_port: metrics_free_port,
337                node_port,
338                interval: options.interval,
339                log_format: options.log_format,
340                number,
341                rpc_socket_addr,
342                rewards_address: options.rewards_address,
343                evm_network: options.evm_network.clone(),
344                version: get_bin_version(&launcher.get_antnode_path())?,
345            },
346            &launcher,
347            &rpc_client,
348        )
349        .await?;
350        node_registry.push_node(node).await;
351
352        // We save the node registry for each launch because it's possible any node can fail to
353        // launch, or maybe the validation will fail. In the error case, we will want to use the
354        // `kill` command for the nodes that we did spin up. The `kill` command works on the basis
355        // of what's in the node registry.
356        node_registry.save().await?;
357
358        node_port = increment_port_option(node_port);
359        metrics_port = increment_port_option(metrics_port);
360        rpc_port = increment_port_option(rpc_port);
361    }
362
363    if !options.skip_validation {
364        debug!("Waiting for 10 seconds before validating the network...");
365        println!("Waiting for 10 seconds before validating the network...");
366        std::thread::sleep(std::time::Duration::from_secs(10));
367        validate_network(node_registry, bootstrap_peers.clone()).await?;
368    }
369
370    Ok(())
371}
372
373pub struct RunNodeOptions {
374    pub first: bool,
375    pub interval: u64,
376    pub log_format: Option<LogFormat>,
377    pub metrics_port: Option<u16>,
378    pub node_port: Option<u16>,
379    pub number: u16,
380    pub rpc_socket_addr: SocketAddr,
381    pub rewards_address: RewardsAddress,
382    pub evm_network: EvmNetwork,
383    pub version: String,
384}
385
386pub async fn run_node(
387    run_options: RunNodeOptions,
388    launcher: &dyn Launcher,
389    rpc_client: &dyn RpcActions,
390) -> Result<NodeServiceData> {
391    // For local network, after the first node got launched,
392    // it takes little bit time to setup the contact_service.
393    // Hence need an extra wait before launching the second node.
394    if run_options.number == 2 {
395        // in milliseconds
396        launcher.wait(5000);
397    }
398
399    info!("Launching node {}...", run_options.number);
400    println!("Launching node {}...", run_options.number);
401    launcher.launch_node(
402        run_options.first,
403        run_options.log_format,
404        run_options.metrics_port,
405        run_options.node_port,
406        run_options.rpc_socket_addr,
407        run_options.rewards_address,
408        run_options.evm_network.clone(),
409    )?;
410    launcher.wait(run_options.interval);
411
412    let node_info = rpc_client.node_info().await?;
413    let peer_id = node_info.peer_id;
414    let network_info = rpc_client.network_info().await?;
415    let connected_peers = Some(network_info.connected_peers);
416    let listen_addrs = network_info
417        .listeners
418        .into_iter()
419        .map(|addr| addr.with(Protocol::P2p(node_info.peer_id)))
420        .collect();
421
422    Ok(NodeServiceData {
423        alpha: false,
424        antnode_path: launcher.get_antnode_path(),
425        auto_restart: false,
426        connected_peers,
427        data_dir_path: node_info.data_path,
428        evm_network: run_options.evm_network,
429        relay: false,
430        initial_peers_config: InitialPeersConfig {
431            first: run_options.first,
432            addrs: vec![],
433            network_contacts_url: vec![],
434            local: true,
435            ignore_cache: true,
436            bootstrap_cache_dir: None,
437        },
438        listen_addr: Some(listen_addrs),
439        log_dir_path: node_info.log_path,
440        log_format: run_options.log_format,
441        max_archived_log_files: None,
442        max_log_files: None,
443        metrics_port: run_options.metrics_port,
444        network_id: None,
445        node_ip: None,
446        node_port: run_options.node_port,
447        number: run_options.number,
448        peer_id: Some(peer_id),
449        pid: Some(node_info.pid),
450        rewards_address: run_options.rewards_address,
451        reward_balance: None,
452        rpc_socket_addr: run_options.rpc_socket_addr,
453        schema_version: NODE_SERVICE_DATA_SCHEMA_LATEST,
454        status: ServiceStatus::Running,
455        service_name: format!("antnode-local{}", run_options.number),
456        no_upnp: false,
457        user: None,
458        user_mode: false,
459        version: run_options.version.to_string(),
460        write_older_cache_files: false,
461    })
462}
463
464//
465// Private Helpers
466//
467
468async fn validate_network(node_registry: NodeRegistryManager, peers: Vec<Multiaddr>) -> Result<()> {
469    let mut all_peers = Vec::new();
470    for node in node_registry.nodes.read().await.iter() {
471        let node = node.read().await;
472        if let Some(peer_id) = &node.peer_id {
473            all_peers.push(*peer_id);
474        } else {
475            return Err(eyre!(
476                "The PeerId was not set for node: {}",
477                node.service_name
478            ));
479        }
480    }
481
482    // The additional peers are peers being managed outwith the node manager. This only applies
483    // when we've joined a network not being managed by the node manager. Otherwise, this list will
484    // be empty.
485    let additional_peers = peers
486        .into_iter()
487        .filter_map(|addr| {
488            addr.to_string()
489                .rsplit('/')
490                .next()
491                .and_then(|id_str| PeerId::from_str(id_str).ok())
492        })
493        .collect::<Vec<PeerId>>();
494    all_peers.extend(additional_peers);
495
496    for node in node_registry.nodes.read().await.iter() {
497        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
498        let net_info = rpc_client.network_info().await?;
499        let peers = net_info.connected_peers;
500        let peer_id = node
501            .read()
502            .await
503            .peer_id
504            .ok_or_eyre("The PeerId was not set")?;
505        debug!("Node {peer_id} has {} peers", peers.len());
506        println!("Node {peer_id} has {} peers", peers.len());
507
508        // Look for peers that are not supposed to be present in the network. This can happen if
509        // the node has connected to peers on other networks.
510        let invalid_peers: Vec<PeerId> = peers
511            .iter()
512            .filter(|peer| !all_peers.contains(peer))
513            .cloned()
514            .collect();
515        if !invalid_peers.is_empty() {
516            for invalid_peer in invalid_peers.iter() {
517                println!("Invalid peer found: {invalid_peer}");
518            }
519            error!("Network validation failed: {invalid_peers:?}");
520            return Err(eyre!("Network validation failed",));
521        }
522    }
523    Ok(())
524}
525
526#[cfg(test)]
527mod tests {
528    use super::*;
529    use ant_evm::utils::dummy_address;
530    use ant_service_management::{
531        error::Result as RpcResult,
532        rpc::{NetworkInfo, NodeInfo, RecordAddress, RpcActions},
533    };
534    use async_trait::async_trait;
535    use evmlib::CustomNetwork;
536    use libp2p_identity::PeerId;
537    use mockall::mock;
538    use mockall::predicate::*;
539    use std::str::FromStr;
540
541    mock! {
542        pub RpcClient {}
543        #[async_trait]
544        impl RpcActions for RpcClient {
545            async fn node_info(&self) -> RpcResult<NodeInfo>;
546            async fn network_info(&self) -> RpcResult<NetworkInfo>;
547            async fn record_addresses(&self) -> RpcResult<Vec<RecordAddress>>;
548            async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> RpcResult<()>;
549            async fn node_stop(&self, delay_millis: u64) -> RpcResult<()>;
550            async fn node_update(&self, delay_millis: u64) -> RpcResult<()>;
551            async fn is_node_connected_to_network(&self, timeout: std::time::Duration) -> RpcResult<()>;
552            async fn update_log_level(&self, log_levels: String) -> RpcResult<()>;
553        }
554    }
555
556    #[tokio::test]
557    async fn run_node_should_launch_the_genesis_node() -> Result<()> {
558        let mut mock_launcher = MockLauncher::new();
559        let mut mock_rpc_client = MockRpcClient::new();
560        let rewards_address = dummy_address();
561
562        let peer_id = PeerId::from_str("12D3KooWS2tpXGGTmg2AHFiDh57yPQnat49YHnyqoggzXZWpqkCR")?;
563        let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 13000);
564        mock_launcher
565            .expect_launch_node()
566            .with(
567                eq(true),
568                eq(None),
569                eq(None),
570                eq(None),
571                eq(rpc_socket_addr),
572                eq(rewards_address),
573                eq(EvmNetwork::Custom(CustomNetwork::new(
574                    "http://localhost:61611",
575                    "0x5FbDB2315678afecb367f032d93F642f64180aa3",
576                    "0x8464135c8F25Da09e49BC8782676a84730C318bC",
577                    Some("0x663F3ad617193148711d28f5334eE4Ed07016602"),
578                ))),
579            )
580            .times(1)
581            .returning(|_, _, _, _, _, _, _| Ok(()));
582        mock_launcher
583            .expect_wait()
584            .with(eq(100))
585            .times(1)
586            .returning(|_| ());
587        mock_launcher
588            .expect_get_antnode_path()
589            .times(1)
590            .returning(|| PathBuf::from("/usr/local/bin/antnode"));
591
592        mock_rpc_client
593            .expect_node_info()
594            .times(1)
595            .returning(move || {
596                Ok(NodeInfo {
597                    pid: 1000,
598                    peer_id,
599                    data_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}")),
600                    log_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs")),
601                    version: "0.100.12".to_string(),
602                    uptime: std::time::Duration::from_secs(1), // the service was just started
603                    wallet_balance: 0,
604                })
605            });
606        mock_rpc_client
607            .expect_network_info()
608            .times(1)
609            .returning(move || {
610                Ok(NetworkInfo {
611                    connected_peers: Vec::new(),
612                    listeners: Vec::new(),
613                })
614            });
615
616        let node = run_node(
617            RunNodeOptions {
618                first: true,
619                interval: 100,
620                log_format: None,
621                metrics_port: None,
622                node_port: None,
623                number: 1,
624                rpc_socket_addr,
625                rewards_address,
626                evm_network: EvmNetwork::Custom(CustomNetwork::new(
627                    "http://localhost:61611",
628                    "0x5FbDB2315678afecb367f032d93F642f64180aa3",
629                    "0x8464135c8F25Da09e49BC8782676a84730C318bC",
630                    Some("0x663F3ad617193148711d28f5334eE4Ed07016602"),
631                )),
632                version: "0.100.12".to_string(),
633            },
634            &mock_launcher,
635            &mock_rpc_client,
636        )
637        .await?;
638
639        assert!(node.initial_peers_config.first);
640        assert_eq!(node.version, "0.100.12");
641        assert_eq!(node.service_name, "antnode-local1");
642        assert_eq!(
643            node.data_dir_path,
644            PathBuf::from(format!("~/.local/share/autonomi/{peer_id}"))
645        );
646        assert_eq!(
647            node.log_dir_path,
648            PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs"))
649        );
650        assert_eq!(node.number, 1);
651        assert_eq!(node.pid, Some(1000));
652        assert_eq!(node.rpc_socket_addr, rpc_socket_addr);
653        assert_eq!(node.status, ServiceStatus::Running);
654        assert_eq!(node.antnode_path, PathBuf::from("/usr/local/bin/antnode"));
655
656        Ok(())
657    }
658}