ant_node_manager/
local.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::add_services::config::PortRange;
10use crate::helpers::{
11    check_port_availability, get_bin_version, get_start_port_if_applicable, increment_port_option,
12};
13
14use ant_bootstrap::InitialPeersConfig;
15use ant_evm::{EvmNetwork, RewardsAddress};
16use ant_logging::LogFormat;
17use ant_service_management::{
18    control::ServiceControl,
19    rpc::{RpcActions, RpcClient},
20    NodeRegistry, NodeServiceData, ServiceStatus,
21};
22use color_eyre::eyre::OptionExt;
23use color_eyre::{eyre::eyre, Result};
24use colored::Colorize;
25use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
26#[cfg(test)]
27use mockall::automock;
28use std::{
29    net::{IpAddr, Ipv4Addr, SocketAddr},
30    path::PathBuf,
31    process::{Command, Stdio},
32    str::FromStr,
33};
34use sysinfo::{Pid, System};
35
36#[cfg_attr(test, automock)]
37pub trait Launcher {
38    fn get_antnode_path(&self) -> PathBuf;
39    #[allow(clippy::too_many_arguments)]
40    fn launch_node(
41        &self,
42        first: bool,
43        log_format: Option<LogFormat>,
44        metrics_port: Option<u16>,
45        node_port: Option<u16>,
46        rpc_socket_addr: SocketAddr,
47        rewards_address: RewardsAddress,
48        evm_network: Option<EvmNetwork>,
49    ) -> Result<()>;
50    fn wait(&self, delay: u64);
51}
52
53#[derive(Default)]
54pub struct LocalSafeLauncher {
55    pub antnode_bin_path: PathBuf,
56}
57
58impl Launcher for LocalSafeLauncher {
59    fn get_antnode_path(&self) -> PathBuf {
60        self.antnode_bin_path.clone()
61    }
62
63    fn launch_node(
64        &self,
65        first: bool,
66        log_format: Option<LogFormat>,
67        metrics_port: Option<u16>,
68        node_port: Option<u16>,
69        rpc_socket_addr: SocketAddr,
70        rewards_address: RewardsAddress,
71        evm_network: Option<EvmNetwork>,
72    ) -> Result<()> {
73        let mut args = Vec::new();
74
75        if first {
76            args.push("--first".to_string())
77        }
78
79        if let Some(log_format) = log_format {
80            args.push("--log-format".to_string());
81            args.push(log_format.as_str().to_string());
82        }
83
84        if let Some(metrics_port) = metrics_port {
85            args.push("--metrics-server-port".to_string());
86            args.push(metrics_port.to_string());
87        }
88
89        if let Some(node_port) = node_port {
90            args.push("--port".to_string());
91            args.push(node_port.to_string());
92        }
93
94        args.push("--local".to_string());
95        args.push("--rpc".to_string());
96        args.push(rpc_socket_addr.to_string());
97
98        args.push("--rewards-address".to_string());
99        args.push(rewards_address.to_string());
100
101        if let Some(network) = evm_network {
102            args.push(format!("evm-{}", network.identifier()));
103
104            if let EvmNetwork::Custom(custom) = network {
105                args.push("--rpc-url".to_string());
106                args.push(custom.rpc_url_http.to_string());
107                args.push("--payment-token-address".to_string());
108                args.push(custom.payment_token_address.to_string());
109                args.push("--data-payments-address".to_string());
110                args.push(custom.data_payments_address.to_string());
111            }
112        }
113
114        Command::new(self.antnode_bin_path.clone())
115            .args(args)
116            .stdout(Stdio::inherit())
117            .stderr(Stdio::inherit())
118            .spawn()
119            .inspect_err(|err| error!("Error while spawning node process: {err:?}"))?;
120
121        Ok(())
122    }
123
124    /// Provide a delay for the service to start or stop.
125    ///
126    /// This is wrapped mainly just for unit testing.
127    fn wait(&self, delay: u64) {
128        std::thread::sleep(std::time::Duration::from_millis(delay));
129    }
130}
131
132pub fn kill_network(node_registry: &NodeRegistry, keep_directories: bool) -> Result<()> {
133    let mut system = System::new_all();
134    system.refresh_all();
135
136    // It's possible that the faucet was not spun up because the network failed the validation
137    // process. If it wasn't running, we obviously don't need to do anything.
138    if let Some(faucet) = &node_registry.faucet {
139        // If we're here, the faucet was spun up. However, it's possible for the process to have
140        // died since then. In that case, we don't need to do anything.
141        // I think the use of `unwrap` is justified here, because for a local network, if the
142        // faucet is not `None`, the pid also must have a value.
143        if let Some(process) = system.process(Pid::from(faucet.pid.unwrap() as usize)) {
144            process.kill();
145            debug!("Faucet has been killed");
146            println!("{} Killed faucet", "✓".green());
147        }
148    }
149
150    let faucet_data_path = dirs_next::data_dir()
151        .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
152        .join("autonomi")
153        .join("test_faucet");
154    if faucet_data_path.is_dir() {
155        std::fs::remove_dir_all(faucet_data_path)?;
156        debug!("Removed faucet data directory");
157    }
158    let genesis_data_path = dirs_next::data_dir()
159        .ok_or_else(|| eyre!("Could not obtain user's data directory"))?
160        .join("autonomi")
161        .join("test_genesis");
162    if genesis_data_path.is_dir() {
163        debug!("Removed genesis data directory");
164        std::fs::remove_dir_all(genesis_data_path)?;
165    }
166
167    for node in node_registry.nodes.iter() {
168        println!("{}:", node.service_name);
169        // If the PID is not set it means the `status` command ran and determined the node was
170        // already dead anyway, so we don't need to do anything.
171        if let Some(pid) = node.pid {
172            // It could be possible that None would be returned here, if the process had already
173            // died, but the `status` command had not ran. In that case, we don't need to do
174            // anything anyway.
175            if let Some(process) = system.process(Pid::from(pid as usize)) {
176                process.kill();
177                debug!("Killed node: {} ({})", node.service_name, pid);
178                println!("  {} Killed process", "✓".green());
179            }
180        }
181
182        if !keep_directories {
183            // At this point we don't allow path overrides, so deleting the data directory will clear
184            // the log directory also.
185            if let Err(e) = std::fs::remove_dir_all(&node.data_dir_path) {
186                error!("Failed to remove node data directory: {:?}", e);
187                println!(
188                    "  {} Failed to remove {}: {e}",
189                    "✗".red(),
190                    node.data_dir_path.to_string_lossy()
191                );
192            } else {
193                debug!("Removed node data directory: {:?}", node.data_dir_path);
194                println!(
195                    "  {} Removed {}",
196                    "✓".green(),
197                    node.data_dir_path.to_string_lossy()
198                );
199            }
200        }
201    }
202
203    Ok(())
204}
205
206pub struct LocalNetworkOptions {
207    pub antnode_bin_path: PathBuf,
208    pub enable_metrics_server: bool,
209    pub join: bool,
210    pub interval: u64,
211    pub metrics_port: Option<PortRange>,
212    pub node_port: Option<PortRange>,
213    pub node_count: u16,
214    pub peers: Option<Vec<Multiaddr>>,
215    pub rpc_port: Option<PortRange>,
216    pub skip_validation: bool,
217    pub log_format: Option<LogFormat>,
218    pub rewards_address: RewardsAddress,
219    pub evm_network: Option<EvmNetwork>,
220}
221
222pub async fn run_network(
223    options: LocalNetworkOptions,
224    node_registry: &mut NodeRegistry,
225    service_control: &dyn ServiceControl,
226) -> Result<()> {
227    info!("Running local network");
228
229    // Check port availability when joining a local network.
230    if let Some(port_range) = &options.node_port {
231        port_range.validate(options.node_count)?;
232        check_port_availability(port_range, &node_registry.nodes)?;
233    }
234
235    if let Some(port_range) = &options.metrics_port {
236        port_range.validate(options.node_count)?;
237        check_port_availability(port_range, &node_registry.nodes)?;
238    }
239
240    if let Some(port_range) = &options.rpc_port {
241        port_range.validate(options.node_count)?;
242        check_port_availability(port_range, &node_registry.nodes)?;
243    }
244
245    let launcher = LocalSafeLauncher {
246        antnode_bin_path: options.antnode_bin_path.to_path_buf(),
247    };
248
249    let mut node_port = get_start_port_if_applicable(options.node_port);
250    let mut metrics_port = get_start_port_if_applicable(options.metrics_port);
251    let mut rpc_port = get_start_port_if_applicable(options.rpc_port);
252
253    // Start the bootstrap node if it doesnt exist.
254    let (bootstrap_peers, start) = if options.join {
255        if let Some(peers) = options.peers {
256            (peers, 1)
257        } else {
258            let peer = node_registry
259                .nodes
260                .iter()
261                .find_map(|n| n.listen_addr.clone())
262                .ok_or_eyre("Unable to obtain a peer to connect to")?;
263            (peer, 1)
264        }
265    } else {
266        let rpc_free_port = if let Some(port) = rpc_port {
267            port
268        } else {
269            service_control.get_available_port()?
270        };
271        let metrics_free_port = if let Some(port) = metrics_port {
272            Some(port)
273        } else if options.enable_metrics_server {
274            Some(service_control.get_available_port()?)
275        } else {
276            None
277        };
278        let rpc_socket_addr =
279            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
280        let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
281
282        let number = (node_registry.nodes.len() as u16) + 1;
283        let node = run_node(
284            RunNodeOptions {
285                first: true,
286                metrics_port: metrics_free_port,
287                node_port,
288                interval: options.interval,
289                log_format: options.log_format,
290                number,
291                rpc_socket_addr,
292                rewards_address: options.rewards_address,
293                evm_network: options.evm_network.clone(),
294                version: get_bin_version(&launcher.get_antnode_path())?,
295            },
296            &launcher,
297            &rpc_client,
298        )
299        .await?;
300        node_registry.nodes.push(node.clone());
301        let bootstrap_peers = node
302            .listen_addr
303            .ok_or_eyre("The listen address was not set")?;
304        node_port = increment_port_option(node_port);
305        metrics_port = increment_port_option(metrics_port);
306        rpc_port = increment_port_option(rpc_port);
307        (bootstrap_peers, 2)
308    };
309    node_registry.save()?;
310
311    for _ in start..=options.node_count {
312        let rpc_free_port = if let Some(port) = rpc_port {
313            port
314        } else {
315            service_control.get_available_port()?
316        };
317        let metrics_free_port = if let Some(port) = metrics_port {
318            Some(port)
319        } else if options.enable_metrics_server {
320            Some(service_control.get_available_port()?)
321        } else {
322            None
323        };
324        let rpc_socket_addr =
325            SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
326        let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
327
328        let number = (node_registry.nodes.len() as u16) + 1;
329        let node = run_node(
330            RunNodeOptions {
331                first: false,
332                metrics_port: metrics_free_port,
333                node_port,
334                interval: options.interval,
335                log_format: options.log_format,
336                number,
337                rpc_socket_addr,
338                rewards_address: options.rewards_address,
339                evm_network: options.evm_network.clone(),
340                version: get_bin_version(&launcher.get_antnode_path())?,
341            },
342            &launcher,
343            &rpc_client,
344        )
345        .await?;
346        node_registry.nodes.push(node);
347
348        // We save the node registry for each launch because it's possible any node can fail to
349        // launch, or maybe the validation will fail. In the error case, we will want to use the
350        // `kill` command for the nodes that we did spin up. The `kill` command works on the basis
351        // of what's in the node registry.
352        node_registry.save()?;
353
354        node_port = increment_port_option(node_port);
355        metrics_port = increment_port_option(metrics_port);
356        rpc_port = increment_port_option(rpc_port);
357    }
358
359    if !options.skip_validation {
360        debug!("Waiting for 10 seconds before validating the network...");
361        println!("Waiting for 10 seconds before validating the network...");
362        std::thread::sleep(std::time::Duration::from_secs(10));
363        validate_network(node_registry, bootstrap_peers.clone()).await?;
364    }
365
366    Ok(())
367}
368
369pub struct RunNodeOptions {
370    pub first: bool,
371    pub interval: u64,
372    pub log_format: Option<LogFormat>,
373    pub metrics_port: Option<u16>,
374    pub node_port: Option<u16>,
375    pub number: u16,
376    pub rpc_socket_addr: SocketAddr,
377    pub rewards_address: RewardsAddress,
378    pub evm_network: Option<EvmNetwork>,
379    pub version: String,
380}
381
382pub async fn run_node(
383    run_options: RunNodeOptions,
384    launcher: &dyn Launcher,
385    rpc_client: &dyn RpcActions,
386) -> Result<NodeServiceData> {
387    info!("Launching node {}...", run_options.number);
388    println!("Launching node {}...", run_options.number);
389    launcher.launch_node(
390        run_options.first,
391        run_options.log_format,
392        run_options.metrics_port,
393        run_options.node_port,
394        run_options.rpc_socket_addr,
395        run_options.rewards_address,
396        run_options.evm_network.clone(),
397    )?;
398    launcher.wait(run_options.interval);
399
400    let node_info = rpc_client.node_info().await?;
401    let peer_id = node_info.peer_id;
402    let network_info = rpc_client.network_info().await?;
403    let connected_peers = Some(network_info.connected_peers);
404    let listen_addrs = network_info
405        .listeners
406        .into_iter()
407        .map(|addr| addr.with(Protocol::P2p(node_info.peer_id)))
408        .collect();
409
410    Ok(NodeServiceData {
411        antnode_path: launcher.get_antnode_path(),
412        auto_restart: false,
413        connected_peers,
414        data_dir_path: node_info.data_path,
415        evm_network: run_options.evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
416        home_network: false,
417        listen_addr: Some(listen_addrs),
418        log_dir_path: node_info.log_path,
419        log_format: run_options.log_format,
420        max_archived_log_files: None,
421        max_log_files: None,
422        metrics_port: run_options.metrics_port,
423        network_id: None,
424        node_ip: None,
425        node_port: run_options.node_port,
426        number: run_options.number,
427        peer_id: Some(peer_id),
428        peers_args: InitialPeersConfig {
429            first: run_options.first,
430            addrs: vec![],
431            network_contacts_url: vec![],
432            local: true,
433            disable_mainnet_contacts: true,
434            ignore_cache: true,
435            bootstrap_cache_dir: None,
436        },
437        pid: Some(node_info.pid),
438        rewards_address: run_options.rewards_address,
439        reward_balance: None,
440        rpc_socket_addr: run_options.rpc_socket_addr,
441        status: ServiceStatus::Running,
442        service_name: format!("antnode-local{}", run_options.number),
443        upnp: false,
444        user: None,
445        user_mode: false,
446        version: run_options.version.to_string(),
447    })
448}
449
450//
451// Private Helpers
452//
453
454async fn validate_network(node_registry: &mut NodeRegistry, peers: Vec<Multiaddr>) -> Result<()> {
455    let mut all_peers = node_registry
456        .nodes
457        .iter()
458        .map(|n| n.peer_id.ok_or_eyre("The PeerId was not set"))
459        .collect::<Result<Vec<PeerId>>>()?;
460    // The additional peers are peers being managed outwith the node manager. This only applies
461    // when we've joined a network not being managed by the node manager. Otherwise, this list will
462    // be empty.
463    let additional_peers = peers
464        .into_iter()
465        .filter_map(|addr| {
466            addr.to_string()
467                .rsplit('/')
468                .next()
469                .and_then(|id_str| PeerId::from_str(id_str).ok())
470        })
471        .collect::<Vec<PeerId>>();
472    all_peers.extend(additional_peers);
473
474    for node in node_registry.nodes.iter() {
475        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
476        let net_info = rpc_client.network_info().await?;
477        let peers = net_info.connected_peers;
478        let peer_id = node.peer_id.ok_or_eyre("The PeerId was not set")?;
479        debug!("Node {peer_id} has {} peers", peers.len());
480        println!("Node {peer_id} has {} peers", peers.len());
481
482        // Look for peers that are not supposed to be present in the network. This can happen if
483        // the node has connected to peers on other networks.
484        let invalid_peers: Vec<PeerId> = peers
485            .iter()
486            .filter(|peer| !all_peers.contains(peer))
487            .cloned()
488            .collect();
489        if !invalid_peers.is_empty() {
490            for invalid_peer in invalid_peers.iter() {
491                println!("Invalid peer found: {}", invalid_peer);
492            }
493            error!("Network validation failed: {invalid_peers:?}");
494            return Err(eyre!("Network validation failed",));
495        }
496    }
497    Ok(())
498}
499
500#[cfg(test)]
501mod tests {
502    use super::*;
503    use ant_evm::utils::dummy_address;
504    use ant_service_management::{
505        error::Result as RpcResult,
506        rpc::{NetworkInfo, NodeInfo, RecordAddress, RpcActions},
507    };
508    use async_trait::async_trait;
509    use libp2p_identity::PeerId;
510    use mockall::mock;
511    use mockall::predicate::*;
512    use std::str::FromStr;
513
514    mock! {
515        pub RpcClient {}
516        #[async_trait]
517        impl RpcActions for RpcClient {
518            async fn node_info(&self) -> RpcResult<NodeInfo>;
519            async fn network_info(&self) -> RpcResult<NetworkInfo>;
520            async fn record_addresses(&self) -> RpcResult<Vec<RecordAddress>>;
521            async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> RpcResult<()>;
522            async fn node_stop(&self, delay_millis: u64) -> RpcResult<()>;
523            async fn node_update(&self, delay_millis: u64) -> RpcResult<()>;
524            async fn is_node_connected_to_network(&self, timeout: std::time::Duration) -> RpcResult<()>;
525            async fn update_log_level(&self, log_levels: String) -> RpcResult<()>;
526        }
527    }
528
529    #[tokio::test]
530    async fn run_node_should_launch_the_genesis_node() -> Result<()> {
531        let mut mock_launcher = MockLauncher::new();
532        let mut mock_rpc_client = MockRpcClient::new();
533        let rewards_address = dummy_address();
534
535        let peer_id = PeerId::from_str("12D3KooWS2tpXGGTmg2AHFiDh57yPQnat49YHnyqoggzXZWpqkCR")?;
536        let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 13000);
537        mock_launcher
538            .expect_launch_node()
539            .with(
540                eq(true),
541                eq(None),
542                eq(None),
543                eq(None),
544                eq(rpc_socket_addr),
545                eq(rewards_address),
546                eq(None),
547            )
548            .times(1)
549            .returning(|_, _, _, _, _, _, _| Ok(()));
550        mock_launcher
551            .expect_wait()
552            .with(eq(100))
553            .times(1)
554            .returning(|_| ());
555        mock_launcher
556            .expect_get_antnode_path()
557            .times(1)
558            .returning(|| PathBuf::from("/usr/local/bin/antnode"));
559
560        mock_rpc_client
561            .expect_node_info()
562            .times(1)
563            .returning(move || {
564                Ok(NodeInfo {
565                    pid: 1000,
566                    peer_id,
567                    data_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}")),
568                    log_path: PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs")),
569                    version: "0.100.12".to_string(),
570                    uptime: std::time::Duration::from_secs(1), // the service was just started
571                    wallet_balance: 0,
572                })
573            });
574        mock_rpc_client
575            .expect_network_info()
576            .times(1)
577            .returning(move || {
578                Ok(NetworkInfo {
579                    connected_peers: Vec::new(),
580                    listeners: Vec::new(),
581                })
582            });
583
584        let node = run_node(
585            RunNodeOptions {
586                first: true,
587                interval: 100,
588                log_format: None,
589                metrics_port: None,
590                node_port: None,
591                number: 1,
592                rpc_socket_addr,
593                rewards_address,
594                evm_network: None,
595                version: "0.100.12".to_string(),
596            },
597            &mock_launcher,
598            &mock_rpc_client,
599        )
600        .await?;
601
602        assert!(node.peers_args.first);
603        assert_eq!(node.version, "0.100.12");
604        assert_eq!(node.service_name, "antnode-local1");
605        assert_eq!(
606            node.data_dir_path,
607            PathBuf::from(format!("~/.local/share/autonomi/{peer_id}"))
608        );
609        assert_eq!(
610            node.log_dir_path,
611            PathBuf::from(format!("~/.local/share/autonomi/{peer_id}/logs"))
612        );
613        assert_eq!(node.number, 1);
614        assert_eq!(node.pid, Some(1000));
615        assert_eq!(node.rpc_socket_addr, rpc_socket_addr);
616        assert_eq!(node.status, ServiceStatus::Running);
617        assert_eq!(node.antnode_path, PathBuf::from("/usr/local/bin/antnode"));
618
619        Ok(())
620    }
621}