1#![allow(clippy::too_many_arguments)]
10
11use super::{download_and_get_upgrade_bin_path, print_upgrade_summary};
12use crate::{
13 add_services::{
14 add_node,
15 config::{AddNodeServiceOptions, PortRange},
16 },
17 config::{self, is_running_as_root},
18 helpers::{download_and_extract_release, get_bin_version},
19 print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel,
20};
21use color_eyre::{eyre::eyre, Help, Result};
22use colored::Colorize;
23use libp2p_identity::PeerId;
24use semver::Version;
25use sn_evm::{EvmNetwork, RewardsAddress};
26use sn_logging::LogFormat;
27use sn_peers_acquisition::PeersArgs;
28use sn_releases::{ReleaseType, SafeReleaseRepoActions};
29use sn_service_management::{
30 control::{ServiceControl, ServiceController},
31 rpc::RpcClient,
32 NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult,
33};
34use sn_transfers::HotWallet;
35use std::{cmp::Ordering, io::Write, net::Ipv4Addr, path::PathBuf, str::FromStr, time::Duration};
36use tracing::debug;
37
38pub async fn add(
40 auto_restart: bool,
41 auto_set_nat_flags: bool,
42 count: Option<u16>,
43 data_dir_path: Option<PathBuf>,
44 enable_metrics_server: bool,
45 env_variables: Option<Vec<(String, String)>>,
46 evm_network: Option<EvmNetwork>,
47 home_network: bool,
48 local: bool,
49 log_dir_path: Option<PathBuf>,
50 log_format: Option<LogFormat>,
51 max_archived_log_files: Option<usize>,
52 max_log_files: Option<usize>,
53 metrics_port: Option<PortRange>,
54 node_ip: Option<Ipv4Addr>,
55 node_port: Option<PortRange>,
56 owner: Option<String>,
57 peers_args: PeersArgs,
58 rewards_address: RewardsAddress,
59 rpc_address: Option<Ipv4Addr>,
60 rpc_port: Option<PortRange>,
61 src_path: Option<PathBuf>,
62 upnp: bool,
63 url: Option<String>,
64 user: Option<String>,
65 version: Option<String>,
66 verbosity: VerbosityLevel,
67) -> Result<Vec<String>> {
68 let user_mode = !is_running_as_root();
69
70 if verbosity != VerbosityLevel::Minimal {
71 print_banner("Add Safenode Services");
72 println!("{} service(s) to be added", count.unwrap_or(1));
73 }
74
75 let service_manager = ServiceController {};
76 let service_user = if user_mode {
77 None
78 } else {
79 let service_user = user.unwrap_or_else(|| "safe".to_string());
80 service_manager.create_service_user(&service_user)?;
81 Some(service_user)
82 };
83
84 let service_data_dir_path =
85 config::get_service_data_dir_path(data_dir_path, service_user.clone())?;
86 let service_log_dir_path = config::get_service_log_dir_path(
87 ReleaseType::Safenode,
88 log_dir_path,
89 service_user.clone(),
90 )?;
91
92 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
93 let release_repo = <dyn SafeReleaseRepoActions>::default_config();
94
95 let (safenode_src_path, version) = if let Some(path) = src_path.clone() {
96 let version = get_bin_version(&path)?;
97 (path, version)
98 } else {
99 download_and_extract_release(
100 ReleaseType::Safenode,
101 url.clone(),
102 version,
103 &*release_repo,
104 verbosity,
105 None,
106 )
107 .await?
108 };
109
110 debug!("Parsing peers from PeersArgs");
111
112 let is_first = peers_args.first;
124 let bootstrap_peers = match peers_args.get_peers_exclude_network_contacts().await {
125 Ok(peers) => {
126 info!("Obtained peers of length {}", peers.len());
127 peers
128 }
129 Err(err) => match err {
130 sn_peers_acquisition::error::Error::PeersNotObtained => {
131 info!("No bootstrap peers obtained, setting empty vec.");
132 Vec::new()
133 }
134 _ => {
135 error!("Error obtaining peers: {err:?}");
136 return Err(err.into());
137 }
138 },
139 };
140
141 let options = AddNodeServiceOptions {
142 auto_restart,
143 auto_set_nat_flags,
144 bootstrap_peers,
145 count,
146 delete_safenode_src: src_path.is_none(),
147 enable_metrics_server,
148 evm_network: evm_network.unwrap_or(EvmNetwork::ArbitrumOne),
149 env_variables,
150 genesis: is_first,
151 home_network,
152 local,
153 log_format,
154 max_archived_log_files,
155 max_log_files,
156 metrics_port,
157 node_ip,
158 node_port,
159 owner,
160 rewards_address,
161 rpc_address,
162 rpc_port,
163 safenode_src_path,
164 safenode_dir_path: service_data_dir_path.clone(),
165 service_data_dir_path,
166 service_log_dir_path,
167 upnp,
168 user: service_user,
169 user_mode,
170 version,
171 };
172 info!("Adding node service(s)");
173 let added_services_names =
174 add_node(options, &mut node_registry, &service_manager, verbosity).await?;
175
176 node_registry.save()?;
177 debug!("Node registry saved");
178
179 Ok(added_services_names)
180}
181
182pub async fn balance(
183 peer_ids: Vec<String>,
184 service_names: Vec<String>,
185 verbosity: VerbosityLevel,
186) -> Result<()> {
187 if verbosity != VerbosityLevel::Minimal {
188 print_banner("Reward Balances");
189 }
190
191 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
192 refresh_node_registry(
193 &mut node_registry,
194 &ServiceController {},
195 verbosity != VerbosityLevel::Minimal,
196 false,
197 false,
198 )
199 .await?;
200
201 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
202 if service_indices.is_empty() {
203 info!("Service indices is empty, cannot obtain the balance");
204 println!("No balances to display");
206 return Ok(());
207 }
208 debug!("Obtaining balances for {} services", service_indices.len());
209
210 for &index in &service_indices {
211 let node = &mut node_registry.nodes[index];
212 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
213 let service = NodeService::new(node, Box::new(rpc_client));
214 let wallet = HotWallet::load_from(&service.service_data.data_dir_path)
215 .inspect_err(|err| error!("Error while loading hot wallet: {err:?}"))?;
216 println!(
217 "{}: {}",
218 service.service_data.service_name,
219 wallet.balance()
220 );
221 }
222 Ok(())
223}
224
225pub async fn remove(
226 keep_directories: bool,
227 peer_ids: Vec<String>,
228 service_names: Vec<String>,
229 verbosity: VerbosityLevel,
230) -> Result<()> {
231 if verbosity != VerbosityLevel::Minimal {
232 print_banner("Remove Safenode Services");
233 }
234 info!("Removing safe node services with keep_dirs=({keep_directories}) for: {peer_ids:?}, {service_names:?}");
235
236 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
237 refresh_node_registry(
238 &mut node_registry,
239 &ServiceController {},
240 verbosity != VerbosityLevel::Minimal,
241 false,
242 false,
243 )
244 .await?;
245
246 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
247 if service_indices.is_empty() {
248 info!("Service indices is empty, no services were eligible for removal");
249 if verbosity != VerbosityLevel::Minimal {
251 println!("No services were eligible for removal");
252 }
253 return Ok(());
254 }
255
256 let mut failed_services = Vec::new();
257 for &index in &service_indices {
258 let node = &mut node_registry.nodes[index];
259 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
260 let service = NodeService::new(node, Box::new(rpc_client));
261 let mut service_manager =
262 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
263 match service_manager.remove(keep_directories).await {
264 Ok(()) => {
265 debug!("Removed service {}", node.service_name);
266 node_registry.save()?;
267 }
268 Err(err) => {
269 error!("Failed to remove service {}: {err}", node.service_name);
270 failed_services.push((node.service_name.clone(), err.to_string()))
271 }
272 }
273 }
274
275 summarise_any_failed_ops(failed_services, "remove", verbosity)
276}
277
278pub async fn reset(force: bool, verbosity: VerbosityLevel) -> Result<()> {
279 if verbosity != VerbosityLevel::Minimal {
280 print_banner("Reset Safenode Services");
281 }
282 info!("Resetting all safenode services, with force={force}");
283
284 if !force {
285 println!("WARNING: all safenode services, data, and logs will be removed.");
286 println!("Do you wish to proceed? [y/n]");
287 std::io::stdout().flush()?;
288 let mut input = String::new();
289 std::io::stdin().read_line(&mut input)?;
290 if input.trim().to_lowercase() != "y" {
291 println!("Reset aborted");
292 return Ok(());
293 }
294 }
295
296 stop(None, vec![], vec![], verbosity).await?;
297 remove(false, vec![], vec![], verbosity).await?;
298
299 let node_registry_path = config::get_node_registry_path()?;
303 if node_registry_path.exists() {
304 info!("Removing node registry file: {node_registry_path:?}");
305 std::fs::remove_file(node_registry_path)?;
306 }
307
308 Ok(())
309}
310
311pub async fn start(
312 connection_timeout_s: u64,
313 fixed_interval: Option<u64>,
314 peer_ids: Vec<String>,
315 service_names: Vec<String>,
316 verbosity: VerbosityLevel,
317) -> Result<()> {
318 if verbosity != VerbosityLevel::Minimal {
319 print_banner("Start Safenode Services");
320 }
321 info!("Starting safenode services for: {peer_ids:?}, {service_names:?}");
322
323 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
324 refresh_node_registry(
325 &mut node_registry,
326 &ServiceController {},
327 verbosity != VerbosityLevel::Minimal,
328 false,
329 false,
330 )
331 .await?;
332
333 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
334 if service_indices.is_empty() {
335 info!("No services are eligible to be started");
336 if verbosity != VerbosityLevel::Minimal {
338 println!("No services were eligible to be started");
339 }
340 return Ok(());
341 }
342
343 let mut failed_services = Vec::new();
344 for &index in &service_indices {
345 let node = &mut node_registry.nodes[index];
346 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
347
348 let service = NodeService::new(node, Box::new(rpc_client));
349
350 let service = if fixed_interval.is_none() {
352 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
353 } else {
354 service
355 };
356
357 let mut service_manager =
358 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
359 if service_manager.service.status() != ServiceStatus::Running {
360 if let Some(interval) = fixed_interval {
365 debug!("Sleeping for {} milliseconds", interval);
366 std::thread::sleep(std::time::Duration::from_millis(interval));
367 }
368 }
369 match service_manager.start().await {
370 Ok(start_duration) => {
371 debug!(
372 "Started service {} in {start_duration:?}",
373 node.service_name
374 );
375
376 node_registry.save()?;
377 }
378 Err(err) => {
379 error!("Failed to start service {}: {err}", node.service_name);
380 failed_services.push((node.service_name.clone(), err.to_string()))
381 }
382 }
383 }
384
385 summarise_any_failed_ops(failed_services, "start", verbosity)
386}
387
388pub async fn status(details: bool, fail: bool, json: bool) -> Result<()> {
389 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
390 if !node_registry.nodes.is_empty() {
391 if !json && !details {
392 print_banner("Safenode Services");
393 }
394 status_report(
395 &mut node_registry,
396 &ServiceController {},
397 details,
398 json,
399 fail,
400 false,
401 )
402 .await?;
403 node_registry.save()?;
404 }
405 Ok(())
406}
407
408pub async fn stop(
409 interval: Option<u64>,
410 peer_ids: Vec<String>,
411 service_names: Vec<String>,
412 verbosity: VerbosityLevel,
413) -> Result<()> {
414 if verbosity != VerbosityLevel::Minimal {
415 print_banner("Stop Safenode Services");
416 }
417 info!("Stopping safenode services for: {peer_ids:?}, {service_names:?}");
418
419 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
420 refresh_node_registry(
421 &mut node_registry,
422 &ServiceController {},
423 verbosity != VerbosityLevel::Minimal,
424 false,
425 false,
426 )
427 .await?;
428
429 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
430 if service_indices.is_empty() {
431 info!("Service indices is empty, no services were eligible to be stopped");
432 if verbosity != VerbosityLevel::Minimal {
434 println!("No services were eligible to be stopped");
435 }
436 return Ok(());
437 }
438
439 let mut failed_services = Vec::new();
440 for &index in &service_indices {
441 let node = &mut node_registry.nodes[index];
442 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
443 let service = NodeService::new(node, Box::new(rpc_client));
444 let mut service_manager =
445 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
446
447 if service_manager.service.status() == ServiceStatus::Running {
448 if let Some(interval) = interval {
449 debug!("Sleeping for {} milliseconds", interval);
450 std::thread::sleep(std::time::Duration::from_millis(interval));
451 }
452 }
453 match service_manager.stop().await {
454 Ok(()) => {
455 debug!("Stopped service {}", node.service_name);
456 node_registry.save()?;
457 }
458 Err(err) => {
459 error!("Failed to stop service {}: {err}", node.service_name);
460 failed_services.push((node.service_name.clone(), err.to_string()))
461 }
462 }
463 }
464
465 summarise_any_failed_ops(failed_services, "stop", verbosity)
466}
467
468pub async fn upgrade(
469 connection_timeout_s: u64,
470 do_not_start: bool,
471 custom_bin_path: Option<PathBuf>,
472 force: bool,
473 fixed_interval: Option<u64>,
474 peer_ids: Vec<String>,
475 provided_env_variables: Option<Vec<(String, String)>>,
476 service_names: Vec<String>,
477 url: Option<String>,
478 version: Option<String>,
479 verbosity: VerbosityLevel,
480) -> Result<()> {
481 let use_force = force || custom_bin_path.is_some();
485
486 if verbosity != VerbosityLevel::Minimal {
487 print_banner("Upgrade Safenode Services");
488 }
489 info!(
490 "Upgrading safenode services with use_force={use_force} for: {peer_ids:?}, {service_names:?}"
491 );
492
493 let (upgrade_bin_path, target_version) = download_and_get_upgrade_bin_path(
494 custom_bin_path.clone(),
495 ReleaseType::Safenode,
496 url,
497 version,
498 verbosity,
499 )
500 .await?;
501
502 let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
503 refresh_node_registry(
504 &mut node_registry,
505 &ServiceController {},
506 verbosity != VerbosityLevel::Minimal,
507 false,
508 false,
509 )
510 .await?;
511
512 debug!(
513 "listen addresses for nodes[0]: {:?}",
514 node_registry.nodes[0].listen_addr
515 );
516 if !use_force {
517 let node_versions = node_registry
518 .nodes
519 .iter()
520 .map(|n| Version::parse(&n.version).map_err(|_| eyre!("Failed to parse Version")))
521 .collect::<Result<Vec<Version>>>()?;
522 let any_nodes_need_upgraded = node_versions
523 .iter()
524 .any(|current_version| current_version < &target_version);
525 if !any_nodes_need_upgraded {
526 info!("All nodes are at the latest version, no upgrade required.");
527 if verbosity != VerbosityLevel::Minimal {
528 println!("{} All nodes are at the latest version", "✓".green());
529 }
530 return Ok(());
531 }
532 }
533
534 let service_indices = get_services_for_ops(&node_registry, peer_ids, service_names)?;
535 trace!("service_indices len: {}", service_indices.len());
536 let mut upgrade_summary = Vec::new();
537
538 for &index in &service_indices {
539 let node = &mut node_registry.nodes[index];
540 let env_variables = if provided_env_variables.is_some() {
541 &provided_env_variables
542 } else {
543 &node_registry.environment_variables
544 };
545 let options = UpgradeOptions {
546 auto_restart: false,
547 bootstrap_peers: node_registry.bootstrap_peers.clone(),
548 env_variables: env_variables.clone(),
549 force: use_force,
550 start_service: !do_not_start,
551 target_bin_path: upgrade_bin_path.clone(),
552 target_version: target_version.clone(),
553 };
554 let service_name = node.service_name.clone();
555
556 let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
557 let service = NodeService::new(node, Box::new(rpc_client));
558 let service = if fixed_interval.is_none() {
560 service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
561 } else {
562 service
563 };
564
565 let mut service_manager =
566 ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
567
568 match service_manager.upgrade(options).await {
569 Ok(upgrade_result) => {
570 info!("Service: {service_name} has been upgraded, result: {upgrade_result:?}",);
571 if upgrade_result != UpgradeResult::NotRequired {
572 if let Some(interval) = fixed_interval {
575 debug!("Sleeping for {interval} milliseconds",);
576 std::thread::sleep(std::time::Duration::from_millis(interval));
577 }
578 }
579 upgrade_summary.push((
580 service_manager.service.service_data.service_name.clone(),
581 upgrade_result,
582 ));
583 node_registry.save()?;
584 }
585 Err(err) => {
586 error!("Error upgrading service {service_name}: {err}");
587 upgrade_summary.push((
588 node.service_name.clone(),
589 UpgradeResult::Error(format!("Error: {err}")),
590 ));
591 node_registry.save()?;
592 }
593 }
594 }
595
596 if verbosity != VerbosityLevel::Minimal {
597 print_upgrade_summary(upgrade_summary.clone());
598 }
599
600 if upgrade_summary.iter().any(|(_, r)| {
601 matches!(r, UpgradeResult::Error(_))
602 || matches!(r, UpgradeResult::UpgradedButNotStarted(_, _, _))
603 }) {
604 return Err(eyre!("There was a problem upgrading one or more nodes").suggestion(
605 "For any services that were upgraded but did not start, you can attempt to start them \
606 again using the 'start' command."));
607 }
608
609 Ok(())
610}
611
612pub async fn maintain_n_running_nodes(
616 auto_restart: bool,
617 auto_set_nat_flags: bool,
618 connection_timeout_s: u64,
619 max_nodes_to_run: u16,
620 data_dir_path: Option<PathBuf>,
621 enable_metrics_server: bool,
622 env_variables: Option<Vec<(String, String)>>,
623 evm_network: Option<EvmNetwork>,
624 home_network: bool,
625 local: bool,
626 log_dir_path: Option<PathBuf>,
627 log_format: Option<LogFormat>,
628 max_archived_log_files: Option<usize>,
629 max_log_files: Option<usize>,
630 metrics_port: Option<PortRange>,
631 node_ip: Option<Ipv4Addr>,
632 node_port: Option<PortRange>,
633 owner: Option<String>,
634 peers: PeersArgs,
635 rewards_address: RewardsAddress,
636 rpc_address: Option<Ipv4Addr>,
637 rpc_port: Option<PortRange>,
638 src_path: Option<PathBuf>,
639 url: Option<String>,
640 upnp: bool,
641 user: Option<String>,
642 version: Option<String>,
643 verbosity: VerbosityLevel,
644 start_node_interval: Option<u64>,
645) -> Result<()> {
646 let node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
647 let running_nodes = node_registry
648 .nodes
649 .iter()
650 .filter(|node| node.status == ServiceStatus::Running)
651 .map(|node| node.service_name.clone())
652 .collect::<Vec<_>>();
653
654 let running_count = running_nodes.len();
655 let target_count = max_nodes_to_run as usize;
656
657 info!(
658 "Current running nodes: {}, Target: {}",
659 running_count, target_count
660 );
661
662 match running_count.cmp(&target_count) {
663 Ordering::Greater => {
664 let to_stop_count = running_count - target_count;
665 let services_to_stop = running_nodes
666 .into_iter()
667 .rev() .take(to_stop_count)
669 .collect::<Vec<_>>();
670
671 info!(
672 "Stopping {} excess nodes: {:?}",
673 to_stop_count, services_to_stop
674 );
675 stop(None, vec![], services_to_stop, verbosity).await?;
676 }
677 Ordering::Less => {
678 let to_start_count = target_count - running_count;
679 let inactive_nodes = node_registry
680 .nodes
681 .iter()
682 .filter(|node| {
683 node.status == ServiceStatus::Stopped || node.status == ServiceStatus::Added
684 })
685 .map(|node| node.service_name.clone())
686 .collect::<Vec<_>>();
687
688 info!("Inactive nodes available: {}", inactive_nodes.len());
689
690 if to_start_count <= inactive_nodes.len() {
691 let nodes_to_start = inactive_nodes.into_iter().take(to_start_count).collect();
692 info!(
693 "Starting {} existing inactive nodes: {:?}",
694 to_start_count, nodes_to_start
695 );
696 start(
697 connection_timeout_s,
698 start_node_interval,
699 vec![],
700 nodes_to_start,
701 verbosity,
702 )
703 .await?;
704 } else {
705 let to_add_count = to_start_count - inactive_nodes.len();
706 info!(
707 "Adding {} new nodes and starting all {} inactive nodes",
708 to_add_count,
709 inactive_nodes.len()
710 );
711
712 let ports_to_use = match node_port {
713 Some(PortRange::Single(port)) => vec![port],
714 Some(PortRange::Range(start, end)) => {
715 (start..=end).take(to_add_count).collect()
716 }
717 None => vec![],
718 };
719
720 for (i, port) in ports_to_use.into_iter().enumerate() {
721 let added_service = add(
722 auto_restart,
723 auto_set_nat_flags,
724 Some(1),
725 data_dir_path.clone(),
726 enable_metrics_server,
727 env_variables.clone(),
728 evm_network.clone(),
729 home_network,
730 local,
731 log_dir_path.clone(),
732 log_format,
733 max_archived_log_files,
734 max_log_files,
735 metrics_port.clone(),
736 node_ip,
737 Some(PortRange::Single(port)),
738 owner.clone(),
739 peers.clone(),
740 rewards_address,
741 rpc_address,
742 rpc_port.clone(),
743 src_path.clone(),
744 upnp,
745 url.clone(),
746 user.clone(),
747 version.clone(),
748 verbosity,
749 )
750 .await?;
751
752 if i == 0 {
753 start(
754 connection_timeout_s,
755 start_node_interval,
756 vec![],
757 added_service,
758 verbosity,
759 )
760 .await?;
761 }
762 }
763
764 if !inactive_nodes.is_empty() {
765 start(
766 connection_timeout_s,
767 start_node_interval,
768 vec![],
769 inactive_nodes,
770 verbosity,
771 )
772 .await?;
773 }
774 }
775 }
776 Ordering::Equal => {
777 info!(
778 "Current node count ({}) matches target ({}). No action needed.",
779 running_count, target_count
780 );
781 }
782 }
783
784 let final_node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
786 let final_running_count = final_node_registry
787 .nodes
788 .iter()
789 .filter(|node| node.status == ServiceStatus::Running)
790 .count();
791
792 info!("Final running node count: {}", final_running_count);
793 if final_running_count != target_count {
794 warn!(
795 "Failed to reach target node count. Expected {}, but got {}",
796 target_count, final_running_count
797 );
798 }
799
800 Ok(())
801}
802
803fn get_services_for_ops(
804 node_registry: &NodeRegistry,
805 peer_ids: Vec<String>,
806 service_names: Vec<String>,
807) -> Result<Vec<usize>> {
808 let mut service_indices = Vec::new();
809
810 if service_names.is_empty() && peer_ids.is_empty() {
811 for node in node_registry.nodes.iter() {
812 if let Some(index) = node_registry.nodes.iter().position(|x| {
813 x.service_name == node.service_name && x.status != ServiceStatus::Removed
814 }) {
815 service_indices.push(index);
816 }
817 }
818 } else {
819 for name in &service_names {
820 if let Some(index) = node_registry
821 .nodes
822 .iter()
823 .position(|x| x.service_name == *name && x.status != ServiceStatus::Removed)
824 {
825 service_indices.push(index);
826 } else {
827 error!("No service named '{name}'");
828 return Err(eyre!(format!("No service named '{name}'")));
829 }
830 }
831
832 for peer_id_str in &peer_ids {
833 let peer_id = PeerId::from_str(peer_id_str)
834 .inspect_err(|err| error!("Error parsing PeerId: {err:?}"))?;
835 if let Some(index) = node_registry
836 .nodes
837 .iter()
838 .position(|x| x.peer_id == Some(peer_id) && x.status != ServiceStatus::Removed)
839 {
840 service_indices.push(index);
841 } else {
842 error!("Could not find node with peer id: '{peer_id:?}'");
843 return Err(eyre!(format!(
844 "Could not find node with peer ID '{peer_id}'",
845 )));
846 }
847 }
848 }
849
850 Ok(service_indices)
851}
852
853fn summarise_any_failed_ops(
854 failed_services: Vec<(String, String)>,
855 verb: &str,
856 verbosity: VerbosityLevel,
857) -> Result<()> {
858 if !failed_services.is_empty() {
859 if verbosity != VerbosityLevel::Minimal {
860 println!("Failed to {verb} {} service(s):", failed_services.len());
861 for failed in failed_services.iter() {
862 println!("{} {}: {}", "✕".red(), failed.0, failed.1);
863 }
864 }
865
866 error!("Failed to {verb} one or more services");
867 return Err(eyre!("Failed to {verb} one or more services"));
868 }
869 Ok(())
870}