1#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13 add_services::{
14 add_node,
15 config::{AddNodeServiceOptions, PortRange},
16 },
17 config::{self, is_running_as_root},
18 helpers::{download_and_extract_release, get_bin_version},
19 print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel,
20};
21use ant_bootstrap::InitialPeersConfig;
22use ant_evm::{EvmNetwork, RewardsAddress};
23use ant_logging::LogFormat;
24use ant_releases::{AntReleaseRepoActions, ReleaseType};
25use ant_service_management::{
26 control::{ServiceControl, ServiceController},
27 rpc::RpcClient,
28 NodeRegistryManager, NodeService, NodeServiceData, ServiceStateActions, ServiceStatus,
29 UpgradeOptions, UpgradeResult,
30};
31use color_eyre::{eyre::eyre, Help, Result};
32use colored::Colorize;
33use libp2p_identity::PeerId;
34use semver::Version;
35use std::{
36 cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, sync::Arc, time::Duration,
37};
38use tokio::sync::RwLock;
39use tracing::debug;
40
41pub async fn add(
43 alpha: bool,
44 auto_restart: bool,
45 auto_set_nat_flags: bool,
46 count: Option<u16>,
47 data_dir_path: Option<PathBuf>,
48 enable_metrics_server: bool,
49 env_variables: Option<Vec<(String, String)>>,
50 evm_network: Option<EvmNetwork>,
51 log_dir_path: Option<PathBuf>,
52 log_format: Option<LogFormat>,
53 max_archived_log_files: Option<usize>,
54 max_log_files: Option<usize>,
55 metrics_port: Option<PortRange>,
56 network_id: Option<u8>,
57 node_ip: Option<Ipv4Addr>,
58 node_port: Option<PortRange>,
59 node_registry: NodeRegistryManager,
60 mut init_peers_config: InitialPeersConfig,
61 relay: bool,
62 rewards_address: RewardsAddress,
63 rpc_address: Option<Ipv4Addr>,
64 rpc_port: Option<PortRange>,
65 src_path: Option<PathBuf>,
66 no_upnp: bool,
67 url: Option<String>,
68 user: Option<String>,
69 version: Option<String>,
70 verbosity: VerbosityLevel,
71 write_older_cache_files: bool,
72) -> Result<Vec<String>> {
73 let user_mode = !is_running_as_root();
74
75 if verbosity != VerbosityLevel::Minimal {
76 print_banner("Add Antnode Services");
77 println!("{} service(s) to be added", count.unwrap_or(1));
78 }
79
80 let service_manager = ServiceController {};
81 let service_user = if user_mode {
82 None
83 } else {
84 let service_user = user.unwrap_or_else(|| "ant".to_string());
85 service_manager.create_service_user(&service_user)?;
86 Some(service_user)
87 };
88
89 let service_data_dir_path =
90 config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
91 let service_log_dir_path =
92 config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
93 let bootstrap_cache_dir = if let Some(user) = &service_user {
94 Some(config::get_bootstrap_cache_owner_path(user)?)
95 } else {
96 None
97 };
98
99 let release_repo = <dyn AntReleaseRepoActions>::default_config();
100
101 let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
102 let version = get_bin_version(&path)?;
103 (path, version)
104 } else {
105 download_and_extract_release(
106 ReleaseType::AntNode,
107 url.clone(),
108 version,
109 &*release_repo,
110 verbosity,
111 None,
112 )
113 .await?
114 };
115
116 debug!("Parsing peers from PeersArgs");
117
118 init_peers_config
119 .addrs
120 .extend(InitialPeersConfig::read_bootstrap_addr_from_env());
121 init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
122
123 let options = AddNodeServiceOptions {
124 alpha,
125 auto_restart,
126 auto_set_nat_flags,
127 count,
128 delete_antnode_src: src_path.is_none(),
129 enable_metrics_server,
130 evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
131 env_variables,
132 relay,
133 log_format,
134 max_archived_log_files,
135 max_log_files,
136 metrics_port,
137 network_id,
138 node_ip,
139 node_port,
140 init_peers_config,
141 rewards_address,
142 rpc_address,
143 rpc_port,
144 antnode_src_path,
145 antnode_dir_path: service_data_dir_path.clone(),
146 service_data_dir_path,
147 service_log_dir_path,
148 no_upnp,
149 user: service_user,
150 user_mode,
151 version,
152 write_older_cache_files,
153 };
154 info!("Adding node service(s)");
155 let added_services_names =
156 add_node(options, node_registry.clone(), &service_manager, verbosity).await?;
157
158 node_registry.save().await?;
159 debug!("Node registry saved");
160
161 Ok(added_services_names)
162}
163
164pub async fn balance(
165 peer_ids: Vec<String>,
166 node_registry: NodeRegistryManager,
167 service_names: Vec<String>,
168 verbosity: VerbosityLevel,
169) -> Result<()> {
170 if verbosity != VerbosityLevel::Minimal {
171 print_banner("Reward Balances");
172 }
173
174 refresh_node_registry(
175 node_registry.clone(),
176 &ServiceController {},
177 verbosity != VerbosityLevel::Minimal,
178 false,
179 verbosity,
180 )
181 .await?;
182
183 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
184 if services_for_ops.is_empty() {
185 info!("Services for ops is empty, cannot obtain the balance");
186 println!("No balances to display");
188 return Ok(());
189 }
190 debug!("Obtaining balances for {} services", services_for_ops.len());
191
192 for node in services_for_ops {
193 let node = node.read().await;
194 println!("{}: {}", node.service_name, 0,);
196 }
197 Ok(())
198}
199
200pub async fn remove(
201 keep_directories: bool,
202 peer_ids: Vec<String>,
203 node_registry: NodeRegistryManager,
204 service_names: Vec<String>,
205 verbosity: VerbosityLevel,
206) -> Result<()> {
207 if verbosity != VerbosityLevel::Minimal {
208 print_banner("Remove Antnode Services");
209 }
210 info!("Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}");
211
212 refresh_node_registry(
213 node_registry.clone(),
214 &ServiceController {},
215 verbosity != VerbosityLevel::Minimal,
216 false,
217 verbosity,
218 )
219 .await?;
220
221 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
222 if services_for_ops.is_empty() {
223 info!("Services for ops is empty, no services were eligible for removal");
224 if verbosity != VerbosityLevel::Minimal {
226 println!("No services were eligible for removal");
227 }
228 return Ok(());
229 }
230
231 let mut failed_services = Vec::new();
232 for node in &services_for_ops {
233 let service_name = node.read().await.service_name.clone();
234 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
235 let service = NodeService::new(node.clone(), Box::new(rpc_client));
236 let mut service_manager =
237 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
238 match service_manager.remove(keep_directories).await {
239 Ok(()) => {
240 debug!("Removed service {service_name}");
241 node_registry.save().await?;
242 }
243 Err(err) => {
244 error!("Failed to remove service {service_name}: {err}");
245 failed_services.push((service_name.clone(), err.to_string()))
246 }
247 }
248 }
249
250 summarise_any_failed_ops(failed_services, "remove", verbosity)
251}
252
253pub async fn reset(
254 force: bool,
255 node_registry: NodeRegistryManager,
256 verbosity: VerbosityLevel,
257) -> Result<()> {
258 if verbosity != VerbosityLevel::Minimal {
259 print_banner("Reset Antnode Services");
260 }
261 info!("Resetting all antnode services, with force={force}");
262
263 if !force {
264 println!("WARNING: all antnode services, data, and logs will be removed.");
265 println!("Do you wish to proceed? [y/n]");
266 std::io::stdout().flush()?;
267 let mut input = String::new();
268 std::io::stdin().read_line(&mut input)?;
269 if input.trim().to_lowercase() != "y" {
270 println!("Reset aborted");
271 return Ok(());
272 }
273 }
274
275 stop(None, node_registry.clone(), vec![], vec![], verbosity).await?;
276 remove(false, vec![], node_registry, vec![], verbosity).await?;
277
278 let node_registry_path = config::get_node_registry_path()?;
282 if node_registry_path.exists() {
283 info!("Removing node registry file: {node_registry_path:?}");
284 std::fs::remove_file(node_registry_path)?;
285 }
286
287 Ok(())
288}
289
290pub async fn start(
291 connection_timeout_s: u64,
292 fixed_interval: Option<u64>,
293 node_registry: NodeRegistryManager,
294 peer_ids: Vec<String>,
295 service_names: Vec<String>,
296 verbosity: VerbosityLevel,
297) -> Result<()> {
298 if verbosity != VerbosityLevel::Minimal {
299 print_banner("Start Antnode Services");
300 }
301 info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
302
303 refresh_node_registry(
304 node_registry.clone(),
305 &ServiceController {},
306 verbosity != VerbosityLevel::Minimal,
307 false,
308 verbosity,
309 )
310 .await?;
311
312 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
313 if services_for_ops.is_empty() {
314 info!("No services are eligible to be started");
315 if verbosity != VerbosityLevel::Minimal {
317 println!("No services were eligible to be started");
318 }
319 return Ok(());
320 }
321
322 let mut failed_services = Vec::new();
323 for node in &services_for_ops {
324 let service_name = node.read().await.service_name.clone();
325
326 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
327 let service = NodeService::new(node.clone(), Box::new(rpc_client));
328
329 let service = if fixed_interval.is_none() {
331 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
332 } else {
333 service
334 };
335
336 let mut service_manager =
337 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
338 if service_manager.service.status().await != ServiceStatus::Running {
339 if let Some(interval) = fixed_interval {
344 debug!("Sleeping for {} milliseconds", interval);
345 std::thread::sleep(std::time::Duration::from_millis(interval));
346 }
347 }
348 match service_manager.start().await {
349 Ok(start_duration) => {
350 debug!("Started service {service_name} in {start_duration:?}",);
351
352 node_registry.save().await?;
353 }
354 Err(err) => {
355 error!("Failed to start service {service_name}: {err}");
356 failed_services.push((service_name.clone(), err.to_string()))
357 }
358 }
359 }
360
361 summarise_any_failed_ops(failed_services, "start", verbosity)
362}
363
364pub async fn status(
365 details: bool,
366 fail: bool,
367 json: bool,
368 node_registry: NodeRegistryManager,
369) -> Result<()> {
370 if !node_registry.nodes.read().await.is_empty() {
371 if !json && !details {
372 print_banner("Antnode Services");
373 }
374 status_report(
375 &node_registry,
376 &ServiceController {},
377 details,
378 json,
379 fail,
380 false,
381 )
382 .await?;
383 node_registry.save().await?;
384 }
385 Ok(())
386}
387
388pub async fn stop(
389 interval: Option<u64>,
390 node_registry: NodeRegistryManager,
391 peer_ids: Vec<String>,
392 service_names: Vec<String>,
393 verbosity: VerbosityLevel,
394) -> Result<()> {
395 if verbosity != VerbosityLevel::Minimal {
396 print_banner("Stop Antnode Services");
397 }
398 info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
399
400 refresh_node_registry(
401 node_registry.clone(),
402 &ServiceController {},
403 verbosity != VerbosityLevel::Minimal,
404 false,
405 verbosity,
406 )
407 .await?;
408
409 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
410 if services_for_ops.is_empty() {
411 info!("No services are eligible to be stopped");
412 if verbosity != VerbosityLevel::Minimal {
414 println!("No services were eligible to be stopped");
415 }
416 return Ok(());
417 }
418
419 let mut failed_services = Vec::new();
420 for node in services_for_ops.iter() {
421 let service_name = node.read().await.service_name.clone();
422 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
423 let service = NodeService::new(node.clone(), Box::new(rpc_client));
424 let mut service_manager =
425 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
426
427 if service_manager.service.status().await == ServiceStatus::Running {
428 if let Some(interval) = interval {
429 debug!("Sleeping for {} milliseconds", interval);
430 std::thread::sleep(std::time::Duration::from_millis(interval));
431 }
432 }
433 match service_manager.stop().await {
434 Ok(()) => {
435 debug!("Stopped service {service_name}");
436 node_registry.save().await?;
437 }
438 Err(err) => {
439 error!("Failed to stop service {service_name}: {err}");
440 failed_services.push((service_name.clone(), err.to_string()))
441 }
442 }
443 }
444
445 summarise_any_failed_ops(failed_services, "stop", verbosity)
446}
447
448pub async fn upgrade(
449 connection_timeout_s: u64,
450 do_not_start: bool,
451 custom_bin_path: Option<PathBuf>,
452 force: bool,
453 fixed_interval: Option<u64>,
454 node_registry: NodeRegistryManager,
455 peer_ids: Vec<String>,
456 provided_env_variables: Option<Vec<(String, String)>>,
457 service_names: Vec<String>,
458 url: Option<String>,
459 version: Option<String>,
460 verbosity: VerbosityLevel,
461) -> Result<()> {
462 let use_force = force || custom_bin_path.is_some();
466
467 if verbosity != VerbosityLevel::Minimal {
468 print_banner("Upgrade Antnode Services");
469 }
470 info!(
471 "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
472 );
473
474 let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
475 custom_bin_path.clone(),
476 ReleaseType::AntNode,
477 url,
478 version,
479 verbosity,
480 )
481 .await?;
482
483 refresh_node_registry(
484 node_registry.clone(),
485 &ServiceController {},
486 verbosity != VerbosityLevel::Minimal,
487 false,
488 verbosity,
489 )
490 .await?;
491
492 if let Some(node) = node_registry.nodes.read().await.first() {
493 let node = node.read().await;
494 debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
495 } else {
496 debug!("There are no nodes currently added or active");
497 }
498
499 if !use_force {
500 let mut node_versions = Vec::new();
501
502 for node in node_registry.nodes.read().await.iter() {
503 let node = node.read().await;
504 let version = Version::parse(&node.version)
505 .map_err(|_| eyre!("Failed to parse Version for node {}", node.service_name))?;
506 node_versions.push(version);
507 }
508
509 let any_nodes_need_upgraded = node_versions
510 .iter()
511 .any(|current_version| current_version < &target_version);
512 if !any_nodes_need_upgraded {
513 info!("All nodes are at the latest version, no upgrade required.");
514 if verbosity != VerbosityLevel::Minimal {
515 println!("{} All nodes are at the latest version", "✓".green());
516 }
517 return Ok(());
518 }
519 }
520
521 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
522 trace!("services_for_ops len: {}", services_for_ops.len());
523 let mut upgrade_summary = Vec::new();
524
525 for node in &services_for_ops {
526 let env_variables = if provided_env_variables.is_some() {
527 provided_env_variables.clone()
528 } else {
529 node_registry.environment_variables.read().await.clone()
530 };
531 let options = UpgradeOptions {
532 auto_restart: false,
533 env_variables: env_variables.clone(),
534 force: use_force,
535 start_service: !do_not_start,
536 target_bin_path: upgrade_bin_path.clone(),
537 target_version: target_version.clone(),
538 };
539 let service_name = node.read().await.service_name.clone();
540
541 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
542 let service = NodeService::new(node.clone(), Box::new(rpc_client));
543 let service = if fixed_interval.is_none() {
545 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
546 } else {
547 service
548 };
549
550 let mut service_manager =
551 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
552
553 match service_manager.upgrade(options).await {
554 Ok(upgrade_result) => {
555 info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
556 if upgrade_result != UpgradeResult::NotRequired {
557 if let Some(interval) = fixed_interval {
560 debug!("Sleeping for {interval} milliseconds",);
561 std::thread::sleep(std::time::Duration::from_millis(interval));
562 }
563 }
564 upgrade_summary.push((service_name.clone(), upgrade_result));
565 node_registry.save().await?;
566 }
567 Err(err) => {
568 error!("Error upgrading service {service_name}: {err}");
569 upgrade_summary.push((
570 service_name.clone(),
571 UpgradeResult::Error(format!("Error: {err}")),
572 ));
573 node_registry.save().await?;
574 }
575 }
576 }
577
578 if verbosity != VerbosityLevel::Minimal {
579 print_upgrade_summary(upgrade_summary.clone());
580 }
581
582 if upgrade_summary.iter().any(|(_, r)| {
583 matches!(r, UpgradeResult::Error(_))
584 || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
585 }) {
586 return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
587 "For any services that were upgraded but did not start, you can attempt to start them \
588 again using the 'start' command."));
589 }
590
591 Ok(())
592}
593
594pub async fn maintain_n_running_nodes(
598 alpha: bool,
599 auto_restart: bool,
600 auto_set_nat_flags: bool,
601 connection_timeout_s: u64,
602 max_nodes_to_run: u16,
603 data_dir_path: Option<PathBuf>,
604 enable_metrics_server: bool,
605 env_variables: Option<Vec<(String, String)>>,
606 evm_network: Option<EvmNetwork>,
607 log_dir_path: Option<PathBuf>,
608 log_format: Option<LogFormat>,
609 max_archived_log_files: Option<usize>,
610 max_log_files: Option<usize>,
611 metrics_port: Option<PortRange>,
612 network_id: Option<u8>,
613 node_ip: Option<Ipv4Addr>,
614 node_port: Option<PortRange>,
615 node_registry: NodeRegistryManager,
616 peers_args: InitialPeersConfig,
617 relay: bool,
618 rewards_address: RewardsAddress,
619 rpc_address: Option<Ipv4Addr>,
620 rpc_port: Option<PortRange>,
621 src_path: Option<PathBuf>,
622 url: Option<String>,
623 no_upnp: bool,
624 user: Option<String>,
625 version: Option<String>,
626 verbosity: VerbosityLevel,
627 start_node_interval: Option<u64>,
628 write_older_cache_files: bool,
629) -> Result<()> {
630 let mut running_nodes = Vec::new();
631
632 for node in node_registry.nodes.read().await.iter() {
633 let node = node.read().await;
634 if node.status == ServiceStatus::Running {
635 running_nodes.push(node.service_name.clone());
636 }
637 }
638
639 let running_count = running_nodes.len();
640 let target_count = max_nodes_to_run as usize;
641
642 info!(
643 "Current running nodes: {}, Target: {}",
644 running_count, target_count
645 );
646
647 match running_count.cmp(&target_count) {
648 Ordering::Greater => {
649 let to_stop_count = running_count - target_count;
650 let services_to_stop = running_nodes
651 .into_iter()
652 .rev() .take(to_stop_count)
654 .collect::<Vec<_>>();
655
656 info!(
657 "Stopping {} excess nodes: {:?}",
658 to_stop_count, services_to_stop
659 );
660 stop(
661 None,
662 node_registry.clone(),
663 vec![],
664 services_to_stop,
665 verbosity,
666 )
667 .await?;
668 }
669 Ordering::Less => {
670 let to_start_count = target_count - running_count;
671 let mut inactive_nodes = Vec::new();
672 for node in node_registry.nodes.read().await.iter() {
673 let node = node.read().await;
674 if node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added {
675 inactive_nodes.push(node.service_name.clone());
676 }
677 }
678
679 info!("Inactive nodes available: {}", inactive_nodes.len());
680
681 if to_start_count <= inactive_nodes.len() {
682 let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
683 info!(
684 "Starting {} existing inactive nodes: {:?}",
685 to_start_count, nodes_to_start
686 );
687 start(
688 connection_timeout_s,
689 start_node_interval,
690 node_registry.clone(),
691 vec![],
692 nodes_to_start,
693 verbosity,
694 )
695 .await?;
696 } else {
697 let to_add_count = to_start_count - inactive_nodes.len();
698 info!(
699 "Adding {} new nodes and starting all {} inactive nodes",
700 to_add_count,
701 inactive_nodes.len()
702 );
703
704 let ports_to_use = match node_port {
705 Some(PortRange::Single(port)) => vec![port],
706 Some(PortRange::Range(start, end)) => {
707 (start..=end).take(to_add_count).collect()
708 }
709 None => vec![],
710 };
711
712 for (i, port) in ports_to_use.into_iter().enumerate() {
713 let added_service = add(
714 alpha,
715 auto_restart,
716 auto_set_nat_flags,
717 Some(1),
718 data_dir_path.clone(),
719 enable_metrics_server,
720 env_variables.clone(),
721 evm_network.clone(),
722 log_dir_path.clone(),
723 log_format,
724 max_archived_log_files,
725 max_log_files,
726 metrics_port.clone(),
727 network_id,
728 node_ip,
729 Some(PortRange::Single(port)),
730 node_registry.clone(),
731 peers_args.clone(),
732 relay,
733 rewards_address,
734 rpc_address,
735 rpc_port.clone(),
736 src_path.clone(),
737 no_upnp,
738 url.clone(),
739 user.clone(),
740 version.clone(),
741 verbosity,
742 write_older_cache_files,
743 )
744 .await?;
745
746 if i == 0 {
747 start(
748 connection_timeout_s,
749 start_node_interval,
750 node_registry.clone(),
751 vec![],
752 added_service,
753 verbosity,
754 )
755 .await?;
756 }
757 }
758
759 if !inactive_nodes.is_empty() {
760 start(
761 connection_timeout_s,
762 start_node_interval,
763 node_registry.clone(),
764 vec![],
765 inactive_nodes,
766 verbosity,
767 )
768 .await?;
769 }
770 }
771 }
772 Ordering::Equal => {
773 info!(
774 "Current node count ({}) matches target ({}). No action needed.",
775 running_count, target_count
776 );
777 }
778 }
779
780 let mut final_running_count = 0;
782 for node in node_registry.nodes.read().await.iter() {
783 let node_read = node.read().await;
784 if node_read.status == ServiceStatus::Running {
785 final_running_count += 1;
786 }
787 }
788
789 if final_running_count != target_count {
790 warn!("Failed to reach target node count. Expected {target_count}, but got {final_running_count}");
791 }
792
793 Ok(())
794}
795
796async fn get_services_for_ops(
797 node_registry: &NodeRegistryManager,
798 peer_ids: Vec<String>,
799 service_names: Vec<String>,
800) -> Result<Vec<Arc<RwLock<NodeServiceData>>>> {
801 let mut services = Vec::new();
802
803 if service_names.is_empty() && peer_ids.is_empty() {
804 for node in node_registry.nodes.read().await.iter() {
805 if node.read().await.status != ServiceStatus::Removed {
806 services.push(node.clone());
807 }
808 }
809 } else {
810 for name in &service_names {
811 let mut found_service_with_name = false;
812 for node in node_registry.nodes.read().await.iter() {
813 let node_read = node.read().await;
814 if node_read.service_name == *name && node_read.status != ServiceStatus::Removed {
815 {
816 services.push(node.clone());
817 found_service_with_name = true;
818 break;
819 }
820 }
821 }
822
823 if !found_service_with_name {
824 error!("No service named '{name}'");
825 return Err(eyre!(format!("No service named '{name}'")));
826 }
827 }
828
829 for peer_id_str in &peer_ids {
830 let mut found_service_with_peer_id = false;
831 let given_peer_id = PeerId::from_str(peer_id_str)
832 .map_err(|_| eyre!(format!("Error parsing PeerId: '{peer_id_str}'")))?;
833 for node in node_registry.nodes.read().await.iter() {
834 let node_read = node.read().await;
835 if let Some(peer_id) = node_read.peer_id {
836 if peer_id == given_peer_id && node_read.status != ServiceStatus::Removed {
837 services.push(node.clone());
838 found_service_with_peer_id = true;
839 break;
840 }
841 }
842 }
843 if !found_service_with_peer_id {
844 error!("Could not find node with peer id: '{given_peer_id:?}'");
845 return Err(eyre!(format!(
846 "Could not find node with peer ID '{given_peer_id}'",
847 )));
848 }
849 }
850 }
851
852 Ok(services)
853}
854
855fn summarise_any_failed_ops(
856 failed_services: Vec<(String, String)>,
857 verb: &str,
858 verbosity: VerbosityLevel,
859) -> Result<()> {
860 if !failed_services.is_empty() {
861 if verbosity != VerbosityLevel::Minimal {
862 println!("Failed to {verb} {} service(s):", failed_services.len());
863 for failed in failed_services.iter() {
864 println!("{} {}: {}", "✕".red(), failed.0, failed.1);
865 }
866 }
867
868 error!("Failed to {verb} one or more services");
869 return Err(eyre!("Failed to {verb} one or more services"));
870 }
871 Ok(())
872}