ant_node_manager/cmd/
node.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13    ServiceManager, VerbosityLevel,
14    add_services::{
15        add_node,
16        config::{AddNodeServiceOptions, PortRange},
17    },
18    config::{self, is_running_as_root},
19    helpers::{download_and_extract_release, get_bin_version},
20    print_banner, refresh_node_registry, status_report,
21};
22use ant_bootstrap::{Bootstrap, InitialPeersConfig};
23use ant_evm::{EvmNetwork, RewardsAddress};
24use ant_logging::LogFormat;
25use ant_releases::{AntReleaseRepoActions, ReleaseType};
26use ant_service_management::{
27    NodeRegistryManager, NodeService, NodeServiceData, ServiceStateActions, ServiceStatus,
28    UpgradeOptions, UpgradeResult,
29    control::{ServiceControl, ServiceController},
30    rpc::RpcClient,
31};
32use color_eyre::{Help, Result, eyre::eyre};
33use colored::Colorize;
34use libp2p_identity::PeerId;
35use semver::Version;
36use service_manager::RestartPolicy;
37use std::{
38    cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, sync::Arc, time::Duration,
39};
40use tokio::sync::RwLock;
41use tracing::debug;
42
43/// Returns the added service names
44pub async fn add(
45    alpha: bool,
46    auto_restart: bool,
47    auto_set_nat_flags: bool,
48    count: Option<u16>,
49    data_dir_path: Option<PathBuf>,
50    enable_metrics_server: bool,
51    env_variables: Option<Vec<(String, String)>>,
52    evm_network: Option<EvmNetwork>,
53    log_dir_path: Option<PathBuf>,
54    log_format: Option<LogFormat>,
55    max_archived_log_files: Option<usize>,
56    max_log_files: Option<usize>,
57    metrics_port: Option<PortRange>,
58    network_id: Option<u8>,
59    node_ip: Option<Ipv4Addr>,
60    node_port: Option<PortRange>,
61    node_registry: NodeRegistryManager,
62    mut init_peers_config: InitialPeersConfig,
63    relay: bool,
64    restart_policy: RestartPolicy,
65    rewards_address: RewardsAddress,
66    rpc_address: Option<Ipv4Addr>,
67    rpc_port: Option<PortRange>,
68    src_path: Option<PathBuf>,
69    no_upnp: bool,
70    url: Option<String>,
71    user: Option<String>,
72    version: Option<String>,
73    verbosity: VerbosityLevel,
74    write_older_cache_files: bool,
75) -> Result<Vec<String>> {
76    let user_mode = !is_running_as_root();
77
78    if verbosity != VerbosityLevel::Minimal {
79        print_banner("Add Antnode Services");
80        println!("{} service(s) to be added", count.unwrap_or(1));
81    }
82
83    let service_manager = ServiceController {};
84    let service_user = if user_mode {
85        None
86    } else {
87        let service_user = user.unwrap_or_else(|| "ant".to_string());
88        service_manager.create_service_user(&service_user)?;
89        Some(service_user)
90    };
91
92    let service_data_dir_path =
93        config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
94    let service_log_dir_path =
95        config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
96    let bootstrap_cache_dir = if let Some(user) = &service_user {
97        Some(config::get_bootstrap_cache_owner_path(user)?)
98    } else {
99        None
100    };
101
102    let release_repo = <dyn AntReleaseRepoActions>::default_config();
103
104    let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
105        let version = get_bin_version(&path)?;
106        (path, version)
107    } else {
108        download_and_extract_release(
109            ReleaseType::AntNode,
110            url.clone(),
111            version,
112            &*release_repo,
113            verbosity,
114            None,
115        )
116        .await?
117    };
118
119    debug!("Parsing peers from PeersArgs");
120
121    init_peers_config.addrs.extend(Bootstrap::fetch_from_env());
122    init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
123
124    let options = AddNodeServiceOptions {
125        alpha,
126        antnode_dir_path: service_data_dir_path.clone(),
127        antnode_src_path,
128        auto_restart,
129        auto_set_nat_flags,
130        count,
131        delete_antnode_src: src_path.is_none(),
132        enable_metrics_server,
133        evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
134        env_variables,
135        init_peers_config,
136        log_format,
137        max_archived_log_files,
138        max_log_files,
139        metrics_port,
140        network_id,
141        no_upnp,
142        node_ip,
143        node_port,
144        relay,
145        restart_policy,
146        rewards_address,
147        rpc_address,
148        rpc_port,
149        service_data_dir_path,
150        service_log_dir_path,
151        user: service_user,
152        user_mode,
153        version,
154        write_older_cache_files,
155    };
156    info!("Adding node service(s)");
157    let added_services_names =
158        add_node(options, node_registry.clone(), &service_manager, verbosity).await?;
159
160    node_registry.save().await?;
161    debug!("Node registry saved");
162
163    Ok(added_services_names)
164}
165
166pub async fn balance(
167    peer_ids: Vec<String>,
168    node_registry: NodeRegistryManager,
169    service_names: Vec<String>,
170    verbosity: VerbosityLevel,
171) -> Result<()> {
172    if verbosity != VerbosityLevel::Minimal {
173        print_banner("Reward Balances");
174    }
175
176    refresh_node_registry(
177        node_registry.clone(),
178        &ServiceController {},
179        verbosity != VerbosityLevel::Minimal,
180        false,
181        verbosity,
182    )
183    .await?;
184
185    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
186    if services_for_ops.is_empty() {
187        info!("Services for ops is empty, cannot obtain the balance");
188        // This could be the case if all services are at `Removed` status.
189        println!("No balances to display");
190        return Ok(());
191    }
192    debug!("Obtaining balances for {} services", services_for_ops.len());
193
194    for node in services_for_ops {
195        let node = node.read().await;
196        // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
197        println!("{}: {}", node.service_name, 0,);
198    }
199    Ok(())
200}
201
202pub async fn remove(
203    keep_directories: bool,
204    peer_ids: Vec<String>,
205    node_registry: NodeRegistryManager,
206    service_names: Vec<String>,
207    verbosity: VerbosityLevel,
208) -> Result<()> {
209    if verbosity != VerbosityLevel::Minimal {
210        print_banner("Remove Antnode Services");
211    }
212    info!(
213        "Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}"
214    );
215
216    refresh_node_registry(
217        node_registry.clone(),
218        &ServiceController {},
219        verbosity != VerbosityLevel::Minimal,
220        false,
221        verbosity,
222    )
223    .await?;
224
225    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
226    if services_for_ops.is_empty() {
227        info!("Services for ops is empty, no services were eligible for removal");
228        // This could be the case if all services are at `Removed` status.
229        if verbosity != VerbosityLevel::Minimal {
230            println!("No services were eligible for removal");
231        }
232        return Ok(());
233    }
234
235    let mut failed_services = Vec::new();
236    for node in &services_for_ops {
237        let service_name = node.read().await.service_name.clone();
238        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
239        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
240        let mut service_manager =
241            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
242        match service_manager.remove(keep_directories).await {
243            Ok(()) => {
244                debug!("Removed service {service_name}");
245                node_registry.save().await?;
246            }
247            Err(err) => {
248                error!("Failed to remove service {service_name}: {err}");
249                failed_services.push((service_name.clone(), err.to_string()))
250            }
251        }
252    }
253
254    summarise_any_failed_ops(failed_services, "remove", verbosity)
255}
256
257pub async fn reset(
258    force: bool,
259    node_registry: NodeRegistryManager,
260    verbosity: VerbosityLevel,
261) -> Result<()> {
262    if verbosity != VerbosityLevel::Minimal {
263        print_banner("Reset Antnode Services");
264    }
265    info!("Resetting all antnode services, with force={force}");
266
267    if !force {
268        println!("WARNING: all antnode services, data, and logs will be removed.");
269        println!("Do you wish to proceed? [y/n]");
270        std::io::stdout().flush()?;
271        let mut input = String::new();
272        std::io::stdin().read_line(&mut input)?;
273        if input.trim().to_lowercase() != "y" {
274            println!("Reset aborted");
275            return Ok(());
276        }
277    }
278
279    stop(None, node_registry.clone(), vec![], vec![], verbosity).await?;
280    remove(false, vec![], node_registry, vec![], verbosity).await?;
281
282    // Due the possibility of repeated runs of the `reset` command, we need to check for the
283    // existence of this file before attempting to delete it, since `remove_file` will return an
284    // error if the file doesn't exist. On Windows this has been observed to happen.
285    let node_registry_path = config::get_node_registry_path()?;
286    if node_registry_path.exists() {
287        info!("Removing node registry file: {node_registry_path:?}");
288        std::fs::remove_file(node_registry_path)?;
289    }
290
291    Ok(())
292}
293
294pub async fn start(
295    connection_timeout_s: u64,
296    fixed_interval: Option<u64>,
297    node_registry: NodeRegistryManager,
298    peer_ids: Vec<String>,
299    service_names: Vec<String>,
300    verbosity: VerbosityLevel,
301) -> Result<()> {
302    if verbosity != VerbosityLevel::Minimal {
303        print_banner("Start Antnode Services");
304    }
305    info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
306
307    refresh_node_registry(
308        node_registry.clone(),
309        &ServiceController {},
310        verbosity != VerbosityLevel::Minimal,
311        false,
312        verbosity,
313    )
314    .await?;
315
316    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
317    if services_for_ops.is_empty() {
318        info!("No services are eligible to be started");
319        // This could be the case if all services are at `Removed` status.
320        if verbosity != VerbosityLevel::Minimal {
321            println!("No services were eligible to be started");
322        }
323        return Ok(());
324    }
325
326    let mut failed_services = Vec::new();
327    for node in &services_for_ops {
328        let service_name = node.read().await.service_name.clone();
329
330        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
331        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
332
333        // set dynamic startup delay if fixed_interval is not set
334        let service = if fixed_interval.is_none() {
335            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
336        } else {
337            service
338        };
339
340        let mut service_manager =
341            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
342        if service_manager.service.status().await != ServiceStatus::Running {
343            // It would be possible here to check if the service *is* running and then just
344            // continue without applying the delay. The reason for not doing so is because when
345            // `start` is called below, the user will get a message to say the service was already
346            // started, which I think is useful behaviour to retain.
347            if let Some(interval) = fixed_interval {
348                debug!("Sleeping for {} milliseconds", interval);
349                std::thread::sleep(std::time::Duration::from_millis(interval));
350            }
351        }
352        match service_manager.start().await {
353            Ok(start_duration) => {
354                debug!("Started service {service_name} in {start_duration:?}",);
355
356                node_registry.save().await?;
357            }
358            Err(err) => {
359                error!("Failed to start service {service_name}: {err}");
360                failed_services.push((service_name.clone(), err.to_string()))
361            }
362        }
363    }
364
365    summarise_any_failed_ops(failed_services, "start", verbosity)
366}
367
368pub async fn status(
369    details: bool,
370    fail: bool,
371    json: bool,
372    node_registry: NodeRegistryManager,
373) -> Result<()> {
374    if !node_registry.nodes.read().await.is_empty() {
375        if !json && !details {
376            print_banner("Antnode Services");
377        }
378        status_report(
379            &node_registry,
380            &ServiceController {},
381            details,
382            json,
383            fail,
384            false,
385        )
386        .await?;
387        node_registry.save().await?;
388    }
389    Ok(())
390}
391
392pub async fn stop(
393    interval: Option<u64>,
394    node_registry: NodeRegistryManager,
395    peer_ids: Vec<String>,
396    service_names: Vec<String>,
397    verbosity: VerbosityLevel,
398) -> Result<()> {
399    if verbosity != VerbosityLevel::Minimal {
400        print_banner("Stop Antnode Services");
401    }
402    info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
403
404    refresh_node_registry(
405        node_registry.clone(),
406        &ServiceController {},
407        verbosity != VerbosityLevel::Minimal,
408        false,
409        verbosity,
410    )
411    .await?;
412
413    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
414    if services_for_ops.is_empty() {
415        info!("No services are eligible to be stopped");
416        // This could be the case if all services are at `Removed` status.
417        if verbosity != VerbosityLevel::Minimal {
418            println!("No services were eligible to be stopped");
419        }
420        return Ok(());
421    }
422
423    let mut failed_services = Vec::new();
424    for node in services_for_ops.iter() {
425        let service_name = node.read().await.service_name.clone();
426        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
427        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
428        let mut service_manager =
429            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
430
431        if service_manager.service.status().await == ServiceStatus::Running
432            && let Some(interval) = interval
433        {
434            debug!("Sleeping for {} milliseconds", interval);
435            std::thread::sleep(std::time::Duration::from_millis(interval));
436        }
437        match service_manager.stop().await {
438            Ok(()) => {
439                debug!("Stopped service {service_name}");
440                node_registry.save().await?;
441            }
442            Err(err) => {
443                error!("Failed to stop service {service_name}: {err}");
444                failed_services.push((service_name.clone(), err.to_string()))
445            }
446        }
447    }
448
449    summarise_any_failed_ops(failed_services, "stop", verbosity)
450}
451
452pub async fn upgrade(
453    connection_timeout_s: u64,
454    do_not_start: bool,
455    custom_bin_path: Option<PathBuf>,
456    force: bool,
457    fixed_interval: Option<u64>,
458    node_registry: NodeRegistryManager,
459    peer_ids: Vec<String>,
460    provided_env_variables: Option<Vec<(String, String)>>,
461    service_names: Vec<String>,
462    url: Option<String>,
463    version: Option<String>,
464    verbosity: VerbosityLevel,
465) -> Result<()> {
466    // In the case of a custom binary, we want to force the use of it. Regardless of its version
467    // number, the user has probably built it for some special case. They may have not used the
468    // `--force` flag; if they didn't, we can just do that for them here.
469    let use_force = force || custom_bin_path.is_some();
470
471    if verbosity != VerbosityLevel::Minimal {
472        print_banner("Upgrade Antnode Services");
473    }
474    info!(
475        "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
476    );
477
478    let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
479        custom_bin_path.clone(),
480        ReleaseType::AntNode,
481        url,
482        version,
483        verbosity,
484    )
485    .await?;
486
487    refresh_node_registry(
488        node_registry.clone(),
489        &ServiceController {},
490        verbosity != VerbosityLevel::Minimal,
491        false,
492        verbosity,
493    )
494    .await?;
495
496    if let Some(node) = node_registry.nodes.read().await.first() {
497        let node = node.read().await;
498        debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
499    } else {
500        debug!("There are no nodes currently added or active");
501    }
502
503    if !use_force {
504        let mut node_versions = Vec::new();
505
506        for node in node_registry.nodes.read().await.iter() {
507            let node = node.read().await;
508            let version = Version::parse(&node.version)
509                .map_err(|_| eyre!("Failed to parse Version for node {}", node.service_name))?;
510            node_versions.push(version);
511        }
512
513        let any_nodes_need_upgraded = node_versions
514            .iter()
515            .any(|current_version| current_version < &target_version);
516        if !any_nodes_need_upgraded {
517            info!("All nodes are at the latest version, no upgrade required.");
518            if verbosity != VerbosityLevel::Minimal {
519                println!("{} All nodes are at the latest version", "✓".green());
520            }
521            return Ok(());
522        }
523    }
524
525    let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
526    trace!("services_for_ops len: {}", services_for_ops.len());
527    let mut upgrade_summary = Vec::new();
528
529    for node in &services_for_ops {
530        let env_variables = if provided_env_variables.is_some() {
531            provided_env_variables.clone()
532        } else {
533            node_registry.environment_variables.read().await.clone()
534        };
535        let options = UpgradeOptions {
536            auto_restart: false,
537            env_variables: env_variables.clone(),
538            force: use_force,
539            start_service: !do_not_start,
540            target_bin_path: upgrade_bin_path.clone(),
541            target_version: target_version.clone(),
542        };
543        let service_name = node.read().await.service_name.clone();
544
545        let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
546        let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
547        // set dynamic startup delay if fixed_interval is not set
548        let service = if fixed_interval.is_none() {
549            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
550        } else {
551            service
552        };
553
554        let mut service_manager =
555            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
556
557        match service_manager.upgrade(options).await {
558            Ok(upgrade_result) => {
559                info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
560                if upgrade_result != UpgradeResult::NotRequired {
561                    // It doesn't seem useful to apply the interval if there was no upgrade
562                    // required for the previous service.
563                    if let Some(interval) = fixed_interval {
564                        debug!("Sleeping for {interval} milliseconds",);
565                        std::thread::sleep(std::time::Duration::from_millis(interval));
566                    }
567                }
568                upgrade_summary.push((service_name.clone(), upgrade_result));
569                node_registry.save().await?;
570            }
571            Err(err) => {
572                error!("Error upgrading service {service_name}: {err}");
573                upgrade_summary.push((
574                    service_name.clone(),
575                    UpgradeResult::Error(format!("Error: {err}")),
576                ));
577                node_registry.save().await?;
578            }
579        }
580    }
581
582    if verbosity != VerbosityLevel::Minimal {
583        print_upgrade_summary(upgrade_summary.clone());
584    }
585
586    if upgrade_summary.iter().any(|(_, r)| {
587        matches!(r, UpgradeResult::Error(_))
588            || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
589    }) {
590        return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
591            "For any services that were upgraded but did not start, you can attempt to start them \
592                again using the 'start' command."));
593    }
594
595    Ok(())
596}
597
598/// Ensure n nodes are running by stopping nodes or by adding and starting nodes if required.
599///
600/// The arguments here are mostly mirror those used in `add`.
601pub async fn maintain_n_running_nodes(
602    alpha: bool,
603    auto_restart: bool,
604    auto_set_nat_flags: bool,
605    connection_timeout_s: u64,
606    max_nodes_to_run: u16,
607    data_dir_path: Option<PathBuf>,
608    enable_metrics_server: bool,
609    env_variables: Option<Vec<(String, String)>>,
610    evm_network: Option<EvmNetwork>,
611    log_dir_path: Option<PathBuf>,
612    log_format: Option<LogFormat>,
613    max_archived_log_files: Option<usize>,
614    max_log_files: Option<usize>,
615    metrics_port: Option<PortRange>,
616    network_id: Option<u8>,
617    node_ip: Option<Ipv4Addr>,
618    node_port: Option<PortRange>,
619    node_registry: NodeRegistryManager,
620    peers_args: InitialPeersConfig,
621    relay: bool,
622    rewards_address: RewardsAddress,
623    restart_policy: RestartPolicy,
624    rpc_address: Option<Ipv4Addr>,
625    rpc_port: Option<PortRange>,
626    src_path: Option<PathBuf>,
627    url: Option<String>,
628    no_upnp: bool,
629    user: Option<String>,
630    version: Option<String>,
631    verbosity: VerbosityLevel,
632    start_node_interval: Option<u64>,
633    write_older_cache_files: bool,
634) -> Result<()> {
635    let mut running_nodes = Vec::new();
636
637    for node in node_registry.nodes.read().await.iter() {
638        let node = node.read().await;
639        if node.status == ServiceStatus::Running {
640            running_nodes.push(node.service_name.clone());
641        }
642    }
643
644    let running_count = running_nodes.len();
645    let target_count = max_nodes_to_run as usize;
646
647    info!(
648        "Current running nodes: {}, Target: {}",
649        running_count, target_count
650    );
651
652    match running_count.cmp(&target_count) {
653        Ordering::Greater => {
654            let to_stop_count = running_count - target_count;
655            let services_to_stop = running_nodes
656                .into_iter()
657                .rev() // Stop the oldest nodes first
658                .take(to_stop_count)
659                .collect::<Vec<_>>();
660
661            info!(
662                "Stopping {} excess nodes: {:?}",
663                to_stop_count, services_to_stop
664            );
665            stop(
666                None,
667                node_registry.clone(),
668                vec![],
669                services_to_stop,
670                verbosity,
671            )
672            .await?;
673        }
674        Ordering::Less => {
675            let to_start_count = target_count - running_count;
676            let mut inactive_nodes = Vec::new();
677            for node in node_registry.nodes.read().await.iter() {
678                let node = node.read().await;
679                if node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added {
680                    inactive_nodes.push(node.service_name.clone());
681                }
682            }
683
684            info!("Inactive nodes available: {}", inactive_nodes.len());
685
686            if to_start_count <= inactive_nodes.len() {
687                let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
688                info!(
689                    "Starting {} existing inactive nodes: {:?}",
690                    to_start_count, nodes_to_start
691                );
692                start(
693                    connection_timeout_s,
694                    start_node_interval,
695                    node_registry.clone(),
696                    vec![],
697                    nodes_to_start,
698                    verbosity,
699                )
700                .await?;
701            } else {
702                let to_add_count = to_start_count - inactive_nodes.len();
703                info!(
704                    "Adding {} new nodes and starting all {} inactive nodes",
705                    to_add_count,
706                    inactive_nodes.len()
707                );
708
709                let ports_to_use = match node_port {
710                    Some(PortRange::Single(port)) => vec![port],
711                    Some(PortRange::Range(start, end)) => {
712                        (start..=end).take(to_add_count).collect()
713                    }
714                    None => vec![],
715                };
716
717                for (i, port) in ports_to_use.into_iter().enumerate() {
718                    let added_service = add(
719                        alpha,
720                        auto_restart,
721                        auto_set_nat_flags,
722                        Some(1),
723                        data_dir_path.clone(),
724                        enable_metrics_server,
725                        env_variables.clone(),
726                        evm_network.clone(),
727                        log_dir_path.clone(),
728                        log_format,
729                        max_archived_log_files,
730                        max_log_files,
731                        metrics_port.clone(),
732                        network_id,
733                        node_ip,
734                        Some(PortRange::Single(port)),
735                        node_registry.clone(),
736                        peers_args.clone(),
737                        relay,
738                        restart_policy,
739                        rewards_address,
740                        rpc_address,
741                        rpc_port.clone(),
742                        src_path.clone(),
743                        no_upnp,
744                        url.clone(),
745                        user.clone(),
746                        version.clone(),
747                        verbosity,
748                        write_older_cache_files,
749                    )
750                    .await?;
751
752                    if i == 0 {
753                        start(
754                            connection_timeout_s,
755                            start_node_interval,
756                            node_registry.clone(),
757                            vec![],
758                            added_service,
759                            verbosity,
760                        )
761                        .await?;
762                    }
763                }
764
765                if !inactive_nodes.is_empty() {
766                    start(
767                        connection_timeout_s,
768                        start_node_interval,
769                        node_registry.clone(),
770                        vec![],
771                        inactive_nodes,
772                        verbosity,
773                    )
774                    .await?;
775                }
776            }
777        }
778        Ordering::Equal => {
779            info!(
780                "Current node count ({}) matches target ({}). No action needed.",
781                running_count, target_count
782            );
783        }
784    }
785
786    // Verify final state
787    let mut final_running_count = 0;
788    for node in node_registry.nodes.read().await.iter() {
789        let node_read = node.read().await;
790        if node_read.status == ServiceStatus::Running {
791            final_running_count += 1;
792        }
793    }
794
795    if final_running_count != target_count {
796        warn!(
797            "Failed to reach target node count. Expected {target_count}, but got {final_running_count}"
798        );
799    }
800
801    Ok(())
802}
803
804async fn get_services_for_ops(
805    node_registry: &NodeRegistryManager,
806    peer_ids: Vec<String>,
807    service_names: Vec<String>,
808) -> Result<Vec<Arc<RwLock<NodeServiceData>>>> {
809    let mut services = Vec::new();
810
811    if service_names.is_empty() && peer_ids.is_empty() {
812        for node in node_registry.nodes.read().await.iter() {
813            if node.read().await.status != ServiceStatus::Removed {
814                services.push(Arc::clone(node));
815            }
816        }
817    } else {
818        for name in &service_names {
819            let mut found_service_with_name = false;
820            for node in node_registry.nodes.read().await.iter() {
821                let node_read = node.read().await;
822                if node_read.service_name == *name && node_read.status != ServiceStatus::Removed {
823                    {
824                        services.push(Arc::clone(node));
825                        found_service_with_name = true;
826                        break;
827                    }
828                }
829            }
830
831            if !found_service_with_name {
832                error!("No service named '{name}'");
833                return Err(eyre!(format!("No service named '{name}'")));
834            }
835        }
836
837        for peer_id_str in &peer_ids {
838            let mut found_service_with_peer_id = false;
839            let given_peer_id = PeerId::from_str(peer_id_str)
840                .map_err(|_| eyre!(format!("Error parsing PeerId: '{peer_id_str}'")))?;
841            for node in node_registry.nodes.read().await.iter() {
842                let node_read = node.read().await;
843                if let Some(peer_id) = node_read.peer_id
844                    && peer_id == given_peer_id
845                    && node_read.status != ServiceStatus::Removed
846                {
847                    services.push(Arc::clone(node));
848                    found_service_with_peer_id = true;
849                    break;
850                }
851            }
852            if !found_service_with_peer_id {
853                error!("Could not find node with peer id: '{given_peer_id:?}'");
854                return Err(eyre!(format!(
855                    "Could not find node with peer ID '{given_peer_id}'",
856                )));
857            }
858        }
859    }
860
861    Ok(services)
862}
863
864fn summarise_any_failed_ops(
865    failed_services: Vec<(String, String)>,
866    verb: &str,
867    verbosity: VerbosityLevel,
868) -> Result<()> {
869    if !failed_services.is_empty() {
870        if verbosity != VerbosityLevel::Minimal {
871            println!("Failed to {verb} {} service(s):", failed_services.len());
872            for failed in failed_services.iter() {
873                println!("{} {}: {}", "✕".red(), failed.0, failed.1);
874            }
875        }
876
877        error!("Failed to {verb} one or more services");
878        return Err(eyre!("Failed to {verb} one or more services"));
879    }
880    Ok(())
881}