1#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13 add_services::{
14 add_node,
15 config::{AddNodeServiceOptions, PortRange},
16 },
17 config::{self, is_running_as_root},
18 helpers::{download_and_extract_release, get_bin_version},
19 print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel,
20};
21use ant_bootstrap::InitialPeersConfig;
22use ant_evm::{EvmNetwork, RewardsAddress};
23use ant_logging::LogFormat;
24use ant_releases::{AntReleaseRepoActions, ReleaseType};
25use ant_service_management::{
26 control::{ServiceControl, ServiceController},
27 rpc::RpcClient,
28 NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult,
29};
30use color_eyre::{eyre::eyre, Help, Result};
31use colored::Colorize;
32use libp2p_identity::PeerId;
33use semver::Version;
34use std::{cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, time::Duration};
35use tracing::debug;
36
37pub async fn add(
39 alpha: bool,
40 auto_restart: bool,
41 auto_set_nat_flags: bool,
42 count: Option<u16>,
43 data_dir_path: Option<PathBuf>,
44 enable_metrics_server: bool,
45 env_variables: Option<Vec<(String, String)>>,
46 evm_network: Option<EvmNetwork>,
47 log_dir_path: Option<PathBuf>,
48 log_format: Option<LogFormat>,
49 max_archived_log_files: Option<usize>,
50 max_log_files: Option<usize>,
51 metrics_port: Option<PortRange>,
52 network_id: Option<u8>,
53 node_ip: Option<Ipv4Addr>,
54 node_port: Option<PortRange>,
55 mut init_peers_config: InitialPeersConfig,
56 relay: bool,
57 rewards_address: RewardsAddress,
58 rpc_address: Option<Ipv4Addr>,
59 rpc_port: Option<PortRange>,
60 src_path: Option<PathBuf>,
61 no_upnp: bool,
62 url: Option<String>,
63 user: Option<String>,
64 version: Option<String>,
65 verbosity: VerbosityLevel,
66) -> Result<Vec<String>> {
67 let user_mode = !is_running_as_root();
68
69 if verbosity != VerbosityLevel::Minimal {
70 print_banner("Add Antnode Services");
71 println!("{} service(s) to be added", count.unwrap_or(1));
72 }
73
74 let service_manager = ServiceController {};
75 let service_user = if user_mode {
76 None
77 } else {
78 let service_user = user.unwrap_or_else(|| "ant".to_string());
79 service_manager.create_service_user(&service_user)?;
80 Some(service_user)
81 };
82
83 let service_data_dir_path =
84 config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
85 let service_log_dir_path =
86 config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
87 let bootstrap_cache_dir = if let Some(user) = &service_user {
88 Some(config::get_bootstrap_cache_owner_path(user)?)
89 } else {
90 None
91 };
92
93 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
94 let release_repo = <dyn AntReleaseRepoActions>::default_config();
95
96 let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
97 let version = get_bin_version(&path)?;
98 (path, version)
99 } else {
100 download_and_extract_release(
101 ReleaseType::AntNode,
102 url.clone(),
103 version,
104 &*release_repo,
105 verbosity,
106 None,
107 )
108 .await?
109 };
110
111 debug!("Parsing peers from PeersArgs");
112
113 init_peers_config
114 .addrs
115 .extend(InitialPeersConfig::read_addr_from_env());
116 init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
117
118 let options = AddNodeServiceOptions {
119 alpha,
120 auto_restart,
121 auto_set_nat_flags,
122 count,
123 delete_antnode_src: src_path.is_none(),
124 enable_metrics_server,
125 evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
126 env_variables,
127 relay,
128 log_format,
129 max_archived_log_files,
130 max_log_files,
131 metrics_port,
132 network_id,
133 node_ip,
134 node_port,
135 init_peers_config,
136 rewards_address,
137 rpc_address,
138 rpc_port,
139 antnode_src_path,
140 antnode_dir_path: service_data_dir_path.clone(),
141 service_data_dir_path,
142 service_log_dir_path,
143 no_upnp,
144 user: service_user,
145 user_mode,
146 version,
147 };
148 info!("Adding node service(s)");
149 let added_services_names =
150 add_node(options, &mut node_registry, &service_manager, verbosity).await?;
151
152 node_registry.save()?;
153 debug!("Node registry saved");
154
155 Ok(added_services_names)
156}
157
158pub async fn balance(
159 peer_ids: Vec<String>,
160 service_names: Vec<String>,
161 verbosity: VerbosityLevel,
162) -> Result<()> {
163 if verbosity != VerbosityLevel::Minimal {
164 print_banner("Reward Balances");
165 }
166
167 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
168 refresh_node_registry(
169 &mut node_registry,
170 &ServiceController {},
171 verbosity != VerbosityLevel::Minimal,
172 false,
173 false,
174 )
175 .await?;
176
177 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
178 if service_indices.is_empty() {
179 info!("Service indices is empty, cannot obtain the balance");
180 println!("No balances to display");
182 return Ok(());
183 }
184 debug!("Obtaining balances for {} services", service_indices.len());
185
186 for &index in &service_indices {
187 let node = &mut node_registry.nodes[index];
188 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
189 let service = NodeService::new(node, Box::new(rpc_client));
190 println!("{}: {}", service.service_data.service_name, 0,);
192 }
193 Ok(())
194}
195
196pub async fn remove(
197 keep_directories: bool,
198 peer_ids: Vec<String>,
199 service_names: Vec<String>,
200 verbosity: VerbosityLevel,
201) -> Result<()> {
202 if verbosity != VerbosityLevel::Minimal {
203 print_banner("Remove Antnode Services");
204 }
205 info!("Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}");
206
207 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
208 refresh_node_registry(
209 &mut node_registry,
210 &ServiceController {},
211 verbosity != VerbosityLevel::Minimal,
212 false,
213 false,
214 )
215 .await?;
216
217 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
218 if service_indices.is_empty() {
219 info!("Service indices is empty, no services were eligible for removal");
220 if verbosity != VerbosityLevel::Minimal {
222 println!("No services were eligible for removal");
223 }
224 return Ok(());
225 }
226
227 let mut failed_services = Vec::new();
228 for &index in &service_indices {
229 let node = &mut node_registry.nodes[index];
230 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
231 let service = NodeService::new(node, Box::new(rpc_client));
232 let mut service_manager =
233 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
234 match service_manager.remove(keep_directories).await {
235 Ok(()) => {
236 debug!("Removed service {}", node.service_name);
237 node_registry.save()?;
238 }
239 Err(err) => {
240 error!("Failed to remove service {}: {err}", node.service_name);
241 failed_services.push((node.service_name.clone(), err.to_string()))
242 }
243 }
244 }
245
246 summarise_any_failed_ops(failed_services, "remove", verbosity)
247}
248
249pub async fn reset(force: bool, verbosity: VerbosityLevel) -> Result<()> {
250 if verbosity != VerbosityLevel::Minimal {
251 print_banner("Reset Antnode Services");
252 }
253 info!("Resetting all antnode services, with force={force}");
254
255 if !force {
256 println!("WARNING: all antnode services, data, and logs will be removed.");
257 println!("Do you wish to proceed? [y/n]");
258 std::io::stdout().flush()?;
259 let mut input = String::new();
260 std::io::stdin().read_line(&mut input)?;
261 if input.trim().to_lowercase() != "y" {
262 println!("Reset aborted");
263 return Ok(());
264 }
265 }
266
267 stop(None, vec![], vec![], verbosity).await?;
268 remove(false, vec![], vec![], verbosity).await?;
269
270 let node_registry_path = config::get_node_registry_path()?;
274 if node_registry_path.exists() {
275 info!("Removing node registry file: {node_registry_path:?}");
276 std::fs::remove_file(node_registry_path)?;
277 }
278
279 Ok(())
280}
281
282pub async fn start(
283 connection_timeout_s: u64,
284 fixed_interval: Option<u64>,
285 peer_ids: Vec<String>,
286 service_names: Vec<String>,
287 verbosity: VerbosityLevel,
288) -> Result<()> {
289 if verbosity != VerbosityLevel::Minimal {
290 print_banner("Start Antnode Services");
291 }
292 info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
293
294 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
295 refresh_node_registry(
296 &mut node_registry,
297 &ServiceController {},
298 verbosity != VerbosityLevel::Minimal,
299 false,
300 false,
301 )
302 .await?;
303
304 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
305 if service_indices.is_empty() {
306 info!("No services are eligible to be started");
307 if verbosity != VerbosityLevel::Minimal {
309 println!("No services were eligible to be started");
310 }
311 return Ok(());
312 }
313
314 let mut failed_services = Vec::new();
315 for &index in &service_indices {
316 let node = &mut node_registry.nodes[index];
317 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
318
319 let service = NodeService::new(node, Box::new(rpc_client));
320
321 let service = if fixed_interval.is_none() {
323 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
324 } else {
325 service
326 };
327
328 let mut service_manager =
329 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
330 if service_manager.service.status() != ServiceStatus::Running {
331 if let Some(interval) = fixed_interval {
336 debug!("Sleeping for {} milliseconds", interval);
337 std::thread::sleep(std::time::Duration::from_millis(interval));
338 }
339 }
340 match service_manager.start().await {
341 Ok(start_duration) => {
342 debug!(
343 "Started service {} in {start_duration:?}",
344 node.service_name
345 );
346
347 node_registry.save()?;
348 }
349 Err(err) => {
350 error!("Failed to start service {}: {err}", node.service_name);
351 failed_services.push((node.service_name.clone(), err.to_string()))
352 }
353 }
354 }
355
356 summarise_any_failed_ops(failed_services, "start", verbosity)
357}
358
359pub async fn status(details: bool, fail: bool, json: bool) -> Result<()> {
360 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
361 if !node_registry.nodes.is_empty() {
362 if !json && !details {
363 print_banner("Antnode Services");
364 }
365 status_report(
366 &mut node_registry,
367 &ServiceController {},
368 details,
369 json,
370 fail,
371 false,
372 )
373 .await?;
374 node_registry.save()?;
375 }
376 Ok(())
377}
378
379pub async fn stop(
380 interval: Option<u64>,
381 peer_ids: Vec<String>,
382 service_names: Vec<String>,
383 verbosity: VerbosityLevel,
384) -> Result<()> {
385 if verbosity != VerbosityLevel::Minimal {
386 print_banner("Stop Antnode Services");
387 }
388 info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
389
390 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
391 refresh_node_registry(
392 &mut node_registry,
393 &ServiceController {},
394 verbosity != VerbosityLevel::Minimal,
395 false,
396 false,
397 )
398 .await?;
399
400 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
401 if service_indices.is_empty() {
402 info!("Service indices is empty, no services were eligible to be stopped");
403 if verbosity != VerbosityLevel::Minimal {
405 println!("No services were eligible to be stopped");
406 }
407 return Ok(());
408 }
409
410 let mut failed_services = Vec::new();
411 for &index in &service_indices {
412 let node = &mut node_registry.nodes[index];
413 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
414 let service = NodeService::new(node, Box::new(rpc_client));
415 let mut service_manager =
416 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
417
418 if service_manager.service.status() == ServiceStatus::Running {
419 if let Some(interval) = interval {
420 debug!("Sleeping for {} milliseconds", interval);
421 std::thread::sleep(std::time::Duration::from_millis(interval));
422 }
423 }
424 match service_manager.stop().await {
425 Ok(()) => {
426 debug!("Stopped service {}", node.service_name);
427 node_registry.save()?;
428 }
429 Err(err) => {
430 error!("Failed to stop service {}: {err}", node.service_name);
431 failed_services.push((node.service_name.clone(), err.to_string()))
432 }
433 }
434 }
435
436 summarise_any_failed_ops(failed_services, "stop", verbosity)
437}
438
439pub async fn upgrade(
440 connection_timeout_s: u64,
441 do_not_start: bool,
442 custom_bin_path: Option<PathBuf>,
443 force: bool,
444 fixed_interval: Option<u64>,
445 peer_ids: Vec<String>,
446 provided_env_variables: Option<Vec<(String, String)>>,
447 service_names: Vec<String>,
448 url: Option<String>,
449 version: Option<String>,
450 verbosity: VerbosityLevel,
451) -> Result<()> {
452 let use_force = force || custom_bin_path.is_some();
456
457 if verbosity != VerbosityLevel::Minimal {
458 print_banner("Upgrade Antnode Services");
459 }
460 info!(
461 "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
462 );
463
464 let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
465 custom_bin_path.clone(),
466 ReleaseType::AntNode,
467 url,
468 version,
469 verbosity,
470 )
471 .await?;
472
473 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
474 refresh_node_registry(
475 &mut node_registry,
476 &ServiceController {},
477 verbosity != VerbosityLevel::Minimal,
478 false,
479 false,
480 )
481 .await?;
482
483 if let Some(node) = node_registry.nodes.first() {
484 debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
485 } else {
486 debug!("There are no nodes currently added or active");
487 }
488
489 if !use_force {
490 let node_versions = node_registry
491 .nodes
492 .iter()
493 .map(|n| Version::parse(&n.version).map_err(|_| eyre!("Failed to parse Version")))
494 .collect::<Result<Vec<Version>>>()?;
495 let any_nodes_need_upgraded = node_versions
496 .iter()
497 .any(|current_version| current_version < &target_version);
498 if !any_nodes_need_upgraded {
499 info!("All nodes are at the latest version, no upgrade required.");
500 if verbosity != VerbosityLevel::Minimal {
501 println!("{} All nodes are at the latest version", "✓".green());
502 }
503 return Ok(());
504 }
505 }
506
507 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
508 trace!("service_indices len: {}", service_indices.len());
509 let mut upgrade_summary = Vec::new();
510
511 for &index in &service_indices {
512 let node = &mut node_registry.nodes[index];
513 let env_variables = if provided_env_variables.is_some() {
514 &provided_env_variables
515 } else {
516 &node_registry.environment_variables
517 };
518 let options = UpgradeOptions {
519 auto_restart: false,
520 env_variables: env_variables.clone(),
521 force: use_force,
522 start_service: !do_not_start,
523 target_bin_path: upgrade_bin_path.clone(),
524 target_version: target_version.clone(),
525 };
526 let service_name = node.service_name.clone();
527
528 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
529 let service = NodeService::new(node, Box::new(rpc_client));
530 let service = if fixed_interval.is_none() {
532 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
533 } else {
534 service
535 };
536
537 let mut service_manager =
538 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
539
540 match service_manager.upgrade(options).await {
541 Ok(upgrade_result) => {
542 info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
543 if upgrade_result != UpgradeResult::NotRequired {
544 if let Some(interval) = fixed_interval {
547 debug!("Sleeping for {interval} milliseconds",);
548 std::thread::sleep(std::time::Duration::from_millis(interval));
549 }
550 }
551 upgrade_summary.push((
552 service_manager.service.service_data.service_name.clone(),
553 upgrade_result,
554 ));
555 node_registry.save()?;
556 }
557 Err(err) => {
558 error!("Error upgrading service {service_name}: {err}");
559 upgrade_summary.push((
560 node.service_name.clone(),
561 UpgradeResult::Error(format!("Error: {err}")),
562 ));
563 node_registry.save()?;
564 }
565 }
566 }
567
568 if verbosity != VerbosityLevel::Minimal {
569 print_upgrade_summary(upgrade_summary.clone());
570 }
571
572 if upgrade_summary.iter().any(|(_, r)| {
573 matches!(r, UpgradeResult::Error(_))
574 || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
575 }) {
576 return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
577 "For any services that were upgraded but did not start, you can attempt to start them \
578 again using the 'start' command."));
579 }
580
581 Ok(())
582}
583
584pub async fn maintain_n_running_nodes(
588 alpha: bool,
589 auto_restart: bool,
590 auto_set_nat_flags: bool,
591 connection_timeout_s: u64,
592 max_nodes_to_run: u16,
593 data_dir_path: Option<PathBuf>,
594 enable_metrics_server: bool,
595 env_variables: Option<Vec<(String, String)>>,
596 evm_network: Option<EvmNetwork>,
597 log_dir_path: Option<PathBuf>,
598 log_format: Option<LogFormat>,
599 max_archived_log_files: Option<usize>,
600 max_log_files: Option<usize>,
601 metrics_port: Option<PortRange>,
602 network_id: Option<u8>,
603 node_ip: Option<Ipv4Addr>,
604 node_port: Option<PortRange>,
605 peers_args: InitialPeersConfig,
606 relay: bool,
607 rewards_address: RewardsAddress,
608 rpc_address: Option<Ipv4Addr>,
609 rpc_port: Option<PortRange>,
610 src_path: Option<PathBuf>,
611 url: Option<String>,
612 no_upnp: bool,
613 user: Option<String>,
614 version: Option<String>,
615 verbosity: VerbosityLevel,
616 start_node_interval: Option<u64>,
617) -> Result<()> {
618 let node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
619 let running_nodes = node_registry
620 .nodes
621 .iter()
622 .filter(|node| node.status == ServiceStatus::Running)
623 .map(|node| node.service_name.clone())
624 .collect::<Vec<_>>();
625
626 let running_count = running_nodes.len();
627 let target_count = max_nodes_to_run as usize;
628
629 info!(
630 "Current running nodes: {}, Target: {}",
631 running_count, target_count
632 );
633
634 match running_count.cmp(&target_count) {
635 Ordering::Greater => {
636 let to_stop_count = running_count - target_count;
637 let services_to_stop = running_nodes
638 .into_iter()
639 .rev() .take(to_stop_count)
641 .collect::<Vec<_>>();
642
643 info!(
644 "Stopping {} excess nodes: {:?}",
645 to_stop_count, services_to_stop
646 );
647 stop(None, vec![], services_to_stop, verbosity).await?;
648 }
649 Ordering::Less => {
650 let to_start_count = target_count - running_count;
651 let inactive_nodes = node_registry
652 .nodes
653 .iter()
654 .filter(|node| {
655 node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added
656 })
657 .map(|node| node.service_name.clone())
658 .collect::<Vec<_>>();
659
660 info!("Inactive nodes available: {}", inactive_nodes.len());
661
662 if to_start_count <= inactive_nodes.len() {
663 let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
664 info!(
665 "Starting {} existing inactive nodes: {:?}",
666 to_start_count, nodes_to_start
667 );
668 start(
669 connection_timeout_s,
670 start_node_interval,
671 vec![],
672 nodes_to_start,
673 verbosity,
674 )
675 .await?;
676 } else {
677 let to_add_count = to_start_count - inactive_nodes.len();
678 info!(
679 "Adding {} new nodes and starting all {} inactive nodes",
680 to_add_count,
681 inactive_nodes.len()
682 );
683
684 let ports_to_use = match node_port {
685 Some(PortRange::Single(port)) => vec![port],
686 Some(PortRange::Range(start, end)) => {
687 (start..=end).take(to_add_count).collect()
688 }
689 None => vec![],
690 };
691
692 for (i, port) in ports_to_use.into_iter().enumerate() {
693 let added_service = add(
694 alpha,
695 auto_restart,
696 auto_set_nat_flags,
697 Some(1),
698 data_dir_path.clone(),
699 enable_metrics_server,
700 env_variables.clone(),
701 evm_network.clone(),
702 log_dir_path.clone(),
703 log_format,
704 max_archived_log_files,
705 max_log_files,
706 metrics_port.clone(),
707 network_id,
708 node_ip,
709 Some(PortRange::Single(port)),
710 peers_args.clone(),
711 relay,
712 rewards_address,
713 rpc_address,
714 rpc_port.clone(),
715 src_path.clone(),
716 no_upnp,
717 url.clone(),
718 user.clone(),
719 version.clone(),
720 verbosity,
721 )
722 .await?;
723
724 if i == 0 {
725 start(
726 connection_timeout_s,
727 start_node_interval,
728 vec![],
729 added_service,
730 verbosity,
731 )
732 .await?;
733 }
734 }
735
736 if !inactive_nodes.is_empty() {
737 start(
738 connection_timeout_s,
739 start_node_interval,
740 vec![],
741 inactive_nodes,
742 verbosity,
743 )
744 .await?;
745 }
746 }
747 }
748 Ordering::Equal => {
749 info!(
750 "Current node count ({}) matches target ({}). No action needed.",
751 running_count, target_count
752 );
753 }
754 }
755
756 let final_node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
758 let final_running_count = final_node_registry
759 .nodes
760 .iter()
761 .filter(|node| node.status == ServiceStatus::Running)
762 .count();
763
764 info!("Final running node count: {}", final_running_count);
765 if final_running_count != target_count {
766 warn!(
767 "Failed to reach target node count. Expected {}, but got {}",
768 target_count, final_running_count
769 );
770 }
771
772 Ok(())
773}
774
775fn get_services_for_ops(
776 node_registry: &NodeRegistry,
777 peer_ids: Vec<String>,
778 service_names: Vec<String>,
779) -> Result<Vec<usize>> {
780 let mut service_indices = Vec::new();
781
782 if service_names.is_empty() && peer_ids.is_empty() {
783 for node in node_registry.nodes.iter() {
784 if let Some(index) = node_registry.nodes.iter().position(|x| {
785 x.service_name == node.service_name && x.status != ServiceStatus::Removed
786 }) {
787 service_indices.push(index);
788 }
789 }
790 } else {
791 for name in &service_names {
792 if let Some(index) = node_registry
793 .nodes
794 .iter()
795 .position(|x| x.service_name == *name && x.status != ServiceStatus::Removed)
796 {
797 service_indices.push(index);
798 } else {
799 error!("No service named '{name}'");
800 return Err(eyre!(format!("No service named '{name}'")));
801 }
802 }
803
804 for peer_id_str in &peer_ids {
805 let peer_id = PeerId::from_str(peer_id_str)
806 .inspect_err(|err| error!("Error parsing PeerId: {err:?}"))?;
807 if let Some(index) = node_registry
808 .nodes
809 .iter()
810 .position(|x| x.peer_id == Some(peer_id) && x.status != ServiceStatus::Removed)
811 {
812 service_indices.push(index);
813 } else {
814 error!("Could not find node with peer id: '{peer_id:?}'");
815 return Err(eyre!(format!(
816 "Could not find node with peer ID '{peer_id}'",
817 )));
818 }
819 }
820 }
821
822 Ok(service_indices)
823}
824
825fn summarise_any_failed_ops(
826 failed_services: Vec<(String, String)>,
827 verb: &str,
828 verbosity: VerbosityLevel,
829) -> Result<()> {
830 if !failed_services.is_empty() {
831 if verbosity != VerbosityLevel::Minimal {
832 println!("Failed to {verb} {} service(s):", failed_services.len());
833 for failed in failed_services.iter() {
834 println!("{} {}: {}", "✕".red(), failed.0, failed.1);
835 }
836 }
837
838 error!("Failed to {verb} one or more services");
839 return Err(eyre!("Failed to {verb} one or more services"));
840 }
841 Ok(())
842}