ant_node_manager/cmd/
node.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13    ServiceManager, VerbosityLevel,
14    add_services::{
15        add_node,
16        config::{AddNodeServiceOptions, PortRange},
17    },
18    config::{self, is_running_as_root},
19    helpers::{download_and_extract_release, get_bin_version},
20    print_banner, refresh_node_registry, status_report,
21};
22use ant_bootstrap::{Bootstrap, InitialPeersConfig};
23use ant_evm::{EvmNetwork, RewardsAddress};
24use ant_logging::LogFormat;
25use ant_releases::{AntReleaseRepoActions, ReleaseType};
26use ant_service_management::{
27    NodeRegistryManager, NodeService, NodeServiceData, ServiceStateActions, ServiceStatus,
28    UpgradeOptions, UpgradeResult,
29    control::{ServiceControl, ServiceController},
30    rpc::RpcClient,
31};
32use color_eyre::{Help, Result, eyre::eyre};
33use colored::Colorize;
34use libp2p_identity::PeerId;
35use semver::Version;
36use std::{
37    cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, sync::Arc, time::Duration,
38};
39use tokio::sync::RwLock;
40use tracing::debug;
41
42/// Returns the added service names
43pub async fn add(
44    alpha: bool,
45    auto_restart: bool,
46    auto_set_nat_flags: bool,
47    count: Option<u16>,
48    data_dir_path: Option<PathBuf>,
49    enable_metrics_server: bool,
50    env_variables: Option<Vec<(String, String)>>,
51    evm_network: Option<EvmNetwork>,
52    log_dir_path: Option<PathBuf>,
53    log_format: Option<LogFormat>,
54    max_archived_log_files: Option<usize>,
55    max_log_files: Option<usize>,
56    metrics_port: Option<PortRange>,
57    network_id: Option<u8>,
58    node_ip: Option<Ipv4Addr>,
59    node_port: Option<PortRange>,
60    node_registry: NodeRegistryManager,
61    mut init_peers_config: InitialPeersConfig,
62    relay: bool,
63    rewards_address: RewardsAddress,
64    rpc_address: Option<Ipv4Addr>,
65    rpc_port: Option<PortRange>,
66    src_path: Option<PathBuf>,
67    no_upnp: bool,
68    url: Option<String>,
69    user: Option<String>,
70    version: Option<String>,
71    verbosity: VerbosityLevel,
72    write_older_cache_files: bool,
73) -> Result<Vec<String>> {
74    let user_mode = !is_running_as_root();
75
76    if verbosity != VerbosityLevel::Minimal {
77        print_banner("Add Antnode Services");
78        println!("{} service(s) to be added", count.unwrap_or(1));
79    }
80
81    let service_manager = ServiceController {};
82    let service_user = if user_mode {
83        None
84    } else {
85        let service_user = user.unwrap_or_else(|| "ant".to_string());
86        service_manager.create_service_user(&service_user)?;
87        Some(service_user)
88    };
89
90    let service_data_dir_path =
91        config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
92    let service_log_dir_path =
93        config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
94    let bootstrap_cache_dir = if let Some(user) = &service_user {
95        Some(config::get_bootstrap_cache_owner_path(user)?)
96    } else {
97        None
98    };
99
100    let release_repo = <dyn AntReleaseRepoActions>::default_config();
101
102    let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
103        let version = get_bin_version(&path)?;
104        (path, version)
105    } else {
106        download_and_extract_release(
107            ReleaseType::AntNode,
108            url.clone(),
109            version,
110            &*release_repo,
111            verbosity,
112            None,
113        )
114        .await?
115    };
116
117    debug!("Parsing peers from PeersArgs");
118
119    init_peers_config.addrs.extend(Bootstrap::fetch_from_env());
120    init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
121
122    let options = AddNodeServiceOptions {
123        alpha,
124        auto_restart,
125        auto_set_nat_flags,
126        count,
127        delete_antnode_src: src_path.is_none(),
128        enable_metrics_server,
129        evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
130        env_variables,
131        relay,
132        log_format,
133        max_archived_log_files,
134        max_log_files,
135        metrics_port,
136        network_id,
137        node_ip,
138        node_port,
139        init_peers_config,
140        rewards_address,
141        rpc_address,
142        rpc_port,
143        antnode_src_path,
144        antnode_dir_path: service_data_dir_path.clone(),
145        service_data_dir_path,
146        service_log_dir_path,
147        no_upnp,
148        user: service_user,
149        user_mode,
150        version,
151        write_older_cache_files,
152    };
153    info!("Adding node service(s)");
154    let added_services_names =
155        add_node(options, node_registry.clone(), &service_manager, verbosity).await?;
156
157    node_registry.save().await?;
158    debug!("Node registry saved");
159
160    Ok(added_services_names)
161}
162
163pub async fn balance(
164    peer_ids: Vec<String>,
165    node_registry: NodeRegistryManager,
166    service_names: Vec<String>,
167    verbosity: VerbosityLevel,
168) -> Result<()> {
169    if verbosity != VerbosityLevel::Minimal {
170        print_banner("Reward Balances");
171    }
172
173    refresh_node_registry(
174        node_registry.clone(),
175        &ServiceController {},
176        verbosity != VerbosityLevel::Minimal,
177        false,
178        verbosity,
179    )
180    .await?;
181
182    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
183    if services_for_ops.is_empty() {
184        info!("Services for ops is empty, cannot obtain the balance");
185        // This could be the case if all services are at `Removed` status.
186        println!("No balances to display");
187        return Ok(());
188    }
189    debug!("Obtaining balances for {} services", services_for_ops.len());
190
191    for node in services_for_ops {
192        let node = node.read().await;
193        // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
194        println!("{}: {}", node.service_name, 0,);
195    }
196    Ok(())
197}
198
199pub async fn remove(
200    keep_directories: bool,
201    peer_ids: Vec<String>,
202    node_registry: NodeRegistryManager,
203    service_names: Vec<String>,
204    verbosity: VerbosityLevel,
205) -> Result<()> {
206    if verbosity != VerbosityLevel::Minimal {
207        print_banner("Remove Antnode Services");
208    }
209    info!(
210        "Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}"
211    );
212
213    refresh_node_registry(
214        node_registry.clone(),
215        &ServiceController {},
216        verbosity != VerbosityLevel::Minimal,
217        false,
218        verbosity,
219    )
220    .await?;
221
222    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
223    if services_for_ops.is_empty() {
224        info!("Services for ops is empty, no services were eligible for removal");
225        // This could be the case if all services are at `Removed` status.
226        if verbosity != VerbosityLevel::Minimal {
227            println!("No services were eligible for removal");
228        }
229        return Ok(());
230    }
231
232    let mut failed_services = Vec::new();
233    for node in &services_for_ops {
234        let service_name = node.read().await.service_name.clone();
235        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
236        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
237        let mut service_manager =
238            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
239        match service_manager.remove(keep_directories).await {
240            Ok(()) => {
241                debug!("Removed service {service_name}");
242                node_registry.save().await?;
243            }
244            Err(err) => {
245                error!("Failed to remove service {service_name}: {err}");
246                failed_services.push((service_name.clone(), err.to_string()))
247            }
248        }
249    }
250
251    summarise_any_failed_ops(failed_services, "remove", verbosity)
252}
253
254pub async fn reset(
255    force: bool,
256    node_registry: NodeRegistryManager,
257    verbosity: VerbosityLevel,
258) -> Result<()> {
259    if verbosity != VerbosityLevel::Minimal {
260        print_banner("Reset Antnode Services");
261    }
262    info!("Resetting all antnode services, with force={force}");
263
264    if !force {
265        println!("WARNING: all antnode services, data, and logs will be removed.");
266        println!("Do you wish to proceed? [y/n]");
267        std::io::stdout().flush()?;
268        let mut input = String::new();
269        std::io::stdin().read_line(&mut input)?;
270        if input.trim().to_lowercase() != "y" {
271            println!("Reset aborted");
272            return Ok(());
273        }
274    }
275
276    stop(None, node_registry.clone(), vec![], vec![], verbosity).await?;
277    remove(false, vec![], node_registry, vec![], verbosity).await?;
278
279    // Due the possibility of repeated runs of the `reset` command, we need to check for the
280    // existence of this file before attempting to delete it, since `remove_file` will return an
281    // error if the file doesn't exist. On Windows this has been observed to happen.
282    let node_registry_path = config::get_node_registry_path()?;
283    if node_registry_path.exists() {
284        info!("Removing node registry file: {node_registry_path:?}");
285        std::fs::remove_file(node_registry_path)?;
286    }
287
288    Ok(())
289}
290
291pub async fn start(
292    connection_timeout_s: u64,
293    fixed_interval: Option<u64>,
294    node_registry: NodeRegistryManager,
295    peer_ids: Vec<String>,
296    service_names: Vec<String>,
297    verbosity: VerbosityLevel,
298) -> Result<()> {
299    if verbosity != VerbosityLevel::Minimal {
300        print_banner("Start Antnode Services");
301    }
302    info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
303
304    refresh_node_registry(
305        node_registry.clone(),
306        &ServiceController {},
307        verbosity != VerbosityLevel::Minimal,
308        false,
309        verbosity,
310    )
311    .await?;
312
313    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
314    if services_for_ops.is_empty() {
315        info!("No services are eligible to be started");
316        // This could be the case if all services are at `Removed` status.
317        if verbosity != VerbosityLevel::Minimal {
318            println!("No services were eligible to be started");
319        }
320        return Ok(());
321    }
322
323    let mut failed_services = Vec::new();
324    for node in &services_for_ops {
325        let service_name = node.read().await.service_name.clone();
326
327        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
328        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
329
330        // set dynamic startup delay if fixed_interval is not set
331        let service = if fixed_interval.is_none() {
332            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
333        } else {
334            service
335        };
336
337        let mut service_manager =
338            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
339        if service_manager.service.status().await != ServiceStatus::Running {
340            // It would be possible here to check if the service *is* running and then just
341            // continue without applying the delay. The reason for not doing so is because when
342            // `start` is called below, the user will get a message to say the service was already
343            // started, which I think is useful behaviour to retain.
344            if let Some(interval) = fixed_interval {
345                debug!("Sleeping for {} milliseconds", interval);
346                std::thread::sleep(std::time::Duration::from_millis(interval));
347            }
348        }
349        match service_manager.start().await {
350            Ok(start_duration) => {
351                debug!("Started service {service_name} in {start_duration:?}",);
352
353                node_registry.save().await?;
354            }
355            Err(err) => {
356                error!("Failed to start service {service_name}: {err}");
357                failed_services.push((service_name.clone(), err.to_string()))
358            }
359        }
360    }
361
362    summarise_any_failed_ops(failed_services, "start", verbosity)
363}
364
365pub async fn status(
366    details: bool,
367    fail: bool,
368    json: bool,
369    node_registry: NodeRegistryManager,
370) -> Result<()> {
371    if !node_registry.nodes.read().await.is_empty() {
372        if !json && !details {
373            print_banner("Antnode Services");
374        }
375        status_report(
376            &node_registry,
377            &ServiceController {},
378            details,
379            json,
380            fail,
381            false,
382        )
383        .await?;
384        node_registry.save().await?;
385    }
386    Ok(())
387}
388
389pub async fn stop(
390    interval: Option<u64>,
391    node_registry: NodeRegistryManager,
392    peer_ids: Vec<String>,
393    service_names: Vec<String>,
394    verbosity: VerbosityLevel,
395) -> Result<()> {
396    if verbosity != VerbosityLevel::Minimal {
397        print_banner("Stop Antnode Services");
398    }
399    info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
400
401    refresh_node_registry(
402        node_registry.clone(),
403        &ServiceController {},
404        verbosity != VerbosityLevel::Minimal,
405        false,
406        verbosity,
407    )
408    .await?;
409
410    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
411    if services_for_ops.is_empty() {
412        info!("No services are eligible to be stopped");
413        // This could be the case if all services are at `Removed` status.
414        if verbosity != VerbosityLevel::Minimal {
415            println!("No services were eligible to be stopped");
416        }
417        return Ok(());
418    }
419
420    let mut failed_services = Vec::new();
421    for node in services_for_ops.iter() {
422        let service_name = node.read().await.service_name.clone();
423        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
424        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
425        let mut service_manager =
426            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
427
428        if service_manager.service.status().await == ServiceStatus::Running
429            && let Some(interval) = interval
430        {
431            debug!("Sleeping for {} milliseconds", interval);
432            std::thread::sleep(std::time::Duration::from_millis(interval));
433        }
434        match service_manager.stop().await {
435            Ok(()) => {
436                debug!("Stopped service {service_name}");
437                node_registry.save().await?;
438            }
439            Err(err) => {
440                error!("Failed to stop service {service_name}: {err}");
441                failed_services.push((service_name.clone(), err.to_string()))
442            }
443        }
444    }
445
446    summarise_any_failed_ops(failed_services, "stop", verbosity)
447}
448
449pub async fn upgrade(
450    connection_timeout_s: u64,
451    do_not_start: bool,
452    custom_bin_path: Option<PathBuf>,
453    force: bool,
454    fixed_interval: Option<u64>,
455    node_registry: NodeRegistryManager,
456    peer_ids: Vec<String>,
457    provided_env_variables: Option<Vec<(String, String)>>,
458    service_names: Vec<String>,
459    url: Option<String>,
460    version: Option<String>,
461    verbosity: VerbosityLevel,
462) -> Result<()> {
463    // In the case of a custom binary, we want to force the use of it. Regardless of its version
464    // number, the user has probably built it for some special case. They may have not used the
465    // `--force` flag; if they didn't, we can just do that for them here.
466    let use_force = force || custom_bin_path.is_some();
467
468    if verbosity != VerbosityLevel::Minimal {
469        print_banner("Upgrade Antnode Services");
470    }
471    info!(
472        "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
473    );
474
475    let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
476        custom_bin_path.clone(),
477        ReleaseType::AntNode,
478        url,
479        version,
480        verbosity,
481    )
482    .await?;
483
484    refresh_node_registry(
485        node_registry.clone(),
486        &ServiceController {},
487        verbosity != VerbosityLevel::Minimal,
488        false,
489        verbosity,
490    )
491    .await?;
492
493    if let Some(node) = node_registry.nodes.read().await.first() {
494        let node = node.read().await;
495        debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
496    } else {
497        debug!("There are no nodes currently added or active");
498    }
499
500    if !use_force {
501        let mut node_versions = Vec::new();
502
503        for node in node_registry.nodes.read().await.iter() {
504            let node = node.read().await;
505            let version = Version::parse(&node.version)
506                .map_err(|_| eyre!("Failed to parse Version for node {}", node.service_name))?;
507            node_versions.push(version);
508        }
509
510        let any_nodes_need_upgraded = node_versions
511            .iter()
512            .any(|current_version| current_version < &target_version);
513        if !any_nodes_need_upgraded {
514            info!("All nodes are at the latest version, no upgrade required.");
515            if verbosity != VerbosityLevel::Minimal {
516                println!("{} All nodes are at the latest version", "✓".green());
517            }
518            return Ok(());
519        }
520    }
521
522    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
523    trace!("services_for_ops len: {}", services_for_ops.len());
524    let mut upgrade_summary = Vec::new();
525
526    for node in &services_for_ops {
527        let env_variables = if provided_env_variables.is_some() {
528            provided_env_variables.clone()
529        } else {
530            node_registry.environment_variables.read().await.clone()
531        };
532        let options = UpgradeOptions {
533            auto_restart: false,
534            env_variables: env_variables.clone(),
535            force: use_force,
536            start_service: !do_not_start,
537            target_bin_path: upgrade_bin_path.clone(),
538            target_version: target_version.clone(),
539        };
540        let service_name = node.read().await.service_name.clone();
541
542        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
543        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
544        // set dynamic startup delay if fixed_interval is not set
545        let service = if fixed_interval.is_none() {
546            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
547        } else {
548            service
549        };
550
551        let mut service_manager =
552            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
553
554        match service_manager.upgrade(options).await {
555            Ok(upgrade_result) => {
556                info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
557                if upgrade_result != UpgradeResult::NotRequired {
558                    // It doesn't seem useful to apply the interval if there was no upgrade
559                    // required for the previous service.
560                    if let Some(interval) = fixed_interval {
561                        debug!("Sleeping for {interval} milliseconds",);
562                        std::thread::sleep(std::time::Duration::from_millis(interval));
563                    }
564                }
565                upgrade_summary.push((service_name.clone(), upgrade_result));
566                node_registry.save().await?;
567            }
568            Err(err) => {
569                error!("Error upgrading service {service_name}: {err}");
570                upgrade_summary.push((
571                    service_name.clone(),
572                    UpgradeResult::Error(format!("Error: {err}")),
573                ));
574                node_registry.save().await?;
575            }
576        }
577    }
578
579    if verbosity != VerbosityLevel::Minimal {
580        print_upgrade_summary(upgrade_summary.clone());
581    }
582
583    if upgrade_summary.iter().any(|(_, r)| {
584        matches!(r, UpgradeResult::Error(_))
585            || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
586    }) {
587        return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
588            "For any services that were upgraded but did not start, you can attempt to start them \
589                again using the 'start' command."));
590    }
591
592    Ok(())
593}
594
595/// Ensure n nodes are running by stopping nodes or by adding and starting nodes if required.
596///
597/// The arguments here are mostly mirror those used in `add`.
598pub async fn maintain_n_running_nodes(
599    alpha: bool,
600    auto_restart: bool,
601    auto_set_nat_flags: bool,
602    connection_timeout_s: u64,
603    max_nodes_to_run: u16,
604    data_dir_path: Option<PathBuf>,
605    enable_metrics_server: bool,
606    env_variables: Option<Vec<(String, String)>>,
607    evm_network: Option<EvmNetwork>,
608    log_dir_path: Option<PathBuf>,
609    log_format: Option<LogFormat>,
610    max_archived_log_files: Option<usize>,
611    max_log_files: Option<usize>,
612    metrics_port: Option<PortRange>,
613    network_id: Option<u8>,
614    node_ip: Option<Ipv4Addr>,
615    node_port: Option<PortRange>,
616    node_registry: NodeRegistryManager,
617    peers_args: InitialPeersConfig,
618    relay: bool,
619    rewards_address: RewardsAddress,
620    rpc_address: Option<Ipv4Addr>,
621    rpc_port: Option<PortRange>,
622    src_path: Option<PathBuf>,
623    url: Option<String>,
624    no_upnp: bool,
625    user: Option<String>,
626    version: Option<String>,
627    verbosity: VerbosityLevel,
628    start_node_interval: Option<u64>,
629    write_older_cache_files: bool,
630) -> Result<()> {
631    let mut running_nodes = Vec::new();
632
633    for node in node_registry.nodes.read().await.iter() {
634        let node = node.read().await;
635        if node.status == ServiceStatus::Running {
636            running_nodes.push(node.service_name.clone());
637        }
638    }
639
640    let running_count = running_nodes.len();
641    let target_count = max_nodes_to_run as usize;
642
643    info!(
644        "Current running nodes: {}, Target: {}",
645        running_count, target_count
646    );
647
648    match running_count.cmp(&target_count) {
649        Ordering::Greater => {
650            let to_stop_count = running_count - target_count;
651            let services_to_stop = running_nodes
652                .into_iter()
653                .rev() // Stop the oldest nodes first
654                .take(to_stop_count)
655                .collect::<Vec<_>>();
656
657            info!(
658                "Stopping {} excess nodes: {:?}",
659                to_stop_count, services_to_stop
660            );
661            stop(
662                None,
663                node_registry.clone(),
664                vec![],
665                services_to_stop,
666                verbosity,
667            )
668            .await?;
669        }
670        Ordering::Less => {
671            let to_start_count = target_count - running_count;
672            let mut inactive_nodes = Vec::new();
673            for node in node_registry.nodes.read().await.iter() {
674                let node = node.read().await;
675                if node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added {
676                    inactive_nodes.push(node.service_name.clone());
677                }
678            }
679
680            info!("Inactive nodes available: {}", inactive_nodes.len());
681
682            if to_start_count <= inactive_nodes.len() {
683                let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
684                info!(
685                    "Starting {} existing inactive nodes: {:?}",
686                    to_start_count, nodes_to_start
687                );
688                start(
689                    connection_timeout_s,
690                    start_node_interval,
691                    node_registry.clone(),
692                    vec![],
693                    nodes_to_start,
694                    verbosity,
695                )
696                .await?;
697            } else {
698                let to_add_count = to_start_count - inactive_nodes.len();
699                info!(
700                    "Adding {} new nodes and starting all {} inactive nodes",
701                    to_add_count,
702                    inactive_nodes.len()
703                );
704
705                let ports_to_use = match node_port {
706                    Some(PortRange::Single(port)) => vec![port],
707                    Some(PortRange::Range(start, end)) => {
708                        (start..=end).take(to_add_count).collect()
709                    }
710                    None => vec![],
711                };
712
713                for (i, port) in ports_to_use.into_iter().enumerate() {
714                    let added_service = add(
715                        alpha,
716                        auto_restart,
717                        auto_set_nat_flags,
718                        Some(1),
719                        data_dir_path.clone(),
720                        enable_metrics_server,
721                        env_variables.clone(),
722                        evm_network.clone(),
723                        log_dir_path.clone(),
724                        log_format,
725                        max_archived_log_files,
726                        max_log_files,
727                        metrics_port.clone(),
728                        network_id,
729                        node_ip,
730                        Some(PortRange::Single(port)),
731                        node_registry.clone(),
732                        peers_args.clone(),
733                        relay,
734                        rewards_address,
735                        rpc_address,
736                        rpc_port.clone(),
737                        src_path.clone(),
738                        no_upnp,
739                        url.clone(),
740                        user.clone(),
741                        version.clone(),
742                        verbosity,
743                        write_older_cache_files,
744                    )
745                    .await?;
746
747                    if i == 0 {
748                        start(
749                            connection_timeout_s,
750                            start_node_interval,
751                            node_registry.clone(),
752                            vec![],
753                            added_service,
754                            verbosity,
755                        )
756                        .await?;
757                    }
758                }
759
760                if !inactive_nodes.is_empty() {
761                    start(
762                        connection_timeout_s,
763                        start_node_interval,
764                        node_registry.clone(),
765                        vec![],
766                        inactive_nodes,
767                        verbosity,
768                    )
769                    .await?;
770                }
771            }
772        }
773        Ordering::Equal => {
774            info!(
775                "Current node count ({}) matches target ({}). No action needed.",
776                running_count, target_count
777            );
778        }
779    }
780
781    // Verify final state
782    let mut final_running_count = 0;
783    for node in node_registry.nodes.read().await.iter() {
784        let node_read = node.read().await;
785        if node_read.status == ServiceStatus::Running {
786            final_running_count += 1;
787        }
788    }
789
790    if final_running_count != target_count {
791        warn!(
792            "Failed to reach target node count. Expected {target_count}, but got {final_running_count}"
793        );
794    }
795
796    Ok(())
797}
798
799async fn get_services_for_ops(
800    node_registry: &NodeRegistryManager,
801    peer_ids: Vec<String>,
802    service_names: Vec<String>,
803) -> Result<Vec<Arc<RwLock<NodeServiceData>>>> {
804    let mut services = Vec::new();
805
806    if service_names.is_empty() && peer_ids.is_empty() {
807        for node in node_registry.nodes.read().await.iter() {
808            if node.read().await.status != ServiceStatus::Removed {
809                services.push(Arc::clone(node));
810            }
811        }
812    } else {
813        for name in &service_names {
814            let mut found_service_with_name = false;
815            for node in node_registry.nodes.read().await.iter() {
816                let node_read = node.read().await;
817                if node_read.service_name == *name && node_read.status != ServiceStatus::Removed {
818                    {
819                        services.push(Arc::clone(node));
820                        found_service_with_name = true;
821                        break;
822                    }
823                }
824            }
825
826            if !found_service_with_name {
827                error!("No service named '{name}'");
828                return Err(eyre!(format!("No service named '{name}'")));
829            }
830        }
831
832        for peer_id_str in &peer_ids {
833            let mut found_service_with_peer_id = false;
834            let given_peer_id = PeerId::from_str(peer_id_str)
835                .map_err(|_| eyre!(format!("Error parsing PeerId: '{peer_id_str}'")))?;
836            for node in node_registry.nodes.read().await.iter() {
837                let node_read = node.read().await;
838                if let Some(peer_id) = node_read.peer_id
839                    && peer_id == given_peer_id
840                    && node_read.status != ServiceStatus::Removed
841                {
842                    services.push(Arc::clone(node));
843                    found_service_with_peer_id = true;
844                    break;
845                }
846            }
847            if !found_service_with_peer_id {
848                error!("Could not find node with peer id: '{given_peer_id:?}'");
849                return Err(eyre!(format!(
850                    "Could not find node with peer ID '{given_peer_id}'",
851                )));
852            }
853        }
854    }
855
856    Ok(services)
857}
858
859fn summarise_any_failed_ops(
860    failed_services: Vec<(String, String)>,
861    verb: &str,
862    verbosity: VerbosityLevel,
863) -> Result<()> {
864    if !failed_services.is_empty() {
865        if verbosity != VerbosityLevel::Minimal {
866            println!("Failed to {verb} {} service(s):", failed_services.len());
867            for failed in failed_services.iter() {
868                println!("{} {}: {}", "✕".red(), failed.0, failed.1);
869            }
870        }
871
872        error!("Failed to {verb} one or more services");
873        return Err(eyre!("Failed to {verb} one or more services"));
874    }
875    Ok(())
876}