Skip to main content

ant_node_manager/cmd/
node.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13    ServiceManager, VerbosityLevel,
14    add_services::{
15        add_node,
16        config::{AddNodeServiceOptions, PortRange},
17    },
18    config::{self, is_running_as_root},
19    helpers::{download_and_extract_release, get_bin_version},
20    print_banner, refresh_node_registry, status_report,
21};
22use ant_bootstrap::{Bootstrap, InitialPeersConfig};
23use ant_evm::{EvmNetwork, RewardsAddress};
24use ant_logging::LogFormat;
25use ant_releases::{AntReleaseRepoActions, ReleaseType};
26use ant_service_management::{
27    NodeRegistryManager, NodeService, NodeServiceData, ServiceStateActions, ServiceStatus,
28    UpgradeOptions, UpgradeResult,
29    control::{ServiceControl, ServiceController},
30    rpc::RpcClient,
31};
32use color_eyre::{Help, Result, eyre::eyre};
33use colored::Colorize;
34use libp2p_identity::PeerId;
35use semver::Version;
36use service_manager::RestartPolicy;
37use std::{
38    cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, sync::Arc, time::Duration,
39};
40use tokio::sync::RwLock;
41use tracing::debug;
42
43/// Returns the added service names
44pub async fn add(
45    alpha: bool,
46    auto_restart: bool,
47    auto_set_nat_flags: bool,
48    count: Option<u16>,
49    data_dir_path: Option<PathBuf>,
50    enable_metrics_server: bool,
51    env_variables: Option<Vec<(String, String)>>,
52    evm_network: Option<EvmNetwork>,
53    log_dir_path: Option<PathBuf>,
54    log_format: Option<LogFormat>,
55    max_archived_log_files: Option<usize>,
56    max_log_files: Option<usize>,
57    metrics_port: Option<PortRange>,
58    network_id: Option<u8>,
59    node_ip: Option<Ipv4Addr>,
60    node_port: Option<PortRange>,
61    node_registry: NodeRegistryManager,
62    mut init_peers_config: InitialPeersConfig,
63    relay: bool,
64    restart_policy: RestartPolicy,
65    rewards_address: RewardsAddress,
66    rpc_address: Option<Ipv4Addr>,
67    rpc_port: Option<PortRange>,
68    src_path: Option<PathBuf>,
69    no_upnp: bool,
70    url: Option<String>,
71    user: Option<String>,
72    version: Option<String>,
73    verbosity: VerbosityLevel,
74    write_older_cache_files: bool,
75) -> Result<Vec<String>> {
76    let user_mode = !is_running_as_root();
77
78    if verbosity != VerbosityLevel::Minimal {
79        print_banner("Add Antnode Services");
80        println!("{} service(s) to be added", count.unwrap_or(1));
81    }
82
83    let service_manager = ServiceController {};
84    let service_user = if user_mode {
85        None
86    } else {
87        let service_user = user.unwrap_or_else(|| "ant".to_string());
88        service_manager.create_service_user(&service_user)?;
89        Some(service_user)
90    };
91
92    let service_data_dir_path =
93        config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
94    let service_log_dir_path =
95        config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
96    let bootstrap_cache_dir = if let Some(user) = &service_user {
97        Some(config::get_bootstrap_cache_owner_path(user)?)
98    } else {
99        None
100    };
101
102    let release_repo = <dyn AntReleaseRepoActions>::default_config();
103
104    let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
105        let version = get_bin_version(&path)?;
106        (path, version)
107    } else {
108        download_and_extract_release(
109            ReleaseType::AntNode,
110            url.clone(),
111            version,
112            &*release_repo,
113            verbosity,
114            None,
115        )
116        .await?
117    };
118
119    debug!("Parsing peers from PeersArgs");
120
121    init_peers_config.addrs.extend(Bootstrap::fetch_from_env());
122    init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
123
124    let options = AddNodeServiceOptions {
125        alpha,
126        antnode_dir_path: service_data_dir_path.clone(),
127        antnode_src_path,
128        auto_restart,
129        auto_set_nat_flags,
130        count,
131        delete_antnode_src: src_path.is_none(),
132        enable_metrics_server,
133        evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
134        env_variables,
135        init_peers_config,
136        log_format,
137        max_archived_log_files,
138        max_log_files,
139        metrics_port,
140        network_id,
141        no_upnp,
142        node_ip,
143        node_port,
144        relay,
145        restart_policy,
146        rewards_address,
147        rpc_address,
148        rpc_port,
149        service_data_dir_path,
150        service_log_dir_path,
151        user: service_user,
152        user_mode,
153        version,
154        write_older_cache_files,
155    };
156    info!("Adding node service(s)");
157    let added_services_names =
158        add_node(options, node_registry.clone(), &service_manager, verbosity).await?;
159
160    node_registry.save().await?;
161    debug!("Node registry saved");
162
163    Ok(added_services_names)
164}
165
166pub async fn balance(
167    peer_ids: Vec<String>,
168    node_registry: NodeRegistryManager,
169    service_names: Vec<String>,
170    verbosity: VerbosityLevel,
171) -> Result<()> {
172    if verbosity != VerbosityLevel::Minimal {
173        print_banner("Reward Balances");
174    }
175
176    refresh_node_registry(
177        node_registry.clone(),
178        &ServiceController {},
179        verbosity != VerbosityLevel::Minimal,
180        false,
181        verbosity,
182    )
183    .await?;
184
185    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
186    if services_for_ops.is_empty() {
187        info!("Services for ops is empty, cannot obtain the balance");
188        // This could be the case if all services are at `Removed` status.
189        println!("No balances to display");
190        return Ok(());
191    }
192    debug!("Obtaining balances for {} services", services_for_ops.len());
193
194    for node in services_for_ops {
195        let node = node.read().await;
196        // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
197        println!("{}: {}", node.service_name, 0,);
198    }
199    Ok(())
200}
201
202pub async fn remove(
203    keep_directories: bool,
204    peer_ids: Vec<String>,
205    node_registry: NodeRegistryManager,
206    service_names: Vec<String>,
207    verbosity: VerbosityLevel,
208) -> Result<()> {
209    if verbosity != VerbosityLevel::Minimal {
210        print_banner("Remove Antnode Services");
211    }
212    info!(
213        "Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}"
214    );
215
216    refresh_node_registry(
217        node_registry.clone(),
218        &ServiceController {},
219        verbosity != VerbosityLevel::Minimal,
220        false,
221        verbosity,
222    )
223    .await?;
224
225    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
226    if services_for_ops.is_empty() {
227        info!("Services for ops is empty, no services were eligible for removal");
228        // This could be the case if all services are at `Removed` status.
229        if verbosity != VerbosityLevel::Minimal {
230            println!("No services were eligible for removal");
231        }
232        return Ok(());
233    }
234
235    let mut failed_services = Vec::new();
236    for node in &services_for_ops {
237        let service_name = node.read().await.service_name.clone();
238        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
239        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
240        let mut service_manager =
241            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
242        match service_manager.remove(keep_directories).await {
243            Ok(()) => {
244                debug!("Removed service {service_name}");
245                node_registry.save().await?;
246            }
247            Err(err) => {
248                error!("Failed to remove service {service_name}: {err}");
249                failed_services.push((service_name.clone(), err.to_string()))
250            }
251        }
252    }
253
254    summarise_any_failed_ops(failed_services, "remove", verbosity)
255}
256
257pub async fn reset(
258    force: bool,
259    node_registry: NodeRegistryManager,
260    verbosity: VerbosityLevel,
261) -> Result<()> {
262    if verbosity != VerbosityLevel::Minimal {
263        print_banner("Reset Antnode Services");
264    }
265    info!("Resetting all antnode services, with force={force}");
266
267    if !force {
268        println!("WARNING: all antnode services, data, and logs will be removed.");
269        println!("Do you wish to proceed? [y/n]");
270        std::io::stdout().flush()?;
271        let mut input = String::new();
272        std::io::stdin().read_line(&mut input)?;
273        if input.trim().to_lowercase() != "y" {
274            println!("Reset aborted");
275            return Ok(());
276        }
277    }
278
279    stop(None, node_registry.clone(), vec![], vec![], verbosity).await?;
280    remove(false, vec![], node_registry.clone(), vec![], verbosity).await?;
281
282    // Due the possibility of repeated runs of the `reset` command, we need to check for the
283    // existence of this file before attempting to delete it, since `remove_file` will return an
284    // error if the file doesn't exist. On Windows this has been observed to happen.
285    let node_registry_path = config::get_node_registry_path()?;
286    if node_registry_path.exists() {
287        info!("Removing node registry file: {node_registry_path:?}");
288        std::fs::remove_file(node_registry_path)?;
289    }
290    info!("Resetting NodeRegistryManager in memory");
291    node_registry.reset().await;
292
293    Ok(())
294}
295
296pub async fn start(
297    connection_timeout_s: u64,
298    fixed_interval: Option<u64>,
299    node_registry: NodeRegistryManager,
300    peer_ids: Vec<String>,
301    service_names: Vec<String>,
302    verbosity: VerbosityLevel,
303) -> Result<()> {
304    if verbosity != VerbosityLevel::Minimal {
305        print_banner("Start Antnode Services");
306    }
307    info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
308
309    refresh_node_registry(
310        node_registry.clone(),
311        &ServiceController {},
312        verbosity != VerbosityLevel::Minimal,
313        false,
314        verbosity,
315    )
316    .await?;
317
318    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
319    if services_for_ops.is_empty() {
320        info!("No services are eligible to be started");
321        // This could be the case if all services are at `Removed` status.
322        if verbosity != VerbosityLevel::Minimal {
323            println!("No services were eligible to be started");
324        }
325        return Ok(());
326    }
327
328    let mut failed_services = Vec::new();
329    for node in &services_for_ops {
330        let service_name = node.read().await.service_name.clone();
331
332        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
333        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
334
335        // set dynamic startup delay if fixed_interval is not set
336        let service = if fixed_interval.is_none() {
337            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
338        } else {
339            service
340        };
341
342        let mut service_manager =
343            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
344        if service_manager.service.status().await != ServiceStatus::Running {
345            // It would be possible here to check if the service *is* running and then just
346            // continue without applying the delay. The reason for not doing so is because when
347            // `start` is called below, the user will get a message to say the service was already
348            // started, which I think is useful behaviour to retain.
349            if let Some(interval) = fixed_interval {
350                debug!("Sleeping for {} milliseconds", interval);
351                std::thread::sleep(std::time::Duration::from_millis(interval));
352            }
353        }
354        match service_manager.start().await {
355            Ok(start_duration) => {
356                debug!("Started service {service_name} in {start_duration:?}",);
357
358                node_registry.save().await?;
359            }
360            Err(err) => {
361                error!("Failed to start service {service_name}: {err}");
362                failed_services.push((service_name.clone(), err.to_string()))
363            }
364        }
365    }
366
367    summarise_any_failed_ops(failed_services, "start", verbosity)
368}
369
370pub async fn status(
371    details: bool,
372    fail: bool,
373    json: bool,
374    node_registry: NodeRegistryManager,
375) -> Result<()> {
376    if !node_registry.nodes.read().await.is_empty() {
377        if !json && !details {
378            print_banner("Antnode Services");
379        }
380        status_report(
381            &node_registry,
382            &ServiceController {},
383            details,
384            json,
385            fail,
386            false,
387        )
388        .await?;
389        node_registry.save().await?;
390    }
391    Ok(())
392}
393
394pub async fn stop(
395    interval: Option<u64>,
396    node_registry: NodeRegistryManager,
397    peer_ids: Vec<String>,
398    service_names: Vec<String>,
399    verbosity: VerbosityLevel,
400) -> Result<()> {
401    if verbosity != VerbosityLevel::Minimal {
402        print_banner("Stop Antnode Services");
403    }
404    info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
405
406    refresh_node_registry(
407        node_registry.clone(),
408        &ServiceController {},
409        verbosity != VerbosityLevel::Minimal,
410        false,
411        verbosity,
412    )
413    .await?;
414
415    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
416    if services_for_ops.is_empty() {
417        info!("No services are eligible to be stopped");
418        // This could be the case if all services are at `Removed` status.
419        if verbosity != VerbosityLevel::Minimal {
420            println!("No services were eligible to be stopped");
421        }
422        return Ok(());
423    }
424
425    let mut failed_services = Vec::new();
426    for node in services_for_ops.iter() {
427        let service_name = node.read().await.service_name.clone();
428        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
429        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
430        let mut service_manager =
431            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
432
433        if service_manager.service.status().await == ServiceStatus::Running
434            && let Some(interval) = interval
435        {
436            debug!("Sleeping for {} milliseconds", interval);
437            std::thread::sleep(std::time::Duration::from_millis(interval));
438        }
439        match service_manager.stop().await {
440            Ok(()) => {
441                debug!("Stopped service {service_name}");
442                node_registry.save().await?;
443            }
444            Err(err) => {
445                error!("Failed to stop service {service_name}: {err}");
446                failed_services.push((service_name.clone(), err.to_string()))
447            }
448        }
449    }
450
451    summarise_any_failed_ops(failed_services, "stop", verbosity)
452}
453
454pub async fn upgrade(
455    connection_timeout_s: u64,
456    do_not_start: bool,
457    custom_bin_path: Option<PathBuf>,
458    force: bool,
459    fixed_interval: Option<u64>,
460    node_registry: NodeRegistryManager,
461    peer_ids: Vec<String>,
462    provided_env_variables: Option<Vec<(String, String)>>,
463    service_names: Vec<String>,
464    url: Option<String>,
465    version: Option<String>,
466    verbosity: VerbosityLevel,
467) -> Result<()> {
468    // In the case of a custom binary, we want to force the use of it. Regardless of its version
469    // number, the user has probably built it for some special case. They may have not used the
470    // `--force` flag; if they didn't, we can just do that for them here.
471    let use_force = force || custom_bin_path.is_some();
472
473    if verbosity != VerbosityLevel::Minimal {
474        print_banner("Upgrade Antnode Services");
475    }
476    info!(
477        "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
478    );
479
480    let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
481        custom_bin_path.clone(),
482        ReleaseType::AntNode,
483        url,
484        version,
485        verbosity,
486    )
487    .await?;
488
489    refresh_node_registry(
490        node_registry.clone(),
491        &ServiceController {},
492        verbosity != VerbosityLevel::Minimal,
493        false,
494        verbosity,
495    )
496    .await?;
497
498    if let Some(node) = node_registry.nodes.read().await.first() {
499        let node = node.read().await;
500        debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
501    } else {
502        debug!("There are no nodes currently added or active");
503    }
504
505    if !use_force {
506        let mut node_versions = Vec::new();
507
508        for node in node_registry.nodes.read().await.iter() {
509            let node = node.read().await;
510            let version = Version::parse(&node.version)
511                .map_err(|_| eyre!("Failed to parse Version for node {}", node.service_name))?;
512            node_versions.push(version);
513        }
514
515        let any_nodes_need_upgraded = node_versions
516            .iter()
517            .any(|current_version| current_version < &target_version);
518        if !any_nodes_need_upgraded {
519            info!("All nodes are at the latest version, no upgrade required.");
520            if verbosity != VerbosityLevel::Minimal {
521                println!("{} All nodes are at the latest version", "✓".green());
522            }
523            return Ok(());
524        }
525    }
526
527    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
528    trace!("services_for_ops len: {}", services_for_ops.len());
529    let mut upgrade_summary = Vec::new();
530
531    for node in &services_for_ops {
532        let env_variables = if provided_env_variables.is_some() {
533            provided_env_variables.clone()
534        } else {
535            node_registry.environment_variables.read().await.clone()
536        };
537        let options = UpgradeOptions {
538            auto_restart: false,
539            env_variables: env_variables.clone(),
540            force: use_force,
541            start_service: !do_not_start,
542            target_bin_path: upgrade_bin_path.clone(),
543            target_version: target_version.clone(),
544        };
545        let service_name = node.read().await.service_name.clone();
546
547        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
548        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
549        // set dynamic startup delay if fixed_interval is not set
550        let service = if fixed_interval.is_none() {
551            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
552        } else {
553            service
554        };
555
556        let mut service_manager =
557            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
558
559        match service_manager.upgrade(options).await {
560            Ok(upgrade_result) => {
561                info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
562                if upgrade_result != UpgradeResult::NotRequired {
563                    // It doesn't seem useful to apply the interval if there was no upgrade
564                    // required for the previous service.
565                    if let Some(interval) = fixed_interval {
566                        debug!("Sleeping for {interval} milliseconds",);
567                        std::thread::sleep(std::time::Duration::from_millis(interval));
568                    }
569                }
570                upgrade_summary.push((service_name.clone(), upgrade_result));
571                node_registry.save().await?;
572            }
573            Err(err) => {
574                error!("Error upgrading service {service_name}: {err}");
575                upgrade_summary.push((
576                    service_name.clone(),
577                    UpgradeResult::Error(format!("Error: {err}")),
578                ));
579                node_registry.save().await?;
580            }
581        }
582    }
583
584    if verbosity != VerbosityLevel::Minimal {
585        print_upgrade_summary(upgrade_summary.clone());
586    }
587
588    if upgrade_summary.iter().any(|(_, r)| {
589        matches!(r, UpgradeResult::Error(_))
590            || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
591    }) {
592        return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
593            "For any services that were upgraded but did not start, you can attempt to start them \
594                again using the 'start' command."));
595    }
596
597    Ok(())
598}
599
600/// Ensure n nodes are running by stopping nodes or by adding and starting nodes if required.
601///
602/// The arguments here are mostly mirror those used in `add`.
603pub async fn maintain_n_running_nodes(
604    alpha: bool,
605    auto_restart: bool,
606    auto_set_nat_flags: bool,
607    connection_timeout_s: u64,
608    max_nodes_to_run: u16,
609    data_dir_path: Option<PathBuf>,
610    enable_metrics_server: bool,
611    env_variables: Option<Vec<(String, String)>>,
612    evm_network: Option<EvmNetwork>,
613    log_dir_path: Option<PathBuf>,
614    log_format: Option<LogFormat>,
615    max_archived_log_files: Option<usize>,
616    max_log_files: Option<usize>,
617    metrics_port: Option<PortRange>,
618    network_id: Option<u8>,
619    node_ip: Option<Ipv4Addr>,
620    node_port: Option<PortRange>,
621    node_registry: NodeRegistryManager,
622    peers_args: InitialPeersConfig,
623    relay: bool,
624    rewards_address: RewardsAddress,
625    restart_policy: RestartPolicy,
626    rpc_address: Option<Ipv4Addr>,
627    rpc_port: Option<PortRange>,
628    src_path: Option<PathBuf>,
629    url: Option<String>,
630    no_upnp: bool,
631    user: Option<String>,
632    version: Option<String>,
633    verbosity: VerbosityLevel,
634    start_node_interval: Option<u64>,
635    write_older_cache_files: bool,
636) -> Result<()> {
637    let mut running_nodes = Vec::new();
638
639    for node in node_registry.nodes.read().await.iter() {
640        let node = node.read().await;
641        if node.status == ServiceStatus::Running {
642            running_nodes.push(node.service_name.clone());
643        }
644    }
645
646    let running_count = running_nodes.len();
647    let target_count = max_nodes_to_run as usize;
648
649    info!(
650        "Current running nodes: {}, Target: {}",
651        running_count, target_count
652    );
653
654    match running_count.cmp(&target_count) {
655        Ordering::Greater => {
656            let to_stop_count = running_count - target_count;
657            let services_to_stop = running_nodes
658                .into_iter()
659                .rev() // Stop the oldest nodes first
660                .take(to_stop_count)
661                .collect::<Vec<_>>();
662
663            info!(
664                "Stopping {} excess nodes: {:?}",
665                to_stop_count, services_to_stop
666            );
667            stop(
668                None,
669                node_registry.clone(),
670                vec![],
671                services_to_stop,
672                verbosity,
673            )
674            .await?;
675        }
676        Ordering::Less => {
677            let to_start_count = target_count - running_count;
678            let mut inactive_nodes = Vec::new();
679            for node in node_registry.nodes.read().await.iter() {
680                let node = node.read().await;
681                if node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added {
682                    inactive_nodes.push(node.service_name.clone());
683                }
684            }
685
686            info!("Inactive nodes available: {}", inactive_nodes.len());
687
688            if to_start_count <= inactive_nodes.len() {
689                let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
690                info!(
691                    "Starting {} existing inactive nodes: {:?}",
692                    to_start_count, nodes_to_start
693                );
694                start(
695                    connection_timeout_s,
696                    start_node_interval,
697                    node_registry.clone(),
698                    vec![],
699                    nodes_to_start,
700                    verbosity,
701                )
702                .await?;
703            } else {
704                let to_add_count = to_start_count - inactive_nodes.len();
705                info!(
706                    "Adding {} new nodes and starting all {} inactive nodes",
707                    to_add_count,
708                    inactive_nodes.len()
709                );
710
711                let ports_to_use = match node_port {
712                    Some(PortRange::Single(port)) => vec![port],
713                    Some(PortRange::Range(start, end)) => {
714                        (start..=end).take(to_add_count).collect()
715                    }
716                    None => vec![],
717                };
718
719                info!("Ports to use for new nodes: {:?}", ports_to_use);
720
721                for (i, port) in ports_to_use.into_iter().enumerate() {
722                    let added_service = add(
723                        alpha,
724                        auto_restart,
725                        auto_set_nat_flags,
726                        Some(1),
727                        data_dir_path.clone(),
728                        enable_metrics_server,
729                        env_variables.clone(),
730                        evm_network.clone(),
731                        log_dir_path.clone(),
732                        log_format,
733                        max_archived_log_files,
734                        max_log_files,
735                        metrics_port.clone(),
736                        network_id,
737                        node_ip,
738                        Some(PortRange::Single(port)),
739                        node_registry.clone(),
740                        peers_args.clone(),
741                        relay,
742                        restart_policy,
743                        rewards_address,
744                        rpc_address,
745                        rpc_port.clone(),
746                        src_path.clone(),
747                        no_upnp,
748                        url.clone(),
749                        user.clone(),
750                        version.clone(),
751                        verbosity,
752                        write_older_cache_files,
753                    )
754                    .await?;
755
756                    if i == 0 {
757                        start(
758                            connection_timeout_s,
759                            start_node_interval,
760                            node_registry.clone(),
761                            vec![],
762                            added_service,
763                            verbosity,
764                        )
765                        .await?;
766                    }
767                }
768
769                if !inactive_nodes.is_empty() {
770                    start(
771                        connection_timeout_s,
772                        start_node_interval,
773                        node_registry.clone(),
774                        vec![],
775                        inactive_nodes,
776                        verbosity,
777                    )
778                    .await?;
779                }
780            }
781        }
782        Ordering::Equal => {
783            info!(
784                "Current node count ({}) matches target ({}). No action needed.",
785                running_count, target_count
786            );
787        }
788    }
789
790    // Verify final state
791    let mut final_running_count = 0;
792    for node in node_registry.nodes.read().await.iter() {
793        let node_read = node.read().await;
794        if node_read.status == ServiceStatus::Running {
795            final_running_count += 1;
796        }
797    }
798
799    if final_running_count != target_count {
800        warn!(
801            "Failed to reach target node count. Expected {target_count}, but got {final_running_count}"
802        );
803    }
804
805    Ok(())
806}
807
808async fn get_services_for_ops(
809    node_registry: &NodeRegistryManager,
810    peer_ids: Vec<String>,
811    service_names: Vec<String>,
812) -> Result<Vec<Arc<RwLock<NodeServiceData>>>> {
813    let mut services = Vec::new();
814
815    if service_names.is_empty() && peer_ids.is_empty() {
816        for node in node_registry.nodes.read().await.iter() {
817            if node.read().await.status != ServiceStatus::Removed {
818                services.push(Arc::clone(node));
819            }
820        }
821    } else {
822        for name in &service_names {
823            let mut found_service_with_name = false;
824            for node in node_registry.nodes.read().await.iter() {
825                let node_read = node.read().await;
826                if node_read.service_name == *name && node_read.status != ServiceStatus::Removed {
827                    {
828                        services.push(Arc::clone(node));
829                        found_service_with_name = true;
830                        break;
831                    }
832                }
833            }
834
835            if !found_service_with_name {
836                error!("No service named '{name}'");
837                return Err(eyre!(format!("No service named '{name}'")));
838            }
839        }
840
841        for peer_id_str in &peer_ids {
842            let mut found_service_with_peer_id = false;
843            let given_peer_id = PeerId::from_str(peer_id_str)
844                .map_err(|_| eyre!(format!("Error parsing PeerId: '{peer_id_str}'")))?;
845            for node in node_registry.nodes.read().await.iter() {
846                let node_read = node.read().await;
847                if let Some(peer_id) = node_read.peer_id
848                    && peer_id == given_peer_id
849                    && node_read.status != ServiceStatus::Removed
850                {
851                    services.push(Arc::clone(node));
852                    found_service_with_peer_id = true;
853                    break;
854                }
855            }
856            if !found_service_with_peer_id {
857                error!("Could not find node with peer id: '{given_peer_id:?}'");
858                return Err(eyre!(format!(
859                    "Could not find node with peer ID '{given_peer_id}'",
860                )));
861            }
862        }
863    }
864
865    Ok(services)
866}
867
868fn summarise_any_failed_ops(
869    failed_services: Vec<(String, String)>,
870    verb: &str,
871    verbosity: VerbosityLevel,
872) -> Result<()> {
873    if !failed_services.is_empty() {
874        if verbosity != VerbosityLevel::Minimal {
875            println!("Failed to {verb} {} service(s):", failed_services.len());
876            for failed in failed_services.iter() {
877                println!("{} {}: {}", "✕".red(), failed.0, failed.1);
878            }
879        }
880
881        error!("Failed to {verb} one or more services");
882        return Err(eyre!("Failed to {verb} one or more services"));
883    }
884    Ok(())
885}