ant_node_manager/
local.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::add_services::config::PortRange;
10use crate::helpers::{
11    check_port_availability, get_bin_version, get_start_port_if_applicable, increment_port_option,
12};
13
14use ant_bootstrap::InitialPeersConfig;
15use ant_evm::{EvmNetwork, RewardsAddress};
16use ant_logging::LogFormat;
17use ant_service_management::node::NODE_SERVICE_DATA_SCHEMA_LATEST;
18use ant_service_management::{
19    control::ServiceControl,
20    rpc::{RpcActions, RpcClient},
21    NodeRegistry, NodeServiceData, ServiceStatus,
22};
23use color_eyre::eyre::OptionExt;
24use color_eyre::{eyre::eyre, Result};
25use colored::Colorize;
26use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
27#[cfg(test)]
28use mockall::automock;
29use std::{
30    net::{IpAddr, Ipv4Addr, SocketAddr},
31    path::PathBuf,
32    process::{Command, Stdio},
33    str::FromStr,
34};
35use sysinfo::{Pid, System};
36
37#[cfg_attr(test, automock)]
38pub trait Launcher {
39    fn get_antnode_path(&self) -> PathBuf;
40    #[allow(clippy::too_many_arguments)]
41    fn launch_node(
42        &self,
43        first: bool,
44        log_format: Option<LogFormat>,
45        metrics_port: Option<u16>,
46        node_port: Option<u16>,
47        rpc_socket_addr: SocketAddr,
48        rewards_address: RewardsAddress,
49        evm_network: Option<EvmNetwork>,
50    ) -> Result<()>;
51    fn wait(&self, delay: u64);
52}
53
54#[derive(Default)]
55pub struct LocalSafeLauncher {
56    pub antnode_bin_path: PathBuf,
57}
58
59impl Launcher for LocalSafeLauncher {
60    fn get_antnode_path(&self) -> PathBuf {
61        self.antnode_bin_path.clone()
62    }
63
64    fn launch_node(
65        &self,
66        first: bool,
67        log_format: Option<LogFormat>,
68        metrics_port: Option<u16>,
69        node_port: Option<u16>,
70        rpc_socket_addr: SocketAddr,
71        rewards_address: RewardsAddress,
72        evm_network: Option<EvmNetwork>,
73    ) -> Result<()> {
74        let mut args = Vec::new();
75
76        if first {
77            args.push("--first".to_string())
78        }
79
80        if let Some(log_format) = log_format {
81            args.push("--log-format".to_string());
82            args.push(log_format.as_str().to_string());
83        }
84
85        if let Some(metrics_port) = metrics_port {
86            args.push("--metrics-server-port".to_string());
87            args.push(metrics_port.to_string());
88        }
89
90        if let Some(node_port) = node_port {
91            args.push("--port".to_string());
92            args.push(node_port.to_string());
93        }
94
95        args.push("--local".to_string());
96        args.push("--rpc".to_string());
97        args.push(rpc_socket_addr.to_string());
98
99        args.push("--rewards-address".to_string());
100        args.push(rewards_address.to_string());
101
102        if let Some(network) = evm_network {
103            args.push(format!("evm-{}", network.identifier()));
104
105            if let EvmNetwork::Custom(custom) = network {
106                args.push("--rpc-url".to_string());
107                args.push(custom.rpc_url_http.to_string());
108                args.push("--payment-token-address".to_string());
109                args.push(custom.payment_token_address.to_string());
110                args.push("--data-payments-address".to_string());
111                args.push(custom.data_payments_address.to_string());
112            }
113        }
114
115        Command::new(self.antnode_bin_path.clone())
116            .args(args)
117            .stdout(Stdio::inherit())
118            .stderr(Stdio::inherit())
119            .spawn()
120            .inspect_err(|err| error!("Error while spawning node process: {err:?}"))?;
121
122        Ok(())
123    }
124
125    /// Provide a delay for the service to start or stop.
126    ///
127    /// This is wrapped mainly just for unit testing.
128    fn wait(&self, delay: u64) {
129        std::thread::sleep(std::time::Duration::from_millis(delay));
130    }
131}
132
133pub fn kill_network(node_registry: &NodeRegistry, keep_directories: bool) -> Result<()> {
134    let mut system = System::new_all();
135    system.refresh_all();
136
137    // It's possible that the faucet was not spun up because the network failed the validation
138    // process. If it wasn't running, we obviously don't need to do anything.
139    if let Some(faucet) = &node_registry.faucet {
140        // If we're here, the faucet was spun up. However, it's possible for the process to have
141        // died since then. In that case, we don't need to do anything.
142        // I think the use of `unwrap` is justified here, because for a local network, if the
143        // faucet is not `None`, the pid also must have a value.
144        if let Some(process) = system.process(Pid::from(faucet.pid.unwrap() as usize)) {
145            process.kill();
146            debug!("Faucet has been killed");
147            println!("{} Killed faucet", "✓".green());
148        }
149    }
150
151    let faucet_data_path = dirs_next::data_dir()
152        .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
153        .join("autonomi")
154        .join("test_faucet");
155    if faucet_data_path.is_dir() {
156        std::fs::remove_dir_all(faucet_data_path)?;
157        debug!("Removed faucet data directory");
158    }
159    let genesis_data_path = dirs_next::data_dir()
160        .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
161        .join("autonomi")
162        .join("test_genesis");
163    if genesis_data_path.is_dir() {
164        debug!("Removed genesis data directory");
165        std::fs::remove_dir_all(genesis_data_path)?;
166    }
167
168    for node in node_registry.nodes.iter() {
169        println!("{}:", node.service_name);
170        // If the PID is not set it means the `status` command ran and determined the node was
171        // already dead anyway, so we don't need to do anything.
172        if let Some(pid) = node.pid {
173            // It could be possible that None would be returned here, if the process had already
174            // died, but the `status` command had not ran. In that case, we don't need to do
175            // anything anyway.
176            if let Some(process) = system.process(Pid::from(pid as usize)) {
177                process.kill();
178                debug!("Killed node: {} ({})", node.service_name, pid);
179                println!("  {} Killed process", "✓".green());
180            }
181        }
182
183        if !keep_directories {
184            // At this point we don't allow path overrides, so deleting the data directory will clear
185            // the log directory also.
186            if let Err(e) = std::fs::remove_dir_all(&node.data_dir_path) {
187                error!("Failed to remove node data directory: {:?}", e);
188                println!(
189                    "  {} Failed to remove {}: {e}",
190                    "✗".red(),
191                    node.data_dir_path.to_string_lossy()
192                );
193            } else {
194                debug!("Removed node data directory: {:?}", node.data_dir_path);
195                println!(
196                    "  {} Removed {}",
197                    "✓".green(),
198                    node.data_dir_path.to_string_lossy()
199                );
200            }
201        }
202    }
203
204    Ok(())
205}
206
207pub struct LocalNetworkOptions {
208    pub antnode_bin_path: PathBuf,
209    pub enable_metrics_server: bool,
210    pub join: bool,
211    pub interval: u64,
212    pub metrics_port: Option<PortRange>,
213    pub node_port: Option<PortRange>,
214    pub node_count: u16,
215    pub peers: Option<Vec<Multiaddr>>,
216    pub rpc_port: Option<PortRange>,
217    pub skip_validation: bool,
218    pub log_format: Option<LogFormat>,
219    pub rewards_address: RewardsAddress,
220    pub evm_network: Option<EvmNetwork>,
221}
222
223pub async fn run_network(
224    options: LocalNetworkOptions,
225    node_registry: &mut NodeRegistry,
226    service_control: &dyn ServiceControl,
227) -> Result<()> {
228    info!("Running local network");
229
230    // Check port availability when joining a local network.
231    if let Some(port_range) = &options.node_port {
232        port_range.validate(options.node_count)?;
233        check_port_availability(port_range, &node_registry.nodes)?;
234    }
235
236    if let Some(port_range) = &options.metrics_port {
237        port_range.validate(options.node_count)?;
238        check_port_availability(port_range, &node_registry.nodes)?;
239    }
240
241    if let Some(port_range) = &options.rpc_port {
242        port_range.validate(options.node_count)?;
243        check_port_availability(port_range, &node_registry.nodes)?;
244    }
245
246    let launcher = LocalSafeLauncher {
247        antnode_bin_path: options.antnode_bin_path.to_path_buf(),
248    };
249
250    let mut node_port = get_start_port_if_applicable(options.node_port);
251    let mut metrics_port = get_start_port_if_applicable(options.metrics_port);
252    let mut rpc_port = get_start_port_if_applicable(options.rpc_port);
253
254    // Start the bootstrap node if it doesnt exist.
255    let (bootstrap_peers, start) = if options.join {
256        if let Some(peers) = options.peers {
257            (peers, 1)
258        } else {
259            let peer = node_registry
260                .nodes
261                .iter()
262                .find_map(|n| n.listen_addr.clone())
263                .ok_or_eyre("Unable to obtain a peer to connect to")?;
264            (peer, 1)
265        }
266    } else {
267        let rpc_free_port = if let Some(port) = rpc_port {
268            port
269        } else {
270            service_control.get_available_port()?
271        };
272        let metrics_free_port = if let Some(port) = metrics_port {
273            Some(port)
274        } else if options.enable_metrics_server {
275            Some(service_control.get_available_port()?)
276        } else {
277            None
278        };
279        let rpc_socket_addr =
280            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
281        let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
282
283        let number = (node_registry.nodes.len() as u16) + 1;
284        let node = run_node(
285            RunNodeOptions {
286                first: true,
287                metrics_port: metrics_free_port,
288                node_port,
289                interval: options.interval,
290                log_format: options.log_format,
291                number,
292                rpc_socket_addr,
293                rewards_address: options.rewards_address,
294                evm_network: options.evm_network.clone(),
295                version: get_bin_version(&launcher.get_antnode_path())?,
296            },
297            &launcher,
298            &rpc_client,
299        )
300        .await?;
301        node_registry.nodes.push(node.clone());
302        let bootstrap_peers = node
303            .listen_addr
304            .ok_or_eyre("The listen address was not set")?;
305        node_port = increment_port_option(node_port);
306        metrics_port = increment_port_option(metrics_port);
307        rpc_port = increment_port_option(rpc_port);
308        (bootstrap_peers, 2)
309    };
310    node_registry.save()?;
311
312    for _ in start..=options.node_count {
313        let rpc_free_port = if let Some(port) = rpc_port {
314            port
315        } else {
316            service_control.get_available_port()?
317        };
318        let metrics_free_port = if let Some(port) = metrics_port {
319            Some(port)
320        } else if options.enable_metrics_server {
321            Some(service_control.get_available_port()?)
322        } else {
323            None
324        };
325        let rpc_socket_addr =
326            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
327        let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
328
329        let number = (node_registry.nodes.len() as u16) + 1;
330        let node = run_node(
331            RunNodeOptions {
332                first: false,
333                metrics_port: metrics_free_port,
334                node_port,
335                interval: options.interval,
336                log_format: options.log_format,
337                number,
338                rpc_socket_addr,
339                rewards_address: options.rewards_address,
340                evm_network: options.evm_network.clone(),
341                version: get_bin_version(&launcher.get_antnode_path())?,
342            },
343            &launcher,
344            &rpc_client,
345        )
346        .await?;
347        node_registry.nodes.push(node);
348
349        // We save the node registry for each launch because it's possible any node can fail to
350        // launch, or maybe the validation will fail. In the error case, we will want to use the
351        // `kill` command for the nodes that we did spin up. The `kill` command works on the basis
352        // of what's in the node registry.
353        node_registry.save()?;
354
355        node_port = increment_port_option(node_port);
356        metrics_port = increment_port_option(metrics_port);
357        rpc_port = increment_port_option(rpc_port);
358    }
359
360    if !options.skip_validation {
361        debug!("Waiting for 10 seconds before validating the network...");
362        println!("Waiting for 10 seconds before validating the network...");
363        std::thread::sleep(std::time::Duration::from_secs(10));
364        validate_network(node_registry, bootstrap_peers.clone()).await?;
365    }
366
367    Ok(())
368}
369
370pub struct RunNodeOptions {
371    pub first: bool,
372    pub interval: u64,
373    pub log_format: Option<LogFormat>,
374    pub metrics_port: Option<u16>,
375    pub node_port: Option<u16>,
376    pub number: u16,
377    pub rpc_socket_addr: SocketAddr,
378    pub rewards_address: RewardsAddress,
379    pub evm_network: Option<EvmNetwork>,
380    pub version: String,
381}
382
383pub async fn run_node(
384    run_options: RunNodeOptions,
385    launcher: &dyn Launcher,
386    rpc_client: &dyn RpcActions,
387) -> Result<NodeServiceData> {
388    info!("Launching node {}...", run_options.number);
389    println!("Launching node {}...", run_options.number);
390    launcher.launch_node(
391        run_options.first,
392        run_options.log_format,
393        run_options.metrics_port,
394        run_options.node_port,
395        run_options.rpc_socket_addr,
396        run_options.rewards_address,
397        run_options.evm_network.clone(),
398    )?;
399    launcher.wait(run_options.interval);
400
401    let node_info = rpc_client.node_info().await?;
402    let peer_id = node_info.peer_id;
403    let network_info = rpc_client.network_info().await?;
404    let connected_peers = Some(network_info.connected_peers);
405    let listen_addrs = network_info
406        .listeners
407        .into_iter()
408        .map(|addr| addr.with(Protocol::P2p(node_info.peer_id)))
409        .collect();
410
411    Ok(NodeServiceData {
412        alpha: false,
413        antnode_path: launcher.get_antnode_path(),
414        auto_restart: false,
415        connected_peers,
416        data_dir_path: node_info.data_path,
417        evm_network: run_options.evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
418        relay: false,
419        initial_peers_config: InitialPeersConfig {
420            first: run_options.first,
421            addrs: vec![],
422            network_contacts_url: vec![],
423            local: true,
424            ignore_cache: true,
425            bootstrap_cache_dir: None,
426        },
427        listen_addr: Some(listen_addrs),
428        log_dir_path: node_info.log_path,
429        log_format: run_options.log_format,
430        max_archived_log_files: None,
431        max_log_files: None,
432        metrics_port: run_options.metrics_port,
433        network_id: None,
434        node_ip: None,
435        node_port: run_options.node_port,
436        number: run_options.number,
437        peer_id: Some(peer_id),
438        pid: Some(node_info.pid),
439        rewards_address: run_options.rewards_address,
440        reward_balance: None,
441        rpc_socket_addr: run_options.rpc_socket_addr,
442        schema_version: NODE_SERVICE_DATA_SCHEMA_LATEST,
443        status: ServiceStatus::Running,
444        service_name: format!("antnode-local{}", run_options.number),
445        no_upnp: false,
446        user: None,
447        user_mode: false,
448        version: run_options.version.to_string(),
449    })
450}
451
452//
453// Private Helpers
454//
455
456async fn validate_network(node_registry: &mut NodeRegistry, peers: Vec<Multiaddr>) -> Result<()> {
457    let mut all_peers = node_registry
458        .nodes
459        .iter()
460        .map(|n| n.peer_id.ok_or_eyre("The PeerId was not set"))
461        .collect::<Result<Vec<PeerId>>>()?;
462    // The additional peers are peers being managed outwith the node manager. This only applies
463    // when we've joined a network not being managed by the node manager. Otherwise, this list will
464    // be empty.
465    let additional_peers = peers
466        .into_iter()
467        .filter_map(|addr| {
468            addr.to_string()
469                .rsplit('/')
470                .next()
471                .and_then(|id_str| PeerId::from_str(id_str).ok())
472        })
473        .collect::<Vec<PeerId>>();
474    all_peers.extend(additional_peers);
475
476    for node in node_registry.nodes.iter() {
477        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
478        let net_info = rpc_client.network_info().await?;
479        let peers = net_info.connected_peers;
480        let peer_id = node.peer_id.ok_or_eyre("The PeerId was not set")?;
481        debug!("Node {peer_id} has {} peers", peers.len());
482        println!("Node {peer_id} has {} peers", peers.len());
483
484        // Look for peers that are not supposed to be present in the network. This can happen if
485        // the node has connected to peers on other networks.
486        let invalid_peers: Vec<PeerId> = peers
487            .iter()
488            .filter(|peer| !all_peers.contains(peer))
489            .cloned()
490            .collect();
491        if !invalid_peers.is_empty() {
492            for invalid_peer in invalid_peers.iter() {
493                println!("Invalid peer found: {}", invalid_peer);
494            }
495            error!("Network validation failed: {invalid_peers:?}");
496            return Err(eyre!("Network validation failed",));
497        }
498    }
499    Ok(())
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use ant_evm::utils::dummy_address;
506    use ant_service_management::{
507        error::Result as RpcResult,
508        rpc::{NetworkInfo, NodeInfo, RecordAddress, RpcActions},
509    };
510    use async_trait::async_trait;
511    use libp2p_identity::PeerId;
512    use mockall::mock;
513    use mockall::predicate::*;
514    use std::str::FromStr;
515
516    mock! {
517        pub RpcClient {}
518        #[async_trait]
519        impl RpcActions for RpcClient {
520            async fn node_info(&self) -> RpcResult<NodeInfo>;
521            async fn network_info(&self) -> RpcResult<NetworkInfo>;
522            async fn record_addresses(&self) -> RpcResult<Vec<RecordAddress>>;
523            async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> RpcResult<()>;
524            async fn node_stop(&self, delay_millis: u64) -> RpcResult<()>;
525            async fn node_update(&self, delay_millis: u64) -> RpcResult<()>;
526            async fn is_node_connected_to_network(&self, timeout: std::time::Duration) -> RpcResult<()>;
527            async fn update_log_level(&self, log_levels: String) -> RpcResult<()>;
528        }
529    }
530
531    #[tokio::test]
532    async fn run_node_should_launch_the_genesis_node() -> Result<()> {
533        let mut mock_launcher = MockLauncher::new();
534        let mut mock_rpc_client = MockRpcClient::new();
535        let rewards_address = dummy_address();
536
537        let peer_id = PeerId::from_str("12D3KooWS2tpXGGTmg2AHFiDh57yPQnat49YHnyqoggzXZWpqkCR")?;
538        let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 13000);
539        mock_launcher
540            .expect_launch_node()
541            .with(
542                eq(true),
543                eq(None),
544                eq(None),
545                eq(None),
546                eq(rpc_socket_addr),
547                eq(rewards_address),
548                eq(None),
549            )
550            .times(1)
551            .returning(|_, _, _, _, _, _, _| Ok(()));
552        mock_launcher
553            .expect_wait()
554            .with(eq(100))
555            .times(1)
556            .returning(|_| ());
557        mock_launcher
558            .expect_get_antnode_path()
559            .times(1)
560            .returning(|| PathBuf::from("/usr/local/bin/antnode"));
561
562        mock_rpc_client
563            .expect_node_info()
564            .times(1)
565            .returning(move || {
566                Ok(NodeInfo {
567                    pid: 1000,
568                    peer_id,
569                    data_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}")),
570                    log_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs")),
571                    version: "0.100.12".to_string(),
572                    uptime: std::time::Duration::from_secs(1), // the service was just started
573                    wallet_balance: 0,
574                })
575            });
576        mock_rpc_client
577            .expect_network_info()
578            .times(1)
579            .returning(move || {
580                Ok(NetworkInfo {
581                    connected_peers: Vec::new(),
582                    listeners: Vec::new(),
583                })
584            });
585
586        let node = run_node(
587            RunNodeOptions {
588                first: true,
589                interval: 100,
590                log_format: None,
591                metrics_port: None,
592                node_port: None,
593                number: 1,
594                rpc_socket_addr,
595                rewards_address,
596                evm_network: None,
597                version: "0.100.12".to_string(),
598            },
599            &mock_launcher,
600            &mock_rpc_client,
601        )
602        .await?;
603
604        assert!(node.initial_peers_config.first);
605        assert_eq!(node.version, "0.100.12");
606        assert_eq!(node.service_name, "antnode-local1");
607        assert_eq!(
608            node.data_dir_path,
609            PathBuf::from(format!("~/.local/share/autonomi/{peer_id}"))
610        );
611        assert_eq!(
612            node.log_dir_path,
613            PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs"))
614        );
615        assert_eq!(node.number, 1);
616        assert_eq!(node.pid, Some(1000));
617        assert_eq!(node.rpc_socket_addr, rpc_socket_addr);
618        assert_eq!(node.status, ServiceStatus::Running);
619        assert_eq!(node.antnode_path, PathBuf::from("/usr/local/bin/antnode"));
620
621        Ok(())
622    }
623}