1#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13 ServiceManager, VerbosityLevel,
14 add_services::{
15 add_node,
16 config::{AddNodeServiceOptions, PortRange},
17 },
18 config::{self, is_running_as_root},
19 helpers::{download_and_extract_release, get_bin_version},
20 print_banner, refresh_node_registry, status_report,
21};
22use ant_bootstrap::InitialPeersConfig;
23use ant_evm::{EvmNetwork, RewardsAddress};
24use ant_logging::LogFormat;
25use ant_releases::{AntReleaseRepoActions, ReleaseType};
26use ant_service_management::{
27 NodeRegistryManager, NodeService, NodeServiceData, ServiceStateActions, ServiceStatus,
28 UpgradeOptions, UpgradeResult,
29 control::{ServiceControl, ServiceController},
30 rpc::RpcClient,
31};
32use color_eyre::{Help, Result, eyre::eyre};
33use colored::Colorize;
34use libp2p_identity::PeerId;
35use semver::Version;
36use std::{
37 cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, sync::Arc, time::Duration,
38};
39use tokio::sync::RwLock;
40use tracing::debug;
41
42pub async fn add(
44 alpha: bool,
45 auto_restart: bool,
46 auto_set_nat_flags: bool,
47 count: Option<u16>,
48 data_dir_path: Option<PathBuf>,
49 enable_metrics_server: bool,
50 env_variables: Option<Vec<(String, String)>>,
51 evm_network: Option<EvmNetwork>,
52 log_dir_path: Option<PathBuf>,
53 log_format: Option<LogFormat>,
54 max_archived_log_files: Option<usize>,
55 max_log_files: Option<usize>,
56 metrics_port: Option<PortRange>,
57 network_id: Option<u8>,
58 node_ip: Option<Ipv4Addr>,
59 node_port: Option<PortRange>,
60 node_registry: NodeRegistryManager,
61 mut init_peers_config: InitialPeersConfig,
62 relay: bool,
63 rewards_address: RewardsAddress,
64 rpc_address: Option<Ipv4Addr>,
65 rpc_port: Option<PortRange>,
66 src_path: Option<PathBuf>,
67 no_upnp: bool,
68 url: Option<String>,
69 user: Option<String>,
70 version: Option<String>,
71 verbosity: VerbosityLevel,
72 write_older_cache_files: bool,
73) -> Result<Vec<String>> {
74 let user_mode = !is_running_as_root();
75
76 if verbosity != VerbosityLevel::Minimal {
77 print_banner("Add Antnode Services");
78 println!("{} service(s) to be added", count.unwrap_or(1));
79 }
80
81 let service_manager = ServiceController {};
82 let service_user = if user_mode {
83 None
84 } else {
85 let service_user = user.unwrap_or_else(|| "ant".to_string());
86 service_manager.create_service_user(&service_user)?;
87 Some(service_user)
88 };
89
90 let service_data_dir_path =
91 config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
92 let service_log_dir_path =
93 config::get_service_log_dir_path(ReleaseType::AntNode, log_dir_path, service_user.clone())?;
94 let bootstrap_cache_dir = if let Some(user) = &service_user {
95 Some(config::get_bootstrap_cache_owner_path(user)?)
96 } else {
97 None
98 };
99
100 let release_repo = <dyn AntReleaseRepoActions>::default_config();
101
102 let (antnode_src_path, version) = if let Some(path) = src_path.clone() {
103 let version = get_bin_version(&path)?;
104 (path, version)
105 } else {
106 download_and_extract_release(
107 ReleaseType::AntNode,
108 url.clone(),
109 version,
110 &*release_repo,
111 verbosity,
112 None,
113 )
114 .await?
115 };
116
117 debug!("Parsing peers from PeersArgs");
118
119 init_peers_config
120 .addrs
121 .extend(InitialPeersConfig::read_bootstrap_addr_from_env());
122 init_peers_config.bootstrap_cache_dir = bootstrap_cache_dir;
123
124 let options = AddNodeServiceOptions {
125 alpha,
126 auto_restart,
127 auto_set_nat_flags,
128 count,
129 delete_antnode_src: src_path.is_none(),
130 enable_metrics_server,
131 evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
132 env_variables,
133 relay,
134 log_format,
135 max_archived_log_files,
136 max_log_files,
137 metrics_port,
138 network_id,
139 node_ip,
140 node_port,
141 init_peers_config,
142 rewards_address,
143 rpc_address,
144 rpc_port,
145 antnode_src_path,
146 antnode_dir_path: service_data_dir_path.clone(),
147 service_data_dir_path,
148 service_log_dir_path,
149 no_upnp,
150 user: service_user,
151 user_mode,
152 version,
153 write_older_cache_files,
154 };
155 info!("Adding node service(s)");
156 let added_services_names =
157 add_node(options, node_registry.clone(), &service_manager, verbosity).await?;
158
159 node_registry.save().await?;
160 debug!("Node registry saved");
161
162 Ok(added_services_names)
163}
164
165pub async fn balance(
166 peer_ids: Vec<String>,
167 node_registry: NodeRegistryManager,
168 service_names: Vec<String>,
169 verbosity: VerbosityLevel,
170) -> Result<()> {
171 if verbosity != VerbosityLevel::Minimal {
172 print_banner("Reward Balances");
173 }
174
175 refresh_node_registry(
176 node_registry.clone(),
177 &ServiceController {},
178 verbosity != VerbosityLevel::Minimal,
179 false,
180 verbosity,
181 )
182 .await?;
183
184 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
185 if services_for_ops.is_empty() {
186 info!("Services for ops is empty, cannot obtain the balance");
187 println!("No balances to display");
189 return Ok(());
190 }
191 debug!("Obtaining balances for {} services", services_for_ops.len());
192
193 for node in services_for_ops {
194 let node = node.read().await;
195 println!("{}: {}", node.service_name, 0,);
197 }
198 Ok(())
199}
200
201pub async fn remove(
202 keep_directories: bool,
203 peer_ids: Vec<String>,
204 node_registry: NodeRegistryManager,
205 service_names: Vec<String>,
206 verbosity: VerbosityLevel,
207) -> Result<()> {
208 if verbosity != VerbosityLevel::Minimal {
209 print_banner("Remove Antnode Services");
210 }
211 info!(
212 "Removing antnode services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}"
213 );
214
215 refresh_node_registry(
216 node_registry.clone(),
217 &ServiceController {},
218 verbosity != VerbosityLevel::Minimal,
219 false,
220 verbosity,
221 )
222 .await?;
223
224 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
225 if services_for_ops.is_empty() {
226 info!("Services for ops is empty, no services were eligible for removal");
227 if verbosity != VerbosityLevel::Minimal {
229 println!("No services were eligible for removal");
230 }
231 return Ok(());
232 }
233
234 let mut failed_services = Vec::new();
235 for node in &services_for_ops {
236 let service_name = node.read().await.service_name.clone();
237 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
238 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
239 let mut service_manager =
240 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
241 match service_manager.remove(keep_directories).await {
242 Ok(()) => {
243 debug!("Removed service {service_name}");
244 node_registry.save().await?;
245 }
246 Err(err) => {
247 error!("Failed to remove service {service_name}: {err}");
248 failed_services.push((service_name.clone(), err.to_string()))
249 }
250 }
251 }
252
253 summarise_any_failed_ops(failed_services, "remove", verbosity)
254}
255
256pub async fn reset(
257 force: bool,
258 node_registry: NodeRegistryManager,
259 verbosity: VerbosityLevel,
260) -> Result<()> {
261 if verbosity != VerbosityLevel::Minimal {
262 print_banner("Reset Antnode Services");
263 }
264 info!("Resetting all antnode services, with force={force}");
265
266 if !force {
267 println!("WARNING: all antnode services, data, and logs will be removed.");
268 println!("Do you wish to proceed? [y/n]");
269 std::io::stdout().flush()?;
270 let mut input = String::new();
271 std::io::stdin().read_line(&mut input)?;
272 if input.trim().to_lowercase() != "y" {
273 println!("Reset aborted");
274 return Ok(());
275 }
276 }
277
278 stop(None, node_registry.clone(), vec![], vec![], verbosity).await?;
279 remove(false, vec![], node_registry, vec![], verbosity).await?;
280
281 let node_registry_path = config::get_node_registry_path()?;
285 if node_registry_path.exists() {
286 info!("Removing node registry file: {node_registry_path:?}");
287 std::fs::remove_file(node_registry_path)?;
288 }
289
290 Ok(())
291}
292
293pub async fn start(
294 connection_timeout_s: u64,
295 fixed_interval: Option<u64>,
296 node_registry: NodeRegistryManager,
297 peer_ids: Vec<String>,
298 service_names: Vec<String>,
299 verbosity: VerbosityLevel,
300) -> Result<()> {
301 if verbosity != VerbosityLevel::Minimal {
302 print_banner("Start Antnode Services");
303 }
304 info!("Starting antnode services for: {peer_ids:?}, {service_names:?}");
305
306 refresh_node_registry(
307 node_registry.clone(),
308 &ServiceController {},
309 verbosity != VerbosityLevel::Minimal,
310 false,
311 verbosity,
312 )
313 .await?;
314
315 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
316 if services_for_ops.is_empty() {
317 info!("No services are eligible to be started");
318 if verbosity != VerbosityLevel::Minimal {
320 println!("No services were eligible to be started");
321 }
322 return Ok(());
323 }
324
325 let mut failed_services = Vec::new();
326 for node in &services_for_ops {
327 let service_name = node.read().await.service_name.clone();
328
329 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
330 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
331
332 let service = if fixed_interval.is_none() {
334 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
335 } else {
336 service
337 };
338
339 let mut service_manager =
340 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
341 if service_manager.service.status().await != ServiceStatus::Running {
342 if let Some(interval) = fixed_interval {
347 debug!("Sleeping for {} milliseconds", interval);
348 std::thread::sleep(std::time::Duration::from_millis(interval));
349 }
350 }
351 match service_manager.start().await {
352 Ok(start_duration) => {
353 debug!("Started service {service_name} in {start_duration:?}",);
354
355 node_registry.save().await?;
356 }
357 Err(err) => {
358 error!("Failed to start service {service_name}: {err}");
359 failed_services.push((service_name.clone(), err.to_string()))
360 }
361 }
362 }
363
364 summarise_any_failed_ops(failed_services, "start", verbosity)
365}
366
367pub async fn status(
368 details: bool,
369 fail: bool,
370 json: bool,
371 node_registry: NodeRegistryManager,
372) -> Result<()> {
373 if !node_registry.nodes.read().await.is_empty() {
374 if !json && !details {
375 print_banner("Antnode Services");
376 }
377 status_report(
378 &node_registry,
379 &ServiceController {},
380 details,
381 json,
382 fail,
383 false,
384 )
385 .await?;
386 node_registry.save().await?;
387 }
388 Ok(())
389}
390
391pub async fn stop(
392 interval: Option<u64>,
393 node_registry: NodeRegistryManager,
394 peer_ids: Vec<String>,
395 service_names: Vec<String>,
396 verbosity: VerbosityLevel,
397) -> Result<()> {
398 if verbosity != VerbosityLevel::Minimal {
399 print_banner("Stop Antnode Services");
400 }
401 info!("Stopping antnode services for: {peer_ids:?}, {service_names:?}");
402
403 refresh_node_registry(
404 node_registry.clone(),
405 &ServiceController {},
406 verbosity != VerbosityLevel::Minimal,
407 false,
408 verbosity,
409 )
410 .await?;
411
412 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
413 if services_for_ops.is_empty() {
414 info!("No services are eligible to be stopped");
415 if verbosity != VerbosityLevel::Minimal {
417 println!("No services were eligible to be stopped");
418 }
419 return Ok(());
420 }
421
422 let mut failed_services = Vec::new();
423 for node in services_for_ops.iter() {
424 let service_name = node.read().await.service_name.clone();
425 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
426 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
427 let mut service_manager =
428 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
429
430 if service_manager.service.status().await == ServiceStatus::Running
431 && let Some(interval) = interval
432 {
433 debug!("Sleeping for {} milliseconds", interval);
434 std::thread::sleep(std::time::Duration::from_millis(interval));
435 }
436 match service_manager.stop().await {
437 Ok(()) => {
438 debug!("Stopped service {service_name}");
439 node_registry.save().await?;
440 }
441 Err(err) => {
442 error!("Failed to stop service {service_name}: {err}");
443 failed_services.push((service_name.clone(), err.to_string()))
444 }
445 }
446 }
447
448 summarise_any_failed_ops(failed_services, "stop", verbosity)
449}
450
451pub async fn upgrade(
452 connection_timeout_s: u64,
453 do_not_start: bool,
454 custom_bin_path: Option<PathBuf>,
455 force: bool,
456 fixed_interval: Option<u64>,
457 node_registry: NodeRegistryManager,
458 peer_ids: Vec<String>,
459 provided_env_variables: Option<Vec<(String, String)>>,
460 service_names: Vec<String>,
461 url: Option<String>,
462 version: Option<String>,
463 verbosity: VerbosityLevel,
464) -> Result<()> {
465 let use_force = force || custom_bin_path.is_some();
469
470 if verbosity != VerbosityLevel::Minimal {
471 print_banner("Upgrade Antnode Services");
472 }
473 info!(
474 "Upgrading antnode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
475 );
476
477 let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
478 custom_bin_path.clone(),
479 ReleaseType::AntNode,
480 url,
481 version,
482 verbosity,
483 )
484 .await?;
485
486 refresh_node_registry(
487 node_registry.clone(),
488 &ServiceController {},
489 verbosity != VerbosityLevel::Minimal,
490 false,
491 verbosity,
492 )
493 .await?;
494
495 if let Some(node) = node_registry.nodes.read().await.first() {
496 let node = node.read().await;
497 debug!("listen addresses for nodes[0]: {:?}", node.listen_addr);
498 } else {
499 debug!("There are no nodes currently added or active");
500 }
501
502 if !use_force {
503 let mut node_versions = Vec::new();
504
505 for node in node_registry.nodes.read().await.iter() {
506 let node = node.read().await;
507 let version = Version::parse(&node.version)
508 .map_err(|_| eyre!("Failed to parse Version for node {}", node.service_name))?;
509 node_versions.push(version);
510 }
511
512 let any_nodes_need_upgraded = node_versions
513 .iter()
514 .any(|current_version| current_version < &target_version);
515 if !any_nodes_need_upgraded {
516 info!("All nodes are at the latest version, no upgrade required.");
517 if verbosity != VerbosityLevel::Minimal {
518 println!("{} All nodes are at the latest version", "✓".green());
519 }
520 return Ok(());
521 }
522 }
523
524 let services_for_ops = get_services_for_ops(&node_registry, peer_ids, service_names).await?;
525 trace!("services_for_ops len: {}", services_for_ops.len());
526 let mut upgrade_summary = Vec::new();
527
528 for node in &services_for_ops {
529 let env_variables = if provided_env_variables.is_some() {
530 provided_env_variables.clone()
531 } else {
532 node_registry.environment_variables.read().await.clone()
533 };
534 let options = UpgradeOptions {
535 auto_restart: false,
536 env_variables: env_variables.clone(),
537 force: use_force,
538 start_service: !do_not_start,
539 target_bin_path: upgrade_bin_path.clone(),
540 target_version: target_version.clone(),
541 };
542 let service_name = node.read().await.service_name.clone();
543
544 let rpc_client = RpcClient::from_socket_addr(node.read().await.rpc_socket_addr);
545 let service = NodeService::new(Arc::clone(node), Box::new(rpc_client));
546 let service = if fixed_interval.is_none() {
548 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
549 } else {
550 service
551 };
552
553 let mut service_manager =
554 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
555
556 match service_manager.upgrade(options).await {
557 Ok(upgrade_result) => {
558 info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
559 if upgrade_result != UpgradeResult::NotRequired {
560 if let Some(interval) = fixed_interval {
563 debug!("Sleeping for {interval} milliseconds",);
564 std::thread::sleep(std::time::Duration::from_millis(interval));
565 }
566 }
567 upgrade_summary.push((service_name.clone(), upgrade_result));
568 node_registry.save().await?;
569 }
570 Err(err) => {
571 error!("Error upgrading service {service_name}: {err}");
572 upgrade_summary.push((
573 service_name.clone(),
574 UpgradeResult::Error(format!("Error: {err}")),
575 ));
576 node_registry.save().await?;
577 }
578 }
579 }
580
581 if verbosity != VerbosityLevel::Minimal {
582 print_upgrade_summary(upgrade_summary.clone());
583 }
584
585 if upgrade_summary.iter().any(|(_, r)| {
586 matches!(r, UpgradeResult::Error(_))
587 || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
588 }) {
589 return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
590 "For any services that were upgraded but did not start, you can attempt to start them \
591 again using the 'start' command."));
592 }
593
594 Ok(())
595}
596
597pub async fn maintain_n_running_nodes(
601 alpha: bool,
602 auto_restart: bool,
603 auto_set_nat_flags: bool,
604 connection_timeout_s: u64,
605 max_nodes_to_run: u16,
606 data_dir_path: Option<PathBuf>,
607 enable_metrics_server: bool,
608 env_variables: Option<Vec<(String, String)>>,
609 evm_network: Option<EvmNetwork>,
610 log_dir_path: Option<PathBuf>,
611 log_format: Option<LogFormat>,
612 max_archived_log_files: Option<usize>,
613 max_log_files: Option<usize>,
614 metrics_port: Option<PortRange>,
615 network_id: Option<u8>,
616 node_ip: Option<Ipv4Addr>,
617 node_port: Option<PortRange>,
618 node_registry: NodeRegistryManager,
619 peers_args: InitialPeersConfig,
620 relay: bool,
621 rewards_address: RewardsAddress,
622 rpc_address: Option<Ipv4Addr>,
623 rpc_port: Option<PortRange>,
624 src_path: Option<PathBuf>,
625 url: Option<String>,
626 no_upnp: bool,
627 user: Option<String>,
628 version: Option<String>,
629 verbosity: VerbosityLevel,
630 start_node_interval: Option<u64>,
631 write_older_cache_files: bool,
632) -> Result<()> {
633 let mut running_nodes = Vec::new();
634
635 for node in node_registry.nodes.read().await.iter() {
636 let node = node.read().await;
637 if node.status == ServiceStatus::Running {
638 running_nodes.push(node.service_name.clone());
639 }
640 }
641
642 let running_count = running_nodes.len();
643 let target_count = max_nodes_to_run as usize;
644
645 info!(
646 "Current running nodes: {}, Target: {}",
647 running_count, target_count
648 );
649
650 match running_count.cmp(&target_count) {
651 Ordering::Greater => {
652 let to_stop_count = running_count - target_count;
653 let services_to_stop = running_nodes
654 .into_iter()
655 .rev() .take(to_stop_count)
657 .collect::<Vec<_>>();
658
659 info!(
660 "Stopping {} excess nodes: {:?}",
661 to_stop_count, services_to_stop
662 );
663 stop(
664 None,
665 node_registry.clone(),
666 vec![],
667 services_to_stop,
668 verbosity,
669 )
670 .await?;
671 }
672 Ordering::Less => {
673 let to_start_count = target_count - running_count;
674 let mut inactive_nodes = Vec::new();
675 for node in node_registry.nodes.read().await.iter() {
676 let node = node.read().await;
677 if node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added {
678 inactive_nodes.push(node.service_name.clone());
679 }
680 }
681
682 info!("Inactive nodes available: {}", inactive_nodes.len());
683
684 if to_start_count <= inactive_nodes.len() {
685 let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
686 info!(
687 "Starting {} existing inactive nodes: {:?}",
688 to_start_count, nodes_to_start
689 );
690 start(
691 connection_timeout_s,
692 start_node_interval,
693 node_registry.clone(),
694 vec![],
695 nodes_to_start,
696 verbosity,
697 )
698 .await?;
699 } else {
700 let to_add_count = to_start_count - inactive_nodes.len();
701 info!(
702 "Adding {} new nodes and starting all {} inactive nodes",
703 to_add_count,
704 inactive_nodes.len()
705 );
706
707 let ports_to_use = match node_port {
708 Some(PortRange::Single(port)) => vec![port],
709 Some(PortRange::Range(start, end)) => {
710 (start..=end).take(to_add_count).collect()
711 }
712 None => vec![],
713 };
714
715 for (i, port) in ports_to_use.into_iter().enumerate() {
716 let added_service = add(
717 alpha,
718 auto_restart,
719 auto_set_nat_flags,
720 Some(1),
721 data_dir_path.clone(),
722 enable_metrics_server,
723 env_variables.clone(),
724 evm_network.clone(),
725 log_dir_path.clone(),
726 log_format,
727 max_archived_log_files,
728 max_log_files,
729 metrics_port.clone(),
730 network_id,
731 node_ip,
732 Some(PortRange::Single(port)),
733 node_registry.clone(),
734 peers_args.clone(),
735 relay,
736 rewards_address,
737 rpc_address,
738 rpc_port.clone(),
739 src_path.clone(),
740 no_upnp,
741 url.clone(),
742 user.clone(),
743 version.clone(),
744 verbosity,
745 write_older_cache_files,
746 )
747 .await?;
748
749 if i == 0 {
750 start(
751 connection_timeout_s,
752 start_node_interval,
753 node_registry.clone(),
754 vec![],
755 added_service,
756 verbosity,
757 )
758 .await?;
759 }
760 }
761
762 if !inactive_nodes.is_empty() {
763 start(
764 connection_timeout_s,
765 start_node_interval,
766 node_registry.clone(),
767 vec![],
768 inactive_nodes,
769 verbosity,
770 )
771 .await?;
772 }
773 }
774 }
775 Ordering::Equal => {
776 info!(
777 "Current node count ({}) matches target ({}). No action needed.",
778 running_count, target_count
779 );
780 }
781 }
782
783 let mut final_running_count = 0;
785 for node in node_registry.nodes.read().await.iter() {
786 let node_read = node.read().await;
787 if node_read.status == ServiceStatus::Running {
788 final_running_count += 1;
789 }
790 }
791
792 if final_running_count != target_count {
793 warn!(
794 "Failed to reach target node count. Expected {target_count}, but got {final_running_count}"
795 );
796 }
797
798 Ok(())
799}
800
801async fn get_services_for_ops(
802 node_registry: &NodeRegistryManager,
803 peer_ids: Vec<String>,
804 service_names: Vec<String>,
805) -> Result<Vec<Arc<RwLock<NodeServiceData>>>> {
806 let mut services = Vec::new();
807
808 if service_names.is_empty() && peer_ids.is_empty() {
809 for node in node_registry.nodes.read().await.iter() {
810 if node.read().await.status != ServiceStatus::Removed {
811 services.push(Arc::clone(node));
812 }
813 }
814 } else {
815 for name in &service_names {
816 let mut found_service_with_name = false;
817 for node in node_registry.nodes.read().await.iter() {
818 let node_read = node.read().await;
819 if node_read.service_name == *name && node_read.status != ServiceStatus::Removed {
820 {
821 services.push(Arc::clone(node));
822 found_service_with_name = true;
823 break;
824 }
825 }
826 }
827
828 if !found_service_with_name {
829 error!("No service named '{name}'");
830 return Err(eyre!(format!("No service named '{name}'")));
831 }
832 }
833
834 for peer_id_str in &peer_ids {
835 let mut found_service_with_peer_id = false;
836 let given_peer_id = PeerId::from_str(peer_id_str)
837 .map_err(|_| eyre!(format!("Error parsing PeerId: '{peer_id_str}'")))?;
838 for node in node_registry.nodes.read().await.iter() {
839 let node_read = node.read().await;
840 if let Some(peer_id) = node_read.peer_id
841 && peer_id == given_peer_id
842 && node_read.status != ServiceStatus::Removed
843 {
844 services.push(Arc::clone(node));
845 found_service_with_peer_id = true;
846 break;
847 }
848 }
849 if !found_service_with_peer_id {
850 error!("Could not find node with peer id: '{given_peer_id:?}'");
851 return Err(eyre!(format!(
852 "Could not find node with peer ID '{given_peer_id}'",
853 )));
854 }
855 }
856 }
857
858 Ok(services)
859}
860
861fn summarise_any_failed_ops(
862 failed_services: Vec<(String, String)>,
863 verb: &str,
864 verbosity: VerbosityLevel,
865) -> Result<()> {
866 if !failed_services.is_empty() {
867 if verbosity != VerbosityLevel::Minimal {
868 println!("Failed to {verb} {} service(s):", failed_services.len());
869 for failed in failed_services.iter() {
870 println!("{} {}: {}", "✕".red(), failed.0, failed.1);
871 }
872 }
873
874 error!("Failed to {verb} one or more services");
875 return Err(eyre!("Failed to {verb} one or more services"));
876 }
877 Ok(())
878}