1#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13 ServiceManager, VerbosityLevel,
14 add_services::{
15 add_node,
16 config::{AddNodeServiceOptions, PortRange},
17 },
18 config::{self, is_running_as_root},
19 helpers::{download_and_extract_release, get_bin_version},
20 print_banner, refresh_node_registry, status_report,
21};
22use ant_bootstrap::{Bootstrap, InitialPeersConfig};
23use ant_evm::{EvmNetwork, RewardsAddress};
24use ant_logging::LogFormat;
25use ant_releases::{AntReleaseRepoActions, ReleaseType};
26use ant_service_management::{
27 NodeRegistryManager, NodeService, NodeServiceData, ServiceStateActions, ServiceStatus,
28 UpgradeOptions, UpgradeResult,
29 control::{ServiceControl, ServiceController},
30 rpc::RpcClient,
31};
32use color_eyre::{Help, Result, eyre::eyre};
33use colored::Colorize;
34use libp2p_identity::PeerId;
35use semver::Version;
36use service_manager::RestartPolicy;
37use std::{
38 cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, sync::Arc, time::Duration,
39};
40use tokio::sync::RwLock;
41use tracing::debug;
42
43pub async fn add(
45 alpha: bool,
46 auto_restart: bool,
47 auto_set_nat_flags: bool,
48 count: Option<u16>,
49 data_dir_path: Option<PathBuf>,
50 enable_metrics_server: bool,
51 env_variables: Option<Vec<(String, String)>>,
52 evm_network: Option<EvmNetwork>,
53 log_dir_path: Option<PathBuf>,
54 log_format: Option<LogFormat>,
55 max_archived_log_files: Option<usize>,
56 max_log_files: Option<usize>,
57 metrics_port: Option<PortRange>,
58 network_id: Option<u8>,
59 node_ip: Option<Ipv4Addr>,
60 node_port: Option<PortRange>,
61 node_registry: NodeRegistryManager,
62 mut init_peers_config: InitialPeersConfig,
63 relay: bool,
64 restart_policy: RestartPolicy,
65 rewards_address: RewardsAddress,
66 rpc_address: Option<Ipv4Addr>,
67 rpc_port: Option<PortRange>,
68 src_path: Option<PathBuf>,
69 no_upnp: bool,
70 url: Option<String>,
71 user: Option<String>,
72 version: Option<String>,
73 verbosity: VerbosityLevel,
74 write_older_cache_files: bool,
75) -> Result<Vec<String>> {
76 let user_mode = !is_running_as_root();
77
78 if verbosity != VerbosityLevel::Minimal {
79 print_banner("Add Antnode Services");
80 println!("{} service(s) to be added", count.unwrap_or(1));
81 }
82
83 let service_manager = ServiceController {};
84 let service_user = if user_mode {
85 None
86 } else {
87 let service_user = user.unwrap_or_else(|| "ant".to_string());
88 service_manager.create_service_user(&service_user)?;
89 Some(service_user)
90 };
91
92 let service_data_dir_path =
93 config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
94 let service_log_dir_path =
95 config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
96 let bootstrap_cache_dir = if let Some(user) = &service_user {
97 Some(config::get_bootstrap_cache_owner_path(user)?)
98 } else {
99 None
100 };
101
102 let release_repo = <dyn AntReleaseRepoActions>::default_config();
103
104 let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
105 let version = get_bin_version(&path)?;
106 (path, version)
107 } else {
108 download_and_extract_release(
109 ReleaseType::AntNode,
110 url.clone(),
111 version,
112 &*release_repo,
113 verbosity,
114 None,
115 )
116 .await?
117 };
118
119 debug!("Parsing peers from PeersArgs");
120
121 init_peers_config.addrs.extend(Bootstrap::fetch_from_env());
122 init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
123
124 let options = AddNodeServiceOptions {
125 alpha,
126 antnode_dir_path: service_data_dir_path.clone(),
127 antnode_src_path,
128 auto_restart,
129 auto_set_nat_flags,
130 count,
131 delete_antnode_src: src_path.is_none(),
132 enable_metrics_server,
133 evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
134 env_variables,
135 init_peers_config,
136 log_format,
137 max_archived_log_files,
138 max_log_files,
139 metrics_port,
140 network_id,
141 no_upnp,
142 node_ip,
143 node_port,
144 relay,
145 restart_policy,
146 rewards_address,
147 rpc_address,
148 rpc_port,
149 service_data_dir_path,
150 service_log_dir_path,
151 user: service_user,
152 user_mode,
153 version,
154 write_older_cache_files,
155 };
156 info!("Adding node service(s)");
157 let added_services_names =
158 add_node(options, node_registry.clone(), &service_manager, verbosity).await?;
159
160 node_registry.save().await?;
161 debug!("Node registry saved");
162
163 Ok(added_services_names)
164}
165
166pub async fn balance(
167 peer_ids: Vec<String>,
168 node_registry: NodeRegistryManager,
169 service_names: Vec<String>,
170 verbosity: VerbosityLevel,
171) -> Result<()> {
172 if verbosity != VerbosityLevel::Minimal {
173 print_banner("Reward Balances");
174 }
175
176 refresh_node_registry(
177 node_registry.clone(),
178 &ServiceController {},
179 verbosity != VerbosityLevel::Minimal,
180 false,
181 verbosity,
182 )
183 .await?;
184
185 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
186 if services_for_ops.is_empty() {
187 info!("Services for ops is empty, cannot obtain the balance");
188 println!("No balances to display");
190 return Ok(());
191 }
192 debug!("Obtaining balances for {} services", services_for_ops.len());
193
194 for node in services_for_ops {
195 let node = node.read().await;
196 println!("{}: {}", node.service_name, 0,);
198 }
199 Ok(())
200}
201
202pub async fn remove(
203 keep_directories: bool,
204 peer_ids: Vec<String>,
205 node_registry: NodeRegistryManager,
206 service_names: Vec<String>,
207 verbosity: VerbosityLevel,
208) -> Result<()> {
209 if verbosity != VerbosityLevel::Minimal {
210 print_banner("Remove Antnode Services");
211 }
212 info!(
213 "Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}"
214 );
215
216 refresh_node_registry(
217 node_registry.clone(),
218 &ServiceController {},
219 verbosity != VerbosityLevel::Minimal,
220 false,
221 verbosity,
222 )
223 .await?;
224
225 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
226 if services_for_ops.is_empty() {
227 info!("Services for ops is empty, no services were eligible for removal");
228 if verbosity != VerbosityLevel::Minimal {
230 println!("No services were eligible for removal");
231 }
232 return Ok(());
233 }
234
235 let mut failed_services = Vec::new();
236 for node in &services_for_ops {
237 let service_name = node.read().await.service_name.clone();
238 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
239 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
240 let mut service_manager =
241 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
242 match service_manager.remove(keep_directories).await {
243 Ok(()) => {
244 debug!("Removed service {service_name}");
245 node_registry.save().await?;
246 }
247 Err(err) => {
248 error!("Failed to remove service {service_name}: {err}");
249 failed_services.push((service_name.clone(), err.to_string()))
250 }
251 }
252 }
253
254 summarise_any_failed_ops(failed_services, "remove", verbosity)
255}
256
257pub async fn reset(
258 force: bool,
259 node_registry: NodeRegistryManager,
260 verbosity: VerbosityLevel,
261) -> Result<()> {
262 if verbosity != VerbosityLevel::Minimal {
263 print_banner("Reset Antnode Services");
264 }
265 info!("Resetting all antnode services, with force={force}");
266
267 if !force {
268 println!("WARNING: all antnode services, data, and logs will be removed.");
269 println!("Do you wish to proceed? [y/n]");
270 std::io::stdout().flush()?;
271 let mut input = String::new();
272 std::io::stdin().read_line(&mut input)?;
273 if input.trim().to_lowercase() != "y" {
274 println!("Reset aborted");
275 return Ok(());
276 }
277 }
278
279 stop(None, node_registry.clone(), vec![], vec![], verbosity).await?;
280 remove(false, vec![], node_registry, vec![], verbosity).await?;
281
282 let node_registry_path = config::get_node_registry_path()?;
286 if node_registry_path.exists() {
287 info!("Removing node registry file: {node_registry_path:?}");
288 std::fs::remove_file(node_registry_path)?;
289 }
290
291 Ok(())
292}
293
294pub async fn start(
295 connection_timeout_s: u64,
296 fixed_interval: Option<u64>,
297 node_registry: NodeRegistryManager,
298 peer_ids: Vec<String>,
299 service_names: Vec<String>,
300 verbosity: VerbosityLevel,
301) -> Result<()> {
302 if verbosity != VerbosityLevel::Minimal {
303 print_banner("Start Antnode Services");
304 }
305 info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
306
307 refresh_node_registry(
308 node_registry.clone(),
309 &ServiceController {},
310 verbosity != VerbosityLevel::Minimal,
311 false,
312 verbosity,
313 )
314 .await?;
315
316 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
317 if services_for_ops.is_empty() {
318 info!("No services are eligible to be started");
319 if verbosity != VerbosityLevel::Minimal {
321 println!("No services were eligible to be started");
322 }
323 return Ok(());
324 }
325
326 let mut failed_services = Vec::new();
327 for node in &services_for_ops {
328 let service_name = node.read().await.service_name.clone();
329
330 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
331 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
332
333 let service = if fixed_interval.is_none() {
335 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
336 } else {
337 service
338 };
339
340 let mut service_manager =
341 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
342 if service_manager.service.status().await != ServiceStatus::Running {
343 if let Some(interval) = fixed_interval {
348 debug!("Sleeping for {} milliseconds", interval);
349 std::thread::sleep(std::time::Duration::from_millis(interval));
350 }
351 }
352 match service_manager.start().await {
353 Ok(start_duration) => {
354 debug!("Started service {service_name} in {start_duration:?}",);
355
356 node_registry.save().await?;
357 }
358 Err(err) => {
359 error!("Failed to start service {service_name}: {err}");
360 failed_services.push((service_name.clone(), err.to_string()))
361 }
362 }
363 }
364
365 summarise_any_failed_ops(failed_services, "start", verbosity)
366}
367
368pub async fn status(
369 details: bool,
370 fail: bool,
371 json: bool,
372 node_registry: NodeRegistryManager,
373) -> Result<()> {
374 if !node_registry.nodes.read().await.is_empty() {
375 if !json && !details {
376 print_banner("Antnode Services");
377 }
378 status_report(
379 &node_registry,
380 &ServiceController {},
381 details,
382 json,
383 fail,
384 false,
385 )
386 .await?;
387 node_registry.save().await?;
388 }
389 Ok(())
390}
391
392pub async fn stop(
393 interval: Option<u64>,
394 node_registry: NodeRegistryManager,
395 peer_ids: Vec<String>,
396 service_names: Vec<String>,
397 verbosity: VerbosityLevel,
398) -> Result<()> {
399 if verbosity != VerbosityLevel::Minimal {
400 print_banner("Stop Antnode Services");
401 }
402 info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
403
404 refresh_node_registry(
405 node_registry.clone(),
406 &ServiceController {},
407 verbosity != VerbosityLevel::Minimal,
408 false,
409 verbosity,
410 )
411 .await?;
412
413 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
414 if services_for_ops.is_empty() {
415 info!("No services are eligible to be stopped");
416 if verbosity != VerbosityLevel::Minimal {
418 println!("No services were eligible to be stopped");
419 }
420 return Ok(());
421 }
422
423 let mut failed_services = Vec::new();
424 for node in services_for_ops.iter() {
425 let service_name = node.read().await.service_name.clone();
426 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
427 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
428 let mut service_manager =
429 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
430
431 if service_manager.service.status().await == ServiceStatus::Running
432 && let Some(interval) = interval
433 {
434 debug!("Sleeping for {} milliseconds", interval);
435 std::thread::sleep(std::time::Duration::from_millis(interval));
436 }
437 match service_manager.stop().await {
438 Ok(()) => {
439 debug!("Stopped service {service_name}");
440 node_registry.save().await?;
441 }
442 Err(err) => {
443 error!("Failed to stop service {service_name}: {err}");
444 failed_services.push((service_name.clone(), err.to_string()))
445 }
446 }
447 }
448
449 summarise_any_failed_ops(failed_services, "stop", verbosity)
450}
451
452pub async fn upgrade(
453 connection_timeout_s: u64,
454 do_not_start: bool,
455 custom_bin_path: Option<PathBuf>,
456 force: bool,
457 fixed_interval: Option<u64>,
458 node_registry: NodeRegistryManager,
459 peer_ids: Vec<String>,
460 provided_env_variables: Option<Vec<(String, String)>>,
461 service_names: Vec<String>,
462 url: Option<String>,
463 version: Option<String>,
464 verbosity: VerbosityLevel,
465) -> Result<()> {
466 let use_force = force || custom_bin_path.is_some();
470
471 if verbosity != VerbosityLevel::Minimal {
472 print_banner("Upgrade Antnode Services");
473 }
474 info!(
475 "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
476 );
477
478 let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
479 custom_bin_path.clone(),
480 ReleaseType::AntNode,
481 url,
482 version,
483 verbosity,
484 )
485 .await?;
486
487 refresh_node_registry(
488 node_registry.clone(),
489 &ServiceController {},
490 verbosity != VerbosityLevel::Minimal,
491 false,
492 verbosity,
493 )
494 .await?;
495
496 if let Some(node) = node_registry.nodes.read().await.first() {
497 let node = node.read().await;
498 debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
499 } else {
500 debug!("There are no nodes currently added or active");
501 }
502
503 if !use_force {
504 let mut node_versions = Vec::new();
505
506 for node in node_registry.nodes.read().await.iter() {
507 let node = node.read().await;
508 let version = Version::parse(&node.version)
509 .map_err(|_| eyre!("Failed to parse Version for node {}", node.service_name))?;
510 node_versions.push(version);
511 }
512
513 let any_nodes_need_upgraded = node_versions
514 .iter()
515 .any(|current_version| current_version < &target_version);
516 if !any_nodes_need_upgraded {
517 info!("All nodes are at the latest version, no upgrade required.");
518 if verbosity != VerbosityLevel::Minimal {
519 println!("{} All nodes are at the latest version", "✓".green());
520 }
521 return Ok(());
522 }
523 }
524
525 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
526 trace!("services_for_ops len: {}", services_for_ops.len());
527 let mut upgrade_summary = Vec::new();
528
529 for node in &services_for_ops {
530 let env_variables = if provided_env_variables.is_some() {
531 provided_env_variables.clone()
532 } else {
533 node_registry.environment_variables.read().await.clone()
534 };
535 let options = UpgradeOptions {
536 auto_restart: false,
537 env_variables: env_variables.clone(),
538 force: use_force,
539 start_service: !do_not_start,
540 target_bin_path: upgrade_bin_path.clone(),
541 target_version: target_version.clone(),
542 };
543 let service_name = node.read().await.service_name.clone();
544
545 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
546 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
547 let service = if fixed_interval.is_none() {
549 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
550 } else {
551 service
552 };
553
554 let mut service_manager =
555 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
556
557 match service_manager.upgrade(options).await {
558 Ok(upgrade_result) => {
559 info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
560 if upgrade_result != UpgradeResult::NotRequired {
561 if let Some(interval) = fixed_interval {
564 debug!("Sleeping for {interval} milliseconds",);
565 std::thread::sleep(std::time::Duration::from_millis(interval));
566 }
567 }
568 upgrade_summary.push((service_name.clone(), upgrade_result));
569 node_registry.save().await?;
570 }
571 Err(err) => {
572 error!("Error upgrading service {service_name}: {err}");
573 upgrade_summary.push((
574 service_name.clone(),
575 UpgradeResult::Error(format!("Error: {err}")),
576 ));
577 node_registry.save().await?;
578 }
579 }
580 }
581
582 if verbosity != VerbosityLevel::Minimal {
583 print_upgrade_summary(upgrade_summary.clone());
584 }
585
586 if upgrade_summary.iter().any(|(_, r)| {
587 matches!(r, UpgradeResult::Error(_))
588 || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
589 }) {
590 return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
591 "For any services that were upgraded but did not start, you can attempt to start them \
592 again using the 'start' command."));
593 }
594
595 Ok(())
596}
597
598pub async fn maintain_n_running_nodes(
602 alpha: bool,
603 auto_restart: bool,
604 auto_set_nat_flags: bool,
605 connection_timeout_s: u64,
606 max_nodes_to_run: u16,
607 data_dir_path: Option<PathBuf>,
608 enable_metrics_server: bool,
609 env_variables: Option<Vec<(String, String)>>,
610 evm_network: Option<EvmNetwork>,
611 log_dir_path: Option<PathBuf>,
612 log_format: Option<LogFormat>,
613 max_archived_log_files: Option<usize>,
614 max_log_files: Option<usize>,
615 metrics_port: Option<PortRange>,
616 network_id: Option<u8>,
617 node_ip: Option<Ipv4Addr>,
618 node_port: Option<PortRange>,
619 node_registry: NodeRegistryManager,
620 peers_args: InitialPeersConfig,
621 relay: bool,
622 rewards_address: RewardsAddress,
623 restart_policy: RestartPolicy,
624 rpc_address: Option<Ipv4Addr>,
625 rpc_port: Option<PortRange>,
626 src_path: Option<PathBuf>,
627 url: Option<String>,
628 no_upnp: bool,
629 user: Option<String>,
630 version: Option<String>,
631 verbosity: VerbosityLevel,
632 start_node_interval: Option<u64>,
633 write_older_cache_files: bool,
634) -> Result<()> {
635 let mut running_nodes = Vec::new();
636
637 for node in node_registry.nodes.read().await.iter() {
638 let node = node.read().await;
639 if node.status == ServiceStatus::Running {
640 running_nodes.push(node.service_name.clone());
641 }
642 }
643
644 let running_count = running_nodes.len();
645 let target_count = max_nodes_to_run as usize;
646
647 info!(
648 "Current running nodes: {}, Target: {}",
649 running_count, target_count
650 );
651
652 match running_count.cmp(&target_count) {
653 Ordering::Greater => {
654 let to_stop_count = running_count - target_count;
655 let services_to_stop = running_nodes
656 .into_iter()
657 .rev() .take(to_stop_count)
659 .collect::<Vec<_>>();
660
661 info!(
662 "Stopping {} excess nodes: {:?}",
663 to_stop_count, services_to_stop
664 );
665 stop(
666 None,
667 node_registry.clone(),
668 vec![],
669 services_to_stop,
670 verbosity,
671 )
672 .await?;
673 }
674 Ordering::Less => {
675 let to_start_count = target_count - running_count;
676 let mut inactive_nodes = Vec::new();
677 for node in node_registry.nodes.read().await.iter() {
678 let node = node.read().await;
679 if node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added {
680 inactive_nodes.push(node.service_name.clone());
681 }
682 }
683
684 info!("Inactive nodes available: {}", inactive_nodes.len());
685
686 if to_start_count <= inactive_nodes.len() {
687 let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
688 info!(
689 "Starting {} existing inactive nodes: {:?}",
690 to_start_count, nodes_to_start
691 );
692 start(
693 connection_timeout_s,
694 start_node_interval,
695 node_registry.clone(),
696 vec![],
697 nodes_to_start,
698 verbosity,
699 )
700 .await?;
701 } else {
702 let to_add_count = to_start_count - inactive_nodes.len();
703 info!(
704 "Adding {} new nodes and starting all {} inactive nodes",
705 to_add_count,
706 inactive_nodes.len()
707 );
708
709 let ports_to_use = match node_port {
710 Some(PortRange::Single(port)) => vec![port],
711 Some(PortRange::Range(start, end)) => {
712 (start..=end).take(to_add_count).collect()
713 }
714 None => vec![],
715 };
716
717 for (i, port) in ports_to_use.into_iter().enumerate() {
718 let added_service = add(
719 alpha,
720 auto_restart,
721 auto_set_nat_flags,
722 Some(1),
723 data_dir_path.clone(),
724 enable_metrics_server,
725 env_variables.clone(),
726 evm_network.clone(),
727 log_dir_path.clone(),
728 log_format,
729 max_archived_log_files,
730 max_log_files,
731 metrics_port.clone(),
732 network_id,
733 node_ip,
734 Some(PortRange::Single(port)),
735 node_registry.clone(),
736 peers_args.clone(),
737 relay,
738 restart_policy,
739 rewards_address,
740 rpc_address,
741 rpc_port.clone(),
742 src_path.clone(),
743 no_upnp,
744 url.clone(),
745 user.clone(),
746 version.clone(),
747 verbosity,
748 write_older_cache_files,
749 )
750 .await?;
751
752 if i == 0 {
753 start(
754 connection_timeout_s,
755 start_node_interval,
756 node_registry.clone(),
757 vec![],
758 added_service,
759 verbosity,
760 )
761 .await?;
762 }
763 }
764
765 if !inactive_nodes.is_empty() {
766 start(
767 connection_timeout_s,
768 start_node_interval,
769 node_registry.clone(),
770 vec![],
771 inactive_nodes,
772 verbosity,
773 )
774 .await?;
775 }
776 }
777 }
778 Ordering::Equal => {
779 info!(
780 "Current node count ({}) matches target ({}). No action needed.",
781 running_count, target_count
782 );
783 }
784 }
785
786 let mut final_running_count = 0;
788 for node in node_registry.nodes.read().await.iter() {
789 let node_read = node.read().await;
790 if node_read.status == ServiceStatus::Running {
791 final_running_count += 1;
792 }
793 }
794
795 if final_running_count != target_count {
796 warn!(
797 "Failed to reach target node count. Expected {target_count}, but got {final_running_count}"
798 );
799 }
800
801 Ok(())
802}
803
804async fn get_services_for_ops(
805 node_registry: &NodeRegistryManager,
806 peer_ids: Vec<String>,
807 service_names: Vec<String>,
808) -> Result<Vec<Arc<RwLock<NodeServiceData>>>> {
809 let mut services = Vec::new();
810
811 if service_names.is_empty() && peer_ids.is_empty() {
812 for node in node_registry.nodes.read().await.iter() {
813 if node.read().await.status != ServiceStatus::Removed {
814 services.push(Arc::clone(node));
815 }
816 }
817 } else {
818 for name in &service_names {
819 let mut found_service_with_name = false;
820 for node in node_registry.nodes.read().await.iter() {
821 let node_read = node.read().await;
822 if node_read.service_name == *name && node_read.status != ServiceStatus::Removed {
823 {
824 services.push(Arc::clone(node));
825 found_service_with_name = true;
826 break;
827 }
828 }
829 }
830
831 if !found_service_with_name {
832 error!("No service named '{name}'");
833 return Err(eyre!(format!("No service named '{name}'")));
834 }
835 }
836
837 for peer_id_str in &peer_ids {
838 let mut found_service_with_peer_id = false;
839 let given_peer_id = PeerId::from_str(peer_id_str)
840 .map_err(|_| eyre!(format!("Error parsing PeerId: '{peer_id_str}'")))?;
841 for node in node_registry.nodes.read().await.iter() {
842 let node_read = node.read().await;
843 if let Some(peer_id) = node_read.peer_id
844 && peer_id == given_peer_id
845 && node_read.status != ServiceStatus::Removed
846 {
847 services.push(Arc::clone(node));
848 found_service_with_peer_id = true;
849 break;
850 }
851 }
852 if !found_service_with_peer_id {
853 error!("Could not find node with peer id: '{given_peer_id:?}'");
854 return Err(eyre!(format!(
855 "Could not find node with peer ID '{given_peer_id}'",
856 )));
857 }
858 }
859 }
860
861 Ok(services)
862}
863
864fn summarise_any_failed_ops(
865 failed_services: Vec<(String, String)>,
866 verb: &str,
867 verbosity: VerbosityLevel,
868) -> Result<()> {
869 if !failed_services.is_empty() {
870 if verbosity != VerbosityLevel::Minimal {
871 println!("Failed to {verb} {} service(s):", failed_services.len());
872 for failed in failed_services.iter() {
873 println!("{} {}: {}", "✕".red(), failed.0, failed.1);
874 }
875 }
876
877 error!("Failed to {verb} one or more services");
878 return Err(eyre!("Failed to {verb} one or more services"));
879 }
880 Ok(())
881}