1#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13 ServiceManager, VerbosityLevel,
14 add_services::{
15 add_node,
16 config::{AddNodeServiceOptions, PortRange},
17 },
18 config::{self, is_running_as_root},
19 helpers::{download_and_extract_release, get_bin_version},
20 print_banner, refresh_node_registry, status_report,
21};
22use ant_bootstrap::{Bootstrap, InitialPeersConfig};
23use ant_evm::{EvmNetwork, RewardsAddress};
24use ant_logging::LogFormat;
25use ant_releases::{AntReleaseRepoActions, ReleaseType};
26use ant_service_management::{
27 NodeRegistryManager, NodeService, NodeServiceData, ServiceStateActions, ServiceStatus,
28 UpgradeOptions, UpgradeResult,
29 control::{ServiceControl, ServiceController},
30 rpc::RpcClient,
31};
32use color_eyre::{Help, Result, eyre::eyre};
33use colored::Colorize;
34use libp2p_identity::PeerId;
35use semver::Version;
36use std::{
37 cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, sync::Arc, time::Duration,
38};
39use tokio::sync::RwLock;
40use tracing::debug;
41
42pub async fn add(
44 alpha: bool,
45 auto_restart: bool,
46 auto_set_nat_flags: bool,
47 count: Option<u16>,
48 data_dir_path: Option<PathBuf>,
49 enable_metrics_server: bool,
50 env_variables: Option<Vec<(String, String)>>,
51 evm_network: Option<EvmNetwork>,
52 log_dir_path: Option<PathBuf>,
53 log_format: Option<LogFormat>,
54 max_archived_log_files: Option<usize>,
55 max_log_files: Option<usize>,
56 metrics_port: Option<PortRange>,
57 network_id: Option<u8>,
58 node_ip: Option<Ipv4Addr>,
59 node_port: Option<PortRange>,
60 node_registry: NodeRegistryManager,
61 mut init_peers_config: InitialPeersConfig,
62 relay: bool,
63 rewards_address: RewardsAddress,
64 rpc_address: Option<Ipv4Addr>,
65 rpc_port: Option<PortRange>,
66 src_path: Option<PathBuf>,
67 no_upnp: bool,
68 url: Option<String>,
69 user: Option<String>,
70 version: Option<String>,
71 verbosity: VerbosityLevel,
72 write_older_cache_files: bool,
73) -> Result<Vec<String>> {
74 let user_mode = !is_running_as_root();
75
76 if verbosity != VerbosityLevel::Minimal {
77 print_banner("Add Antnode Services");
78 println!("{} service(s) to be added", count.unwrap_or(1));
79 }
80
81 let service_manager = ServiceController {};
82 let service_user = if user_mode {
83 None
84 } else {
85 let service_user = user.unwrap_or_else(|| "ant".to_string());
86 service_manager.create_service_user(&service_user)?;
87 Some(service_user)
88 };
89
90 let service_data_dir_path =
91 config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
92 let service_log_dir_path =
93 config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
94 let bootstrap_cache_dir = if let Some(user) = &service_user {
95 Some(config::get_bootstrap_cache_owner_path(user)?)
96 } else {
97 None
98 };
99
100 let release_repo = <dyn AntReleaseRepoActions>::default_config();
101
102 let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
103 let version = get_bin_version(&path)?;
104 (path, version)
105 } else {
106 download_and_extract_release(
107 ReleaseType::AntNode,
108 url.clone(),
109 version,
110 &*release_repo,
111 verbosity,
112 None,
113 )
114 .await?
115 };
116
117 debug!("Parsing peers from PeersArgs");
118
119 init_peers_config.addrs.extend(Bootstrap::fetch_from_env());
120 init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
121
122 let options = AddNodeServiceOptions {
123 alpha,
124 auto_restart,
125 auto_set_nat_flags,
126 count,
127 delete_antnode_src: src_path.is_none(),
128 enable_metrics_server,
129 evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
130 env_variables,
131 relay,
132 log_format,
133 max_archived_log_files,
134 max_log_files,
135 metrics_port,
136 network_id,
137 node_ip,
138 node_port,
139 init_peers_config,
140 rewards_address,
141 rpc_address,
142 rpc_port,
143 antnode_src_path,
144 antnode_dir_path: service_data_dir_path.clone(),
145 service_data_dir_path,
146 service_log_dir_path,
147 no_upnp,
148 user: service_user,
149 user_mode,
150 version,
151 write_older_cache_files,
152 };
153 info!("Adding node service(s)");
154 let added_services_names =
155 add_node(options, node_registry.clone(), &service_manager, verbosity).await?;
156
157 node_registry.save().await?;
158 debug!("Node registry saved");
159
160 Ok(added_services_names)
161}
162
163pub async fn balance(
164 peer_ids: Vec<String>,
165 node_registry: NodeRegistryManager,
166 service_names: Vec<String>,
167 verbosity: VerbosityLevel,
168) -> Result<()> {
169 if verbosity != VerbosityLevel::Minimal {
170 print_banner("Reward Balances");
171 }
172
173 refresh_node_registry(
174 node_registry.clone(),
175 &ServiceController {},
176 verbosity != VerbosityLevel::Minimal,
177 false,
178 verbosity,
179 )
180 .await?;
181
182 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
183 if services_for_ops.is_empty() {
184 info!("Services for ops is empty, cannot obtain the balance");
185 println!("No balances to display");
187 return Ok(());
188 }
189 debug!("Obtaining balances for {} services", services_for_ops.len());
190
191 for node in services_for_ops {
192 let node = node.read().await;
193 println!("{}: {}", node.service_name, 0,);
195 }
196 Ok(())
197}
198
199pub async fn remove(
200 keep_directories: bool,
201 peer_ids: Vec<String>,
202 node_registry: NodeRegistryManager,
203 service_names: Vec<String>,
204 verbosity: VerbosityLevel,
205) -> Result<()> {
206 if verbosity != VerbosityLevel::Minimal {
207 print_banner("Remove Antnode Services");
208 }
209 info!(
210 "Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}"
211 );
212
213 refresh_node_registry(
214 node_registry.clone(),
215 &ServiceController {},
216 verbosity != VerbosityLevel::Minimal,
217 false,
218 verbosity,
219 )
220 .await?;
221
222 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
223 if services_for_ops.is_empty() {
224 info!("Services for ops is empty, no services were eligible for removal");
225 if verbosity != VerbosityLevel::Minimal {
227 println!("No services were eligible for removal");
228 }
229 return Ok(());
230 }
231
232 let mut failed_services = Vec::new();
233 for node in &services_for_ops {
234 let service_name = node.read().await.service_name.clone();
235 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
236 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
237 let mut service_manager =
238 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
239 match service_manager.remove(keep_directories).await {
240 Ok(()) => {
241 debug!("Removed service {service_name}");
242 node_registry.save().await?;
243 }
244 Err(err) => {
245 error!("Failed to remove service {service_name}: {err}");
246 failed_services.push((service_name.clone(), err.to_string()))
247 }
248 }
249 }
250
251 summarise_any_failed_ops(failed_services, "remove", verbosity)
252}
253
254pub async fn reset(
255 force: bool,
256 node_registry: NodeRegistryManager,
257 verbosity: VerbosityLevel,
258) -> Result<()> {
259 if verbosity != VerbosityLevel::Minimal {
260 print_banner("Reset Antnode Services");
261 }
262 info!("Resetting all antnode services, with force={force}");
263
264 if !force {
265 println!("WARNING: all antnode services, data, and logs will be removed.");
266 println!("Do you wish to proceed? [y/n]");
267 std::io::stdout().flush()?;
268 let mut input = String::new();
269 std::io::stdin().read_line(&mut input)?;
270 if input.trim().to_lowercase() != "y" {
271 println!("Reset aborted");
272 return Ok(());
273 }
274 }
275
276 stop(None, node_registry.clone(), vec![], vec![], verbosity).await?;
277 remove(false, vec![], node_registry, vec![], verbosity).await?;
278
279 let node_registry_path = config::get_node_registry_path()?;
283 if node_registry_path.exists() {
284 info!("Removing node registry file: {node_registry_path:?}");
285 std::fs::remove_file(node_registry_path)?;
286 }
287
288 Ok(())
289}
290
291pub async fn start(
292 connection_timeout_s: u64,
293 fixed_interval: Option<u64>,
294 node_registry: NodeRegistryManager,
295 peer_ids: Vec<String>,
296 service_names: Vec<String>,
297 verbosity: VerbosityLevel,
298) -> Result<()> {
299 if verbosity != VerbosityLevel::Minimal {
300 print_banner("Start Antnode Services");
301 }
302 info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
303
304 refresh_node_registry(
305 node_registry.clone(),
306 &ServiceController {},
307 verbosity != VerbosityLevel::Minimal,
308 false,
309 verbosity,
310 )
311 .await?;
312
313 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
314 if services_for_ops.is_empty() {
315 info!("No services are eligible to be started");
316 if verbosity != VerbosityLevel::Minimal {
318 println!("No services were eligible to be started");
319 }
320 return Ok(());
321 }
322
323 let mut failed_services = Vec::new();
324 for node in &services_for_ops {
325 let service_name = node.read().await.service_name.clone();
326
327 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
328 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
329
330 let service = if fixed_interval.is_none() {
332 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
333 } else {
334 service
335 };
336
337 let mut service_manager =
338 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
339 if service_manager.service.status().await != ServiceStatus::Running {
340 if let Some(interval) = fixed_interval {
345 debug!("Sleeping for {} milliseconds", interval);
346 std::thread::sleep(std::time::Duration::from_millis(interval));
347 }
348 }
349 match service_manager.start().await {
350 Ok(start_duration) => {
351 debug!("Started service {service_name} in {start_duration:?}",);
352
353 node_registry.save().await?;
354 }
355 Err(err) => {
356 error!("Failed to start service {service_name}: {err}");
357 failed_services.push((service_name.clone(), err.to_string()))
358 }
359 }
360 }
361
362 summarise_any_failed_ops(failed_services, "start", verbosity)
363}
364
365pub async fn status(
366 details: bool,
367 fail: bool,
368 json: bool,
369 node_registry: NodeRegistryManager,
370) -> Result<()> {
371 if !node_registry.nodes.read().await.is_empty() {
372 if !json && !details {
373 print_banner("Antnode Services");
374 }
375 status_report(
376 &node_registry,
377 &ServiceController {},
378 details,
379 json,
380 fail,
381 false,
382 )
383 .await?;
384 node_registry.save().await?;
385 }
386 Ok(())
387}
388
389pub async fn stop(
390 interval: Option<u64>,
391 node_registry: NodeRegistryManager,
392 peer_ids: Vec<String>,
393 service_names: Vec<String>,
394 verbosity: VerbosityLevel,
395) -> Result<()> {
396 if verbosity != VerbosityLevel::Minimal {
397 print_banner("Stop Antnode Services");
398 }
399 info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
400
401 refresh_node_registry(
402 node_registry.clone(),
403 &ServiceController {},
404 verbosity != VerbosityLevel::Minimal,
405 false,
406 verbosity,
407 )
408 .await?;
409
410 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
411 if services_for_ops.is_empty() {
412 info!("No services are eligible to be stopped");
413 if verbosity != VerbosityLevel::Minimal {
415 println!("No services were eligible to be stopped");
416 }
417 return Ok(());
418 }
419
420 let mut failed_services = Vec::new();
421 for node in services_for_ops.iter() {
422 let service_name = node.read().await.service_name.clone();
423 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
424 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
425 let mut service_manager =
426 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
427
428 if service_manager.service.status().await == ServiceStatus::Running
429 && let Some(interval) = interval
430 {
431 debug!("Sleeping for {} milliseconds", interval);
432 std::thread::sleep(std::time::Duration::from_millis(interval));
433 }
434 match service_manager.stop().await {
435 Ok(()) => {
436 debug!("Stopped service {service_name}");
437 node_registry.save().await?;
438 }
439 Err(err) => {
440 error!("Failed to stop service {service_name}: {err}");
441 failed_services.push((service_name.clone(), err.to_string()))
442 }
443 }
444 }
445
446 summarise_any_failed_ops(failed_services, "stop", verbosity)
447}
448
449pub async fn upgrade(
450 connection_timeout_s: u64,
451 do_not_start: bool,
452 custom_bin_path: Option<PathBuf>,
453 force: bool,
454 fixed_interval: Option<u64>,
455 node_registry: NodeRegistryManager,
456 peer_ids: Vec<String>,
457 provided_env_variables: Option<Vec<(String, String)>>,
458 service_names: Vec<String>,
459 url: Option<String>,
460 version: Option<String>,
461 verbosity: VerbosityLevel,
462) -> Result<()> {
463 let use_force = force || custom_bin_path.is_some();
467
468 if verbosity != VerbosityLevel::Minimal {
469 print_banner("Upgrade Antnode Services");
470 }
471 info!(
472 "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
473 );
474
475 let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
476 custom_bin_path.clone(),
477 ReleaseType::AntNode,
478 url,
479 version,
480 verbosity,
481 )
482 .await?;
483
484 refresh_node_registry(
485 node_registry.clone(),
486 &ServiceController {},
487 verbosity != VerbosityLevel::Minimal,
488 false,
489 verbosity,
490 )
491 .await?;
492
493 if let Some(node) = node_registry.nodes.read().await.first() {
494 let node = node.read().await;
495 debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
496 } else {
497 debug!("There are no nodes currently added or active");
498 }
499
500 if !use_force {
501 let mut node_versions = Vec::new();
502
503 for node in node_registry.nodes.read().await.iter() {
504 let node = node.read().await;
505 let version = Version::parse(&node.version)
506 .map_err(|_| eyre!("Failed to parse Version for node {}", node.service_name))?;
507 node_versions.push(version);
508 }
509
510 let any_nodes_need_upgraded = node_versions
511 .iter()
512 .any(|current_version| current_version < &target_version);
513 if !any_nodes_need_upgraded {
514 info!("All nodes are at the latest version, no upgrade required.");
515 if verbosity != VerbosityLevel::Minimal {
516 println!("{} All nodes are at the latest version", "✓".green());
517 }
518 return Ok(());
519 }
520 }
521
522 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
523 trace!("services_for_ops len: {}", services_for_ops.len());
524 let mut upgrade_summary = Vec::new();
525
526 for node in &services_for_ops {
527 let env_variables = if provided_env_variables.is_some() {
528 provided_env_variables.clone()
529 } else {
530 node_registry.environment_variables.read().await.clone()
531 };
532 let options = UpgradeOptions {
533 auto_restart: false,
534 env_variables: env_variables.clone(),
535 force: use_force,
536 start_service: !do_not_start,
537 target_bin_path: upgrade_bin_path.clone(),
538 target_version: target_version.clone(),
539 };
540 let service_name = node.read().await.service_name.clone();
541
542 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
543 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
544 let service = if fixed_interval.is_none() {
546 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
547 } else {
548 service
549 };
550
551 let mut service_manager =
552 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
553
554 match service_manager.upgrade(options).await {
555 Ok(upgrade_result) => {
556 info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
557 if upgrade_result != UpgradeResult::NotRequired {
558 if let Some(interval) = fixed_interval {
561 debug!("Sleeping for {interval} milliseconds",);
562 std::thread::sleep(std::time::Duration::from_millis(interval));
563 }
564 }
565 upgrade_summary.push((service_name.clone(), upgrade_result));
566 node_registry.save().await?;
567 }
568 Err(err) => {
569 error!("Error upgrading service {service_name}: {err}");
570 upgrade_summary.push((
571 service_name.clone(),
572 UpgradeResult::Error(format!("Error: {err}")),
573 ));
574 node_registry.save().await?;
575 }
576 }
577 }
578
579 if verbosity != VerbosityLevel::Minimal {
580 print_upgrade_summary(upgrade_summary.clone());
581 }
582
583 if upgrade_summary.iter().any(|(_, r)| {
584 matches!(r, UpgradeResult::Error(_))
585 || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
586 }) {
587 return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
588 "For any services that were upgraded but did not start, you can attempt to start them \
589 again using the 'start' command."));
590 }
591
592 Ok(())
593}
594
595pub async fn maintain_n_running_nodes(
599 alpha: bool,
600 auto_restart: bool,
601 auto_set_nat_flags: bool,
602 connection_timeout_s: u64,
603 max_nodes_to_run: u16,
604 data_dir_path: Option<PathBuf>,
605 enable_metrics_server: bool,
606 env_variables: Option<Vec<(String, String)>>,
607 evm_network: Option<EvmNetwork>,
608 log_dir_path: Option<PathBuf>,
609 log_format: Option<LogFormat>,
610 max_archived_log_files: Option<usize>,
611 max_log_files: Option<usize>,
612 metrics_port: Option<PortRange>,
613 network_id: Option<u8>,
614 node_ip: Option<Ipv4Addr>,
615 node_port: Option<PortRange>,
616 node_registry: NodeRegistryManager,
617 peers_args: InitialPeersConfig,
618 relay: bool,
619 rewards_address: RewardsAddress,
620 rpc_address: Option<Ipv4Addr>,
621 rpc_port: Option<PortRange>,
622 src_path: Option<PathBuf>,
623 url: Option<String>,
624 no_upnp: bool,
625 user: Option<String>,
626 version: Option<String>,
627 verbosity: VerbosityLevel,
628 start_node_interval: Option<u64>,
629 write_older_cache_files: bool,
630) -> Result<()> {
631 let mut running_nodes = Vec::new();
632
633 for node in node_registry.nodes.read().await.iter() {
634 let node = node.read().await;
635 if node.status == ServiceStatus::Running {
636 running_nodes.push(node.service_name.clone());
637 }
638 }
639
640 let running_count = running_nodes.len();
641 let target_count = max_nodes_to_run as usize;
642
643 info!(
644 "Current running nodes: {}, Target: {}",
645 running_count, target_count
646 );
647
648 match running_count.cmp(&target_count) {
649 Ordering::Greater => {
650 let to_stop_count = running_count - target_count;
651 let services_to_stop = running_nodes
652 .into_iter()
653 .rev() .take(to_stop_count)
655 .collect::<Vec<_>>();
656
657 info!(
658 "Stopping {} excess nodes: {:?}",
659 to_stop_count, services_to_stop
660 );
661 stop(
662 None,
663 node_registry.clone(),
664 vec![],
665 services_to_stop,
666 verbosity,
667 )
668 .await?;
669 }
670 Ordering::Less => {
671 let to_start_count = target_count - running_count;
672 let mut inactive_nodes = Vec::new();
673 for node in node_registry.nodes.read().await.iter() {
674 let node = node.read().await;
675 if node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added {
676 inactive_nodes.push(node.service_name.clone());
677 }
678 }
679
680 info!("Inactive nodes available: {}", inactive_nodes.len());
681
682 if to_start_count <= inactive_nodes.len() {
683 let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
684 info!(
685 "Starting {} existing inactive nodes: {:?}",
686 to_start_count, nodes_to_start
687 );
688 start(
689 connection_timeout_s,
690 start_node_interval,
691 node_registry.clone(),
692 vec![],
693 nodes_to_start,
694 verbosity,
695 )
696 .await?;
697 } else {
698 let to_add_count = to_start_count - inactive_nodes.len();
699 info!(
700 "Adding {} new nodes and starting all {} inactive nodes",
701 to_add_count,
702 inactive_nodes.len()
703 );
704
705 let ports_to_use = match node_port {
706 Some(PortRange::Single(port)) => vec![port],
707 Some(PortRange::Range(start, end)) => {
708 (start..=end).take(to_add_count).collect()
709 }
710 None => vec![],
711 };
712
713 for (i, port) in ports_to_use.into_iter().enumerate() {
714 let added_service = add(
715 alpha,
716 auto_restart,
717 auto_set_nat_flags,
718 Some(1),
719 data_dir_path.clone(),
720 enable_metrics_server,
721 env_variables.clone(),
722 evm_network.clone(),
723 log_dir_path.clone(),
724 log_format,
725 max_archived_log_files,
726 max_log_files,
727 metrics_port.clone(),
728 network_id,
729 node_ip,
730 Some(PortRange::Single(port)),
731 node_registry.clone(),
732 peers_args.clone(),
733 relay,
734 rewards_address,
735 rpc_address,
736 rpc_port.clone(),
737 src_path.clone(),
738 no_upnp,
739 url.clone(),
740 user.clone(),
741 version.clone(),
742 verbosity,
743 write_older_cache_files,
744 )
745 .await?;
746
747 if i == 0 {
748 start(
749 connection_timeout_s,
750 start_node_interval,
751 node_registry.clone(),
752 vec![],
753 added_service,
754 verbosity,
755 )
756 .await?;
757 }
758 }
759
760 if !inactive_nodes.is_empty() {
761 start(
762 connection_timeout_s,
763 start_node_interval,
764 node_registry.clone(),
765 vec![],
766 inactive_nodes,
767 verbosity,
768 )
769 .await?;
770 }
771 }
772 }
773 Ordering::Equal => {
774 info!(
775 "Current node count ({}) matches target ({}). No action needed.",
776 running_count, target_count
777 );
778 }
779 }
780
781 let mut final_running_count = 0;
783 for node in node_registry.nodes.read().await.iter() {
784 let node_read = node.read().await;
785 if node_read.status == ServiceStatus::Running {
786 final_running_count += 1;
787 }
788 }
789
790 if final_running_count != target_count {
791 warn!(
792 "Failed to reach target node count. Expected {target_count}, but got {final_running_count}"
793 );
794 }
795
796 Ok(())
797}
798
799async fn get_services_for_ops(
800 node_registry: &NodeRegistryManager,
801 peer_ids: Vec<String>,
802 service_names: Vec<String>,
803) -> Result<Vec<Arc<RwLock<NodeServiceData>>>> {
804 let mut services = Vec::new();
805
806 if service_names.is_empty() && peer_ids.is_empty() {
807 for node in node_registry.nodes.read().await.iter() {
808 if node.read().await.status != ServiceStatus::Removed {
809 services.push(Arc::clone(node));
810 }
811 }
812 } else {
813 for name in &service_names {
814 let mut found_service_with_name = false;
815 for node in node_registry.nodes.read().await.iter() {
816 let node_read = node.read().await;
817 if node_read.service_name == *name && node_read.status != ServiceStatus::Removed {
818 {
819 services.push(Arc::clone(node));
820 found_service_with_name = true;
821 break;
822 }
823 }
824 }
825
826 if !found_service_with_name {
827 error!("No service named '{name}'");
828 return Err(eyre!(format!("No service named '{name}'")));
829 }
830 }
831
832 for peer_id_str in &peer_ids {
833 let mut found_service_with_peer_id = false;
834 let given_peer_id = PeerId::from_str(peer_id_str)
835 .map_err(|_| eyre!(format!("Error parsing PeerId: '{peer_id_str}'")))?;
836 for node in node_registry.nodes.read().await.iter() {
837 let node_read = node.read().await;
838 if let Some(peer_id) = node_read.peer_id
839 && peer_id == given_peer_id
840 && node_read.status != ServiceStatus::Removed
841 {
842 services.push(Arc::clone(node));
843 found_service_with_peer_id = true;
844 break;
845 }
846 }
847 if !found_service_with_peer_id {
848 error!("Could not find node with peer id: '{given_peer_id:?}'");
849 return Err(eyre!(format!(
850 "Could not find node with peer ID '{given_peer_id}'",
851 )));
852 }
853 }
854 }
855
856 Ok(services)
857}
858
859fn summarise_any_failed_ops(
860 failed_services: Vec<(String, String)>,
861 verb: &str,
862 verbosity: VerbosityLevel,
863) -> Result<()> {
864 if !failed_services.is_empty() {
865 if verbosity != VerbosityLevel::Minimal {
866 println!("Failed to {verb} {} service(s):", failed_services.len());
867 for failed in failed_services.iter() {
868 println!("{} {}: {}", "✕".red(), failed.0, failed.1);
869 }
870 }
871
872 error!("Failed to {verb} one or more services");
873 return Err(eyre!("Failed to {verb} one or more services"));
874 }
875 Ok(())
876}