ant_node_manager/cmd/
node.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13    add_services::{
14        add_node,
15        config::{AddNodeServiceOptions, PortRange},
16    },
17    config::{self, is_running_as_root},
18    helpers::{download_and_extract_release, get_bin_version},
19    print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel,
20};
21use ant_bootstrap::InitialPeersConfig;
22use ant_evm::{EvmNetwork, RewardsAddress};
23use ant_logging::LogFormat;
24use ant_releases::{AntReleaseRepoActions, ReleaseType};
25use ant_service_management::{
26    control::{ServiceControl, ServiceController},
27    rpc::RpcClient,
28    NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult,
29};
30use color_eyre::{eyre::eyre, Help, Result};
31use colored::Colorize;
32use libp2p_identity::PeerId;
33use semver::Version;
34use std::{cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, time::Duration};
35use tracing::debug;
36
37/// Returns the added service names
38pub async fn add(
39    alpha: bool,
40    auto_restart: bool,
41    auto_set_nat_flags: bool,
42    count: Option<u16>,
43    data_dir_path: Option<PathBuf>,
44    enable_metrics_server: bool,
45    env_variables: Option<Vec<(String, String)>>,
46    evm_network: Option<EvmNetwork>,
47    log_dir_path: Option<PathBuf>,
48    log_format: Option<LogFormat>,
49    max_archived_log_files: Option<usize>,
50    max_log_files: Option<usize>,
51    metrics_port: Option<PortRange>,
52    network_id: Option<u8>,
53    node_ip: Option<Ipv4Addr>,
54    node_port: Option<PortRange>,
55    mut init_peers_config: InitialPeersConfig,
56    relay: bool,
57    rewards_address: RewardsAddress,
58    rpc_address: Option<Ipv4Addr>,
59    rpc_port: Option<PortRange>,
60    src_path: Option<PathBuf>,
61    no_upnp: bool,
62    url: Option<String>,
63    user: Option<String>,
64    version: Option<String>,
65    verbosity: VerbosityLevel,
66) -> Result<Vec<String>> {
67    let user_mode = !is_running_as_root();
68
69    if verbosity != VerbosityLevel::Minimal {
70        print_banner("Add Antnode Services");
71        println!("{} service(s) to be added", count.unwrap_or(1));
72    }
73
74    let service_manager = ServiceController {};
75    let service_user = if user_mode {
76        None
77    } else {
78        let service_user = user.unwrap_or_else(|| "ant".to_string());
79        service_manager.create_service_user(&service_user)?;
80        Some(service_user)
81    };
82
83    let service_data_dir_path =
84        config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
85    let service_log_dir_path =
86        config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
87    let bootstrap_cache_dir = if let Some(user) = &service_user {
88        Some(config::get_bootstrap_cache_owner_path(user)?)
89    } else {
90        None
91    };
92
93    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
94    let release_repo = <dyn AntReleaseRepoActions>::default_config();
95
96    let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
97        let version = get_bin_version(&path)?;
98        (path, version)
99    } else {
100        download_and_extract_release(
101            ReleaseType::AntNode,
102            url.clone(),
103            version,
104            &*release_repo,
105            verbosity,
106            None,
107        )
108        .await?
109    };
110
111    debug!("Parsing peers from PeersArgs");
112
113    init_peers_config
114        .addrs
115        .extend(InitialPeersConfig::read_addr_from_env());
116    init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
117
118    let options = AddNodeServiceOptions {
119        alpha,
120        auto_restart,
121        auto_set_nat_flags,
122        count,
123        delete_antnode_src: src_path.is_none(),
124        enable_metrics_server,
125        evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
126        env_variables,
127        relay,
128        log_format,
129        max_archived_log_files,
130        max_log_files,
131        metrics_port,
132        network_id,
133        node_ip,
134        node_port,
135        init_peers_config,
136        rewards_address,
137        rpc_address,
138        rpc_port,
139        antnode_src_path,
140        antnode_dir_path: service_data_dir_path.clone(),
141        service_data_dir_path,
142        service_log_dir_path,
143        no_upnp,
144        user: service_user,
145        user_mode,
146        version,
147    };
148    info!("Adding node service(s)");
149    let added_services_names =
150        add_node(options, &mut node_registry, &service_manager, verbosity).await?;
151
152    node_registry.save()?;
153    debug!("Node registry saved");
154
155    Ok(added_services_names)
156}
157
158pub async fn balance(
159    peer_ids: Vec<String>,
160    service_names: Vec<String>,
161    verbosity: VerbosityLevel,
162) -> Result<()> {
163    if verbosity != VerbosityLevel::Minimal {
164        print_banner("Reward Balances");
165    }
166
167    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
168    refresh_node_registry(
169        &mut node_registry,
170        &ServiceController {},
171        verbosity != VerbosityLevel::Minimal,
172        false,
173        false,
174    )
175    .await?;
176
177    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
178    if service_indices.is_empty() {
179        info!("Service indices is empty, cannot obtain the balance");
180        // This could be the case if all services are at `Removed` status.
181        println!("No balances to display");
182        return Ok(());
183    }
184    debug!("Obtaining balances for {} services", service_indices.len());
185
186    for &index in &service_indices {
187        let node = &mut node_registry.nodes[index];
188        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
189        let service = NodeService::new(node, Box::new(rpc_client));
190        // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
191        println!("{}: {}", service.service_data.service_name, 0,);
192    }
193    Ok(())
194}
195
196pub async fn remove(
197    keep_directories: bool,
198    peer_ids: Vec<String>,
199    service_names: Vec<String>,
200    verbosity: VerbosityLevel,
201) -> Result<()> {
202    if verbosity != VerbosityLevel::Minimal {
203        print_banner("Remove Antnode Services");
204    }
205    info!("Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}");
206
207    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
208    refresh_node_registry(
209        &mut node_registry,
210        &ServiceController {},
211        verbosity != VerbosityLevel::Minimal,
212        false,
213        false,
214    )
215    .await?;
216
217    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
218    if service_indices.is_empty() {
219        info!("Service indices is empty, no services were eligible for removal");
220        // This could be the case if all services are at `Removed` status.
221        if verbosity != VerbosityLevel::Minimal {
222            println!("No services were eligible for removal");
223        }
224        return Ok(());
225    }
226
227    let mut failed_services = Vec::new();
228    for &index in &service_indices {
229        let node = &mut node_registry.nodes[index];
230        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
231        let service = NodeService::new(node, Box::new(rpc_client));
232        let mut service_manager =
233            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
234        match service_manager.remove(keep_directories).await {
235            Ok(()) => {
236                debug!("Removed service {}", node.service_name);
237                node_registry.save()?;
238            }
239            Err(err) => {
240                error!("Failed to remove service {}: {err}", node.service_name);
241                failed_services.push((node.service_name.clone(), err.to_string()))
242            }
243        }
244    }
245
246    summarise_any_failed_ops(failed_services, "remove", verbosity)
247}
248
249pub async fn reset(force: bool, verbosity: VerbosityLevel) -> Result<()> {
250    if verbosity != VerbosityLevel::Minimal {
251        print_banner("Reset Antnode Services");
252    }
253    info!("Resetting all antnode services, with force={force}");
254
255    if !force {
256        println!("WARNING: all antnode services, data, and logs will be removed.");
257        println!("Do you wish to proceed? [y/n]");
258        std::io::stdout().flush()?;
259        let mut input = String::new();
260        std::io::stdin().read_line(&mut input)?;
261        if input.trim().to_lowercase() != "y" {
262            println!("Reset aborted");
263            return Ok(());
264        }
265    }
266
267    stop(None, vec![], vec![], verbosity).await?;
268    remove(false, vec![], vec![], verbosity).await?;
269
270    // Due the possibility of repeated runs of the `reset` command, we need to check for the
271    // existence of this file before attempting to delete it, since `remove_file` will return an
272    // error if the file doesn't exist. On Windows this has been observed to happen.
273    let node_registry_path = config::get_node_registry_path()?;
274    if node_registry_path.exists() {
275        info!("Removing node registry file: {node_registry_path:?}");
276        std::fs::remove_file(node_registry_path)?;
277    }
278
279    Ok(())
280}
281
282pub async fn start(
283    connection_timeout_s: u64,
284    fixed_interval: Option<u64>,
285    peer_ids: Vec<String>,
286    service_names: Vec<String>,
287    verbosity: VerbosityLevel,
288) -> Result<()> {
289    if verbosity != VerbosityLevel::Minimal {
290        print_banner("Start Antnode Services");
291    }
292    info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
293
294    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
295    refresh_node_registry(
296        &mut node_registry,
297        &ServiceController {},
298        verbosity != VerbosityLevel::Minimal,
299        false,
300        false,
301    )
302    .await?;
303
304    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
305    if service_indices.is_empty() {
306        info!("No services are eligible to be started");
307        // This could be the case if all services are at `Removed` status.
308        if verbosity != VerbosityLevel::Minimal {
309            println!("No services were eligible to be started");
310        }
311        return Ok(());
312    }
313
314    let mut failed_services = Vec::new();
315    for &index in &service_indices {
316        let node = &mut node_registry.nodes[index];
317        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
318
319        let service = NodeService::new(node, Box::new(rpc_client));
320
321        // set dynamic startup delay if fixed_interval is not set
322        let service = if fixed_interval.is_none() {
323            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
324        } else {
325            service
326        };
327
328        let mut service_manager =
329            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
330        if service_manager.service.status() != ServiceStatus::Running {
331            // It would be possible here to check if the service *is* running and then just
332            // continue without applying the delay. The reason for not doing so is because when
333            // `start` is called below, the user will get a message to say the service was already
334            // started, which I think is useful behaviour to retain.
335            if let Some(interval) = fixed_interval {
336                debug!("Sleeping for {} milliseconds", interval);
337                std::thread::sleep(std::time::Duration::from_millis(interval));
338            }
339        }
340        match service_manager.start().await {
341            Ok(start_duration) => {
342                debug!(
343                    "Started service {} in {start_duration:?}",
344                    node.service_name
345                );
346
347                node_registry.save()?;
348            }
349            Err(err) => {
350                error!("Failed to start service {}: {err}", node.service_name);
351                failed_services.push((node.service_name.clone(), err.to_string()))
352            }
353        }
354    }
355
356    summarise_any_failed_ops(failed_services, "start", verbosity)
357}
358
359pub async fn status(details: bool, fail: bool, json: bool) -> Result<()> {
360    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
361    if !node_registry.nodes.is_empty() {
362        if !json && !details {
363            print_banner("Antnode Services");
364        }
365        status_report(
366            &mut node_registry,
367            &ServiceController {},
368            details,
369            json,
370            fail,
371            false,
372        )
373        .await?;
374        node_registry.save()?;
375    }
376    Ok(())
377}
378
379pub async fn stop(
380    interval: Option<u64>,
381    peer_ids: Vec<String>,
382    service_names: Vec<String>,
383    verbosity: VerbosityLevel,
384) -> Result<()> {
385    if verbosity != VerbosityLevel::Minimal {
386        print_banner("Stop Antnode Services");
387    }
388    info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
389
390    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
391    refresh_node_registry(
392        &mut node_registry,
393        &ServiceController {},
394        verbosity != VerbosityLevel::Minimal,
395        false,
396        false,
397    )
398    .await?;
399
400    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
401    if service_indices.is_empty() {
402        info!("Service indices is empty, no services were eligible to be stopped");
403        // This could be the case if all services are at `Removed` status.
404        if verbosity != VerbosityLevel::Minimal {
405            println!("No services were eligible to be stopped");
406        }
407        return Ok(());
408    }
409
410    let mut failed_services = Vec::new();
411    for &index in &service_indices {
412        let node = &mut node_registry.nodes[index];
413        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
414        let service = NodeService::new(node, Box::new(rpc_client));
415        let mut service_manager =
416            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
417
418        if service_manager.service.status() == ServiceStatus::Running {
419            if let Some(interval) = interval {
420                debug!("Sleeping for {} milliseconds", interval);
421                std::thread::sleep(std::time::Duration::from_millis(interval));
422            }
423        }
424        match service_manager.stop().await {
425            Ok(()) => {
426                debug!("Stopped service {}", node.service_name);
427                node_registry.save()?;
428            }
429            Err(err) => {
430                error!("Failed to stop service {}: {err}", node.service_name);
431                failed_services.push((node.service_name.clone(), err.to_string()))
432            }
433        }
434    }
435
436    summarise_any_failed_ops(failed_services, "stop", verbosity)
437}
438
439pub async fn upgrade(
440    connection_timeout_s: u64,
441    do_not_start: bool,
442    custom_bin_path: Option<PathBuf>,
443    force: bool,
444    fixed_interval: Option<u64>,
445    peer_ids: Vec<String>,
446    provided_env_variables: Option<Vec<(String, String)>>,
447    service_names: Vec<String>,
448    url: Option<String>,
449    version: Option<String>,
450    verbosity: VerbosityLevel,
451) -> Result<()> {
452    // In the case of a custom binary, we want to force the use of it. Regardless of its version
453    // number, the user has probably built it for some special case. They may have not used the
454    // `--force` flag; if they didn't, we can just do that for them here.
455    let use_force = force || custom_bin_path.is_some();
456
457    if verbosity != VerbosityLevel::Minimal {
458        print_banner("Upgrade Antnode Services");
459    }
460    info!(
461        "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
462    );
463
464    let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
465        custom_bin_path.clone(),
466        ReleaseType::AntNode,
467        url,
468        version,
469        verbosity,
470    )
471    .await?;
472
473    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
474    refresh_node_registry(
475        &mut node_registry,
476        &ServiceController {},
477        verbosity != VerbosityLevel::Minimal,
478        false,
479        false,
480    )
481    .await?;
482
483    if let Some(node) = node_registry.nodes.first() {
484        debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
485    } else {
486        debug!("There are no nodes currently added or active");
487    }
488
489    if !use_force {
490        let node_versions = node_registry
491            .nodes
492            .iter()
493            .map(|n| Version::parse(&n.version).map_err(|_| eyre!("Failed to parse Version")))
494            .collect::<Result<Vec<Version>>>()?;
495        let any_nodes_need_upgraded = node_versions
496            .iter()
497            .any(|current_version| current_version < &target_version);
498        if !any_nodes_need_upgraded {
499            info!("All nodes are at the latest version, no upgrade required.");
500            if verbosity != VerbosityLevel::Minimal {
501                println!("{} All nodes are at the latest version", "✓".green());
502            }
503            return Ok(());
504        }
505    }
506
507    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
508    trace!("service_indices len: {}", service_indices.len());
509    let mut upgrade_summary = Vec::new();
510
511    for &index in &service_indices {
512        let node = &mut node_registry.nodes[index];
513        let env_variables = if provided_env_variables.is_some() {
514            &provided_env_variables
515        } else {
516            &node_registry.environment_variables
517        };
518        let options = UpgradeOptions {
519            auto_restart: false,
520            env_variables: env_variables.clone(),
521            force: use_force,
522            start_service: !do_not_start,
523            target_bin_path: upgrade_bin_path.clone(),
524            target_version: target_version.clone(),
525        };
526        let service_name = node.service_name.clone();
527
528        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
529        let service = NodeService::new(node, Box::new(rpc_client));
530        // set dynamic startup delay if fixed_interval is not set
531        let service = if fixed_interval.is_none() {
532            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
533        } else {
534            service
535        };
536
537        let mut service_manager =
538            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
539
540        match service_manager.upgrade(options).await {
541            Ok(upgrade_result) => {
542                info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
543                if upgrade_result != UpgradeResult::NotRequired {
544                    // It doesn't seem useful to apply the interval if there was no upgrade
545                    // required for the previous service.
546                    if let Some(interval) = fixed_interval {
547                        debug!("Sleeping for {interval} milliseconds",);
548                        std::thread::sleep(std::time::Duration::from_millis(interval));
549                    }
550                }
551                upgrade_summary.push((
552                    service_manager.service.service_data.service_name.clone(),
553                    upgrade_result,
554                ));
555                node_registry.save()?;
556            }
557            Err(err) => {
558                error!("Error upgrading service {service_name}: {err}");
559                upgrade_summary.push((
560                    node.service_name.clone(),
561                    UpgradeResult::Error(format!("Error: {err}")),
562                ));
563                node_registry.save()?;
564            }
565        }
566    }
567
568    if verbosity != VerbosityLevel::Minimal {
569        print_upgrade_summary(upgrade_summary.clone());
570    }
571
572    if upgrade_summary.iter().any(|(_, r)| {
573        matches!(r, UpgradeResult::Error(_))
574            || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
575    }) {
576        return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
577            "For any services that were upgraded but did not start, you can attempt to start them \
578                again using the 'start' command."));
579    }
580
581    Ok(())
582}
583
584/// Ensure n nodes are running by stopping nodes or by adding and starting nodes if required.
585///
586/// The arguments here are mostly mirror those used in `add`.
587pub async fn maintain_n_running_nodes(
588    alpha: bool,
589    auto_restart: bool,
590    auto_set_nat_flags: bool,
591    connection_timeout_s: u64,
592    max_nodes_to_run: u16,
593    data_dir_path: Option<PathBuf>,
594    enable_metrics_server: bool,
595    env_variables: Option<Vec<(String, String)>>,
596    evm_network: Option<EvmNetwork>,
597    log_dir_path: Option<PathBuf>,
598    log_format: Option<LogFormat>,
599    max_archived_log_files: Option<usize>,
600    max_log_files: Option<usize>,
601    metrics_port: Option<PortRange>,
602    network_id: Option<u8>,
603    node_ip: Option<Ipv4Addr>,
604    node_port: Option<PortRange>,
605    peers_args: InitialPeersConfig,
606    relay: bool,
607    rewards_address: RewardsAddress,
608    rpc_address: Option<Ipv4Addr>,
609    rpc_port: Option<PortRange>,
610    src_path: Option<PathBuf>,
611    url: Option<String>,
612    no_upnp: bool,
613    user: Option<String>,
614    version: Option<String>,
615    verbosity: VerbosityLevel,
616    start_node_interval: Option<u64>,
617) -> Result<()> {
618    let node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
619    let running_nodes = node_registry
620        .nodes
621        .iter()
622        .filter(|node| node.status == ServiceStatus::Running)
623        .map(|node| node.service_name.clone())
624        .collect::<Vec<_>>();
625
626    let running_count = running_nodes.len();
627    let target_count = max_nodes_to_run as usize;
628
629    info!(
630        "Current running nodes: {}, Target: {}",
631        running_count, target_count
632    );
633
634    match running_count.cmp(&target_count) {
635        Ordering::Greater => {
636            let to_stop_count = running_count - target_count;
637            let services_to_stop = running_nodes
638                .into_iter()
639                .rev() // Stop the oldest nodes first
640                .take(to_stop_count)
641                .collect::<Vec<_>>();
642
643            info!(
644                "Stopping {} excess nodes: {:?}",
645                to_stop_count, services_to_stop
646            );
647            stop(None, vec![], services_to_stop, verbosity).await?;
648        }
649        Ordering::Less => {
650            let to_start_count = target_count - running_count;
651            let inactive_nodes = node_registry
652                .nodes
653                .iter()
654                .filter(|node| {
655                    node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added
656                })
657                .map(|node| node.service_name.clone())
658                .collect::<Vec<_>>();
659
660            info!("Inactive nodes available: {}", inactive_nodes.len());
661
662            if to_start_count <= inactive_nodes.len() {
663                let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
664                info!(
665                    "Starting {} existing inactive nodes: {:?}",
666                    to_start_count, nodes_to_start
667                );
668                start(
669                    connection_timeout_s,
670                    start_node_interval,
671                    vec![],
672                    nodes_to_start,
673                    verbosity,
674                )
675                .await?;
676            } else {
677                let to_add_count = to_start_count - inactive_nodes.len();
678                info!(
679                    "Adding {} new nodes and starting all {} inactive nodes",
680                    to_add_count,
681                    inactive_nodes.len()
682                );
683
684                let ports_to_use = match node_port {
685                    Some(PortRange::Single(port)) => vec![port],
686                    Some(PortRange::Range(start, end)) => {
687                        (start..=end).take(to_add_count).collect()
688                    }
689                    None => vec![],
690                };
691
692                for (i, port) in ports_to_use.into_iter().enumerate() {
693                    let added_service = add(
694                        alpha,
695                        auto_restart,
696                        auto_set_nat_flags,
697                        Some(1),
698                        data_dir_path.clone(),
699                        enable_metrics_server,
700                        env_variables.clone(),
701                        evm_network.clone(),
702                        log_dir_path.clone(),
703                        log_format,
704                        max_archived_log_files,
705                        max_log_files,
706                        metrics_port.clone(),
707                        network_id,
708                        node_ip,
709                        Some(PortRange::Single(port)),
710                        peers_args.clone(),
711                        relay,
712                        rewards_address,
713                        rpc_address,
714                        rpc_port.clone(),
715                        src_path.clone(),
716                        no_upnp,
717                        url.clone(),
718                        user.clone(),
719                        version.clone(),
720                        verbosity,
721                    )
722                    .await?;
723
724                    if i == 0 {
725                        start(
726                            connection_timeout_s,
727                            start_node_interval,
728                            vec![],
729                            added_service,
730                            verbosity,
731                        )
732                        .await?;
733                    }
734                }
735
736                if !inactive_nodes.is_empty() {
737                    start(
738                        connection_timeout_s,
739                        start_node_interval,
740                        vec![],
741                        inactive_nodes,
742                        verbosity,
743                    )
744                    .await?;
745                }
746            }
747        }
748        Ordering::Equal => {
749            info!(
750                "Current node count ({}) matches target ({}). No action needed.",
751                running_count, target_count
752            );
753        }
754    }
755
756    // Verify final state
757    let final_node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
758    let final_running_count = final_node_registry
759        .nodes
760        .iter()
761        .filter(|node| node.status == ServiceStatus::Running)
762        .count();
763
764    info!("Final running node count: {}", final_running_count);
765    if final_running_count != target_count {
766        warn!(
767            "Failed to reach target node count. Expected {}, but got {}",
768            target_count, final_running_count
769        );
770    }
771
772    Ok(())
773}
774
775fn get_services_for_ops(
776    node_registry: &NodeRegistry,
777    peer_ids: Vec<String>,
778    service_names: Vec<String>,
779) -> Result<Vec<usize>> {
780    let mut service_indices = Vec::new();
781
782    if service_names.is_empty() && peer_ids.is_empty() {
783        for node in node_registry.nodes.iter() {
784            if let Some(index) = node_registry.nodes.iter().position(|x| {
785                x.service_name == node.service_name && x.status != ServiceStatus::Removed
786            }) {
787                service_indices.push(index);
788            }
789        }
790    } else {
791        for name in &service_names {
792            if let Some(index) = node_registry
793                .nodes
794                .iter()
795                .position(|x| x.service_name == *name && x.status != ServiceStatus::Removed)
796            {
797                service_indices.push(index);
798            } else {
799                error!("No service named '{name}'");
800                return Err(eyre!(format!("No service named '{name}'")));
801            }
802        }
803
804        for peer_id_str in &peer_ids {
805            let peer_id = PeerId::from_str(peer_id_str)
806                .inspect_err(|err| error!("Error parsing PeerId: {err:?}"))?;
807            if let Some(index) = node_registry
808                .nodes
809                .iter()
810                .position(|x| x.peer_id == Some(peer_id) && x.status != ServiceStatus::Removed)
811            {
812                service_indices.push(index);
813            } else {
814                error!("Could not find node with peer id: '{peer_id:?}'");
815                return Err(eyre!(format!(
816                    "Could not find node with peer ID '{peer_id}'",
817                )));
818            }
819        }
820    }
821
822    Ok(service_indices)
823}
824
825fn summarise_any_failed_ops(
826    failed_services: Vec<(String, String)>,
827    verb: &str,
828    verbosity: VerbosityLevel,
829) -> Result<()> {
830    if !failed_services.is_empty() {
831        if verbosity != VerbosityLevel::Minimal {
832            println!("Failed to {verb} {} service(s):", failed_services.len());
833            for failed in failed_services.iter() {
834                println!("{} {}: {}", "✕".red(), failed.0, failed.1);
835            }
836        }
837
838        error!("Failed to {verb} one or more services");
839        return Err(eyre!("Failed to {verb} one or more services"));
840    }
841    Ok(())
842}