ant_node_manager/cmd/
node.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13    add_services::{
14        add_node,
15        config::{AddNodeServiceOptions, PortRange},
16    },
17    config::{self, is_running_as_root},
18    helpers::{download_and_extract_release, get_bin_version},
19    print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel,
20};
21use ant_bootstrap::InitialPeersConfig;
22use ant_evm::{EvmNetwork, RewardsAddress};
23use ant_logging::LogFormat;
24use ant_releases::{AntReleaseRepoActions, ReleaseType};
25use ant_service_management::{
26    control::{ServiceControl, ServiceController},
27    rpc::RpcClient,
28    NodeRegistryManager, NodeService, NodeServiceData, ServiceStateActions, ServiceStatus,
29    UpgradeOptions, UpgradeResult,
30};
31use color_eyre::{eyre::eyre, Help, Result};
32use colored::Colorize;
33use libp2p_identity::PeerId;
34use semver::Version;
35use std::{
36    cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, sync::Arc, time::Duration,
37};
38use tokio::sync::RwLock;
39use tracing::debug;
40
41/// Returns the added service names
42pub async fn add(
43    alpha: bool,
44    auto_restart: bool,
45    auto_set_nat_flags: bool,
46    count: Option<u16>,
47    data_dir_path: Option<PathBuf>,
48    enable_metrics_server: bool,
49    env_variables: Option<Vec<(String, String)>>,
50    evm_network: Option<EvmNetwork>,
51    log_dir_path: Option<PathBuf>,
52    log_format: Option<LogFormat>,
53    max_archived_log_files: Option<usize>,
54    max_log_files: Option<usize>,
55    metrics_port: Option<PortRange>,
56    network_id: Option<u8>,
57    node_ip: Option<Ipv4Addr>,
58    node_port: Option<PortRange>,
59    node_registry: NodeRegistryManager,
60    mut init_peers_config: InitialPeersConfig,
61    relay: bool,
62    rewards_address: RewardsAddress,
63    rpc_address: Option<Ipv4Addr>,
64    rpc_port: Option<PortRange>,
65    src_path: Option<PathBuf>,
66    no_upnp: bool,
67    url: Option<String>,
68    user: Option<String>,
69    version: Option<String>,
70    verbosity: VerbosityLevel,
71    write_older_cache_files: bool,
72) -> Result<Vec<String>> {
73    let user_mode = !is_running_as_root();
74
75    if verbosity != VerbosityLevel::Minimal {
76        print_banner("Add Antnode Services");
77        println!("{} service(s) to be added", count.unwrap_or(1));
78    }
79
80    let service_manager = ServiceController {};
81    let service_user = if user_mode {
82        None
83    } else {
84        let service_user = user.unwrap_or_else(|| "ant".to_string());
85        service_manager.create_service_user(&service_user)?;
86        Some(service_user)
87    };
88
89    let service_data_dir_path =
90        config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
91    let service_log_dir_path =
92        config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
93    let bootstrap_cache_dir = if let Some(user) = &service_user {
94        Some(config::get_bootstrap_cache_owner_path(user)?)
95    } else {
96        None
97    };
98
99    let release_repo = <dyn AntReleaseRepoActions>::default_config();
100
101    let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
102        let version = get_bin_version(&path)?;
103        (path, version)
104    } else {
105        download_and_extract_release(
106            ReleaseType::AntNode,
107            url.clone(),
108            version,
109            &*release_repo,
110            verbosity,
111            None,
112        )
113        .await?
114    };
115
116    debug!("Parsing peers from PeersArgs");
117
118    init_peers_config
119        .addrs
120        .extend(InitialPeersConfig::read_bootstrap_addr_from_env());
121    init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
122
123    let options = AddNodeServiceOptions {
124        alpha,
125        auto_restart,
126        auto_set_nat_flags,
127        count,
128        delete_antnode_src: src_path.is_none(),
129        enable_metrics_server,
130        evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
131        env_variables,
132        relay,
133        log_format,
134        max_archived_log_files,
135        max_log_files,
136        metrics_port,
137        network_id,
138        node_ip,
139        node_port,
140        init_peers_config,
141        rewards_address,
142        rpc_address,
143        rpc_port,
144        antnode_src_path,
145        antnode_dir_path: service_data_dir_path.clone(),
146        service_data_dir_path,
147        service_log_dir_path,
148        no_upnp,
149        user: service_user,
150        user_mode,
151        version,
152        write_older_cache_files,
153    };
154    info!("Adding node service(s)");
155    let added_services_names =
156        add_node(options, node_registry.clone(), &service_manager, verbosity).await?;
157
158    node_registry.save().await?;
159    debug!("Node registry saved");
160
161    Ok(added_services_names)
162}
163
164pub async fn balance(
165    peer_ids: Vec<String>,
166    node_registry: NodeRegistryManager,
167    service_names: Vec<String>,
168    verbosity: VerbosityLevel,
169) -> Result<()> {
170    if verbosity != VerbosityLevel::Minimal {
171        print_banner("Reward Balances");
172    }
173
174    refresh_node_registry(
175        node_registry.clone(),
176        &ServiceController {},
177        verbosity != VerbosityLevel::Minimal,
178        false,
179        verbosity,
180    )
181    .await?;
182
183    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
184    if services_for_ops.is_empty() {
185        info!("Services for ops is empty, cannot obtain the balance");
186        // This could be the case if all services are at `Removed` status.
187        println!("No balances to display");
188        return Ok(());
189    }
190    debug!("Obtaining balances for {} services", services_for_ops.len());
191
192    for node in services_for_ops {
193        let node = node.read().await;
194        // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
195        println!("{}: {}", node.service_name, 0,);
196    }
197    Ok(())
198}
199
200pub async fn remove(
201    keep_directories: bool,
202    peer_ids: Vec<String>,
203    node_registry: NodeRegistryManager,
204    service_names: Vec<String>,
205    verbosity: VerbosityLevel,
206) -> Result<()> {
207    if verbosity != VerbosityLevel::Minimal {
208        print_banner("Remove Antnode Services");
209    }
210    info!("Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}");
211
212    refresh_node_registry(
213        node_registry.clone(),
214        &ServiceController {},
215        verbosity != VerbosityLevel::Minimal,
216        false,
217        verbosity,
218    )
219    .await?;
220
221    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
222    if services_for_ops.is_empty() {
223        info!("Services for ops is empty, no services were eligible for removal");
224        // This could be the case if all services are at `Removed` status.
225        if verbosity != VerbosityLevel::Minimal {
226            println!("No services were eligible for removal");
227        }
228        return Ok(());
229    }
230
231    let mut failed_services = Vec::new();
232    for node in &services_for_ops {
233        let service_name = node.read().await.service_name.clone();
234        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
235        let service = NodeService::new(node.clone(), Box::new(rpc_client));
236        let mut service_manager =
237            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
238        match service_manager.remove(keep_directories).await {
239            Ok(()) => {
240                debug!("Removed service {service_name}");
241                node_registry.save().await?;
242            }
243            Err(err) => {
244                error!("Failed to remove service {service_name}: {err}");
245                failed_services.push((service_name.clone(), err.to_string()))
246            }
247        }
248    }
249
250    summarise_any_failed_ops(failed_services, "remove", verbosity)
251}
252
253pub async fn reset(
254    force: bool,
255    node_registry: NodeRegistryManager,
256    verbosity: VerbosityLevel,
257) -> Result<()> {
258    if verbosity != VerbosityLevel::Minimal {
259        print_banner("Reset Antnode Services");
260    }
261    info!("Resetting all antnode services, with force={force}");
262
263    if !force {
264        println!("WARNING: all antnode services, data, and logs will be removed.");
265        println!("Do you wish to proceed? [y/n]");
266        std::io::stdout().flush()?;
267        let mut input = String::new();
268        std::io::stdin().read_line(&mut input)?;
269        if input.trim().to_lowercase() != "y" {
270            println!("Reset aborted");
271            return Ok(());
272        }
273    }
274
275    stop(None, node_registry.clone(), vec![], vec![], verbosity).await?;
276    remove(false, vec![], node_registry, vec![], verbosity).await?;
277
278    // Due the possibility of repeated runs of the `reset` command, we need to check for the
279    // existence of this file before attempting to delete it, since `remove_file` will return an
280    // error if the file doesn't exist. On Windows this has been observed to happen.
281    let node_registry_path = config::get_node_registry_path()?;
282    if node_registry_path.exists() {
283        info!("Removing node registry file: {node_registry_path:?}");
284        std::fs::remove_file(node_registry_path)?;
285    }
286
287    Ok(())
288}
289
290pub async fn start(
291    connection_timeout_s: u64,
292    fixed_interval: Option<u64>,
293    node_registry: NodeRegistryManager,
294    peer_ids: Vec<String>,
295    service_names: Vec<String>,
296    verbosity: VerbosityLevel,
297) -> Result<()> {
298    if verbosity != VerbosityLevel::Minimal {
299        print_banner("Start Antnode Services");
300    }
301    info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
302
303    refresh_node_registry(
304        node_registry.clone(),
305        &ServiceController {},
306        verbosity != VerbosityLevel::Minimal,
307        false,
308        verbosity,
309    )
310    .await?;
311
312    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
313    if services_for_ops.is_empty() {
314        info!("No services are eligible to be started");
315        // This could be the case if all services are at `Removed` status.
316        if verbosity != VerbosityLevel::Minimal {
317            println!("No services were eligible to be started");
318        }
319        return Ok(());
320    }
321
322    let mut failed_services = Vec::new();
323    for node in &services_for_ops {
324        let service_name = node.read().await.service_name.clone();
325
326        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
327        let service = NodeService::new(node.clone(), Box::new(rpc_client));
328
329        // set dynamic startup delay if fixed_interval is not set
330        let service = if fixed_interval.is_none() {
331            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
332        } else {
333            service
334        };
335
336        let mut service_manager =
337            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
338        if service_manager.service.status().await != ServiceStatus::Running {
339            // It would be possible here to check if the service *is* running and then just
340            // continue without applying the delay. The reason for not doing so is because when
341            // `start` is called below, the user will get a message to say the service was already
342            // started, which I think is useful behaviour to retain.
343            if let Some(interval) = fixed_interval {
344                debug!("Sleeping for {} milliseconds", interval);
345                std::thread::sleep(std::time::Duration::from_millis(interval));
346            }
347        }
348        match service_manager.start().await {
349            Ok(start_duration) => {
350                debug!("Started service {service_name} in {start_duration:?}",);
351
352                node_registry.save().await?;
353            }
354            Err(err) => {
355                error!("Failed to start service {service_name}: {err}");
356                failed_services.push((service_name.clone(), err.to_string()))
357            }
358        }
359    }
360
361    summarise_any_failed_ops(failed_services, "start", verbosity)
362}
363
364pub async fn status(
365    details: bool,
366    fail: bool,
367    json: bool,
368    node_registry: NodeRegistryManager,
369) -> Result<()> {
370    if !node_registry.nodes.read().await.is_empty() {
371        if !json && !details {
372            print_banner("Antnode Services");
373        }
374        status_report(
375            &node_registry,
376            &ServiceController {},
377            details,
378            json,
379            fail,
380            false,
381        )
382        .await?;
383        node_registry.save().await?;
384    }
385    Ok(())
386}
387
388pub async fn stop(
389    interval: Option<u64>,
390    node_registry: NodeRegistryManager,
391    peer_ids: Vec<String>,
392    service_names: Vec<String>,
393    verbosity: VerbosityLevel,
394) -> Result<()> {
395    if verbosity != VerbosityLevel::Minimal {
396        print_banner("Stop Antnode Services");
397    }
398    info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
399
400    refresh_node_registry(
401        node_registry.clone(),
402        &ServiceController {},
403        verbosity != VerbosityLevel::Minimal,
404        false,
405        verbosity,
406    )
407    .await?;
408
409    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
410    if services_for_ops.is_empty() {
411        info!("No services are eligible to be stopped");
412        // This could be the case if all services are at `Removed` status.
413        if verbosity != VerbosityLevel::Minimal {
414            println!("No services were eligible to be stopped");
415        }
416        return Ok(());
417    }
418
419    let mut failed_services = Vec::new();
420    for node in services_for_ops.iter() {
421        let service_name = node.read().await.service_name.clone();
422        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
423        let service = NodeService::new(node.clone(), Box::new(rpc_client));
424        let mut service_manager =
425            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
426
427        if service_manager.service.status().await == ServiceStatus::Running {
428            if let Some(interval) = interval {
429                debug!("Sleeping for {} milliseconds", interval);
430                std::thread::sleep(std::time::Duration::from_millis(interval));
431            }
432        }
433        match service_manager.stop().await {
434            Ok(()) => {
435                debug!("Stopped service {service_name}");
436                node_registry.save().await?;
437            }
438            Err(err) => {
439                error!("Failed to stop service {service_name}: {err}");
440                failed_services.push((service_name.clone(), err.to_string()))
441            }
442        }
443    }
444
445    summarise_any_failed_ops(failed_services, "stop", verbosity)
446}
447
448pub async fn upgrade(
449    connection_timeout_s: u64,
450    do_not_start: bool,
451    custom_bin_path: Option<PathBuf>,
452    force: bool,
453    fixed_interval: Option<u64>,
454    node_registry: NodeRegistryManager,
455    peer_ids: Vec<String>,
456    provided_env_variables: Option<Vec<(String, String)>>,
457    service_names: Vec<String>,
458    url: Option<String>,
459    version: Option<String>,
460    verbosity: VerbosityLevel,
461) -> Result<()> {
462    // In the case of a custom binary, we want to force the use of it. Regardless of its version
463    // number, the user has probably built it for some special case. They may have not used the
464    // `--force` flag; if they didn't, we can just do that for them here.
465    let use_force = force || custom_bin_path.is_some();
466
467    if verbosity != VerbosityLevel::Minimal {
468        print_banner("Upgrade Antnode Services");
469    }
470    info!(
471        "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
472    );
473
474    let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
475        custom_bin_path.clone(),
476        ReleaseType::AntNode,
477        url,
478        version,
479        verbosity,
480    )
481    .await?;
482
483    refresh_node_registry(
484        node_registry.clone(),
485        &ServiceController {},
486        verbosity != VerbosityLevel::Minimal,
487        false,
488        verbosity,
489    )
490    .await?;
491
492    if let Some(node) = node_registry.nodes.read().await.first() {
493        let node = node.read().await;
494        debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
495    } else {
496        debug!("There are no nodes currently added or active");
497    }
498
499    if !use_force {
500        let mut node_versions = Vec::new();
501
502        for node in node_registry.nodes.read().await.iter() {
503            let node = node.read().await;
504            let version = Version::parse(&node.version)
505                .map_err(|_| eyre!("Failed to parse Version for node {}", node.service_name))?;
506            node_versions.push(version);
507        }
508
509        let any_nodes_need_upgraded = node_versions
510            .iter()
511            .any(|current_version| current_version < &target_version);
512        if !any_nodes_need_upgraded {
513            info!("All nodes are at the latest version, no upgrade required.");
514            if verbosity != VerbosityLevel::Minimal {
515                println!("{} All nodes are at the latest version", "✓".green());
516            }
517            return Ok(());
518        }
519    }
520
521    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
522    trace!("services_for_ops len: {}", services_for_ops.len());
523    let mut upgrade_summary = Vec::new();
524
525    for node in &services_for_ops {
526        let env_variables = if provided_env_variables.is_some() {
527            provided_env_variables.clone()
528        } else {
529            node_registry.environment_variables.read().await.clone()
530        };
531        let options = UpgradeOptions {
532            auto_restart: false,
533            env_variables: env_variables.clone(),
534            force: use_force,
535            start_service: !do_not_start,
536            target_bin_path: upgrade_bin_path.clone(),
537            target_version: target_version.clone(),
538        };
539        let service_name = node.read().await.service_name.clone();
540
541        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
542        let service = NodeService::new(node.clone(), Box::new(rpc_client));
543        // set dynamic startup delay if fixed_interval is not set
544        let service = if fixed_interval.is_none() {
545            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
546        } else {
547            service
548        };
549
550        let mut service_manager =
551            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
552
553        match service_manager.upgrade(options).await {
554            Ok(upgrade_result) => {
555                info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
556                if upgrade_result != UpgradeResult::NotRequired {
557                    // It doesn't seem useful to apply the interval if there was no upgrade
558                    // required for the previous service.
559                    if let Some(interval) = fixed_interval {
560                        debug!("Sleeping for {interval} milliseconds",);
561                        std::thread::sleep(std::time::Duration::from_millis(interval));
562                    }
563                }
564                upgrade_summary.push((service_name.clone(), upgrade_result));
565                node_registry.save().await?;
566            }
567            Err(err) => {
568                error!("Error upgrading service {service_name}: {err}");
569                upgrade_summary.push((
570                    service_name.clone(),
571                    UpgradeResult::Error(format!("Error: {err}")),
572                ));
573                node_registry.save().await?;
574            }
575        }
576    }
577
578    if verbosity != VerbosityLevel::Minimal {
579        print_upgrade_summary(upgrade_summary.clone());
580    }
581
582    if upgrade_summary.iter().any(|(_, r)| {
583        matches!(r, UpgradeResult::Error(_))
584            || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
585    }) {
586        return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
587            "For any services that were upgraded but did not start, you can attempt to start them \
588                again using the 'start' command."));
589    }
590
591    Ok(())
592}
593
594/// Ensure n nodes are running by stopping nodes or by adding and starting nodes if required.
595///
596/// The arguments here are mostly mirror those used in `add`.
597pub async fn maintain_n_running_nodes(
598    alpha: bool,
599    auto_restart: bool,
600    auto_set_nat_flags: bool,
601    connection_timeout_s: u64,
602    max_nodes_to_run: u16,
603    data_dir_path: Option<PathBuf>,
604    enable_metrics_server: bool,
605    env_variables: Option<Vec<(String, String)>>,
606    evm_network: Option<EvmNetwork>,
607    log_dir_path: Option<PathBuf>,
608    log_format: Option<LogFormat>,
609    max_archived_log_files: Option<usize>,
610    max_log_files: Option<usize>,
611    metrics_port: Option<PortRange>,
612    network_id: Option<u8>,
613    node_ip: Option<Ipv4Addr>,
614    node_port: Option<PortRange>,
615    node_registry: NodeRegistryManager,
616    peers_args: InitialPeersConfig,
617    relay: bool,
618    rewards_address: RewardsAddress,
619    rpc_address: Option<Ipv4Addr>,
620    rpc_port: Option<PortRange>,
621    src_path: Option<PathBuf>,
622    url: Option<String>,
623    no_upnp: bool,
624    user: Option<String>,
625    version: Option<String>,
626    verbosity: VerbosityLevel,
627    start_node_interval: Option<u64>,
628    write_older_cache_files: bool,
629) -> Result<()> {
630    let mut running_nodes = Vec::new();
631
632    for node in node_registry.nodes.read().await.iter() {
633        let node = node.read().await;
634        if node.status == ServiceStatus::Running {
635            running_nodes.push(node.service_name.clone());
636        }
637    }
638
639    let running_count = running_nodes.len();
640    let target_count = max_nodes_to_run as usize;
641
642    info!(
643        "Current running nodes: {}, Target: {}",
644        running_count, target_count
645    );
646
647    match running_count.cmp(&target_count) {
648        Ordering::Greater => {
649            let to_stop_count = running_count - target_count;
650            let services_to_stop = running_nodes
651                .into_iter()
652                .rev() // Stop the oldest nodes first
653                .take(to_stop_count)
654                .collect::<Vec<_>>();
655
656            info!(
657                "Stopping {} excess nodes: {:?}",
658                to_stop_count, services_to_stop
659            );
660            stop(
661                None,
662                node_registry.clone(),
663                vec![],
664                services_to_stop,
665                verbosity,
666            )
667            .await?;
668        }
669        Ordering::Less => {
670            let to_start_count = target_count - running_count;
671            let mut inactive_nodes = Vec::new();
672            for node in node_registry.nodes.read().await.iter() {
673                let node = node.read().await;
674                if node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added {
675                    inactive_nodes.push(node.service_name.clone());
676                }
677            }
678
679            info!("Inactive nodes available: {}", inactive_nodes.len());
680
681            if to_start_count <= inactive_nodes.len() {
682                let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
683                info!(
684                    "Starting {} existing inactive nodes: {:?}",
685                    to_start_count, nodes_to_start
686                );
687                start(
688                    connection_timeout_s,
689                    start_node_interval,
690                    node_registry.clone(),
691                    vec![],
692                    nodes_to_start,
693                    verbosity,
694                )
695                .await?;
696            } else {
697                let to_add_count = to_start_count - inactive_nodes.len();
698                info!(
699                    "Adding {} new nodes and starting all {} inactive nodes",
700                    to_add_count,
701                    inactive_nodes.len()
702                );
703
704                let ports_to_use = match node_port {
705                    Some(PortRange::Single(port)) => vec![port],
706                    Some(PortRange::Range(start, end)) => {
707                        (start..=end).take(to_add_count).collect()
708                    }
709                    None => vec![],
710                };
711
712                for (i, port) in ports_to_use.into_iter().enumerate() {
713                    let added_service = add(
714                        alpha,
715                        auto_restart,
716                        auto_set_nat_flags,
717                        Some(1),
718                        data_dir_path.clone(),
719                        enable_metrics_server,
720                        env_variables.clone(),
721                        evm_network.clone(),
722                        log_dir_path.clone(),
723                        log_format,
724                        max_archived_log_files,
725                        max_log_files,
726                        metrics_port.clone(),
727                        network_id,
728                        node_ip,
729                        Some(PortRange::Single(port)),
730                        node_registry.clone(),
731                        peers_args.clone(),
732                        relay,
733                        rewards_address,
734                        rpc_address,
735                        rpc_port.clone(),
736                        src_path.clone(),
737                        no_upnp,
738                        url.clone(),
739                        user.clone(),
740                        version.clone(),
741                        verbosity,
742                        write_older_cache_files,
743                    )
744                    .await?;
745
746                    if i == 0 {
747                        start(
748                            connection_timeout_s,
749                            start_node_interval,
750                            node_registry.clone(),
751                            vec![],
752                            added_service,
753                            verbosity,
754                        )
755                        .await?;
756                    }
757                }
758
759                if !inactive_nodes.is_empty() {
760                    start(
761                        connection_timeout_s,
762                        start_node_interval,
763                        node_registry.clone(),
764                        vec![],
765                        inactive_nodes,
766                        verbosity,
767                    )
768                    .await?;
769                }
770            }
771        }
772        Ordering::Equal => {
773            info!(
774                "Current node count ({}) matches target ({}). No action needed.",
775                running_count, target_count
776            );
777        }
778    }
779
780    // Verify final state
781    let mut final_running_count = 0;
782    for node in node_registry.nodes.read().await.iter() {
783        let node_read = node.read().await;
784        if node_read.status == ServiceStatus::Running {
785            final_running_count += 1;
786        }
787    }
788
789    if final_running_count != target_count {
790        warn!("Failed to reach target node count. Expected {target_count}, but got {final_running_count}");
791    }
792
793    Ok(())
794}
795
796async fn get_services_for_ops(
797    node_registry: &NodeRegistryManager,
798    peer_ids: Vec<String>,
799    service_names: Vec<String>,
800) -> Result<Vec<Arc<RwLock<NodeServiceData>>>> {
801    let mut services = Vec::new();
802
803    if service_names.is_empty() && peer_ids.is_empty() {
804        for node in node_registry.nodes.read().await.iter() {
805            if node.read().await.status != ServiceStatus::Removed {
806                services.push(node.clone());
807            }
808        }
809    } else {
810        for name in &service_names {
811            let mut found_service_with_name = false;
812            for node in node_registry.nodes.read().await.iter() {
813                let node_read = node.read().await;
814                if node_read.service_name == *name && node_read.status != ServiceStatus::Removed {
815                    {
816                        services.push(node.clone());
817                        found_service_with_name = true;
818                        break;
819                    }
820                }
821            }
822
823            if !found_service_with_name {
824                error!("No service named '{name}'");
825                return Err(eyre!(format!("No service named '{name}'")));
826            }
827        }
828
829        for peer_id_str in &peer_ids {
830            let mut found_service_with_peer_id = false;
831            let given_peer_id = PeerId::from_str(peer_id_str)
832                .map_err(|_| eyre!(format!("Error parsing PeerId: '{peer_id_str}'")))?;
833            for node in node_registry.nodes.read().await.iter() {
834                let node_read = node.read().await;
835                if let Some(peer_id) = node_read.peer_id {
836                    if peer_id == given_peer_id && node_read.status != ServiceStatus::Removed {
837                        services.push(node.clone());
838                        found_service_with_peer_id = true;
839                        break;
840                    }
841                }
842            }
843            if !found_service_with_peer_id {
844                error!("Could not find node with peer id: '{given_peer_id:?}'");
845                return Err(eyre!(format!(
846                    "Could not find node with peer ID '{given_peer_id}'",
847                )));
848            }
849        }
850    }
851
852    Ok(services)
853}
854
855fn summarise_any_failed_ops(
856    failed_services: Vec<(String, String)>,
857    verb: &str,
858    verbosity: VerbosityLevel,
859) -> Result<()> {
860    if !failed_services.is_empty() {
861        if verbosity != VerbosityLevel::Minimal {
862            println!("Failed to {verb} {} service(s):", failed_services.len());
863            for failed in failed_services.iter() {
864                println!("{} {}: {}", "✕".red(), failed.0, failed.1);
865            }
866        }
867
868        error!("Failed to {verb} one or more services");
869        return Err(eyre!("Failed to {verb} one or more services"));
870    }
871    Ok(())
872}