1#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13 ServiceManager, VerbosityLevel,
14 add_services::{
15 add_node,
16 config::{AddNodeServiceOptions, PortRange},
17 },
18 config::{self, is_running_as_root},
19 helpers::{download_and_extract_release, get_bin_version},
20 print_banner, refresh_node_registry, status_report,
21};
22use ant_bootstrap::{Bootstrap, InitialPeersConfig};
23use ant_evm::{EvmNetwork, RewardsAddress};
24use ant_logging::LogFormat;
25use ant_releases::{AntReleaseRepoActions, ReleaseType};
26use ant_service_management::{
27 NodeRegistryManager, NodeService, NodeServiceData, ServiceStateActions, ServiceStatus,
28 UpgradeOptions, UpgradeResult,
29 control::{ServiceControl, ServiceController},
30 rpc::RpcClient,
31};
32use color_eyre::{Help, Result, eyre::eyre};
33use colored::Colorize;
34use libp2p_identity::PeerId;
35use semver::Version;
36use service_manager::RestartPolicy;
37use std::{
38 cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, sync::Arc, time::Duration,
39};
40use tokio::sync::RwLock;
41use tracing::debug;
42
43pub async fn add(
45 alpha: bool,
46 auto_restart: bool,
47 auto_set_nat_flags: bool,
48 count: Option<u16>,
49 data_dir_path: Option<PathBuf>,
50 enable_metrics_server: bool,
51 env_variables: Option<Vec<(String, String)>>,
52 evm_network: Option<EvmNetwork>,
53 log_dir_path: Option<PathBuf>,
54 log_format: Option<LogFormat>,
55 max_archived_log_files: Option<usize>,
56 max_log_files: Option<usize>,
57 metrics_port: Option<PortRange>,
58 network_id: Option<u8>,
59 node_ip: Option<Ipv4Addr>,
60 node_port: Option<PortRange>,
61 node_registry: NodeRegistryManager,
62 mut init_peers_config: InitialPeersConfig,
63 relay: bool,
64 restart_policy: RestartPolicy,
65 rewards_address: RewardsAddress,
66 rpc_address: Option<Ipv4Addr>,
67 rpc_port: Option<PortRange>,
68 src_path: Option<PathBuf>,
69 no_upnp: bool,
70 url: Option<String>,
71 user: Option<String>,
72 version: Option<String>,
73 verbosity: VerbosityLevel,
74 write_older_cache_files: bool,
75) -> Result<Vec<String>> {
76 let user_mode = !is_running_as_root();
77
78 if verbosity != VerbosityLevel::Minimal {
79 print_banner("Add Antnode Services");
80 println!("{} service(s) to be added", count.unwrap_or(1));
81 }
82
83 let service_manager = ServiceController {};
84 let service_user = if user_mode {
85 None
86 } else {
87 let service_user = user.unwrap_or_else(|| "ant".to_string());
88 service_manager.create_service_user(&service_user)?;
89 Some(service_user)
90 };
91
92 let service_data_dir_path =
93 config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
94 let service_log_dir_path =
95 config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
96 let bootstrap_cache_dir = if let Some(user) = &service_user {
97 Some(config::get_bootstrap_cache_owner_path(user)?)
98 } else {
99 None
100 };
101
102 let release_repo = <dyn AntReleaseRepoActions>::default_config();
103
104 let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
105 let version = get_bin_version(&path)?;
106 (path, version)
107 } else {
108 download_and_extract_release(
109 ReleaseType::AntNode,
110 url.clone(),
111 version,
112 &*release_repo,
113 verbosity,
114 None,
115 )
116 .await?
117 };
118
119 debug!("Parsing peers from PeersArgs");
120
121 init_peers_config.addrs.extend(Bootstrap::fetch_from_env());
122 init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
123
124 let options = AddNodeServiceOptions {
125 alpha,
126 antnode_dir_path: service_data_dir_path.clone(),
127 antnode_src_path,
128 auto_restart,
129 auto_set_nat_flags,
130 count,
131 delete_antnode_src: src_path.is_none(),
132 enable_metrics_server,
133 evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
134 env_variables,
135 init_peers_config,
136 log_format,
137 max_archived_log_files,
138 max_log_files,
139 metrics_port,
140 network_id,
141 no_upnp,
142 node_ip,
143 node_port,
144 relay,
145 restart_policy,
146 rewards_address,
147 rpc_address,
148 rpc_port,
149 service_data_dir_path,
150 service_log_dir_path,
151 user: service_user,
152 user_mode,
153 version,
154 write_older_cache_files,
155 };
156 info!("Adding node service(s)");
157 let added_services_names =
158 add_node(options, node_registry.clone(), &service_manager, verbosity).await?;
159
160 node_registry.save().await?;
161 debug!("Node registry saved");
162
163 Ok(added_services_names)
164}
165
166pub async fn balance(
167 peer_ids: Vec<String>,
168 node_registry: NodeRegistryManager,
169 service_names: Vec<String>,
170 verbosity: VerbosityLevel,
171) -> Result<()> {
172 if verbosity != VerbosityLevel::Minimal {
173 print_banner("Reward Balances");
174 }
175
176 refresh_node_registry(
177 node_registry.clone(),
178 &ServiceController {},
179 verbosity != VerbosityLevel::Minimal,
180 false,
181 verbosity,
182 )
183 .await?;
184
185 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
186 if services_for_ops.is_empty() {
187 info!("Services for ops is empty, cannot obtain the balance");
188 println!("No balances to display");
190 return Ok(());
191 }
192 debug!("Obtaining balances for {} services", services_for_ops.len());
193
194 for node in services_for_ops {
195 let node = node.read().await;
196 println!("{}: {}", node.service_name, 0,);
198 }
199 Ok(())
200}
201
202pub async fn remove(
203 keep_directories: bool,
204 peer_ids: Vec<String>,
205 node_registry: NodeRegistryManager,
206 service_names: Vec<String>,
207 verbosity: VerbosityLevel,
208) -> Result<()> {
209 if verbosity != VerbosityLevel::Minimal {
210 print_banner("Remove Antnode Services");
211 }
212 info!(
213 "Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}"
214 );
215
216 refresh_node_registry(
217 node_registry.clone(),
218 &ServiceController {},
219 verbosity != VerbosityLevel::Minimal,
220 false,
221 verbosity,
222 )
223 .await?;
224
225 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
226 if services_for_ops.is_empty() {
227 info!("Services for ops is empty, no services were eligible for removal");
228 if verbosity != VerbosityLevel::Minimal {
230 println!("No services were eligible for removal");
231 }
232 return Ok(());
233 }
234
235 let mut failed_services = Vec::new();
236 for node in &services_for_ops {
237 let service_name = node.read().await.service_name.clone();
238 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
239 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
240 let mut service_manager =
241 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
242 match service_manager.remove(keep_directories).await {
243 Ok(()) => {
244 debug!("Removed service {service_name}");
245 node_registry.save().await?;
246 }
247 Err(err) => {
248 error!("Failed to remove service {service_name}: {err}");
249 failed_services.push((service_name.clone(), err.to_string()))
250 }
251 }
252 }
253
254 summarise_any_failed_ops(failed_services, "remove", verbosity)
255}
256
257pub async fn reset(
258 force: bool,
259 node_registry: NodeRegistryManager,
260 verbosity: VerbosityLevel,
261) -> Result<()> {
262 if verbosity != VerbosityLevel::Minimal {
263 print_banner("Reset Antnode Services");
264 }
265 info!("Resetting all antnode services, with force={force}");
266
267 if !force {
268 println!("WARNING: all antnode services, data, and logs will be removed.");
269 println!("Do you wish to proceed? [y/n]");
270 std::io::stdout().flush()?;
271 let mut input = String::new();
272 std::io::stdin().read_line(&mut input)?;
273 if input.trim().to_lowercase() != "y" {
274 println!("Reset aborted");
275 return Ok(());
276 }
277 }
278
279 stop(None, node_registry.clone(), vec![], vec![], verbosity).await?;
280 remove(false, vec![], node_registry.clone(), vec![], verbosity).await?;
281
282 let node_registry_path = config::get_node_registry_path()?;
286 if node_registry_path.exists() {
287 info!("Removing node registry file: {node_registry_path:?}");
288 std::fs::remove_file(node_registry_path)?;
289 }
290 info!("Resetting NodeRegistryManager in memory");
291 node_registry.reset().await;
292
293 Ok(())
294}
295
296pub async fn start(
297 connection_timeout_s: u64,
298 fixed_interval: Option<u64>,
299 node_registry: NodeRegistryManager,
300 peer_ids: Vec<String>,
301 service_names: Vec<String>,
302 verbosity: VerbosityLevel,
303) -> Result<()> {
304 if verbosity != VerbosityLevel::Minimal {
305 print_banner("Start Antnode Services");
306 }
307 info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
308
309 refresh_node_registry(
310 node_registry.clone(),
311 &ServiceController {},
312 verbosity != VerbosityLevel::Minimal,
313 false,
314 verbosity,
315 )
316 .await?;
317
318 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
319 if services_for_ops.is_empty() {
320 info!("No services are eligible to be started");
321 if verbosity != VerbosityLevel::Minimal {
323 println!("No services were eligible to be started");
324 }
325 return Ok(());
326 }
327
328 let mut failed_services = Vec::new();
329 for node in &services_for_ops {
330 let service_name = node.read().await.service_name.clone();
331
332 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
333 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
334
335 let service = if fixed_interval.is_none() {
337 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
338 } else {
339 service
340 };
341
342 let mut service_manager =
343 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
344 if service_manager.service.status().await != ServiceStatus::Running {
345 if let Some(interval) = fixed_interval {
350 debug!("Sleeping for {} milliseconds", interval);
351 std::thread::sleep(std::time::Duration::from_millis(interval));
352 }
353 }
354 match service_manager.start().await {
355 Ok(start_duration) => {
356 debug!("Started service {service_name} in {start_duration:?}",);
357
358 node_registry.save().await?;
359 }
360 Err(err) => {
361 error!("Failed to start service {service_name}: {err}");
362 failed_services.push((service_name.clone(), err.to_string()))
363 }
364 }
365 }
366
367 summarise_any_failed_ops(failed_services, "start", verbosity)
368}
369
370pub async fn status(
371 details: bool,
372 fail: bool,
373 json: bool,
374 node_registry: NodeRegistryManager,
375) -> Result<()> {
376 if !node_registry.nodes.read().await.is_empty() {
377 if !json && !details {
378 print_banner("Antnode Services");
379 }
380 status_report(
381 &node_registry,
382 &ServiceController {},
383 details,
384 json,
385 fail,
386 false,
387 )
388 .await?;
389 node_registry.save().await?;
390 }
391 Ok(())
392}
393
394pub async fn stop(
395 interval: Option<u64>,
396 node_registry: NodeRegistryManager,
397 peer_ids: Vec<String>,
398 service_names: Vec<String>,
399 verbosity: VerbosityLevel,
400) -> Result<()> {
401 if verbosity != VerbosityLevel::Minimal {
402 print_banner("Stop Antnode Services");
403 }
404 info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
405
406 refresh_node_registry(
407 node_registry.clone(),
408 &ServiceController {},
409 verbosity != VerbosityLevel::Minimal,
410 false,
411 verbosity,
412 )
413 .await?;
414
415 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
416 if services_for_ops.is_empty() {
417 info!("No services are eligible to be stopped");
418 if verbosity != VerbosityLevel::Minimal {
420 println!("No services were eligible to be stopped");
421 }
422 return Ok(());
423 }
424
425 let mut failed_services = Vec::new();
426 for node in services_for_ops.iter() {
427 let service_name = node.read().await.service_name.clone();
428 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
429 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
430 let mut service_manager =
431 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
432
433 if service_manager.service.status().await == ServiceStatus::Running
434 && let Some(interval) = interval
435 {
436 debug!("Sleeping for {} milliseconds", interval);
437 std::thread::sleep(std::time::Duration::from_millis(interval));
438 }
439 match service_manager.stop().await {
440 Ok(()) => {
441 debug!("Stopped service {service_name}");
442 node_registry.save().await?;
443 }
444 Err(err) => {
445 error!("Failed to stop service {service_name}: {err}");
446 failed_services.push((service_name.clone(), err.to_string()))
447 }
448 }
449 }
450
451 summarise_any_failed_ops(failed_services, "stop", verbosity)
452}
453
454pub async fn upgrade(
455 connection_timeout_s: u64,
456 do_not_start: bool,
457 custom_bin_path: Option<PathBuf>,
458 force: bool,
459 fixed_interval: Option<u64>,
460 node_registry: NodeRegistryManager,
461 peer_ids: Vec<String>,
462 provided_env_variables: Option<Vec<(String, String)>>,
463 service_names: Vec<String>,
464 url: Option<String>,
465 version: Option<String>,
466 verbosity: VerbosityLevel,
467) -> Result<()> {
468 let use_force = force || custom_bin_path.is_some();
472
473 if verbosity != VerbosityLevel::Minimal {
474 print_banner("Upgrade Antnode Services");
475 }
476 info!(
477 "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
478 );
479
480 let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
481 custom_bin_path.clone(),
482 ReleaseType::AntNode,
483 url,
484 version,
485 verbosity,
486 )
487 .await?;
488
489 refresh_node_registry(
490 node_registry.clone(),
491 &ServiceController {},
492 verbosity != VerbosityLevel::Minimal,
493 false,
494 verbosity,
495 )
496 .await?;
497
498 if let Some(node) = node_registry.nodes.read().await.first() {
499 let node = node.read().await;
500 debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
501 } else {
502 debug!("There are no nodes currently added or active");
503 }
504
505 if !use_force {
506 let mut node_versions = Vec::new();
507
508 for node in node_registry.nodes.read().await.iter() {
509 let node = node.read().await;
510 let version = Version::parse(&node.version)
511 .map_err(|_| eyre!("Failed to parse Version for node {}", node.service_name))?;
512 node_versions.push(version);
513 }
514
515 let any_nodes_need_upgraded = node_versions
516 .iter()
517 .any(|current_version| current_version < &target_version);
518 if !any_nodes_need_upgraded {
519 info!("All nodes are at the latest version, no upgrade required.");
520 if verbosity != VerbosityLevel::Minimal {
521 println!("{} All nodes are at the latest version", "✓".green());
522 }
523 return Ok(());
524 }
525 }
526
527 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
528 trace!("services_for_ops len: {}", services_for_ops.len());
529 let mut upgrade_summary = Vec::new();
530
531 for node in &services_for_ops {
532 let env_variables = if provided_env_variables.is_some() {
533 provided_env_variables.clone()
534 } else {
535 node_registry.environment_variables.read().await.clone()
536 };
537 let options = UpgradeOptions {
538 auto_restart: false,
539 env_variables: env_variables.clone(),
540 force: use_force,
541 start_service: !do_not_start,
542 target_bin_path: upgrade_bin_path.clone(),
543 target_version: target_version.clone(),
544 };
545 let service_name = node.read().await.service_name.clone();
546
547 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
548 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
549 let service = if fixed_interval.is_none() {
551 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
552 } else {
553 service
554 };
555
556 let mut service_manager =
557 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
558
559 match service_manager.upgrade(options).await {
560 Ok(upgrade_result) => {
561 info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
562 if upgrade_result != UpgradeResult::NotRequired {
563 if let Some(interval) = fixed_interval {
566 debug!("Sleeping for {interval} milliseconds",);
567 std::thread::sleep(std::time::Duration::from_millis(interval));
568 }
569 }
570 upgrade_summary.push((service_name.clone(), upgrade_result));
571 node_registry.save().await?;
572 }
573 Err(err) => {
574 error!("Error upgrading service {service_name}: {err}");
575 upgrade_summary.push((
576 service_name.clone(),
577 UpgradeResult::Error(format!("Error: {err}")),
578 ));
579 node_registry.save().await?;
580 }
581 }
582 }
583
584 if verbosity != VerbosityLevel::Minimal {
585 print_upgrade_summary(upgrade_summary.clone());
586 }
587
588 if upgrade_summary.iter().any(|(_, r)| {
589 matches!(r, UpgradeResult::Error(_))
590 || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
591 }) {
592 return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
593 "For any services that were upgraded but did not start, you can attempt to start them \
594 again using the 'start' command."));
595 }
596
597 Ok(())
598}
599
600pub async fn maintain_n_running_nodes(
604 alpha: bool,
605 auto_restart: bool,
606 auto_set_nat_flags: bool,
607 connection_timeout_s: u64,
608 max_nodes_to_run: u16,
609 data_dir_path: Option<PathBuf>,
610 enable_metrics_server: bool,
611 env_variables: Option<Vec<(String, String)>>,
612 evm_network: Option<EvmNetwork>,
613 log_dir_path: Option<PathBuf>,
614 log_format: Option<LogFormat>,
615 max_archived_log_files: Option<usize>,
616 max_log_files: Option<usize>,
617 metrics_port: Option<PortRange>,
618 network_id: Option<u8>,
619 node_ip: Option<Ipv4Addr>,
620 node_port: Option<PortRange>,
621 node_registry: NodeRegistryManager,
622 peers_args: InitialPeersConfig,
623 relay: bool,
624 rewards_address: RewardsAddress,
625 restart_policy: RestartPolicy,
626 rpc_address: Option<Ipv4Addr>,
627 rpc_port: Option<PortRange>,
628 src_path: Option<PathBuf>,
629 url: Option<String>,
630 no_upnp: bool,
631 user: Option<String>,
632 version: Option<String>,
633 verbosity: VerbosityLevel,
634 start_node_interval: Option<u64>,
635 write_older_cache_files: bool,
636) -> Result<()> {
637 let mut running_nodes = Vec::new();
638
639 for node in node_registry.nodes.read().await.iter() {
640 let node = node.read().await;
641 if node.status == ServiceStatus::Running {
642 running_nodes.push(node.service_name.clone());
643 }
644 }
645
646 let running_count = running_nodes.len();
647 let target_count = max_nodes_to_run as usize;
648
649 info!(
650 "Current running nodes: {}, Target: {}",
651 running_count, target_count
652 );
653
654 match running_count.cmp(&target_count) {
655 Ordering::Greater => {
656 let to_stop_count = running_count - target_count;
657 let services_to_stop = running_nodes
658 .into_iter()
659 .rev() .take(to_stop_count)
661 .collect::<Vec<_>>();
662
663 info!(
664 "Stopping {} excess nodes: {:?}",
665 to_stop_count, services_to_stop
666 );
667 stop(
668 None,
669 node_registry.clone(),
670 vec![],
671 services_to_stop,
672 verbosity,
673 )
674 .await?;
675 }
676 Ordering::Less => {
677 let to_start_count = target_count - running_count;
678 let mut inactive_nodes = Vec::new();
679 for node in node_registry.nodes.read().await.iter() {
680 let node = node.read().await;
681 if node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added {
682 inactive_nodes.push(node.service_name.clone());
683 }
684 }
685
686 info!("Inactive nodes available: {}", inactive_nodes.len());
687
688 if to_start_count <= inactive_nodes.len() {
689 let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
690 info!(
691 "Starting {} existing inactive nodes: {:?}",
692 to_start_count, nodes_to_start
693 );
694 start(
695 connection_timeout_s,
696 start_node_interval,
697 node_registry.clone(),
698 vec![],
699 nodes_to_start,
700 verbosity,
701 )
702 .await?;
703 } else {
704 let to_add_count = to_start_count - inactive_nodes.len();
705 info!(
706 "Adding {} new nodes and starting all {} inactive nodes",
707 to_add_count,
708 inactive_nodes.len()
709 );
710
711 let ports_to_use = match node_port {
712 Some(PortRange::Single(port)) => vec![port],
713 Some(PortRange::Range(start, end)) => {
714 (start..=end).take(to_add_count).collect()
715 }
716 None => vec![],
717 };
718
719 info!("Ports to use for new nodes: {:?}", ports_to_use);
720
721 for (i, port) in ports_to_use.into_iter().enumerate() {
722 let added_service = add(
723 alpha,
724 auto_restart,
725 auto_set_nat_flags,
726 Some(1),
727 data_dir_path.clone(),
728 enable_metrics_server,
729 env_variables.clone(),
730 evm_network.clone(),
731 log_dir_path.clone(),
732 log_format,
733 max_archived_log_files,
734 max_log_files,
735 metrics_port.clone(),
736 network_id,
737 node_ip,
738 Some(PortRange::Single(port)),
739 node_registry.clone(),
740 peers_args.clone(),
741 relay,
742 restart_policy,
743 rewards_address,
744 rpc_address,
745 rpc_port.clone(),
746 src_path.clone(),
747 no_upnp,
748 url.clone(),
749 user.clone(),
750 version.clone(),
751 verbosity,
752 write_older_cache_files,
753 )
754 .await?;
755
756 if i == 0 {
757 start(
758 connection_timeout_s,
759 start_node_interval,
760 node_registry.clone(),
761 vec![],
762 added_service,
763 verbosity,
764 )
765 .await?;
766 }
767 }
768
769 if !inactive_nodes.is_empty() {
770 start(
771 connection_timeout_s,
772 start_node_interval,
773 node_registry.clone(),
774 vec![],
775 inactive_nodes,
776 verbosity,
777 )
778 .await?;
779 }
780 }
781 }
782 Ordering::Equal => {
783 info!(
784 "Current node count ({}) matches target ({}). No action needed.",
785 running_count, target_count
786 );
787 }
788 }
789
790 let mut final_running_count = 0;
792 for node in node_registry.nodes.read().await.iter() {
793 let node_read = node.read().await;
794 if node_read.status == ServiceStatus::Running {
795 final_running_count += 1;
796 }
797 }
798
799 if final_running_count != target_count {
800 warn!(
801 "Failed to reach target node count. Expected {target_count}, but got {final_running_count}"
802 );
803 }
804
805 Ok(())
806}
807
808async fn get_services_for_ops(
809 node_registry: &NodeRegistryManager,
810 peer_ids: Vec<String>,
811 service_names: Vec<String>,
812) -> Result<Vec<Arc<RwLock<NodeServiceData>>>> {
813 let mut services = Vec::new();
814
815 if service_names.is_empty() && peer_ids.is_empty() {
816 for node in node_registry.nodes.read().await.iter() {
817 if node.read().await.status != ServiceStatus::Removed {
818 services.push(Arc::clone(node));
819 }
820 }
821 } else {
822 for name in &service_names {
823 let mut found_service_with_name = false;
824 for node in node_registry.nodes.read().await.iter() {
825 let node_read = node.read().await;
826 if node_read.service_name == *name && node_read.status != ServiceStatus::Removed {
827 {
828 services.push(Arc::clone(node));
829 found_service_with_name = true;
830 break;
831 }
832 }
833 }
834
835 if !found_service_with_name {
836 error!("No service named '{name}'");
837 return Err(eyre!(format!("No service named '{name}'")));
838 }
839 }
840
841 for peer_id_str in &peer_ids {
842 let mut found_service_with_peer_id = false;
843 let given_peer_id = PeerId::from_str(peer_id_str)
844 .map_err(|_| eyre!(format!("Error parsing PeerId: '{peer_id_str}'")))?;
845 for node in node_registry.nodes.read().await.iter() {
846 let node_read = node.read().await;
847 if let Some(peer_id) = node_read.peer_id
848 && peer_id == given_peer_id
849 && node_read.status != ServiceStatus::Removed
850 {
851 services.push(Arc::clone(node));
852 found_service_with_peer_id = true;
853 break;
854 }
855 }
856 if !found_service_with_peer_id {
857 error!("Could not find node with peer id: '{given_peer_id:?}'");
858 return Err(eyre!(format!(
859 "Could not find node with peer ID '{given_peer_id}'",
860 )));
861 }
862 }
863 }
864
865 Ok(services)
866}
867
868fn summarise_any_failed_ops(
869 failed_services: Vec<(String, String)>,
870 verb: &str,
871 verbosity: VerbosityLevel,
872) -> Result<()> {
873 if !failed_services.is_empty() {
874 if verbosity != VerbosityLevel::Minimal {
875 println!("Failed to {verb} {} service(s):", failed_services.len());
876 for failed in failed_services.iter() {
877 println!("{} {}: {}", "✕".red(), failed.0, failed.1);
878 }
879 }
880
881 error!("Failed to {verb} one or more services");
882 return Err(eyre!("Failed to {verb} one or more services"));
883 }
884 Ok(())
885}