ant_node_manager/cmd/
node.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13    ServiceManager, VerbosityLevel,
14    add_services::{
15        add_node,
16        config::{AddNodeServiceOptions, PortRange},
17    },
18    config::{self, is_running_as_root},
19    helpers::{download_and_extract_release, get_bin_version},
20    print_banner, refresh_node_registry, status_report,
21};
22use ant_bootstrap::InitialPeersConfig;
23use ant_evm::{EvmNetwork, RewardsAddress};
24use ant_logging::LogFormat;
25use ant_releases::{AntReleaseRepoActions, ReleaseType};
26use ant_service_management::{
27    NodeRegistryManager, NodeService, NodeServiceData, ServiceStateActions, ServiceStatus,
28    UpgradeOptions, UpgradeResult,
29    control::{ServiceControl, ServiceController},
30    rpc::RpcClient,
31};
32use color_eyre::{Help, Result, eyre::eyre};
33use colored::Colorize;
34use libp2p_identity::PeerId;
35use semver::Version;
36use std::{
37    cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, sync::Arc, time::Duration,
38};
39use tokio::sync::RwLock;
40use tracing::debug;
41
42/// Returns the added service names
43pub async fn add(
44    alpha: bool,
45    auto_restart: bool,
46    auto_set_nat_flags: bool,
47    count: Option<u16>,
48    data_dir_path: Option<PathBuf>,
49    enable_metrics_server: bool,
50    env_variables: Option<Vec<(String, String)>>,
51    evm_network: Option<EvmNetwork>,
52    log_dir_path: Option<PathBuf>,
53    log_format: Option<LogFormat>,
54    max_archived_log_files: Option<usize>,
55    max_log_files: Option<usize>,
56    metrics_port: Option<PortRange>,
57    network_id: Option<u8>,
58    node_ip: Option<Ipv4Addr>,
59    node_port: Option<PortRange>,
60    node_registry: NodeRegistryManager,
61    mut init_peers_config: InitialPeersConfig,
62    relay: bool,
63    rewards_address: RewardsAddress,
64    rpc_address: Option<Ipv4Addr>,
65    rpc_port: Option<PortRange>,
66    src_path: Option<PathBuf>,
67    no_upnp: bool,
68    url: Option<String>,
69    user: Option<String>,
70    version: Option<String>,
71    verbosity: VerbosityLevel,
72    write_older_cache_files: bool,
73) -> Result<Vec<String>> {
74    let user_mode = !is_running_as_root();
75
76    if verbosity != VerbosityLevel::Minimal {
77        print_banner("Add Antnode Services");
78        println!("{} service(s) to be added", count.unwrap_or(1));
79    }
80
81    let service_manager = ServiceController {};
82    let service_user = if user_mode {
83        None
84    } else {
85        let service_user = user.unwrap_or_else(|| "ant".to_string());
86        service_manager.create_service_user(&service_user)?;
87        Some(service_user)
88    };
89
90    let service_data_dir_path =
91        config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
92    let service_log_dir_path =
93        config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
94    let bootstrap_cache_dir = if let Some(user) = &service_user {
95        Some(config::get_bootstrap_cache_owner_path(user)?)
96    } else {
97        None
98    };
99
100    let release_repo = <dyn AntReleaseRepoActions>::default_config();
101
102    let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
103        let version = get_bin_version(&path)?;
104        (path, version)
105    } else {
106        download_and_extract_release(
107            ReleaseType::AntNode,
108            url.clone(),
109            version,
110            &*release_repo,
111            verbosity,
112            None,
113        )
114        .await?
115    };
116
117    debug!("Parsing peers from PeersArgs");
118
119    init_peers_config
120        .addrs
121        .extend(InitialPeersConfig::read_bootstrap_addr_from_env());
122    init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
123
124    let options = AddNodeServiceOptions {
125        alpha,
126        auto_restart,
127        auto_set_nat_flags,
128        count,
129        delete_antnode_src: src_path.is_none(),
130        enable_metrics_server,
131        evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
132        env_variables,
133        relay,
134        log_format,
135        max_archived_log_files,
136        max_log_files,
137        metrics_port,
138        network_id,
139        node_ip,
140        node_port,
141        init_peers_config,
142        rewards_address,
143        rpc_address,
144        rpc_port,
145        antnode_src_path,
146        antnode_dir_path: service_data_dir_path.clone(),
147        service_data_dir_path,
148        service_log_dir_path,
149        no_upnp,
150        user: service_user,
151        user_mode,
152        version,
153        write_older_cache_files,
154    };
155    info!("Adding node service(s)");
156    let added_services_names =
157        add_node(options, node_registry.clone(), &service_manager, verbosity).await?;
158
159    node_registry.save().await?;
160    debug!("Node registry saved");
161
162    Ok(added_services_names)
163}
164
165pub async fn balance(
166    peer_ids: Vec<String>,
167    node_registry: NodeRegistryManager,
168    service_names: Vec<String>,
169    verbosity: VerbosityLevel,
170) -> Result<()> {
171    if verbosity != VerbosityLevel::Minimal {
172        print_banner("Reward Balances");
173    }
174
175    refresh_node_registry(
176        node_registry.clone(),
177        &ServiceController {},
178        verbosity != VerbosityLevel::Minimal,
179        false,
180        verbosity,
181    )
182    .await?;
183
184    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
185    if services_for_ops.is_empty() {
186        info!("Services for ops is empty, cannot obtain the balance");
187        // This could be the case if all services are at `Removed` status.
188        println!("No balances to display");
189        return Ok(());
190    }
191    debug!("Obtaining balances for {} services", services_for_ops.len());
192
193    for node in services_for_ops {
194        let node = node.read().await;
195        // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
196        println!("{}: {}", node.service_name, 0,);
197    }
198    Ok(())
199}
200
201pub async fn remove(
202    keep_directories: bool,
203    peer_ids: Vec<String>,
204    node_registry: NodeRegistryManager,
205    service_names: Vec<String>,
206    verbosity: VerbosityLevel,
207) -> Result<()> {
208    if verbosity != VerbosityLevel::Minimal {
209        print_banner("Remove Antnode Services");
210    }
211    info!(
212        "Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}"
213    );
214
215    refresh_node_registry(
216        node_registry.clone(),
217        &ServiceController {},
218        verbosity != VerbosityLevel::Minimal,
219        false,
220        verbosity,
221    )
222    .await?;
223
224    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
225    if services_for_ops.is_empty() {
226        info!("Services for ops is empty, no services were eligible for removal");
227        // This could be the case if all services are at `Removed` status.
228        if verbosity != VerbosityLevel::Minimal {
229            println!("No services were eligible for removal");
230        }
231        return Ok(());
232    }
233
234    let mut failed_services = Vec::new();
235    for node in &services_for_ops {
236        let service_name = node.read().await.service_name.clone();
237        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
238        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
239        let mut service_manager =
240            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
241        match service_manager.remove(keep_directories).await {
242            Ok(()) => {
243                debug!("Removed service {service_name}");
244                node_registry.save().await?;
245            }
246            Err(err) => {
247                error!("Failed to remove service {service_name}: {err}");
248                failed_services.push((service_name.clone(), err.to_string()))
249            }
250        }
251    }
252
253    summarise_any_failed_ops(failed_services, "remove", verbosity)
254}
255
256pub async fn reset(
257    force: bool,
258    node_registry: NodeRegistryManager,
259    verbosity: VerbosityLevel,
260) -> Result<()> {
261    if verbosity != VerbosityLevel::Minimal {
262        print_banner("Reset Antnode Services");
263    }
264    info!("Resetting all antnode services, with force={force}");
265
266    if !force {
267        println!("WARNING: all antnode services, data, and logs will be removed.");
268        println!("Do you wish to proceed? [y/n]");
269        std::io::stdout().flush()?;
270        let mut input = String::new();
271        std::io::stdin().read_line(&mut input)?;
272        if input.trim().to_lowercase() != "y" {
273            println!("Reset aborted");
274            return Ok(());
275        }
276    }
277
278    stop(None, node_registry.clone(), vec![], vec![], verbosity).await?;
279    remove(false, vec![], node_registry, vec![], verbosity).await?;
280
281    // Due the possibility of repeated runs of the `reset` command, we need to check for the
282    // existence of this file before attempting to delete it, since `remove_file` will return an
283    // error if the file doesn't exist. On Windows this has been observed to happen.
284    let node_registry_path = config::get_node_registry_path()?;
285    if node_registry_path.exists() {
286        info!("Removing node registry file: {node_registry_path:?}");
287        std::fs::remove_file(node_registry_path)?;
288    }
289
290    Ok(())
291}
292
293pub async fn start(
294    connection_timeout_s: u64,
295    fixed_interval: Option<u64>,
296    node_registry: NodeRegistryManager,
297    peer_ids: Vec<String>,
298    service_names: Vec<String>,
299    verbosity: VerbosityLevel,
300) -> Result<()> {
301    if verbosity != VerbosityLevel::Minimal {
302        print_banner("Start Antnode Services");
303    }
304    info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
305
306    refresh_node_registry(
307        node_registry.clone(),
308        &ServiceController {},
309        verbosity != VerbosityLevel::Minimal,
310        false,
311        verbosity,
312    )
313    .await?;
314
315    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
316    if services_for_ops.is_empty() {
317        info!("No services are eligible to be started");
318        // This could be the case if all services are at `Removed` status.
319        if verbosity != VerbosityLevel::Minimal {
320            println!("No services were eligible to be started");
321        }
322        return Ok(());
323    }
324
325    let mut failed_services = Vec::new();
326    for node in &services_for_ops {
327        let service_name = node.read().await.service_name.clone();
328
329        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
330        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
331
332        // set dynamic startup delay if fixed_interval is not set
333        let service = if fixed_interval.is_none() {
334            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
335        } else {
336            service
337        };
338
339        let mut service_manager =
340            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
341        if service_manager.service.status().await != ServiceStatus::Running {
342            // It would be possible here to check if the service *is* running and then just
343            // continue without applying the delay. The reason for not doing so is because when
344            // `start` is called below, the user will get a message to say the service was already
345            // started, which I think is useful behaviour to retain.
346            if let Some(interval) = fixed_interval {
347                debug!("Sleeping for {} milliseconds", interval);
348                std::thread::sleep(std::time::Duration::from_millis(interval));
349            }
350        }
351        match service_manager.start().await {
352            Ok(start_duration) => {
353                debug!("Started service {service_name} in {start_duration:?}",);
354
355                node_registry.save().await?;
356            }
357            Err(err) => {
358                error!("Failed to start service {service_name}: {err}");
359                failed_services.push((service_name.clone(), err.to_string()))
360            }
361        }
362    }
363
364    summarise_any_failed_ops(failed_services, "start", verbosity)
365}
366
367pub async fn status(
368    details: bool,
369    fail: bool,
370    json: bool,
371    node_registry: NodeRegistryManager,
372) -> Result<()> {
373    if !node_registry.nodes.read().await.is_empty() {
374        if !json && !details {
375            print_banner("Antnode Services");
376        }
377        status_report(
378            &node_registry,
379            &ServiceController {},
380            details,
381            json,
382            fail,
383            false,
384        )
385        .await?;
386        node_registry.save().await?;
387    }
388    Ok(())
389}
390
391pub async fn stop(
392    interval: Option<u64>,
393    node_registry: NodeRegistryManager,
394    peer_ids: Vec<String>,
395    service_names: Vec<String>,
396    verbosity: VerbosityLevel,
397) -> Result<()> {
398    if verbosity != VerbosityLevel::Minimal {
399        print_banner("Stop Antnode Services");
400    }
401    info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
402
403    refresh_node_registry(
404        node_registry.clone(),
405        &ServiceController {},
406        verbosity != VerbosityLevel::Minimal,
407        false,
408        verbosity,
409    )
410    .await?;
411
412    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
413    if services_for_ops.is_empty() {
414        info!("No services are eligible to be stopped");
415        // This could be the case if all services are at `Removed` status.
416        if verbosity != VerbosityLevel::Minimal {
417            println!("No services were eligible to be stopped");
418        }
419        return Ok(());
420    }
421
422    let mut failed_services = Vec::new();
423    for node in services_for_ops.iter() {
424        let service_name = node.read().await.service_name.clone();
425        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
426        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
427        let mut service_manager =
428            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
429
430        if service_manager.service.status().await == ServiceStatus::Running
431            && let Some(interval) = interval
432        {
433            debug!("Sleeping for {} milliseconds", interval);
434            std::thread::sleep(std::time::Duration::from_millis(interval));
435        }
436        match service_manager.stop().await {
437            Ok(()) => {
438                debug!("Stopped service {service_name}");
439                node_registry.save().await?;
440            }
441            Err(err) => {
442                error!("Failed to stop service {service_name}: {err}");
443                failed_services.push((service_name.clone(), err.to_string()))
444            }
445        }
446    }
447
448    summarise_any_failed_ops(failed_services, "stop", verbosity)
449}
450
451pub async fn upgrade(
452    connection_timeout_s: u64,
453    do_not_start: bool,
454    custom_bin_path: Option<PathBuf>,
455    force: bool,
456    fixed_interval: Option<u64>,
457    node_registry: NodeRegistryManager,
458    peer_ids: Vec<String>,
459    provided_env_variables: Option<Vec<(String, String)>>,
460    service_names: Vec<String>,
461    url: Option<String>,
462    version: Option<String>,
463    verbosity: VerbosityLevel,
464) -> Result<()> {
465    // In the case of a custom binary, we want to force the use of it. Regardless of its version
466    // number, the user has probably built it for some special case. They may have not used the
467    // `--force` flag; if they didn't, we can just do that for them here.
468    let use_force = force || custom_bin_path.is_some();
469
470    if verbosity != VerbosityLevel::Minimal {
471        print_banner("Upgrade Antnode Services");
472    }
473    info!(
474        "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
475    );
476
477    let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
478        custom_bin_path.clone(),
479        ReleaseType::AntNode,
480        url,
481        version,
482        verbosity,
483    )
484    .await?;
485
486    refresh_node_registry(
487        node_registry.clone(),
488        &ServiceController {},
489        verbosity != VerbosityLevel::Minimal,
490        false,
491        verbosity,
492    )
493    .await?;
494
495    if let Some(node) = node_registry.nodes.read().await.first() {
496        let node = node.read().await;
497        debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
498    } else {
499        debug!("There are no nodes currently added or active");
500    }
501
502    if !use_force {
503        let mut node_versions = Vec::new();
504
505        for node in node_registry.nodes.read().await.iter() {
506            let node = node.read().await;
507            let version = Version::parse(&node.version)
508                .map_err(|_| eyre!("Failed to parse Version for node {}", node.service_name))?;
509            node_versions.push(version);
510        }
511
512        let any_nodes_need_upgraded = node_versions
513            .iter()
514            .any(|current_version| current_version < &target_version);
515        if !any_nodes_need_upgraded {
516            info!("All nodes are at the latest version, no upgrade required.");
517            if verbosity != VerbosityLevel::Minimal {
518                println!("{} All nodes are at the latest version", "✓".green());
519            }
520            return Ok(());
521        }
522    }
523
524    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
525    trace!("services_for_ops len: {}", services_for_ops.len());
526    let mut upgrade_summary = Vec::new();
527
528    for node in &services_for_ops {
529        let env_variables = if provided_env_variables.is_some() {
530            provided_env_variables.clone()
531        } else {
532            node_registry.environment_variables.read().await.clone()
533        };
534        let options = UpgradeOptions {
535            auto_restart: false,
536            env_variables: env_variables.clone(),
537            force: use_force,
538            start_service: !do_not_start,
539            target_bin_path: upgrade_bin_path.clone(),
540            target_version: target_version.clone(),
541        };
542        let service_name = node.read().await.service_name.clone();
543
544        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
545        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
546        // set dynamic startup delay if fixed_interval is not set
547        let service = if fixed_interval.is_none() {
548            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
549        } else {
550            service
551        };
552
553        let mut service_manager =
554            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
555
556        match service_manager.upgrade(options).await {
557            Ok(upgrade_result) => {
558                info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
559                if upgrade_result != UpgradeResult::NotRequired {
560                    // It doesn't seem useful to apply the interval if there was no upgrade
561                    // required for the previous service.
562                    if let Some(interval) = fixed_interval {
563                        debug!("Sleeping for {interval} milliseconds",);
564                        std::thread::sleep(std::time::Duration::from_millis(interval));
565                    }
566                }
567                upgrade_summary.push((service_name.clone(), upgrade_result));
568                node_registry.save().await?;
569            }
570            Err(err) => {
571                error!("Error upgrading service {service_name}: {err}");
572                upgrade_summary.push((
573                    service_name.clone(),
574                    UpgradeResult::Error(format!("Error: {err}")),
575                ));
576                node_registry.save().await?;
577            }
578        }
579    }
580
581    if verbosity != VerbosityLevel::Minimal {
582        print_upgrade_summary(upgrade_summary.clone());
583    }
584
585    if upgrade_summary.iter().any(|(_, r)| {
586        matches!(r, UpgradeResult::Error(_))
587            || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
588    }) {
589        return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
590            "For any services that were upgraded but did not start, you can attempt to start them \
591                again using the 'start' command."));
592    }
593
594    Ok(())
595}
596
597/// Ensure n nodes are running by stopping nodes or by adding and starting nodes if required.
598///
599/// The arguments here are mostly mirror those used in `add`.
600pub async fn maintain_n_running_nodes(
601    alpha: bool,
602    auto_restart: bool,
603    auto_set_nat_flags: bool,
604    connection_timeout_s: u64,
605    max_nodes_to_run: u16,
606    data_dir_path: Option<PathBuf>,
607    enable_metrics_server: bool,
608    env_variables: Option<Vec<(String, String)>>,
609    evm_network: Option<EvmNetwork>,
610    log_dir_path: Option<PathBuf>,
611    log_format: Option<LogFormat>,
612    max_archived_log_files: Option<usize>,
613    max_log_files: Option<usize>,
614    metrics_port: Option<PortRange>,
615    network_id: Option<u8>,
616    node_ip: Option<Ipv4Addr>,
617    node_port: Option<PortRange>,
618    node_registry: NodeRegistryManager,
619    peers_args: InitialPeersConfig,
620    relay: bool,
621    rewards_address: RewardsAddress,
622    rpc_address: Option<Ipv4Addr>,
623    rpc_port: Option<PortRange>,
624    src_path: Option<PathBuf>,
625    url: Option<String>,
626    no_upnp: bool,
627    user: Option<String>,
628    version: Option<String>,
629    verbosity: VerbosityLevel,
630    start_node_interval: Option<u64>,
631    write_older_cache_files: bool,
632) -> Result<()> {
633    let mut running_nodes = Vec::new();
634
635    for node in node_registry.nodes.read().await.iter() {
636        let node = node.read().await;
637        if node.status == ServiceStatus::Running {
638            running_nodes.push(node.service_name.clone());
639        }
640    }
641
642    let running_count = running_nodes.len();
643    let target_count = max_nodes_to_run as usize;
644
645    info!(
646        "Current running nodes: {}, Target: {}",
647        running_count, target_count
648    );
649
650    match running_count.cmp(&target_count) {
651        Ordering::Greater => {
652            let to_stop_count = running_count - target_count;
653            let services_to_stop = running_nodes
654                .into_iter()
655                .rev() // Stop the oldest nodes first
656                .take(to_stop_count)
657                .collect::<Vec<_>>();
658
659            info!(
660                "Stopping {} excess nodes: {:?}",
661                to_stop_count, services_to_stop
662            );
663            stop(
664                None,
665                node_registry.clone(),
666                vec![],
667                services_to_stop,
668                verbosity,
669            )
670            .await?;
671        }
672        Ordering::Less => {
673            let to_start_count = target_count - running_count;
674            let mut inactive_nodes = Vec::new();
675            for node in node_registry.nodes.read().await.iter() {
676                let node = node.read().await;
677                if node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added {
678                    inactive_nodes.push(node.service_name.clone());
679                }
680            }
681
682            info!("Inactive nodes available: {}", inactive_nodes.len());
683
684            if to_start_count <= inactive_nodes.len() {
685                let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
686                info!(
687                    "Starting {} existing inactive nodes: {:?}",
688                    to_start_count, nodes_to_start
689                );
690                start(
691                    connection_timeout_s,
692                    start_node_interval,
693                    node_registry.clone(),
694                    vec![],
695                    nodes_to_start,
696                    verbosity,
697                )
698                .await?;
699            } else {
700                let to_add_count = to_start_count - inactive_nodes.len();
701                info!(
702                    "Adding {} new nodes and starting all {} inactive nodes",
703                    to_add_count,
704                    inactive_nodes.len()
705                );
706
707                let ports_to_use = match node_port {
708                    Some(PortRange::Single(port)) => vec![port],
709                    Some(PortRange::Range(start, end)) => {
710                        (start..=end).take(to_add_count).collect()
711                    }
712                    None => vec![],
713                };
714
715                for (i, port) in ports_to_use.into_iter().enumerate() {
716                    let added_service = add(
717                        alpha,
718                        auto_restart,
719                        auto_set_nat_flags,
720                        Some(1),
721                        data_dir_path.clone(),
722                        enable_metrics_server,
723                        env_variables.clone(),
724                        evm_network.clone(),
725                        log_dir_path.clone(),
726                        log_format,
727                        max_archived_log_files,
728                        max_log_files,
729                        metrics_port.clone(),
730                        network_id,
731                        node_ip,
732                        Some(PortRange::Single(port)),
733                        node_registry.clone(),
734                        peers_args.clone(),
735                        relay,
736                        rewards_address,
737                        rpc_address,
738                        rpc_port.clone(),
739                        src_path.clone(),
740                        no_upnp,
741                        url.clone(),
742                        user.clone(),
743                        version.clone(),
744                        verbosity,
745                        write_older_cache_files,
746                    )
747                    .await?;
748
749                    if i == 0 {
750                        start(
751                            connection_timeout_s,
752                            start_node_interval,
753                            node_registry.clone(),
754                            vec![],
755                            added_service,
756                            verbosity,
757                        )
758                        .await?;
759                    }
760                }
761
762                if !inactive_nodes.is_empty() {
763                    start(
764                        connection_timeout_s,
765                        start_node_interval,
766                        node_registry.clone(),
767                        vec![],
768                        inactive_nodes,
769                        verbosity,
770                    )
771                    .await?;
772                }
773            }
774        }
775        Ordering::Equal => {
776            info!(
777                "Current node count ({}) matches target ({}). No action needed.",
778                running_count, target_count
779            );
780        }
781    }
782
783    // Verify final state
784    let mut final_running_count = 0;
785    for node in node_registry.nodes.read().await.iter() {
786        let node_read = node.read().await;
787        if node_read.status == ServiceStatus::Running {
788            final_running_count += 1;
789        }
790    }
791
792    if final_running_count != target_count {
793        warn!(
794            "Failed to reach target node count. Expected {target_count}, but got {final_running_count}"
795        );
796    }
797
798    Ok(())
799}
800
801async fn get_services_for_ops(
802    node_registry: &NodeRegistryManager,
803    peer_ids: Vec<String>,
804    service_names: Vec<String>,
805) -> Result<Vec<Arc<RwLock<NodeServiceData>>>> {
806    let mut services = Vec::new();
807
808    if service_names.is_empty() && peer_ids.is_empty() {
809        for node in node_registry.nodes.read().await.iter() {
810            if node.read().await.status != ServiceStatus::Removed {
811                services.push(Arc::clone(node));
812            }
813        }
814    } else {
815        for name in &service_names {
816            let mut found_service_with_name = false;
817            for node in node_registry.nodes.read().await.iter() {
818                let node_read = node.read().await;
819                if node_read.service_name == *name && node_read.status != ServiceStatus::Removed {
820                    {
821                        services.push(Arc::clone(node));
822                        found_service_with_name = true;
823                        break;
824                    }
825                }
826            }
827
828            if !found_service_with_name {
829                error!("No service named '{name}'");
830                return Err(eyre!(format!("No service named '{name}'")));
831            }
832        }
833
834        for peer_id_str in &peer_ids {
835            let mut found_service_with_peer_id = false;
836            let given_peer_id = PeerId::from_str(peer_id_str)
837                .map_err(|_| eyre!(format!("Error parsing PeerId: '{peer_id_str}'")))?;
838            for node in node_registry.nodes.read().await.iter() {
839                let node_read = node.read().await;
840                if let Some(peer_id) = node_read.peer_id
841                    && peer_id == given_peer_id
842                    && node_read.status != ServiceStatus::Removed
843                {
844                    services.push(Arc::clone(node));
845                    found_service_with_peer_id = true;
846                    break;
847                }
848            }
849            if !found_service_with_peer_id {
850                error!("Could not find node with peer id: '{given_peer_id:?}'");
851                return Err(eyre!(format!(
852                    "Could not find node with peer ID '{given_peer_id}'",
853                )));
854            }
855        }
856    }
857
858    Ok(services)
859}
860
861fn summarise_any_failed_ops(
862    failed_services: Vec<(String, String)>,
863    verb: &str,
864    verbosity: VerbosityLevel,
865) -> Result<()> {
866    if !failed_services.is_empty() {
867        if verbosity != VerbosityLevel::Minimal {
868            println!("Failed to {verb} {} service(s):", failed_services.len());
869            for failed in failed_services.iter() {
870                println!("{} {}: {}", "✕".red(), failed.0, failed.1);
871            }
872        }
873
874        error!("Failed to {verb} one or more services");
875        return Err(eyre!("Failed to {verb} one or more services"));
876    }
877    Ok(())
878}