1use crate::action::{Action, StatusActions};
10use crate::connection_mode::ConnectionMode;
11use ant_bootstrap::InitialPeersConfig;
12use ant_evm::{EvmNetwork, RewardsAddress};
13use ant_node_manager::{VerbosityLevel, add_services::config::PortRange};
14use ant_releases::{self, AntReleaseRepoActions, ReleaseType};
15use ant_service_management::NodeRegistryManager;
16use color_eyre::Result;
17use color_eyre::eyre::eyre;
18use service_manager::RestartPolicy;
19use std::{path::PathBuf, str::FromStr};
20use tokio::runtime::Builder;
21use tokio::sync::mpsc::{self, UnboundedSender};
22use tokio::task::LocalSet;
23
24pub const PORT_MAX: u32 = 65535;
25pub const PORT_MIN: u32 = 1024;
26
27const NODE_ADD_MAX_RETRIES: u32 = 5;
28
29pub const FIXED_INTERVAL: u64 = 60_000;
30pub const CONNECTION_TIMEOUT_START: u64 = 120;
31
32pub const NODES_ALL: &str = "NODES_ALL";
33
34#[derive(Debug)]
35pub enum NodeManagementTask {
36 MaintainNodes {
37 args: MaintainNodesArgs,
38 },
39 ResetNodes {
40 start_nodes_after_reset: bool,
41 action_sender: UnboundedSender<Action>,
42 },
43 StopNodes {
44 services: Vec<String>,
45 action_sender: UnboundedSender<Action>,
46 },
47 UpgradeNodes {
48 args: UpgradeNodesArgs,
49 },
50 AddNode {
51 args: MaintainNodesArgs,
52 },
53 RemoveNodes {
54 services: Vec<String>,
55 action_sender: UnboundedSender<Action>,
56 },
57 StartNode {
58 services: Vec<String>,
59 action_sender: UnboundedSender<Action>,
60 },
61}
62
63#[derive(Clone)]
64pub struct NodeManagement {
65 task_sender: mpsc::UnboundedSender<NodeManagementTask>,
66}
67
68impl NodeManagement {
69 pub fn new(node_registry: NodeRegistryManager) -> Result<Self> {
70 let (send, mut recv) = mpsc::unbounded_channel();
71
72 let rt = Builder::new_current_thread().enable_all().build()?;
73
74 std::thread::spawn(move || {
75 let local = LocalSet::new();
76
77 local.spawn_local(async move {
78 while let Some(new_task) = recv.recv().await {
79 match new_task {
80 NodeManagementTask::MaintainNodes { args } => {
81 maintain_n_running_nodes(args, node_registry.clone()).await;
82 }
83 NodeManagementTask::ResetNodes {
84 start_nodes_after_reset,
85 action_sender,
86 } => {
87 reset_nodes(
88 action_sender,
89 node_registry.clone(),
90 start_nodes_after_reset,
91 )
92 .await;
93 }
94 NodeManagementTask::StopNodes {
95 services,
96 action_sender,
97 } => {
98 stop_nodes(services, action_sender, node_registry.clone()).await;
99 }
100 NodeManagementTask::UpgradeNodes { args } => {
101 upgrade_nodes(args, node_registry.clone()).await
102 }
103 NodeManagementTask::RemoveNodes {
104 services,
105 action_sender,
106 } => remove_nodes(services, action_sender, node_registry.clone()).await,
107 NodeManagementTask::StartNode {
108 services,
109 action_sender,
110 } => start_nodes(services, action_sender, node_registry.clone()).await,
111 NodeManagementTask::AddNode { args } => {
112 add_node(args, node_registry.clone()).await
113 }
114 }
115 }
116 });
119
120 rt.block_on(local);
123 });
124
125 Ok(Self { task_sender: send })
126 }
127
128 pub fn send_task(&self, task: NodeManagementTask) -> Result<()> {
135 self.task_sender
136 .send(task)
137 .inspect_err(|err| error!("The node management local set is down {err:?}"))
138 .map_err(|_| eyre!("Failed to send task to the node management local set"))?;
139 Ok(())
140 }
141}
142
143async fn stop_nodes(
145 services: Vec<String>,
146 action_sender: UnboundedSender<Action>,
147 node_registry: NodeRegistryManager,
148) {
149 if let Err(err) = ant_node_manager::cmd::node::stop(
150 None,
151 node_registry.clone(),
152 vec![],
153 services.clone(),
154 VerbosityLevel::Minimal,
155 )
156 .await
157 {
158 error!("Error while stopping services {err:?}");
159 send_action(
160 action_sender,
161 Action::StatusActions(StatusActions::ErrorStoppingNodes {
162 services,
163 raw_error: err.to_string(),
164 }),
165 );
166 } else {
167 info!("Successfully stopped services");
168 for service in services {
169 send_action(
170 action_sender.clone(),
171 Action::StatusActions(StatusActions::StopNodesCompleted {
172 service_name: service,
173 all_nodes_data: node_registry.get_node_service_data().await,
174 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
175 }),
176 );
177 }
178 }
179}
180
181#[derive(Debug)]
182pub struct MaintainNodesArgs {
183 pub action_sender: UnboundedSender<Action>,
184 pub antnode_path: Option<PathBuf>,
185 pub connection_mode: ConnectionMode,
186 pub count: u16,
187 pub data_dir_path: Option<PathBuf>,
188 pub network_id: Option<u8>,
189 pub owner: String,
190 pub init_peers_config: InitialPeersConfig,
191 pub port_range: Option<PortRange>,
192 pub rewards_address: String,
193 pub run_nat_detection: bool,
194}
195
196async fn maintain_n_running_nodes(args: MaintainNodesArgs, node_registry: NodeRegistryManager) {
198 debug!("Maintaining {} nodes", args.count);
199 if args.run_nat_detection {
200 run_nat_detection(&args.action_sender).await;
201 }
202
203 let config = prepare_node_config(&args);
204 debug_log_config(&config, &args);
205
206 let mut used_ports = get_used_ports(&node_registry).await;
207 let (mut current_port, max_port) = get_port_range(&config.custom_ports);
208
209 let nodes_to_add = args.count as i32 - node_registry.nodes.read().await.len() as i32;
210
211 if nodes_to_add <= 0 {
212 debug!("Scaling down nodes to {}", nodes_to_add);
213 scale_down_nodes(&config, args.count, node_registry.clone()).await;
214 } else {
215 debug!("Scaling up nodes to {}", nodes_to_add);
216 add_nodes(
217 &args.action_sender,
218 &config,
219 nodes_to_add,
220 &mut used_ports,
221 &mut current_port,
222 max_port,
223 node_registry.clone(),
224 )
225 .await;
226 }
227
228 debug!("Finished maintaining {} nodes", args.count);
229 send_action(
230 args.action_sender,
231 Action::StatusActions(StatusActions::StartNodesCompleted {
232 service_name: NODES_ALL.to_string(),
233 all_nodes_data: node_registry.get_node_service_data().await,
234 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
235 }),
236 );
237}
238
239async fn reset_nodes(
241 action_sender: UnboundedSender<Action>,
242 node_registry: NodeRegistryManager,
243 start_nodes_after_reset: bool,
244) {
245 if let Err(err) =
246 ant_node_manager::cmd::node::reset(true, node_registry.clone(), VerbosityLevel::Minimal)
247 .await
248 {
249 error!("Error while resetting services {err:?}");
250 send_action(
251 action_sender,
252 Action::StatusActions(StatusActions::ErrorResettingNodes {
253 raw_error: err.to_string(),
254 }),
255 );
256 } else {
257 info!("Successfully reset services");
258 send_action(
259 action_sender,
260 Action::StatusActions(StatusActions::ResetNodesCompleted {
261 trigger_start_node: start_nodes_after_reset,
262 all_nodes_data: node_registry.get_node_service_data().await,
263 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
264 }),
265 );
266 }
267}
268
269#[derive(Debug)]
270pub struct UpgradeNodesArgs {
271 pub action_sender: UnboundedSender<Action>,
272 pub connection_timeout_s: u64,
273 pub do_not_start: bool,
274 pub custom_bin_path: Option<PathBuf>,
275 pub force: bool,
276 pub fixed_interval: Option<u64>,
277 pub peer_ids: Vec<String>,
278 pub provided_env_variables: Option<Vec<(String, String)>>,
279 pub service_names: Vec<String>,
280 pub url: Option<String>,
281 pub version: Option<String>,
282}
283
284async fn upgrade_nodes(args: UpgradeNodesArgs, node_registry: NodeRegistryManager) {
285 if let Err(err) = ant_node_manager::cmd::node::stop(
287 None,
288 node_registry.clone(),
289 vec![],
290 args.service_names.clone(),
291 VerbosityLevel::Minimal,
292 )
293 .await
294 {
295 error!("Error while stopping services {err:?}");
296 send_action(
297 args.action_sender.clone(),
298 Action::StatusActions(StatusActions::ErrorUpdatingNodes {
299 raw_error: err.to_string(),
300 }),
301 );
302 }
303
304 if let Err(err) = ant_node_manager::cmd::node::upgrade(
305 0, args.do_not_start,
307 args.custom_bin_path,
308 args.force,
309 Some(FIXED_INTERVAL),
310 node_registry.clone(),
311 args.peer_ids,
312 args.provided_env_variables,
313 args.service_names,
314 args.url,
315 args.version,
316 VerbosityLevel::Minimal,
317 )
318 .await
319 {
320 error!("Error while updating services {err:?}");
321 send_action(
322 args.action_sender,
323 Action::StatusActions(StatusActions::ErrorUpdatingNodes {
324 raw_error: err.to_string(),
325 }),
326 );
327 } else {
328 info!("Successfully updated services");
329 send_action(
330 args.action_sender,
331 Action::StatusActions(StatusActions::UpdateNodesCompleted {
332 all_nodes_data: node_registry.get_node_service_data().await,
333 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
334 }),
335 );
336 }
337}
338
339async fn remove_nodes(
340 services: Vec<String>,
341 action_sender: UnboundedSender<Action>,
342 node_registry: NodeRegistryManager,
343) {
344 if let Err(err) = ant_node_manager::cmd::node::stop(
346 None,
347 node_registry.clone(),
348 vec![],
349 services.clone(),
350 VerbosityLevel::Minimal,
351 )
352 .await
353 {
354 error!("Error while stopping services {err:?}");
355 send_action(
356 action_sender.clone(),
357 Action::StatusActions(StatusActions::ErrorRemovingNodes {
358 services: services.clone(),
359 raw_error: err.to_string(),
360 }),
361 );
362 }
363
364 if let Err(err) = ant_node_manager::cmd::node::remove(
365 false,
366 vec![],
367 node_registry.clone(),
368 services.clone(),
369 VerbosityLevel::Minimal,
370 )
371 .await
372 {
373 error!("Error while removing services {err:?}");
374 send_action(
375 action_sender,
376 Action::StatusActions(StatusActions::ErrorRemovingNodes {
377 services,
378 raw_error: err.to_string(),
379 }),
380 );
381 } else {
382 info!("Successfully removed services {:?}", services);
383 for service in services {
384 send_action(
385 action_sender.clone(),
386 Action::StatusActions(StatusActions::RemoveNodesCompleted {
387 service_name: service,
388 all_nodes_data: node_registry.get_node_service_data().await,
389 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
390 }),
391 );
392 }
393 }
394}
395
396async fn add_node(args: MaintainNodesArgs, node_registry: NodeRegistryManager) {
397 debug!("Adding node");
398
399 if args.run_nat_detection {
400 run_nat_detection(&args.action_sender).await;
401 }
402
403 let config = prepare_node_config(&args);
404
405 let used_ports = get_used_ports(&node_registry).await;
406 let (mut current_port, max_port) = get_port_range(&config.custom_ports);
407
408 while used_ports.contains(¤t_port) && current_port <= max_port {
409 current_port += 1;
410 }
411
412 if current_port > max_port {
413 error!("Reached maximum port number. Unable to find an available port.");
414 send_action(
415 args.action_sender.clone(),
416 Action::StatusActions(StatusActions::ErrorAddingNodes {
417 raw_error: format!(
418 "When adding a new node we reached maximum port number ({max_port}).\nUnable to find an available port."
419 ),
420 }),
421 );
422 }
423
424 let port_range = Some(PortRange::Single(current_port));
425 match ant_node_manager::cmd::node::add(
426 false, false, config.auto_set_nat_flags,
429 Some(config.count),
430 config.data_dir_path,
431 true, None, None, None, None, None, None, None, None, None, port_range, node_registry.clone(),
443 config.init_peers_config.clone(),
444 config.relay, get_restart_policy(),
446 RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
447 None, None, config.antnode_path.clone(), !config.upnp,
451 None, None, None, VerbosityLevel::Minimal,
455 false, )
457 .await
458 {
459 Err(err) => {
460 error!("Error while adding services {err:?}");
461 send_action(
462 args.action_sender,
463 Action::StatusActions(StatusActions::ErrorAddingNodes {
464 raw_error: err.to_string(),
465 }),
466 );
467 }
468 Ok(services) => {
469 info!("Successfully added services: {:?}", services);
470 for service in services {
471 send_action(
472 args.action_sender.clone(),
473 Action::StatusActions(StatusActions::AddNodesCompleted {
474 service_name: service,
475 all_nodes_data: node_registry.get_node_service_data().await,
476 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
477 }),
478 );
479 }
480 }
481 }
482}
483
484async fn start_nodes(
485 services: Vec<String>,
486 action_sender: UnboundedSender<Action>,
487 node_registry: NodeRegistryManager,
488) {
489 debug!("Starting node {:?}", services);
490 if let Err(err) = ant_node_manager::cmd::node::start(
491 CONNECTION_TIMEOUT_START,
492 None,
493 node_registry.clone(),
494 vec![],
495 services.clone(),
496 VerbosityLevel::Minimal,
497 )
498 .await
499 {
500 error!("Error while starting services {err:?}");
501 send_action(
502 action_sender,
503 Action::StatusActions(StatusActions::ErrorStartingNodes {
504 services,
505 raw_error: err.to_string(),
506 }),
507 );
508 } else {
509 info!("Successfully started services {:?}", services);
510 for service in services {
511 send_action(
512 action_sender.clone(),
513 Action::StatusActions(StatusActions::StartNodesCompleted {
514 service_name: service,
515 all_nodes_data: node_registry.get_node_service_data().await,
516 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
517 }),
518 );
519 }
520 }
521}
522
523#[cfg(unix)]
527fn get_restart_policy() -> RestartPolicy {
528 RestartPolicy::OnSuccess { delay_secs: None }
529}
530
531#[cfg(windows)]
532fn get_restart_policy() -> RestartPolicy {
533 RestartPolicy::Never
534}
535
536fn send_action(action_sender: UnboundedSender<Action>, action: Action) {
537 if let Err(err) = action_sender.send(action) {
538 error!("Error while sending action: {err:?}");
539 }
540}
541
542struct NodeConfig {
543 antnode_path: Option<PathBuf>,
544 auto_set_nat_flags: bool,
545 count: u16,
546 custom_ports: Option<PortRange>,
547 data_dir_path: Option<PathBuf>,
548 relay: bool,
549 network_id: Option<u8>,
550 owner: Option<String>,
551 init_peers_config: InitialPeersConfig,
552 rewards_address: String,
553 upnp: bool,
554}
555
556async fn run_nat_detection(action_sender: &UnboundedSender<Action>) {
558 info!("Running nat detection....");
559
560 if let Err(err) = action_sender.send(Action::StatusActions(StatusActions::NatDetectionStarted))
562 {
563 error!("Error while sending action: {err:?}");
564 }
565
566 let release_repo = <dyn AntReleaseRepoActions>::default_config();
567 let version = match release_repo
568 .get_latest_version(&ReleaseType::NatDetection)
569 .await
570 {
571 Ok(v) => {
572 info!("Using NAT detection version {}", v.to_string());
573 v.to_string()
574 }
575 Err(err) => {
576 info!("No NAT detection release found, using fallback version 0.1.0");
577 info!("Error: {err}");
578 "0.1.0".to_string()
579 }
580 };
581
582 if let Err(err) = ant_node_manager::cmd::nat_detection::run_nat_detection(
583 None,
584 true,
585 None,
586 None,
587 Some(version),
588 VerbosityLevel::Minimal,
589 )
590 .await
591 {
592 error!("Error while running nat detection {err:?}. Registering the error.");
593 if let Err(err) = action_sender.send(Action::StatusActions(
594 StatusActions::ErrorWhileRunningNatDetection,
595 )) {
596 error!("Error while sending action: {err:?}");
597 }
598 } else {
599 info!("Successfully ran nat detection.");
600 if let Err(err) = action_sender.send(Action::StatusActions(
601 StatusActions::SuccessfullyDetectedNatStatus,
602 )) {
603 error!("Error while sending action: {err:?}");
604 }
605 }
606}
607
608fn prepare_node_config(args: &MaintainNodesArgs) -> NodeConfig {
609 NodeConfig {
610 antnode_path: args.antnode_path.clone(),
611 auto_set_nat_flags: args.connection_mode == ConnectionMode::Automatic,
612 data_dir_path: args.data_dir_path.clone(),
613 count: args.count,
614 custom_ports: if args.connection_mode == ConnectionMode::CustomPorts {
615 args.port_range.clone()
616 } else {
617 None
618 },
619 owner: if args.owner.is_empty() {
620 None
621 } else {
622 Some(args.owner.clone())
623 },
624 relay: args.connection_mode == ConnectionMode::HomeNetwork,
625 network_id: args.network_id,
626 init_peers_config: args.init_peers_config.clone(),
627 rewards_address: args.rewards_address.clone(),
628 upnp: args.connection_mode == ConnectionMode::UPnP,
629 }
630}
631
632fn debug_log_config(config: &NodeConfig, args: &MaintainNodesArgs) {
634 debug!("************ STARTING NODE MAINTENANCE ************");
635 debug!(
636 "Maintaining {} running nodes with the following args:",
637 config.count
638 );
639 debug!(
640 " owner: {:?}, init_peers_config: {:?}, antnode_path: {:?}, network_id: {:?}",
641 config.owner, config.init_peers_config, config.antnode_path, args.network_id
642 );
643 debug!(
644 " data_dir_path: {:?}, connection_mode: {:?}",
645 config.data_dir_path, args.connection_mode
646 );
647 debug!(
648 " auto_set_nat_flags: {:?}, custom_ports: {:?}, upnp: {}, relay: {}",
649 config.auto_set_nat_flags, config.custom_ports, config.upnp, config.relay
650 );
651}
652
653async fn get_used_ports(node_registry: &NodeRegistryManager) -> Vec<u16> {
655 let mut used_ports = Vec::new();
656 for node in node_registry.nodes.read().await.iter() {
657 let node = node.read().await;
658 if let Some(port) = node.node_port {
659 used_ports.push(port);
660 }
661 }
662 debug!("Currently used ports: {:?}", used_ports);
663 used_ports
664}
665
666fn get_port_range(custom_ports: &Option<PortRange>) -> (u16, u16) {
668 match custom_ports {
669 Some(PortRange::Single(port)) => (*port, *port),
670 Some(PortRange::Range(start, end)) => (*start, *end),
671 None => (PORT_MIN as u16, PORT_MAX as u16),
672 }
673}
674
675async fn scale_down_nodes(config: &NodeConfig, count: u16, node_registry: NodeRegistryManager) {
677 match ant_node_manager::cmd::node::maintain_n_running_nodes(
678 false,
679 false,
680 config.auto_set_nat_flags,
681 CONNECTION_TIMEOUT_START,
682 count,
683 config.data_dir_path.clone(),
684 true,
685 None,
686 Some(EvmNetwork::default()),
687 None,
688 None,
689 None,
690 None,
691 None,
692 config.network_id,
693 None,
694 None, node_registry,
696 config.init_peers_config.clone(),
697 config.relay,
698 RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
699 get_restart_policy(),
700 None,
701 None,
702 config.antnode_path.clone(),
703 None,
704 !config.upnp,
705 None,
706 None,
707 VerbosityLevel::Minimal,
708 None,
709 false,
710 )
711 .await
712 {
713 Ok(_) => {
714 info!("Scaling down to {} nodes", count);
715 }
716 Err(err) => {
717 error!("Error while scaling down to {} nodes: {err:?}", count);
718 }
719 }
720}
721
722async fn add_nodes(
724 action_sender: &UnboundedSender<Action>,
725 config: &NodeConfig,
726 mut nodes_to_add: i32,
727 used_ports: &mut Vec<u16>,
728 current_port: &mut u16,
729 max_port: u16,
730 node_registry: NodeRegistryManager,
731) {
732 let mut retry_count = 0;
733
734 while nodes_to_add > 0 && retry_count < NODE_ADD_MAX_RETRIES {
735 while used_ports.contains(current_port) && *current_port <= max_port {
737 *current_port += 1;
738 }
739
740 if *current_port > max_port {
741 error!("Reached maximum port number. Unable to find an available port.");
742 send_action(
743 action_sender.clone(),
744 Action::StatusActions(StatusActions::ErrorScalingUpNodes {
745 raw_error: format!(
746 "Reached maximum port number ({max_port}).\nUnable to find an available port."
747 ),
748 }),
749 );
750 break;
751 }
752
753 let port_range = Some(PortRange::Single(*current_port));
754 match ant_node_manager::cmd::node::maintain_n_running_nodes(
755 false,
756 false,
757 config.auto_set_nat_flags,
758 CONNECTION_TIMEOUT_START,
759 config.count,
760 config.data_dir_path.clone(),
761 true,
762 None,
763 Some(EvmNetwork::default()),
764 None,
765 None,
766 None,
767 None,
768 None,
769 config.network_id,
770 None,
771 port_range,
772 node_registry.clone(),
773 config.init_peers_config.clone(),
774 config.relay,
775 RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
776 get_restart_policy(),
777 None,
778 None,
779 config.antnode_path.clone(),
780 None,
781 !config.upnp,
782 None,
783 None,
784 VerbosityLevel::Minimal,
785 None,
786 false,
787 )
788 .await
789 {
790 Ok(_) => {
791 info!("Successfully added a node on port {}", current_port);
792 used_ports.push(*current_port);
793 nodes_to_add -= 1;
794 *current_port += 1;
795 retry_count = 0; }
797 Err(err) => {
798 if err.to_string().contains("is being used by another service") {
800 warn!(
801 "Port {} is being used, retrying with a different port. Attempt {}/{}",
802 current_port,
803 retry_count + 1,
804 NODE_ADD_MAX_RETRIES
805 );
806 } else if err
807 .to_string()
808 .contains("Failed to add one or more services")
809 && retry_count >= NODE_ADD_MAX_RETRIES
810 {
811 send_action(
812 action_sender.clone(),
813 Action::StatusActions(StatusActions::ErrorScalingUpNodes {
814 raw_error: "When trying to add a node, we failed.\n\
815 Maybe you ran out of disk space?\n\
816 Maybe you need to change the port range?"
817 .to_string(),
818 }),
819 );
820 } else if err
821 .to_string()
822 .contains("contains a virus or potentially unwanted software")
823 && retry_count >= NODE_ADD_MAX_RETRIES
824 {
825 send_action(
826 action_sender.clone(),
827 Action::StatusActions(StatusActions::ErrorScalingUpNodes {
828 raw_error: "When trying to add a node, we failed.\n\
829 You may be running an old version of antnode service?\n\
830 Did you whitelisted antnode and the launchpad?"
831 .to_string(),
832 }),
833 );
834 } else {
835 error!("Range of ports to be used {:?}", *current_port..max_port);
836 error!("Error while adding node on port {}: {err:?}", current_port);
837 }
838 *current_port += 1;
840 retry_count += 1;
841 }
842 }
843 }
844 if retry_count >= NODE_ADD_MAX_RETRIES {
845 send_action(
846 action_sender.clone(),
847 Action::StatusActions(StatusActions::ErrorScalingUpNodes {
848 raw_error: format!(
849 "When trying to start a node, we reached the maximum amount of retries ({NODE_ADD_MAX_RETRIES}).\n\
850 Could this be a firewall blocking nodes starting or ports on your router already in use?"
851 ),
852 }),
853 );
854 }
855}