sn_node_manager/cmd/
node.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13    add_services::{
14        add_node,
15        config::{AddNodeServiceOptions, PortRange},
16    },
17    config::{self, is_running_as_root},
18    helpers::{download_and_extract_release, get_bin_version},
19    print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel,
20};
21use color_eyre::{eyre::eyre, Help, Result};
22use colored::Colorize;
23use libp2p_identity::PeerId;
24use semver::Version;
25use sn_evm::{EvmNetwork, RewardsAddress};
26use sn_logging::LogFormat;
27use sn_peers_acquisition::PeersArgs;
28use sn_releases::{ReleaseType, SafeReleaseRepoActions};
29use sn_service_management::{
30    control::{ServiceControl, ServiceController},
31    rpc::RpcClient,
32    NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult,
33};
34use sn_transfers::HotWallet;
35use std::{cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, time::Duration};
36use tracing::debug;
37
38/// Returns the added service names
39pub async fn add(
40    auto_restart: bool,
41    auto_set_nat_flags: bool,
42    count: Option<u16>,
43    data_dir_path: Option<PathBuf>,
44    enable_metrics_server: bool,
45    env_variables: Option<Vec<(String, String)>>,
46    evm_network: Option<EvmNetwork>,
47    home_network: bool,
48    local: bool,
49    log_dir_path: Option<PathBuf>,
50    log_format: Option<LogFormat>,
51    max_archived_log_files: Option<usize>,
52    max_log_files: Option<usize>,
53    metrics_port: Option<PortRange>,
54    node_ip: Option<Ipv4Addr>,
55    node_port: Option<PortRange>,
56    owner: Option<String>,
57    peers_args: PeersArgs,
58    rewards_address: RewardsAddress,
59    rpc_address: Option<Ipv4Addr>,
60    rpc_port: Option<PortRange>,
61    src_path: Option<PathBuf>,
62    upnp: bool,
63    url: Option<String>,
64    user: Option<String>,
65    version: Option<String>,
66    verbosity: VerbosityLevel,
67) -> Result<Vec<String>> {
68    let user_mode = !is_running_as_root();
69
70    if verbosity != VerbosityLevel::Minimal {
71        print_banner("Add Safenode Services");
72        println!("{} service(s) to be added", count.unwrap_or(1));
73    }
74
75    let service_manager = ServiceController {};
76    let service_user = if user_mode {
77        None
78    } else {
79        let service_user = user.unwrap_or_else(|| "safe".to_string());
80        service_manager.create_service_user(&service_user)?;
81        Some(service_user)
82    };
83
84    let service_data_dir_path =
85        config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
86    let service_log_dir_path = config::get_service_log_dir_path(
87        ReleaseType::Safenode,
88        log_dir_path,
89        service_user.clone(),
90    )?;
91
92    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
93    let release_repo = <dyn SafeReleaseRepoActions>::default_config();
94
95    let (safenode_src_path, version) = if let Some(path) = src_path.clone() {
96        let version = get_bin_version(&path)?;
97        (path, version)
98    } else {
99        download_and_extract_release(
100            ReleaseType::Safenode,
101            url.clone(),
102            version,
103            &*release_repo,
104            verbosity,
105            None,
106        )
107        .await?
108    };
109
110    debug!("Parsing peers from PeersArgs");
111
112    // Handle the `PeersNotObtained` error to make the `--peer` argument optional for the node
113    // manager.
114    //
115    // Since any application making use of the node manager can enable the `network-contacts` feature on
116    // sn_peers_acquisition, we might end up getting having a huge peer list, and that's problematic for
117    // service definition files.
118    // Thus make use of get_peers_exclude_network_contacts() instead of get_peers() to make sure we only
119    // parse the --peers and SAFE_PEERS env var.
120
121    // If the `safenode` binary we're using has `network-contacts` enabled (which is the case for released binaries),
122    // it's fine if the service definition doesn't call `safenode` with a `--peer` argument.
123    let is_first = peers_args.first;
124    let bootstrap_peers = match peers_args.get_peers_exclude_network_contacts().await {
125        Ok(peers) => {
126            info!("Obtained peers of length {}", peers.len());
127            peers
128        }
129        Err(err) => match err {
130            sn_peers_acquisition::error::Error::PeersNotObtained => {
131                info!("No bootstrap peers obtained, setting empty vec.");
132                Vec::new()
133            }
134            _ => {
135                error!("Error obtaining peers: {err:?}");
136                return Err(err.into());
137            }
138        },
139    };
140
141    let options = AddNodeServiceOptions {
142        auto_restart,
143        auto_set_nat_flags,
144        bootstrap_peers,
145        count,
146        delete_safenode_src: src_path.is_none(),
147        enable_metrics_server,
148        evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
149        env_variables,
150        genesis: is_first,
151        home_network,
152        local,
153        log_format,
154        max_archived_log_files,
155        max_log_files,
156        metrics_port,
157        node_ip,
158        node_port,
159        owner,
160        rewards_address,
161        rpc_address,
162        rpc_port,
163        safenode_src_path,
164        safenode_dir_path: service_data_dir_path.clone(),
165        service_data_dir_path,
166        service_log_dir_path,
167        upnp,
168        user: service_user,
169        user_mode,
170        version,
171    };
172    info!("Adding node service(s)");
173    let added_services_names =
174        add_node(options, &mut node_registry, &service_manager, verbosity).await?;
175
176    node_registry.save()?;
177    debug!("Node registry saved");
178
179    Ok(added_services_names)
180}
181
182pub async fn balance(
183    peer_ids: Vec<String>,
184    service_names: Vec<String>,
185    verbosity: VerbosityLevel,
186) -> Result<()> {
187    if verbosity != VerbosityLevel::Minimal {
188        print_banner("Reward Balances");
189    }
190
191    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
192    refresh_node_registry(
193        &mut node_registry,
194        &ServiceController {},
195        verbosity != VerbosityLevel::Minimal,
196        false,
197        false,
198    )
199    .await?;
200
201    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
202    if service_indices.is_empty() {
203        info!("Service indices is empty, cannot obtain the balance");
204        // This could be the case if all services are at `Removed` status.
205        println!("No balances to display");
206        return Ok(());
207    }
208    debug!("Obtaining balances for {} services", service_indices.len());
209
210    for &index in &service_indices {
211        let node = &mut node_registry.nodes[index];
212        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
213        let service = NodeService::new(node, Box::new(rpc_client));
214        let wallet = HotWallet::load_from(&service.service_data.data_dir_path)
215            .inspect_err(|err| error!("Error while loading hot wallet: {err:?}"))?;
216        println!(
217            "{}: {}",
218            service.service_data.service_name,
219            wallet.balance()
220        );
221    }
222    Ok(())
223}
224
225pub async fn remove(
226    keep_directories: bool,
227    peer_ids: Vec<String>,
228    service_names: Vec<String>,
229    verbosity: VerbosityLevel,
230) -> Result<()> {
231    if verbosity != VerbosityLevel::Minimal {
232        print_banner("Remove Safenode Services");
233    }
234    info!("Removing safe node services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}");
235
236    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
237    refresh_node_registry(
238        &mut node_registry,
239        &ServiceController {},
240        verbosity != VerbosityLevel::Minimal,
241        false,
242        false,
243    )
244    .await?;
245
246    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
247    if service_indices.is_empty() {
248        info!("Service indices is empty, no services were eligible for removal");
249        // This could be the case if all services are at `Removed` status.
250        if verbosity != VerbosityLevel::Minimal {
251            println!("No services were eligible for removal");
252        }
253        return Ok(());
254    }
255
256    let mut failed_services = Vec::new();
257    for &index in &service_indices {
258        let node = &mut node_registry.nodes[index];
259        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
260        let service = NodeService::new(node, Box::new(rpc_client));
261        let mut service_manager =
262            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
263        match service_manager.remove(keep_directories).await {
264            Ok(()) => {
265                debug!("Removed service {}", node.service_name);
266                node_registry.save()?;
267            }
268            Err(err) => {
269                error!("Failed to remove service {}: {err}", node.service_name);
270                failed_services.push((node.service_name.clone(), err.to_string()))
271            }
272        }
273    }
274
275    summarise_any_failed_ops(failed_services, "remove", verbosity)
276}
277
278pub async fn reset(force: bool, verbosity: VerbosityLevel) -> Result<()> {
279    if verbosity != VerbosityLevel::Minimal {
280        print_banner("Reset Safenode Services");
281    }
282    info!("Resetting all safenode services, with force={force}");
283
284    if !force {
285        println!("WARNING: all safenode services, data, and logs will be removed.");
286        println!("Do you wish to proceed? [y/n]");
287        std::io::stdout().flush()?;
288        let mut input = String::new();
289        std::io::stdin().read_line(&mut input)?;
290        if input.trim().to_lowercase() != "y" {
291            println!("Reset aborted");
292            return Ok(());
293        }
294    }
295
296    stop(None, vec![], vec![], verbosity).await?;
297    remove(false, vec![], vec![], verbosity).await?;
298
299    // Due the possibility of repeated runs of the `reset` command, we need to check for the
300    // existence of this file before attempting to delete it, since `remove_file` will return an
301    // error if the file doesn't exist. On Windows this has been observed to happen.
302    let node_registry_path = config::get_node_registry_path()?;
303    if node_registry_path.exists() {
304        info!("Removing node registry file: {node_registry_path:?}");
305        std::fs::remove_file(node_registry_path)?;
306    }
307
308    Ok(())
309}
310
311pub async fn start(
312    connection_timeout_s: u64,
313    fixed_interval: Option<u64>,
314    peer_ids: Vec<String>,
315    service_names: Vec<String>,
316    verbosity: VerbosityLevel,
317) -> Result<()> {
318    if verbosity != VerbosityLevel::Minimal {
319        print_banner("Start Safenode Services");
320    }
321    info!("Starting safenode services for: {peer_ids:?}, {service_names:?}");
322
323    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
324    refresh_node_registry(
325        &mut node_registry,
326        &ServiceController {},
327        verbosity != VerbosityLevel::Minimal,
328        false,
329        false,
330    )
331    .await?;
332
333    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
334    if service_indices.is_empty() {
335        info!("No services are eligible to be started");
336        // This could be the case if all services are at `Removed` status.
337        if verbosity != VerbosityLevel::Minimal {
338            println!("No services were eligible to be started");
339        }
340        return Ok(());
341    }
342
343    let mut failed_services = Vec::new();
344    for &index in &service_indices {
345        let node = &mut node_registry.nodes[index];
346        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
347
348        let service = NodeService::new(node, Box::new(rpc_client));
349
350        // set dynamic startup delay if fixed_interval is not set
351        let service = if fixed_interval.is_none() {
352            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
353        } else {
354            service
355        };
356
357        let mut service_manager =
358            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
359        if service_manager.service.status() != ServiceStatus::Running {
360            // It would be possible here to check if the service *is* running and then just
361            // continue without applying the delay. The reason for not doing so is because when
362            // `start` is called below, the user will get a message to say the service was already
363            // started, which I think is useful behaviour to retain.
364            if let Some(interval) = fixed_interval {
365                debug!("Sleeping for {} milliseconds", interval);
366                std::thread::sleep(std::time::Duration::from_millis(interval));
367            }
368        }
369        match service_manager.start().await {
370            Ok(start_duration) => {
371                debug!(
372                    "Started service {} in {start_duration:?}",
373                    node.service_name
374                );
375
376                node_registry.save()?;
377            }
378            Err(err) => {
379                error!("Failed to start service {}: {err}", node.service_name);
380                failed_services.push((node.service_name.clone(), err.to_string()))
381            }
382        }
383    }
384
385    summarise_any_failed_ops(failed_services, "start", verbosity)
386}
387
388pub async fn status(details: bool, fail: bool, json: bool) -> Result<()> {
389    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
390    if !node_registry.nodes.is_empty() {
391        if !json && !details {
392            print_banner("Safenode Services");
393        }
394        status_report(
395            &mut node_registry,
396            &ServiceController {},
397            details,
398            json,
399            fail,
400            false,
401        )
402        .await?;
403        node_registry.save()?;
404    }
405    Ok(())
406}
407
408pub async fn stop(
409    interval: Option<u64>,
410    peer_ids: Vec<String>,
411    service_names: Vec<String>,
412    verbosity: VerbosityLevel,
413) -> Result<()> {
414    if verbosity != VerbosityLevel::Minimal {
415        print_banner("Stop Safenode Services");
416    }
417    info!("Stopping safenode services for: {peer_ids:?}, {service_names:?}");
418
419    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
420    refresh_node_registry(
421        &mut node_registry,
422        &ServiceController {},
423        verbosity != VerbosityLevel::Minimal,
424        false,
425        false,
426    )
427    .await?;
428
429    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
430    if service_indices.is_empty() {
431        info!("Service indices is empty, no services were eligible to be stopped");
432        // This could be the case if all services are at `Removed` status.
433        if verbosity != VerbosityLevel::Minimal {
434            println!("No services were eligible to be stopped");
435        }
436        return Ok(());
437    }
438
439    let mut failed_services = Vec::new();
440    for &index in &service_indices {
441        let node = &mut node_registry.nodes[index];
442        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
443        let service = NodeService::new(node, Box::new(rpc_client));
444        let mut service_manager =
445            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
446
447        if service_manager.service.status() == ServiceStatus::Running {
448            if let Some(interval) = interval {
449                debug!("Sleeping for {} milliseconds", interval);
450                std::thread::sleep(std::time::Duration::from_millis(interval));
451            }
452        }
453        match service_manager.stop().await {
454            Ok(()) => {
455                debug!("Stopped service {}", node.service_name);
456                node_registry.save()?;
457            }
458            Err(err) => {
459                error!("Failed to stop service {}: {err}", node.service_name);
460                failed_services.push((node.service_name.clone(), err.to_string()))
461            }
462        }
463    }
464
465    summarise_any_failed_ops(failed_services, "stop", verbosity)
466}
467
468pub async fn upgrade(
469    connection_timeout_s: u64,
470    do_not_start: bool,
471    custom_bin_path: Option<PathBuf>,
472    force: bool,
473    fixed_interval: Option<u64>,
474    peer_ids: Vec<String>,
475    provided_env_variables: Option<Vec<(String, String)>>,
476    service_names: Vec<String>,
477    url: Option<String>,
478    version: Option<String>,
479    verbosity: VerbosityLevel,
480) -> Result<()> {
481    // In the case of a custom binary, we want to force the use of it. Regardless of its version
482    // number, the user has probably built it for some special case. They may have not used the
483    // `--force` flag; if they didn't, we can just do that for them here.
484    let use_force = force || custom_bin_path.is_some();
485
486    if verbosity != VerbosityLevel::Minimal {
487        print_banner("Upgrade Safenode Services");
488    }
489    info!(
490        "Upgrading safenode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
491    );
492
493    let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
494        custom_bin_path.clone(),
495        ReleaseType::Safenode,
496        url,
497        version,
498        verbosity,
499    )
500    .await?;
501
502    let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
503    refresh_node_registry(
504        &mut node_registry,
505        &ServiceController {},
506        verbosity != VerbosityLevel::Minimal,
507        false,
508        false,
509    )
510    .await?;
511
512    debug!(
513        "listen addresses for nodes[0]: {:?}",
514        node_registry.nodes[0].listen_addr
515    );
516    if !use_force {
517        let node_versions = node_registry
518            .nodes
519            .iter()
520            .map(|n| Version::parse(&n.version).map_err(|_| eyre!("Failed to parse Version")))
521            .collect::<Result<Vec<Version>>>()?;
522        let any_nodes_need_upgraded = node_versions
523            .iter()
524            .any(|current_version| current_version < &target_version);
525        if !any_nodes_need_upgraded {
526            info!("All nodes are at the latest version, no upgrade required.");
527            if verbosity != VerbosityLevel::Minimal {
528                println!("{} All nodes are at the latest version", "✓".green());
529            }
530            return Ok(());
531        }
532    }
533
534    let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
535    trace!("service_indices len: {}", service_indices.len());
536    let mut upgrade_summary = Vec::new();
537
538    for &index in &service_indices {
539        let node = &mut node_registry.nodes[index];
540        let env_variables = if provided_env_variables.is_some() {
541            &provided_env_variables
542        } else {
543            &node_registry.environment_variables
544        };
545        let options = UpgradeOptions {
546            auto_restart: false,
547            bootstrap_peers: node_registry.bootstrap_peers.clone(),
548            env_variables: env_variables.clone(),
549            force: use_force,
550            start_service: !do_not_start,
551            target_bin_path: upgrade_bin_path.clone(),
552            target_version: target_version.clone(),
553        };
554        let service_name = node.service_name.clone();
555
556        let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
557        let service = NodeService::new(node, Box::new(rpc_client));
558        // set dynamic startup delay if fixed_interval is not set
559        let service = if fixed_interval.is_none() {
560            service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
561        } else {
562            service
563        };
564
565        let mut service_manager =
566            ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
567
568        match service_manager.upgrade(options).await {
569            Ok(upgrade_result) => {
570                info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
571                if upgrade_result != UpgradeResult::NotRequired {
572                    // It doesn't seem useful to apply the interval if there was no upgrade
573                    // required for the previous service.
574                    if let Some(interval) = fixed_interval {
575                        debug!("Sleeping for {interval} milliseconds",);
576                        std::thread::sleep(std::time::Duration::from_millis(interval));
577                    }
578                }
579                upgrade_summary.push((
580                    service_manager.service.service_data.service_name.clone(),
581                    upgrade_result,
582                ));
583                node_registry.save()?;
584            }
585            Err(err) => {
586                error!("Error upgrading service {service_name}: {err}");
587                upgrade_summary.push((
588                    node.service_name.clone(),
589                    UpgradeResult::Error(format!("Error: {err}")),
590                ));
591                node_registry.save()?;
592            }
593        }
594    }
595
596    if verbosity != VerbosityLevel::Minimal {
597        print_upgrade_summary(upgrade_summary.clone());
598    }
599
600    if upgrade_summary.iter().any(|(_, r)| {
601        matches!(r, UpgradeResult::Error(_))
602            || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
603    }) {
604        return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
605            "For any services that were upgraded but did not start, you can attempt to start them \
606                again using the 'start' command."));
607    }
608
609    Ok(())
610}
611
612/// Ensure n nodes are running by stopping nodes or by adding and starting nodes if required.
613///
614/// The arguments here are mostly mirror those used in `add`.
615pub async fn maintain_n_running_nodes(
616    auto_restart: bool,
617    auto_set_nat_flags: bool,
618    connection_timeout_s: u64,
619    max_nodes_to_run: u16,
620    data_dir_path: Option<PathBuf>,
621    enable_metrics_server: bool,
622    env_variables: Option<Vec<(String, String)>>,
623    evm_network: Option<EvmNetwork>,
624    home_network: bool,
625    local: bool,
626    log_dir_path: Option<PathBuf>,
627    log_format: Option<LogFormat>,
628    max_archived_log_files: Option<usize>,
629    max_log_files: Option<usize>,
630    metrics_port: Option<PortRange>,
631    node_ip: Option<Ipv4Addr>,
632    node_port: Option<PortRange>,
633    owner: Option<String>,
634    peers: PeersArgs,
635    rewards_address: RewardsAddress,
636    rpc_address: Option<Ipv4Addr>,
637    rpc_port: Option<PortRange>,
638    src_path: Option<PathBuf>,
639    url: Option<String>,
640    upnp: bool,
641    user: Option<String>,
642    version: Option<String>,
643    verbosity: VerbosityLevel,
644    start_node_interval: Option<u64>,
645) -> Result<()> {
646    let node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
647    let running_nodes = node_registry
648        .nodes
649        .iter()
650        .filter(|node| node.status == ServiceStatus::Running)
651        .map(|node| node.service_name.clone())
652        .collect::<Vec<_>>();
653
654    let running_count = running_nodes.len();
655    let target_count = max_nodes_to_run as usize;
656
657    info!(
658        "Current running nodes: {}, Target: {}",
659        running_count, target_count
660    );
661
662    match running_count.cmp(&target_count) {
663        Ordering::Greater => {
664            let to_stop_count = running_count - target_count;
665            let services_to_stop = running_nodes
666                .into_iter()
667                .rev() // Stop the oldest nodes first
668                .take(to_stop_count)
669                .collect::<Vec<_>>();
670
671            info!(
672                "Stopping {} excess nodes: {:?}",
673                to_stop_count, services_to_stop
674            );
675            stop(None, vec![], services_to_stop, verbosity).await?;
676        }
677        Ordering::Less => {
678            let to_start_count = target_count - running_count;
679            let inactive_nodes = node_registry
680                .nodes
681                .iter()
682                .filter(|node| {
683                    node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added
684                })
685                .map(|node| node.service_name.clone())
686                .collect::<Vec<_>>();
687
688            info!("Inactive nodes available: {}", inactive_nodes.len());
689
690            if to_start_count <= inactive_nodes.len() {
691                let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
692                info!(
693                    "Starting {} existing inactive nodes: {:?}",
694                    to_start_count, nodes_to_start
695                );
696                start(
697                    connection_timeout_s,
698                    start_node_interval,
699                    vec![],
700                    nodes_to_start,
701                    verbosity,
702                )
703                .await?;
704            } else {
705                let to_add_count = to_start_count - inactive_nodes.len();
706                info!(
707                    "Adding {} new nodes and starting all {} inactive nodes",
708                    to_add_count,
709                    inactive_nodes.len()
710                );
711
712                let ports_to_use = match node_port {
713                    Some(PortRange::Single(port)) => vec![port],
714                    Some(PortRange::Range(start, end)) => {
715                        (start..=end).take(to_add_count).collect()
716                    }
717                    None => vec![],
718                };
719
720                for (i, port) in ports_to_use.into_iter().enumerate() {
721                    let added_service = add(
722                        auto_restart,
723                        auto_set_nat_flags,
724                        Some(1),
725                        data_dir_path.clone(),
726                        enable_metrics_server,
727                        env_variables.clone(),
728                        evm_network.clone(),
729                        home_network,
730                        local,
731                        log_dir_path.clone(),
732                        log_format,
733                        max_archived_log_files,
734                        max_log_files,
735                        metrics_port.clone(),
736                        node_ip,
737                        Some(PortRange::Single(port)),
738                        owner.clone(),
739                        peers.clone(),
740                        rewards_address,
741                        rpc_address,
742                        rpc_port.clone(),
743                        src_path.clone(),
744                        upnp,
745                        url.clone(),
746                        user.clone(),
747                        version.clone(),
748                        verbosity,
749                    )
750                    .await?;
751
752                    if i == 0 {
753                        start(
754                            connection_timeout_s,
755                            start_node_interval,
756                            vec![],
757                            added_service,
758                            verbosity,
759                        )
760                        .await?;
761                    }
762                }
763
764                if !inactive_nodes.is_empty() {
765                    start(
766                        connection_timeout_s,
767                        start_node_interval,
768                        vec![],
769                        inactive_nodes,
770                        verbosity,
771                    )
772                    .await?;
773                }
774            }
775        }
776        Ordering::Equal => {
777            info!(
778                "Current node count ({}) matches target ({}). No action needed.",
779                running_count, target_count
780            );
781        }
782    }
783
784    // Verify final state
785    let final_node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
786    let final_running_count = final_node_registry
787        .nodes
788        .iter()
789        .filter(|node| node.status == ServiceStatus::Running)
790        .count();
791
792    info!("Final running node count: {}", final_running_count);
793    if final_running_count != target_count {
794        warn!(
795            "Failed to reach target node count. Expected {}, but got {}",
796            target_count, final_running_count
797        );
798    }
799
800    Ok(())
801}
802
803fn get_services_for_ops(
804    node_registry: &NodeRegistry,
805    peer_ids: Vec<String>,
806    service_names: Vec<String>,
807) -> Result<Vec<usize>> {
808    let mut service_indices = Vec::new();
809
810    if service_names.is_empty() && peer_ids.is_empty() {
811        for node in node_registry.nodes.iter() {
812            if let Some(index) = node_registry.nodes.iter().position(|x| {
813                x.service_name == node.service_name && x.status != ServiceStatus::Removed
814            }) {
815                service_indices.push(index);
816            }
817        }
818    } else {
819        for name in &service_names {
820            if let Some(index) = node_registry
821                .nodes
822                .iter()
823                .position(|x| x.service_name == *name && x.status != ServiceStatus::Removed)
824            {
825                service_indices.push(index);
826            } else {
827                error!("No service named '{name}'");
828                return Err(eyre!(format!("No service named '{name}'")));
829            }
830        }
831
832        for peer_id_str in &peer_ids {
833            let peer_id = PeerId::from_str(peer_id_str)
834                .inspect_err(|err| error!("Error parsing PeerId: {err:?}"))?;
835            if let Some(index) = node_registry
836                .nodes
837                .iter()
838                .position(|x| x.peer_id == Some(peer_id) && x.status != ServiceStatus::Removed)
839            {
840                service_indices.push(index);
841            } else {
842                error!("Could not find node with peer id: '{peer_id:?}'");
843                return Err(eyre!(format!(
844                    "Could not find node with peer ID '{peer_id}'",
845                )));
846            }
847        }
848    }
849
850    Ok(service_indices)
851}
852
853fn summarise_any_failed_ops(
854    failed_services: Vec<(String, String)>,
855    verb: &str,
856    verbosity: VerbosityLevel,
857) -> Result<()> {
858    if !failed_services.is_empty() {
859        if verbosity != VerbosityLevel::Minimal {
860            println!("Failed to {verb} {} service(s):", failed_services.len());
861            for failed in failed_services.iter() {
862                println!("{} {}: {}", "✕".red(), failed.0, failed.1);
863            }
864        }
865
866        error!("Failed to {verb} one or more services");
867        return Err(eyre!("Failed to {verb} one or more services"));
868    }
869    Ok(())
870}