ant_node_manager/cmd/
node.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13    add_services::{
14        add_node,
15        config::{AddNodeServiceOptions, PortRange},
16    },
17    config::{self, is_running_as_root},
18    helpers::{download_and_extract_release, get_bin_version},
19    print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel,
20};
21use ant_bootstrap::InitialPeersConfig;
22use ant_evm::{EvmNetwork, RewardsAddress};
23use ant_logging::LogFormat;
24use ant_releases::{AntReleaseRepoActions, ReleaseType};
25use ant_service_management::{
26    control::{ServiceControl, ServiceController},
27    rpc::RpcClient,
28    NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult,
29};
30use color_eyre::{eyre::eyre, Help, Result};
31use colored::Colorize;
32use libp2p_identity::PeerId;
33use semver::Version;
34use std::{cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, time::Duration};
35use tracing::debug;
36
37/// Returns the added service names
38pub async fn add(
39    auto_restart: bool,
40    auto_set_nat_flags: bool,
41    count: Option<u16>,
42    data_dir_path: Option<PathBuf>,
43    enable_metrics_server: bool,
44    env_variables: Option<Vec<(String, String)>>,
45    evm_network: Option<EvmNetwork>,
46    relay: bool,
47    log_dir_path: Option<PathBuf>,
48    log_format: Option<LogFormat>,
49    max_archived_log_files: Option<usize>,
50    max_log_files: Option<usize>,
51    metrics_port: Option<PortRange>,
52    network_id: Option<u8>,
53    node_ip: Option<Ipv4Addr>,
54    node_port: Option<PortRange>,
55    mut init_peers_config: InitialPeersConfig,
56    rewards_address: RewardsAddress,
57    rpc_address: Option<Ipv4Addr>,
58    rpc_port: Option<PortRange>,
59    src_path: Option<PathBuf>,
60    no_upnp: bool,
61    url: Option<String>,
62    user: Option<String>,
63    version: Option<String>,
64    verbosity: VerbosityLevel,
65) -> Result<Vec<String>> {
66    let user_mode = !is_running_as_root();
67
68    if verbosity != VerbosityLevel::Minimal {
69        print_banner("Add Antnode Services");
70        println!("{} service(s) to be added", count.unwrap_or(1));
71    }
72
73    let service_manager = ServiceController {};
74    let service_user = if user_mode {
75        None
76    } else {
77        let service_user = user.unwrap_or_else(|| "ant".to_string());
78        service_manager.create_service_user(&service_user)?;
79        Some(service_user)
80    };
81
82    let service_data_dir_path =
83        config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
84    let service_log_dir_path =
85        config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
86    let bootstrap_cache_dir = if let Some(user) = &service_user {
87        Some(config::get_bootstrap_cache_owner_path(user)?)
88    } else {
89        None
90    };
91
92    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
93    let release_repo = <dyn AntReleaseRepoActions>::default_config();
94
95    let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
96        let version = get_bin_version(&path)?;
97        (path, version)
98    } else {
99        download_and_extract_release(
100            ReleaseType::AntNode,
101            url.clone(),
102            version,
103            &*release_repo,
104            verbosity,
105            None,
106        )
107        .await?
108    };
109
110    debug!("Parsing peers from PeersArgs");
111
112    init_peers_config
113        .addrs
114        .extend(InitialPeersConfig::read_addr_from_env());
115    init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
116
117    let options = AddNodeServiceOptions {
118        auto_restart,
119        auto_set_nat_flags,
120        count,
121        delete_antnode_src: src_path.is_none(),
122        enable_metrics_server,
123        evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
124        env_variables,
125        relay,
126        log_format,
127        max_archived_log_files,
128        max_log_files,
129        metrics_port,
130        network_id,
131        node_ip,
132        node_port,
133        init_peers_config,
134        rewards_address,
135        rpc_address,
136        rpc_port,
137        antnode_src_path,
138        antnode_dir_path: service_data_dir_path.clone(),
139        service_data_dir_path,
140        service_log_dir_path,
141        no_upnp,
142        user: service_user,
143        user_mode,
144        version,
145    };
146    info!("Adding node service(s)");
147    let added_services_names =
148        add_node(options, &mut node_registry, &service_manager, verbosity).await?;
149
150    node_registry.save()?;
151    debug!("Node registry saved");
152
153    Ok(added_services_names)
154}
155
156pub async fn balance(
157    peer_ids: Vec<String>,
158    service_names: Vec<String>,
159    verbosity: VerbosityLevel,
160) -> Result<()> {
161    if verbosity != VerbosityLevel::Minimal {
162        print_banner("Reward Balances");
163    }
164
165    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
166    refresh_node_registry(
167        &mut node_registry,
168        &ServiceController {},
169        verbosity != VerbosityLevel::Minimal,
170        false,
171        false,
172    )
173    .await?;
174
175    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
176    if service_indices.is_empty() {
177        info!("Service indices is empty, cannot obtain the balance");
178        // This could be the case if all services are at `Removed` status.
179        println!("No balances to display");
180        return Ok(());
181    }
182    debug!("Obtaining balances for {} services", service_indices.len());
183
184    for &index in &service_indices {
185        let node = &mut node_registry.nodes[index];
186        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
187        let service = NodeService::new(node, Box::new(rpc_client));
188        // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
189        println!("{}: {}", service.service_data.service_name, 0,);
190    }
191    Ok(())
192}
193
194pub async fn remove(
195    keep_directories: bool,
196    peer_ids: Vec<String>,
197    service_names: Vec<String>,
198    verbosity: VerbosityLevel,
199) -> Result<()> {
200    if verbosity != VerbosityLevel::Minimal {
201        print_banner("Remove Antnode Services");
202    }
203    info!("Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}");
204
205    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
206    refresh_node_registry(
207        &mut node_registry,
208        &ServiceController {},
209        verbosity != VerbosityLevel::Minimal,
210        false,
211        false,
212    )
213    .await?;
214
215    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
216    if service_indices.is_empty() {
217        info!("Service indices is empty, no services were eligible for removal");
218        // This could be the case if all services are at `Removed` status.
219        if verbosity != VerbosityLevel::Minimal {
220            println!("No services were eligible for removal");
221        }
222        return Ok(());
223    }
224
225    let mut failed_services = Vec::new();
226    for &index in &service_indices {
227        let node = &mut node_registry.nodes[index];
228        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
229        let service = NodeService::new(node, Box::new(rpc_client));
230        let mut service_manager =
231            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
232        match service_manager.remove(keep_directories).await {
233            Ok(()) => {
234                debug!("Removed service {}", node.service_name);
235                node_registry.save()?;
236            }
237            Err(err) => {
238                error!("Failed to remove service {}: {err}", node.service_name);
239                failed_services.push((node.service_name.clone(), err.to_string()))
240            }
241        }
242    }
243
244    summarise_any_failed_ops(failed_services, "remove", verbosity)
245}
246
247pub async fn reset(force: bool, verbosity: VerbosityLevel) -> Result<()> {
248    if verbosity != VerbosityLevel::Minimal {
249        print_banner("Reset Antnode Services");
250    }
251    info!("Resetting all antnode services, with force={force}");
252
253    if !force {
254        println!("WARNING: all antnode services, data, and logs will be removed.");
255        println!("Do you wish to proceed? [y/n]");
256        std::io::stdout().flush()?;
257        let mut input = String::new();
258        std::io::stdin().read_line(&mut input)?;
259        if input.trim().to_lowercase() != "y" {
260            println!("Reset aborted");
261            return Ok(());
262        }
263    }
264
265    stop(None, vec![], vec![], verbosity).await?;
266    remove(false, vec![], vec![], verbosity).await?;
267
268    // Due the possibility of repeated runs of the `reset` command, we need to check for the
269    // existence of this file before attempting to delete it, since `remove_file` will return an
270    // error if the file doesn't exist. On Windows this has been observed to happen.
271    let node_registry_path = config::get_node_registry_path()?;
272    if node_registry_path.exists() {
273        info!("Removing node registry file: {node_registry_path:?}");
274        std::fs::remove_file(node_registry_path)?;
275    }
276
277    Ok(())
278}
279
280pub async fn start(
281    connection_timeout_s: u64,
282    fixed_interval: Option<u64>,
283    peer_ids: Vec<String>,
284    service_names: Vec<String>,
285    verbosity: VerbosityLevel,
286) -> Result<()> {
287    if verbosity != VerbosityLevel::Minimal {
288        print_banner("Start Antnode Services");
289    }
290    info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
291
292    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
293    refresh_node_registry(
294        &mut node_registry,
295        &ServiceController {},
296        verbosity != VerbosityLevel::Minimal,
297        false,
298        false,
299    )
300    .await?;
301
302    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
303    if service_indices.is_empty() {
304        info!("No services are eligible to be started");
305        // This could be the case if all services are at `Removed` status.
306        if verbosity != VerbosityLevel::Minimal {
307            println!("No services were eligible to be started");
308        }
309        return Ok(());
310    }
311
312    let mut failed_services = Vec::new();
313    for &index in &service_indices {
314        let node = &mut node_registry.nodes[index];
315        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
316
317        let service = NodeService::new(node, Box::new(rpc_client));
318
319        // set dynamic startup delay if fixed_interval is not set
320        let service = if fixed_interval.is_none() {
321            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
322        } else {
323            service
324        };
325
326        let mut service_manager =
327            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
328        if service_manager.service.status() != ServiceStatus::Running {
329            // It would be possible here to check if the service *is* running and then just
330            // continue without applying the delay. The reason for not doing so is because when
331            // `start` is called below, the user will get a message to say the service was already
332            // started, which I think is useful behaviour to retain.
333            if let Some(interval) = fixed_interval {
334                debug!("Sleeping for {} milliseconds", interval);
335                std::thread::sleep(std::time::Duration::from_millis(interval));
336            }
337        }
338        match service_manager.start().await {
339            Ok(start_duration) => {
340                debug!(
341                    "Started service {} in {start_duration:?}",
342                    node.service_name
343                );
344
345                node_registry.save()?;
346            }
347            Err(err) => {
348                error!("Failed to start service {}: {err}", node.service_name);
349                failed_services.push((node.service_name.clone(), err.to_string()))
350            }
351        }
352    }
353
354    summarise_any_failed_ops(failed_services, "start", verbosity)
355}
356
357pub async fn status(details: bool, fail: bool, json: bool) -> Result<()> {
358    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
359    if !node_registry.nodes.is_empty() {
360        if !json && !details {
361            print_banner("Antnode Services");
362        }
363        status_report(
364            &mut node_registry,
365            &ServiceController {},
366            details,
367            json,
368            fail,
369            false,
370        )
371        .await?;
372        node_registry.save()?;
373    }
374    Ok(())
375}
376
377pub async fn stop(
378    interval: Option<u64>,
379    peer_ids: Vec<String>,
380    service_names: Vec<String>,
381    verbosity: VerbosityLevel,
382) -> Result<()> {
383    if verbosity != VerbosityLevel::Minimal {
384        print_banner("Stop Antnode Services");
385    }
386    info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
387
388    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
389    refresh_node_registry(
390        &mut node_registry,
391        &ServiceController {},
392        verbosity != VerbosityLevel::Minimal,
393        false,
394        false,
395    )
396    .await?;
397
398    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
399    if service_indices.is_empty() {
400        info!("Service indices is empty, no services were eligible to be stopped");
401        // This could be the case if all services are at `Removed` status.
402        if verbosity != VerbosityLevel::Minimal {
403            println!("No services were eligible to be stopped");
404        }
405        return Ok(());
406    }
407
408    let mut failed_services = Vec::new();
409    for &index in &service_indices {
410        let node = &mut node_registry.nodes[index];
411        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
412        let service = NodeService::new(node, Box::new(rpc_client));
413        let mut service_manager =
414            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
415
416        if service_manager.service.status() == ServiceStatus::Running {
417            if let Some(interval) = interval {
418                debug!("Sleeping for {} milliseconds", interval);
419                std::thread::sleep(std::time::Duration::from_millis(interval));
420            }
421        }
422        match service_manager.stop().await {
423            Ok(()) => {
424                debug!("Stopped service {}", node.service_name);
425                node_registry.save()?;
426            }
427            Err(err) => {
428                error!("Failed to stop service {}: {err}", node.service_name);
429                failed_services.push((node.service_name.clone(), err.to_string()))
430            }
431        }
432    }
433
434    summarise_any_failed_ops(failed_services, "stop", verbosity)
435}
436
437pub async fn upgrade(
438    connection_timeout_s: u64,
439    do_not_start: bool,
440    custom_bin_path: Option<PathBuf>,
441    force: bool,
442    fixed_interval: Option<u64>,
443    peer_ids: Vec<String>,
444    provided_env_variables: Option<Vec<(String, String)>>,
445    service_names: Vec<String>,
446    url: Option<String>,
447    version: Option<String>,
448    verbosity: VerbosityLevel,
449) -> Result<()> {
450    // In the case of a custom binary, we want to force the use of it. Regardless of its version
451    // number, the user has probably built it for some special case. They may have not used the
452    // `--force` flag; if they didn't, we can just do that for them here.
453    let use_force = force || custom_bin_path.is_some();
454
455    if verbosity != VerbosityLevel::Minimal {
456        print_banner("Upgrade Antnode Services");
457    }
458    info!(
459        "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
460    );
461
462    let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
463        custom_bin_path.clone(),
464        ReleaseType::AntNode,
465        url,
466        version,
467        verbosity,
468    )
469    .await?;
470
471    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
472    refresh_node_registry(
473        &mut node_registry,
474        &ServiceController {},
475        verbosity != VerbosityLevel::Minimal,
476        false,
477        false,
478    )
479    .await?;
480
481    if let Some(node) = node_registry.nodes.first() {
482        debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
483    } else {
484        debug!("There are no nodes currently added or active");
485    }
486
487    if !use_force {
488        let node_versions = node_registry
489            .nodes
490            .iter()
491            .map(|n| Version::parse(&n.version).map_err(|_| eyre!("Failed to parse Version")))
492            .collect::<Result<Vec<Version>>>()?;
493        let any_nodes_need_upgraded = node_versions
494            .iter()
495            .any(|current_version| current_version < &target_version);
496        if !any_nodes_need_upgraded {
497            info!("All nodes are at the latest version, no upgrade required.");
498            if verbosity != VerbosityLevel::Minimal {
499                println!("{} All nodes are at the latest version", "✓".green());
500            }
501            return Ok(());
502        }
503    }
504
505    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
506    trace!("service_indices len: {}", service_indices.len());
507    let mut upgrade_summary = Vec::new();
508
509    for &index in &service_indices {
510        let node = &mut node_registry.nodes[index];
511        let env_variables = if provided_env_variables.is_some() {
512            &provided_env_variables
513        } else {
514            &node_registry.environment_variables
515        };
516        let options = UpgradeOptions {
517            auto_restart: false,
518            env_variables: env_variables.clone(),
519            force: use_force,
520            start_service: !do_not_start,
521            target_bin_path: upgrade_bin_path.clone(),
522            target_version: target_version.clone(),
523        };
524        let service_name = node.service_name.clone();
525
526        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
527        let service = NodeService::new(node, Box::new(rpc_client));
528        // set dynamic startup delay if fixed_interval is not set
529        let service = if fixed_interval.is_none() {
530            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
531        } else {
532            service
533        };
534
535        let mut service_manager =
536            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
537
538        match service_manager.upgrade(options).await {
539            Ok(upgrade_result) => {
540                info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
541                if upgrade_result != UpgradeResult::NotRequired {
542                    // It doesn't seem useful to apply the interval if there was no upgrade
543                    // required for the previous service.
544                    if let Some(interval) = fixed_interval {
545                        debug!("Sleeping for {interval} milliseconds",);
546                        std::thread::sleep(std::time::Duration::from_millis(interval));
547                    }
548                }
549                upgrade_summary.push((
550                    service_manager.service.service_data.service_name.clone(),
551                    upgrade_result,
552                ));
553                node_registry.save()?;
554            }
555            Err(err) => {
556                error!("Error upgrading service {service_name}: {err}");
557                upgrade_summary.push((
558                    node.service_name.clone(),
559                    UpgradeResult::Error(format!("Error: {err}")),
560                ));
561                node_registry.save()?;
562            }
563        }
564    }
565
566    if verbosity != VerbosityLevel::Minimal {
567        print_upgrade_summary(upgrade_summary.clone());
568    }
569
570    if upgrade_summary.iter().any(|(_, r)| {
571        matches!(r, UpgradeResult::Error(_))
572            || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
573    }) {
574        return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
575            "For any services that were upgraded but did not start, you can attempt to start them \
576                again using the 'start' command."));
577    }
578
579    Ok(())
580}
581
582/// Ensure n nodes are running by stopping nodes or by adding and starting nodes if required.
583///
584/// The arguments here are mostly mirror those used in `add`.
585pub async fn maintain_n_running_nodes(
586    auto_restart: bool,
587    auto_set_nat_flags: bool,
588    connection_timeout_s: u64,
589    max_nodes_to_run: u16,
590    data_dir_path: Option<PathBuf>,
591    enable_metrics_server: bool,
592    env_variables: Option<Vec<(String, String)>>,
593    evm_network: Option<EvmNetwork>,
594    home_network: bool,
595    log_dir_path: Option<PathBuf>,
596    log_format: Option<LogFormat>,
597    max_archived_log_files: Option<usize>,
598    max_log_files: Option<usize>,
599    metrics_port: Option<PortRange>,
600    network_id: Option<u8>,
601    node_ip: Option<Ipv4Addr>,
602    node_port: Option<PortRange>,
603    peers_args: InitialPeersConfig,
604    rewards_address: RewardsAddress,
605    rpc_address: Option<Ipv4Addr>,
606    rpc_port: Option<PortRange>,
607    src_path: Option<PathBuf>,
608    url: Option<String>,
609    no_upnp: bool,
610    user: Option<String>,
611    version: Option<String>,
612    verbosity: VerbosityLevel,
613    start_node_interval: Option<u64>,
614) -> Result<()> {
615    let node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
616    let running_nodes = node_registry
617        .nodes
618        .iter()
619        .filter(|node| node.status == ServiceStatus::Running)
620        .map(|node| node.service_name.clone())
621        .collect::<Vec<_>>();
622
623    let running_count = running_nodes.len();
624    let target_count = max_nodes_to_run as usize;
625
626    info!(
627        "Current running nodes: {}, Target: {}",
628        running_count, target_count
629    );
630
631    match running_count.cmp(&target_count) {
632        Ordering::Greater => {
633            let to_stop_count = running_count - target_count;
634            let services_to_stop = running_nodes
635                .into_iter()
636                .rev() // Stop the oldest nodes first
637                .take(to_stop_count)
638                .collect::<Vec<_>>();
639
640            info!(
641                "Stopping {} excess nodes: {:?}",
642                to_stop_count, services_to_stop
643            );
644            stop(None, vec![], services_to_stop, verbosity).await?;
645        }
646        Ordering::Less => {
647            let to_start_count = target_count - running_count;
648            let inactive_nodes = node_registry
649                .nodes
650                .iter()
651                .filter(|node| {
652                    node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added
653                })
654                .map(|node| node.service_name.clone())
655                .collect::<Vec<_>>();
656
657            info!("Inactive nodes available: {}", inactive_nodes.len());
658
659            if to_start_count <= inactive_nodes.len() {
660                let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
661                info!(
662                    "Starting {} existing inactive nodes: {:?}",
663                    to_start_count, nodes_to_start
664                );
665                start(
666                    connection_timeout_s,
667                    start_node_interval,
668                    vec![],
669                    nodes_to_start,
670                    verbosity,
671                )
672                .await?;
673            } else {
674                let to_add_count = to_start_count - inactive_nodes.len();
675                info!(
676                    "Adding {} new nodes and starting all {} inactive nodes",
677                    to_add_count,
678                    inactive_nodes.len()
679                );
680
681                let ports_to_use = match node_port {
682                    Some(PortRange::Single(port)) => vec![port],
683                    Some(PortRange::Range(start, end)) => {
684                        (start..=end).take(to_add_count).collect()
685                    }
686                    None => vec![],
687                };
688
689                for (i, port) in ports_to_use.into_iter().enumerate() {
690                    let added_service = add(
691                        auto_restart,
692                        auto_set_nat_flags,
693                        Some(1),
694                        data_dir_path.clone(),
695                        enable_metrics_server,
696                        env_variables.clone(),
697                        evm_network.clone(),
698                        home_network,
699                        log_dir_path.clone(),
700                        log_format,
701                        max_archived_log_files,
702                        max_log_files,
703                        metrics_port.clone(),
704                        network_id,
705                        node_ip,
706                        Some(PortRange::Single(port)),
707                        peers_args.clone(),
708                        rewards_address,
709                        rpc_address,
710                        rpc_port.clone(),
711                        src_path.clone(),
712                        no_upnp,
713                        url.clone(),
714                        user.clone(),
715                        version.clone(),
716                        verbosity,
717                    )
718                    .await?;
719
720                    if i == 0 {
721                        start(
722                            connection_timeout_s,
723                            start_node_interval,
724                            vec![],
725                            added_service,
726                            verbosity,
727                        )
728                        .await?;
729                    }
730                }
731
732                if !inactive_nodes.is_empty() {
733                    start(
734                        connection_timeout_s,
735                        start_node_interval,
736                        vec![],
737                        inactive_nodes,
738                        verbosity,
739                    )
740                    .await?;
741                }
742            }
743        }
744        Ordering::Equal => {
745            info!(
746                "Current node count ({}) matches target ({}). No action needed.",
747                running_count, target_count
748            );
749        }
750    }
751
752    // Verify final state
753    let final_node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
754    let final_running_count = final_node_registry
755        .nodes
756        .iter()
757        .filter(|node| node.status == ServiceStatus::Running)
758        .count();
759
760    info!("Final running node count: {}", final_running_count);
761    if final_running_count != target_count {
762        warn!(
763            "Failed to reach target node count. Expected {}, but got {}",
764            target_count, final_running_count
765        );
766    }
767
768    Ok(())
769}
770
771fn get_services_for_ops(
772    node_registry: &NodeRegistry,
773    peer_ids: Vec<String>,
774    service_names: Vec<String>,
775) -> Result<Vec<usize>> {
776    let mut service_indices = Vec::new();
777
778    if service_names.is_empty() && peer_ids.is_empty() {
779        for node in node_registry.nodes.iter() {
780            if let Some(index) = node_registry.nodes.iter().position(|x| {
781                x.service_name == node.service_name && x.status != ServiceStatus::Removed
782            }) {
783                service_indices.push(index);
784            }
785        }
786    } else {
787        for name in &service_names {
788            if let Some(index) = node_registry
789                .nodes
790                .iter()
791                .position(|x| x.service_name == *name && x.status != ServiceStatus::Removed)
792            {
793                service_indices.push(index);
794            } else {
795                error!("No service named '{name}'");
796                return Err(eyre!(format!("No service named '{name}'")));
797            }
798        }
799
800        for peer_id_str in &peer_ids {
801            let peer_id = PeerId::from_str(peer_id_str)
802                .inspect_err(|err| error!("Error parsing PeerId: {err:?}"))?;
803            if let Some(index) = node_registry
804                .nodes
805                .iter()
806                .position(|x| x.peer_id == Some(peer_id) && x.status != ServiceStatus::Removed)
807            {
808                service_indices.push(index);
809            } else {
810                error!("Could not find node with peer id: '{peer_id:?}'");
811                return Err(eyre!(format!(
812                    "Could not find node with peer ID '{peer_id}'",
813                )));
814            }
815        }
816    }
817
818    Ok(service_indices)
819}
820
821fn summarise_any_failed_ops(
822    failed_services: Vec<(String, String)>,
823    verb: &str,
824    verbosity: VerbosityLevel,
825) -> Result<()> {
826    if !failed_services.is_empty() {
827        if verbosity != VerbosityLevel::Minimal {
828            println!("Failed to {verb} {} service(s):", failed_services.len());
829            for failed in failed_services.iter() {
830                println!("{} {}: {}", "✕".red(), failed.0, failed.1);
831            }
832        }
833
834        error!("Failed to {verb} one or more services");
835        return Err(eyre!("Failed to {verb} one or more services"));
836    }
837    Ok(())
838}