1#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13 add_services::{
14 add_node,
15 config::{AddNodeServiceOptions, PortRange},
16 },
17 config::{self, is_running_as_root},
18 helpers::{download_and_extract_release, get_bin_version},
19 print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel,
20};
21use ant_bootstrap::InitialPeersConfig;
22use ant_evm::{EvmNetwork, RewardsAddress};
23use ant_logging::LogFormat;
24use ant_releases::{AntReleaseRepoActions, ReleaseType};
25use ant_service_management::{
26 control::{ServiceControl, ServiceController},
27 rpc::RpcClient,
28 NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult,
29};
30use color_eyre::{eyre::eyre, Help, Result};
31use colored::Colorize;
32use libp2p_identity::PeerId;
33use semver::Version;
34use std::{cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, time::Duration};
35use tracing::debug;
36
37pub async fn add(
39 auto_restart: bool,
40 auto_set_nat_flags: bool,
41 count: Option<u16>,
42 data_dir_path: Option<PathBuf>,
43 enable_metrics_server: bool,
44 env_variables: Option<Vec<(String, String)>>,
45 evm_network: Option<EvmNetwork>,
46 relay: bool,
47 log_dir_path: Option<PathBuf>,
48 log_format: Option<LogFormat>,
49 max_archived_log_files: Option<usize>,
50 max_log_files: Option<usize>,
51 metrics_port: Option<PortRange>,
52 network_id: Option<u8>,
53 node_ip: Option<Ipv4Addr>,
54 node_port: Option<PortRange>,
55 mut init_peers_config: InitialPeersConfig,
56 rewards_address: RewardsAddress,
57 rpc_address: Option<Ipv4Addr>,
58 rpc_port: Option<PortRange>,
59 src_path: Option<PathBuf>,
60 no_upnp: bool,
61 url: Option<String>,
62 user: Option<String>,
63 version: Option<String>,
64 verbosity: VerbosityLevel,
65) -> Result<Vec<String>> {
66 let user_mode = !is_running_as_root();
67
68 if verbosity != VerbosityLevel::Minimal {
69 print_banner("Add Antnode Services");
70 println!("{} service(s) to be added", count.unwrap_or(1));
71 }
72
73 let service_manager = ServiceController {};
74 let service_user = if user_mode {
75 None
76 } else {
77 let service_user = user.unwrap_or_else(|| "ant".to_string());
78 service_manager.create_service_user(&service_user)?;
79 Some(service_user)
80 };
81
82 let service_data_dir_path =
83 config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
84 let service_log_dir_path =
85 config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
86 let bootstrap_cache_dir = if let Some(user) = &service_user {
87 Some(config::get_bootstrap_cache_owner_path(user)?)
88 } else {
89 None
90 };
91
92 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
93 let release_repo = <dyn AntReleaseRepoActions>::default_config();
94
95 let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
96 let version = get_bin_version(&path)?;
97 (path, version)
98 } else {
99 download_and_extract_release(
100 ReleaseType::AntNode,
101 url.clone(),
102 version,
103 &*release_repo,
104 verbosity,
105 None,
106 )
107 .await?
108 };
109
110 debug!("Parsing peers from PeersArgs");
111
112 init_peers_config
113 .addrs
114 .extend(InitialPeersConfig::read_addr_from_env());
115 init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
116
117 let options = AddNodeServiceOptions {
118 auto_restart,
119 auto_set_nat_flags,
120 count,
121 delete_antnode_src: src_path.is_none(),
122 enable_metrics_server,
123 evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
124 env_variables,
125 relay,
126 log_format,
127 max_archived_log_files,
128 max_log_files,
129 metrics_port,
130 network_id,
131 node_ip,
132 node_port,
133 init_peers_config,
134 rewards_address,
135 rpc_address,
136 rpc_port,
137 antnode_src_path,
138 antnode_dir_path: service_data_dir_path.clone(),
139 service_data_dir_path,
140 service_log_dir_path,
141 no_upnp,
142 user: service_user,
143 user_mode,
144 version,
145 };
146 info!("Adding node service(s)");
147 let added_services_names =
148 add_node(options, &mut node_registry, &service_manager, verbosity).await?;
149
150 node_registry.save()?;
151 debug!("Node registry saved");
152
153 Ok(added_services_names)
154}
155
156pub async fn balance(
157 peer_ids: Vec<String>,
158 service_names: Vec<String>,
159 verbosity: VerbosityLevel,
160) -> Result<()> {
161 if verbosity != VerbosityLevel::Minimal {
162 print_banner("Reward Balances");
163 }
164
165 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
166 refresh_node_registry(
167 &mut node_registry,
168 &ServiceController {},
169 verbosity != VerbosityLevel::Minimal,
170 false,
171 false,
172 )
173 .await?;
174
175 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
176 if service_indices.is_empty() {
177 info!("Service indices is empty, cannot obtain the balance");
178 println!("No balances to display");
180 return Ok(());
181 }
182 debug!("Obtaining balances for {} services", service_indices.len());
183
184 for &index in &service_indices {
185 let node = &mut node_registry.nodes[index];
186 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
187 let service = NodeService::new(node, Box::new(rpc_client));
188 println!("{}: {}", service.service_data.service_name, 0,);
190 }
191 Ok(())
192}
193
194pub async fn remove(
195 keep_directories: bool,
196 peer_ids: Vec<String>,
197 service_names: Vec<String>,
198 verbosity: VerbosityLevel,
199) -> Result<()> {
200 if verbosity != VerbosityLevel::Minimal {
201 print_banner("Remove Antnode Services");
202 }
203 info!("Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}");
204
205 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
206 refresh_node_registry(
207 &mut node_registry,
208 &ServiceController {},
209 verbosity != VerbosityLevel::Minimal,
210 false,
211 false,
212 )
213 .await?;
214
215 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
216 if service_indices.is_empty() {
217 info!("Service indices is empty, no services were eligible for removal");
218 if verbosity != VerbosityLevel::Minimal {
220 println!("No services were eligible for removal");
221 }
222 return Ok(());
223 }
224
225 let mut failed_services = Vec::new();
226 for &index in &service_indices {
227 let node = &mut node_registry.nodes[index];
228 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
229 let service = NodeService::new(node, Box::new(rpc_client));
230 let mut service_manager =
231 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
232 match service_manager.remove(keep_directories).await {
233 Ok(()) => {
234 debug!("Removed service {}", node.service_name);
235 node_registry.save()?;
236 }
237 Err(err) => {
238 error!("Failed to remove service {}: {err}", node.service_name);
239 failed_services.push((node.service_name.clone(), err.to_string()))
240 }
241 }
242 }
243
244 summarise_any_failed_ops(failed_services, "remove", verbosity)
245}
246
247pub async fn reset(force: bool, verbosity: VerbosityLevel) -> Result<()> {
248 if verbosity != VerbosityLevel::Minimal {
249 print_banner("Reset Antnode Services");
250 }
251 info!("Resetting all antnode services, with force={force}");
252
253 if !force {
254 println!("WARNING: all antnode services, data, and logs will be removed.");
255 println!("Do you wish to proceed? [y/n]");
256 std::io::stdout().flush()?;
257 let mut input = String::new();
258 std::io::stdin().read_line(&mut input)?;
259 if input.trim().to_lowercase() != "y" {
260 println!("Reset aborted");
261 return Ok(());
262 }
263 }
264
265 stop(None, vec![], vec![], verbosity).await?;
266 remove(false, vec![], vec![], verbosity).await?;
267
268 let node_registry_path = config::get_node_registry_path()?;
272 if node_registry_path.exists() {
273 info!("Removing node registry file: {node_registry_path:?}");
274 std::fs::remove_file(node_registry_path)?;
275 }
276
277 Ok(())
278}
279
280pub async fn start(
281 connection_timeout_s: u64,
282 fixed_interval: Option<u64>,
283 peer_ids: Vec<String>,
284 service_names: Vec<String>,
285 verbosity: VerbosityLevel,
286) -> Result<()> {
287 if verbosity != VerbosityLevel::Minimal {
288 print_banner("Start Antnode Services");
289 }
290 info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
291
292 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
293 refresh_node_registry(
294 &mut node_registry,
295 &ServiceController {},
296 verbosity != VerbosityLevel::Minimal,
297 false,
298 false,
299 )
300 .await?;
301
302 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
303 if service_indices.is_empty() {
304 info!("No services are eligible to be started");
305 if verbosity != VerbosityLevel::Minimal {
307 println!("No services were eligible to be started");
308 }
309 return Ok(());
310 }
311
312 let mut failed_services = Vec::new();
313 for &index in &service_indices {
314 let node = &mut node_registry.nodes[index];
315 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
316
317 let service = NodeService::new(node, Box::new(rpc_client));
318
319 let service = if fixed_interval.is_none() {
321 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
322 } else {
323 service
324 };
325
326 let mut service_manager =
327 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
328 if service_manager.service.status() != ServiceStatus::Running {
329 if let Some(interval) = fixed_interval {
334 debug!("Sleeping for {} milliseconds", interval);
335 std::thread::sleep(std::time::Duration::from_millis(interval));
336 }
337 }
338 match service_manager.start().await {
339 Ok(start_duration) => {
340 debug!(
341 "Started service {} in {start_duration:?}",
342 node.service_name
343 );
344
345 node_registry.save()?;
346 }
347 Err(err) => {
348 error!("Failed to start service {}: {err}", node.service_name);
349 failed_services.push((node.service_name.clone(), err.to_string()))
350 }
351 }
352 }
353
354 summarise_any_failed_ops(failed_services, "start", verbosity)
355}
356
357pub async fn status(details: bool, fail: bool, json: bool) -> Result<()> {
358 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
359 if !node_registry.nodes.is_empty() {
360 if !json && !details {
361 print_banner("Antnode Services");
362 }
363 status_report(
364 &mut node_registry,
365 &ServiceController {},
366 details,
367 json,
368 fail,
369 false,
370 )
371 .await?;
372 node_registry.save()?;
373 }
374 Ok(())
375}
376
377pub async fn stop(
378 interval: Option<u64>,
379 peer_ids: Vec<String>,
380 service_names: Vec<String>,
381 verbosity: VerbosityLevel,
382) -> Result<()> {
383 if verbosity != VerbosityLevel::Minimal {
384 print_banner("Stop Antnode Services");
385 }
386 info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
387
388 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
389 refresh_node_registry(
390 &mut node_registry,
391 &ServiceController {},
392 verbosity != VerbosityLevel::Minimal,
393 false,
394 false,
395 )
396 .await?;
397
398 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
399 if service_indices.is_empty() {
400 info!("Service indices is empty, no services were eligible to be stopped");
401 if verbosity != VerbosityLevel::Minimal {
403 println!("No services were eligible to be stopped");
404 }
405 return Ok(());
406 }
407
408 let mut failed_services = Vec::new();
409 for &index in &service_indices {
410 let node = &mut node_registry.nodes[index];
411 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
412 let service = NodeService::new(node, Box::new(rpc_client));
413 let mut service_manager =
414 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
415
416 if service_manager.service.status() == ServiceStatus::Running {
417 if let Some(interval) = interval {
418 debug!("Sleeping for {} milliseconds", interval);
419 std::thread::sleep(std::time::Duration::from_millis(interval));
420 }
421 }
422 match service_manager.stop().await {
423 Ok(()) => {
424 debug!("Stopped service {}", node.service_name);
425 node_registry.save()?;
426 }
427 Err(err) => {
428 error!("Failed to stop service {}: {err}", node.service_name);
429 failed_services.push((node.service_name.clone(), err.to_string()))
430 }
431 }
432 }
433
434 summarise_any_failed_ops(failed_services, "stop", verbosity)
435}
436
437pub async fn upgrade(
438 connection_timeout_s: u64,
439 do_not_start: bool,
440 custom_bin_path: Option<PathBuf>,
441 force: bool,
442 fixed_interval: Option<u64>,
443 peer_ids: Vec<String>,
444 provided_env_variables: Option<Vec<(String, String)>>,
445 service_names: Vec<String>,
446 url: Option<String>,
447 version: Option<String>,
448 verbosity: VerbosityLevel,
449) -> Result<()> {
450 let use_force = force || custom_bin_path.is_some();
454
455 if verbosity != VerbosityLevel::Minimal {
456 print_banner("Upgrade Antnode Services");
457 }
458 info!(
459 "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
460 );
461
462 let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
463 custom_bin_path.clone(),
464 ReleaseType::AntNode,
465 url,
466 version,
467 verbosity,
468 )
469 .await?;
470
471 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
472 refresh_node_registry(
473 &mut node_registry,
474 &ServiceController {},
475 verbosity != VerbosityLevel::Minimal,
476 false,
477 false,
478 )
479 .await?;
480
481 if let Some(node) = node_registry.nodes.first() {
482 debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
483 } else {
484 debug!("There are no nodes currently added or active");
485 }
486
487 if !use_force {
488 let node_versions = node_registry
489 .nodes
490 .iter()
491 .map(|n| Version::parse(&n.version).map_err(|_| eyre!("Failed to parse Version")))
492 .collect::<Result<Vec<Version>>>()?;
493 let any_nodes_need_upgraded = node_versions
494 .iter()
495 .any(|current_version| current_version < &target_version);
496 if !any_nodes_need_upgraded {
497 info!("All nodes are at the latest version, no upgrade required.");
498 if verbosity != VerbosityLevel::Minimal {
499 println!("{} All nodes are at the latest version", "✓".green());
500 }
501 return Ok(());
502 }
503 }
504
505 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
506 trace!("service_indices len: {}", service_indices.len());
507 let mut upgrade_summary = Vec::new();
508
509 for &index in &service_indices {
510 let node = &mut node_registry.nodes[index];
511 let env_variables = if provided_env_variables.is_some() {
512 &provided_env_variables
513 } else {
514 &node_registry.environment_variables
515 };
516 let options = UpgradeOptions {
517 auto_restart: false,
518 env_variables: env_variables.clone(),
519 force: use_force,
520 start_service: !do_not_start,
521 target_bin_path: upgrade_bin_path.clone(),
522 target_version: target_version.clone(),
523 };
524 let service_name = node.service_name.clone();
525
526 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
527 let service = NodeService::new(node, Box::new(rpc_client));
528 let service = if fixed_interval.is_none() {
530 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
531 } else {
532 service
533 };
534
535 let mut service_manager =
536 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
537
538 match service_manager.upgrade(options).await {
539 Ok(upgrade_result) => {
540 info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
541 if upgrade_result != UpgradeResult::NotRequired {
542 if let Some(interval) = fixed_interval {
545 debug!("Sleeping for {interval} milliseconds",);
546 std::thread::sleep(std::time::Duration::from_millis(interval));
547 }
548 }
549 upgrade_summary.push((
550 service_manager.service.service_data.service_name.clone(),
551 upgrade_result,
552 ));
553 node_registry.save()?;
554 }
555 Err(err) => {
556 error!("Error upgrading service {service_name}: {err}");
557 upgrade_summary.push((
558 node.service_name.clone(),
559 UpgradeResult::Error(format!("Error: {err}")),
560 ));
561 node_registry.save()?;
562 }
563 }
564 }
565
566 if verbosity != VerbosityLevel::Minimal {
567 print_upgrade_summary(upgrade_summary.clone());
568 }
569
570 if upgrade_summary.iter().any(|(_, r)| {
571 matches!(r, UpgradeResult::Error(_))
572 || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
573 }) {
574 return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
575 "For any services that were upgraded but did not start, you can attempt to start them \
576 again using the 'start' command."));
577 }
578
579 Ok(())
580}
581
582pub async fn maintain_n_running_nodes(
586 auto_restart: bool,
587 auto_set_nat_flags: bool,
588 connection_timeout_s: u64,
589 max_nodes_to_run: u16,
590 data_dir_path: Option<PathBuf>,
591 enable_metrics_server: bool,
592 env_variables: Option<Vec<(String, String)>>,
593 evm_network: Option<EvmNetwork>,
594 home_network: bool,
595 log_dir_path: Option<PathBuf>,
596 log_format: Option<LogFormat>,
597 max_archived_log_files: Option<usize>,
598 max_log_files: Option<usize>,
599 metrics_port: Option<PortRange>,
600 network_id: Option<u8>,
601 node_ip: Option<Ipv4Addr>,
602 node_port: Option<PortRange>,
603 peers_args: InitialPeersConfig,
604 rewards_address: RewardsAddress,
605 rpc_address: Option<Ipv4Addr>,
606 rpc_port: Option<PortRange>,
607 src_path: Option<PathBuf>,
608 url: Option<String>,
609 no_upnp: bool,
610 user: Option<String>,
611 version: Option<String>,
612 verbosity: VerbosityLevel,
613 start_node_interval: Option<u64>,
614) -> Result<()> {
615 let node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
616 let running_nodes = node_registry
617 .nodes
618 .iter()
619 .filter(|node| node.status == ServiceStatus::Running)
620 .map(|node| node.service_name.clone())
621 .collect::<Vec<_>>();
622
623 let running_count = running_nodes.len();
624 let target_count = max_nodes_to_run as usize;
625
626 info!(
627 "Current running nodes: {}, Target: {}",
628 running_count, target_count
629 );
630
631 match running_count.cmp(&target_count) {
632 Ordering::Greater => {
633 let to_stop_count = running_count - target_count;
634 let services_to_stop = running_nodes
635 .into_iter()
636 .rev() .take(to_stop_count)
638 .collect::<Vec<_>>();
639
640 info!(
641 "Stopping {} excess nodes: {:?}",
642 to_stop_count, services_to_stop
643 );
644 stop(None, vec![], services_to_stop, verbosity).await?;
645 }
646 Ordering::Less => {
647 let to_start_count = target_count - running_count;
648 let inactive_nodes = node_registry
649 .nodes
650 .iter()
651 .filter(|node| {
652 node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added
653 })
654 .map(|node| node.service_name.clone())
655 .collect::<Vec<_>>();
656
657 info!("Inactive nodes available: {}", inactive_nodes.len());
658
659 if to_start_count <= inactive_nodes.len() {
660 let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
661 info!(
662 "Starting {} existing inactive nodes: {:?}",
663 to_start_count, nodes_to_start
664 );
665 start(
666 connection_timeout_s,
667 start_node_interval,
668 vec![],
669 nodes_to_start,
670 verbosity,
671 )
672 .await?;
673 } else {
674 let to_add_count = to_start_count - inactive_nodes.len();
675 info!(
676 "Adding {} new nodes and starting all {} inactive nodes",
677 to_add_count,
678 inactive_nodes.len()
679 );
680
681 let ports_to_use = match node_port {
682 Some(PortRange::Single(port)) => vec![port],
683 Some(PortRange::Range(start, end)) => {
684 (start..=end).take(to_add_count).collect()
685 }
686 None => vec![],
687 };
688
689 for (i, port) in ports_to_use.into_iter().enumerate() {
690 let added_service = add(
691 auto_restart,
692 auto_set_nat_flags,
693 Some(1),
694 data_dir_path.clone(),
695 enable_metrics_server,
696 env_variables.clone(),
697 evm_network.clone(),
698 home_network,
699 log_dir_path.clone(),
700 log_format,
701 max_archived_log_files,
702 max_log_files,
703 metrics_port.clone(),
704 network_id,
705 node_ip,
706 Some(PortRange::Single(port)),
707 peers_args.clone(),
708 rewards_address,
709 rpc_address,
710 rpc_port.clone(),
711 src_path.clone(),
712 no_upnp,
713 url.clone(),
714 user.clone(),
715 version.clone(),
716 verbosity,
717 )
718 .await?;
719
720 if i == 0 {
721 start(
722 connection_timeout_s,
723 start_node_interval,
724 vec![],
725 added_service,
726 verbosity,
727 )
728 .await?;
729 }
730 }
731
732 if !inactive_nodes.is_empty() {
733 start(
734 connection_timeout_s,
735 start_node_interval,
736 vec![],
737 inactive_nodes,
738 verbosity,
739 )
740 .await?;
741 }
742 }
743 }
744 Ordering::Equal => {
745 info!(
746 "Current node count ({}) matches target ({}). No action needed.",
747 running_count, target_count
748 );
749 }
750 }
751
752 let final_node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
754 let final_running_count = final_node_registry
755 .nodes
756 .iter()
757 .filter(|node| node.status == ServiceStatus::Running)
758 .count();
759
760 info!("Final running node count: {}", final_running_count);
761 if final_running_count != target_count {
762 warn!(
763 "Failed to reach target node count. Expected {}, but got {}",
764 target_count, final_running_count
765 );
766 }
767
768 Ok(())
769}
770
771fn get_services_for_ops(
772 node_registry: &NodeRegistry,
773 peer_ids: Vec<String>,
774 service_names: Vec<String>,
775) -> Result<Vec<usize>> {
776 let mut service_indices = Vec::new();
777
778 if service_names.is_empty() && peer_ids.is_empty() {
779 for node in node_registry.nodes.iter() {
780 if let Some(index) = node_registry.nodes.iter().position(|x| {
781 x.service_name == node.service_name && x.status != ServiceStatus::Removed
782 }) {
783 service_indices.push(index);
784 }
785 }
786 } else {
787 for name in &service_names {
788 if let Some(index) = node_registry
789 .nodes
790 .iter()
791 .position(|x| x.service_name == *name && x.status != ServiceStatus::Removed)
792 {
793 service_indices.push(index);
794 } else {
795 error!("No service named '{name}'");
796 return Err(eyre!(format!("No service named '{name}'")));
797 }
798 }
799
800 for peer_id_str in &peer_ids {
801 let peer_id = PeerId::from_str(peer_id_str)
802 .inspect_err(|err| error!("Error parsing PeerId: {err:?}"))?;
803 if let Some(index) = node_registry
804 .nodes
805 .iter()
806 .position(|x| x.peer_id == Some(peer_id) && x.status != ServiceStatus::Removed)
807 {
808 service_indices.push(index);
809 } else {
810 error!("Could not find node with peer id: '{peer_id:?}'");
811 return Err(eyre!(format!(
812 "Could not find node with peer ID '{peer_id}'",
813 )));
814 }
815 }
816 }
817
818 Ok(service_indices)
819}
820
821fn summarise_any_failed_ops(
822 failed_services: Vec<(String, String)>,
823 verb: &str,
824 verbosity: VerbosityLevel,
825) -> Result<()> {
826 if !failed_services.is_empty() {
827 if verbosity != VerbosityLevel::Minimal {
828 println!("Failed to {verb} {} service(s):", failed_services.len());
829 for failed in failed_services.iter() {
830 println!("{} {}: {}", "✕".red(), failed.0, failed.1);
831 }
832 }
833
834 error!("Failed to {verb} one or more services");
835 return Err(eyre!("Failed to {verb} one or more services"));
836 }
837 Ok(())
838}