1use crate::action::{Action, StatusActions};
2use crate::connection_mode::ConnectionMode;
3use ant_bootstrap::InitialPeersConfig;
4use ant_evm::{EvmNetwork, RewardsAddress};
5use ant_node_manager::{
6 add_services::config::PortRange, config::get_node_registry_path, VerbosityLevel,
7};
8use ant_releases::{self, AntReleaseRepoActions, ReleaseType};
9use ant_service_management::NodeRegistry;
10use color_eyre::eyre::{eyre, Error};
11use color_eyre::Result;
12use std::{path::PathBuf, str::FromStr};
13use tokio::runtime::Builder;
14use tokio::sync::mpsc::{self, UnboundedSender};
15use tokio::task::LocalSet;
16
17pub const PORT_MAX: u32 = 65535;
18pub const PORT_MIN: u32 = 1024;
19
20const NODE_ADD_MAX_RETRIES: u32 = 5;
21
22pub const FIXED_INTERVAL: u64 = 60_000;
23pub const CONNECTION_TIMEOUT_START: u64 = 120;
24
25pub const NODES_ALL: &str = "NODES_ALL";
26
27#[derive(Debug)]
28pub enum NodeManagementTask {
29 MaintainNodes {
30 args: MaintainNodesArgs,
31 },
32 ResetNodes {
33 start_nodes_after_reset: bool,
34 action_sender: UnboundedSender<Action>,
35 },
36 StopNodes {
37 services: Vec<String>,
38 action_sender: UnboundedSender<Action>,
39 },
40 UpgradeNodes {
41 args: UpgradeNodesArgs,
42 },
43 AddNode {
44 args: MaintainNodesArgs,
45 },
46 RemoveNodes {
47 services: Vec<String>,
48 action_sender: UnboundedSender<Action>,
49 },
50 StartNode {
51 services: Vec<String>,
52 action_sender: UnboundedSender<Action>,
53 },
54}
55
56#[derive(Clone)]
57pub struct NodeManagement {
58 task_sender: mpsc::UnboundedSender<NodeManagementTask>,
59}
60
61impl NodeManagement {
62 pub fn new() -> Result<Self> {
63 let (send, mut recv) = mpsc::unbounded_channel();
64
65 let rt = Builder::new_current_thread().enable_all().build()?;
66
67 std::thread::spawn(move || {
68 let local = LocalSet::new();
69
70 local.spawn_local(async move {
71 while let Some(new_task) = recv.recv().await {
72 match new_task {
73 NodeManagementTask::MaintainNodes { args } => {
74 maintain_n_running_nodes(args).await;
75 }
76 NodeManagementTask::ResetNodes {
77 start_nodes_after_reset,
78 action_sender,
79 } => {
80 reset_nodes(action_sender, start_nodes_after_reset).await;
81 }
82 NodeManagementTask::StopNodes {
83 services,
84 action_sender,
85 } => {
86 stop_nodes(services, action_sender).await;
87 }
88 NodeManagementTask::UpgradeNodes { args } => upgrade_nodes(args).await,
89 NodeManagementTask::RemoveNodes {
90 services,
91 action_sender,
92 } => remove_nodes(services, action_sender).await,
93 NodeManagementTask::StartNode {
94 services,
95 action_sender,
96 } => start_nodes(services, action_sender).await,
97 NodeManagementTask::AddNode { args } => add_node(args).await,
98 }
99 }
100 });
103
104 rt.block_on(local);
107 });
108
109 Ok(Self { task_sender: send })
110 }
111
112 pub fn send_task(&self, task: NodeManagementTask) -> Result<()> {
119 self.task_sender
120 .send(task)
121 .inspect_err(|err| error!("The node management local set is down {err:?}"))
122 .map_err(|_| eyre!("Failed to send task to the node management local set"))?;
123 Ok(())
124 }
125}
126
127async fn stop_nodes(services: Vec<String>, action_sender: UnboundedSender<Action>) {
129 if let Err(err) =
130 ant_node_manager::cmd::node::stop(None, vec![], services.clone(), VerbosityLevel::Minimal)
131 .await
132 {
133 error!("Error while stopping services {err:?}");
134 send_action(
135 action_sender,
136 Action::StatusActions(StatusActions::ErrorStoppingNodes {
137 services,
138 raw_error: err.to_string(),
139 }),
140 );
141 } else {
142 info!("Successfully stopped services");
143 for service in services {
144 send_action(
145 action_sender.clone(),
146 Action::StatusActions(StatusActions::StopNodesCompleted {
147 service_name: service,
148 }),
149 );
150 }
151 }
152}
153
154#[derive(Debug)]
155pub struct MaintainNodesArgs {
156 pub action_sender: UnboundedSender<Action>,
157 pub antnode_path: Option<PathBuf>,
158 pub connection_mode: ConnectionMode,
159 pub count: u16,
160 pub data_dir_path: Option<PathBuf>,
161 pub network_id: Option<u8>,
162 pub owner: String,
163 pub init_peers_config: InitialPeersConfig,
164 pub port_range: Option<PortRange>,
165 pub rewards_address: String,
166 pub run_nat_detection: bool,
167}
168
169async fn maintain_n_running_nodes(args: MaintainNodesArgs) {
171 debug!("Maintaining {} nodes", args.count);
172 if args.run_nat_detection {
173 run_nat_detection(&args.action_sender).await;
174 }
175
176 let config = prepare_node_config(&args);
177 debug_log_config(&config, &args);
178
179 let node_registry = match load_node_registry(&args.action_sender).await {
180 Ok(registry) => registry,
181 Err(err) => {
182 error!("Failed to load node registry: {:?}", err);
183 return;
184 }
185 };
186 let mut used_ports = get_used_ports(&node_registry);
187 let (mut current_port, max_port) = get_port_range(&config.custom_ports);
188
189 let nodes_to_add = args.count as i32 - node_registry.nodes.len() as i32;
190
191 if nodes_to_add <= 0 {
192 debug!("Scaling down nodes to {}", nodes_to_add);
193 scale_down_nodes(&config, args.count).await;
194 } else {
195 debug!("Scaling up nodes to {}", nodes_to_add);
196 add_nodes(
197 &args.action_sender,
198 &config,
199 nodes_to_add,
200 &mut used_ports,
201 &mut current_port,
202 max_port,
203 )
204 .await;
205 }
206
207 debug!("Finished maintaining {} nodes", args.count);
208 send_action(
209 args.action_sender,
210 Action::StatusActions(StatusActions::StartNodesCompleted {
211 service_name: NODES_ALL.to_string(),
212 }),
213 );
214}
215
216async fn reset_nodes(action_sender: UnboundedSender<Action>, start_nodes_after_reset: bool) {
218 if let Err(err) = ant_node_manager::cmd::node::reset(true, VerbosityLevel::Minimal).await {
219 error!("Error while resetting services {err:?}");
220 send_action(
221 action_sender,
222 Action::StatusActions(StatusActions::ErrorResettingNodes {
223 raw_error: err.to_string(),
224 }),
225 );
226 } else {
227 info!("Successfully reset services");
228 send_action(
229 action_sender,
230 Action::StatusActions(StatusActions::ResetNodesCompleted {
231 trigger_start_node: start_nodes_after_reset,
232 }),
233 );
234 }
235}
236
237#[derive(Debug)]
238pub struct UpgradeNodesArgs {
239 pub action_sender: UnboundedSender<Action>,
240 pub connection_timeout_s: u64,
241 pub do_not_start: bool,
242 pub custom_bin_path: Option<PathBuf>,
243 pub force: bool,
244 pub fixed_interval: Option<u64>,
245 pub peer_ids: Vec<String>,
246 pub provided_env_variables: Option<Vec<(String, String)>>,
247 pub service_names: Vec<String>,
248 pub url: Option<String>,
249 pub version: Option<String>,
250}
251
252async fn upgrade_nodes(args: UpgradeNodesArgs) {
253 if let Err(err) = ant_node_manager::cmd::node::stop(
255 None,
256 vec![],
257 args.service_names.clone(),
258 VerbosityLevel::Minimal,
259 )
260 .await
261 {
262 error!("Error while stopping services {err:?}");
263 send_action(
264 args.action_sender.clone(),
265 Action::StatusActions(StatusActions::ErrorUpdatingNodes {
266 raw_error: err.to_string(),
267 }),
268 );
269 }
270
271 if let Err(err) = ant_node_manager::cmd::node::upgrade(
272 0, args.do_not_start,
274 args.custom_bin_path,
275 args.force,
276 Some(FIXED_INTERVAL),
277 args.peer_ids,
278 args.provided_env_variables,
279 args.service_names,
280 args.url,
281 args.version,
282 VerbosityLevel::Minimal,
283 )
284 .await
285 {
286 error!("Error while updating services {err:?}");
287 send_action(
288 args.action_sender,
289 Action::StatusActions(StatusActions::ErrorUpdatingNodes {
290 raw_error: err.to_string(),
291 }),
292 );
293 } else {
294 info!("Successfully updated services");
295 send_action(
296 args.action_sender,
297 Action::StatusActions(StatusActions::UpdateNodesCompleted),
298 );
299 }
300}
301
302async fn remove_nodes(services: Vec<String>, action_sender: UnboundedSender<Action>) {
303 if let Err(err) =
305 ant_node_manager::cmd::node::stop(None, vec![], services.clone(), VerbosityLevel::Minimal)
306 .await
307 {
308 error!("Error while stopping services {err:?}");
309 send_action(
310 action_sender.clone(),
311 Action::StatusActions(StatusActions::ErrorRemovingNodes {
312 services: services.clone(),
313 raw_error: err.to_string(),
314 }),
315 );
316 }
317
318 if let Err(err) = ant_node_manager::cmd::node::remove(
319 false,
320 vec![],
321 services.clone(),
322 VerbosityLevel::Minimal,
323 )
324 .await
325 {
326 error!("Error while removing services {err:?}");
327 send_action(
328 action_sender,
329 Action::StatusActions(StatusActions::ErrorRemovingNodes {
330 services,
331 raw_error: err.to_string(),
332 }),
333 );
334 } else {
335 info!("Successfully removed services {:?}", services);
336 for service in services {
337 send_action(
338 action_sender.clone(),
339 Action::StatusActions(StatusActions::RemoveNodesCompleted {
340 service_name: service,
341 }),
342 );
343 }
344 }
345}
346
347async fn add_node(args: MaintainNodesArgs) {
348 debug!("Adding node");
349
350 if args.run_nat_detection {
351 run_nat_detection(&args.action_sender).await;
352 }
353
354 let config = prepare_node_config(&args);
355
356 let node_registry = match load_node_registry(&args.action_sender).await {
357 Ok(registry) => registry,
358 Err(err) => {
359 error!("Failed to load node registry: {:?}", err);
360 return;
361 }
362 };
363 let used_ports = get_used_ports(&node_registry);
364 let (mut current_port, max_port) = get_port_range(&config.custom_ports);
365
366 while used_ports.contains(¤t_port) && current_port <= max_port {
367 current_port += 1;
368 }
369
370 if current_port > max_port {
371 error!("Reached maximum port number. Unable to find an available port.");
372 send_action(
373 args.action_sender.clone(),
374 Action::StatusActions(StatusActions::ErrorAddingNodes {
375 raw_error: format!(
376 "When adding a new node we reached maximum port number ({}).\nUnable to find an available port.",
377 max_port
378 ),
379 }),
380 );
381 }
382
383 let port_range = Some(PortRange::Single(current_port));
384 match ant_node_manager::cmd::node::add(
385 false, false, config.auto_set_nat_flags,
388 Some(config.count),
389 config.data_dir_path,
390 true, None, None, None, None, None, None, None, None, None, port_range, config.init_peers_config.clone(),
402 config.relay, RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
404 None, None, config.antnode_path.clone(), !config.upnp,
408 None, None, None, VerbosityLevel::Minimal,
412 )
413 .await
414 {
415 Err(err) => {
416 error!("Error while adding services {err:?}");
417 send_action(
418 args.action_sender,
419 Action::StatusActions(StatusActions::ErrorAddingNodes {
420 raw_error: err.to_string(),
421 }),
422 );
423 }
424 Ok(services) => {
425 info!("Successfully added services: {:?}", services);
426 for service in services {
427 send_action(
428 args.action_sender.clone(),
429 Action::StatusActions(StatusActions::AddNodesCompleted {
430 service_name: service,
431 }),
432 );
433 }
434 }
435 }
436}
437
438async fn start_nodes(services: Vec<String>, action_sender: UnboundedSender<Action>) {
439 debug!("Starting node {:?}", services);
440 if let Err(err) = ant_node_manager::cmd::node::start(
441 CONNECTION_TIMEOUT_START,
442 None,
443 vec![],
444 services.clone(),
445 VerbosityLevel::Minimal,
446 )
447 .await
448 {
449 error!("Error while starting services {err:?}");
450 send_action(
451 action_sender,
452 Action::StatusActions(StatusActions::ErrorStartingNodes {
453 services,
454 raw_error: err.to_string(),
455 }),
456 );
457 } else {
458 info!("Successfully started services {:?}", services);
459 for service in services {
460 send_action(
461 action_sender.clone(),
462 Action::StatusActions(StatusActions::StartNodesCompleted {
463 service_name: service,
464 }),
465 );
466 }
467 }
468}
469
470fn send_action(action_sender: UnboundedSender<Action>, action: Action) {
473 if let Err(err) = action_sender.send(action) {
474 error!("Error while sending action: {err:?}");
475 }
476}
477
478async fn load_node_registry(
480 action_sender: &UnboundedSender<Action>,
481) -> Result<NodeRegistry, Error> {
482 match get_node_registry_path() {
483 Ok(path) => match NodeRegistry::load(&path) {
484 Ok(registry) => Ok(registry),
485 Err(err) => {
486 error!("Failed to load NodeRegistry: {}", err);
487 if let Err(send_err) = action_sender.send(Action::StatusActions(
488 StatusActions::ErrorLoadingNodeRegistry {
489 raw_error: err.to_string(),
490 },
491 )) {
492 error!("Error while sending action: {}", send_err);
493 }
494 Err(eyre!("Failed to load NodeRegistry"))
495 }
496 },
497 Err(err) => {
498 error!("Failed to get node registry path: {}", err);
499 if let Err(send_err) = action_sender.send(Action::StatusActions(
500 StatusActions::ErrorGettingNodeRegistryPath {
501 raw_error: err.to_string(),
502 },
503 )) {
504 error!("Error while sending action: {}", send_err);
505 }
506 Err(eyre!("Failed to get node registry path"))
507 }
508 }
509}
510
511struct NodeConfig {
512 antnode_path: Option<PathBuf>,
513 auto_set_nat_flags: bool,
514 count: u16,
515 custom_ports: Option<PortRange>,
516 data_dir_path: Option<PathBuf>,
517 relay: bool,
518 network_id: Option<u8>,
519 owner: Option<String>,
520 init_peers_config: InitialPeersConfig,
521 rewards_address: String,
522 upnp: bool,
523}
524
525async fn run_nat_detection(action_sender: &UnboundedSender<Action>) {
527 info!("Running nat detection....");
528
529 let release_repo = <dyn AntReleaseRepoActions>::default_config();
530 let version = match release_repo
531 .get_latest_version(&ReleaseType::NatDetection)
532 .await
533 {
534 Ok(v) => {
535 info!("Using NAT detection version {}", v.to_string());
536 v.to_string()
537 }
538 Err(err) => {
539 info!("No NAT detection release found, using fallback version 0.1.0");
540 info!("Error: {err}");
541 "0.1.0".to_string()
542 }
543 };
544
545 if let Err(err) = ant_node_manager::cmd::nat_detection::run_nat_detection(
546 None,
547 true,
548 None,
549 None,
550 Some(version),
551 VerbosityLevel::Minimal,
552 )
553 .await
554 {
555 error!("Error while running nat detection {err:?}. Registering the error.");
556 if let Err(err) = action_sender.send(Action::StatusActions(
557 StatusActions::ErrorWhileRunningNatDetection,
558 )) {
559 error!("Error while sending action: {err:?}");
560 }
561 } else {
562 info!("Successfully ran nat detection.");
563 }
564}
565
566fn prepare_node_config(args: &MaintainNodesArgs) -> NodeConfig {
567 NodeConfig {
568 antnode_path: args.antnode_path.clone(),
569 auto_set_nat_flags: args.connection_mode == ConnectionMode::Automatic,
570 data_dir_path: args.data_dir_path.clone(),
571 count: args.count,
572 custom_ports: if args.connection_mode == ConnectionMode::CustomPorts {
573 args.port_range.clone()
574 } else {
575 None
576 },
577 owner: if args.owner.is_empty() {
578 None
579 } else {
580 Some(args.owner.clone())
581 },
582 relay: args.connection_mode == ConnectionMode::HomeNetwork,
583 network_id: args.network_id,
584 init_peers_config: args.init_peers_config.clone(),
585 rewards_address: args.rewards_address.clone(),
586 upnp: args.connection_mode == ConnectionMode::UPnP,
587 }
588}
589
590fn debug_log_config(config: &NodeConfig, args: &MaintainNodesArgs) {
592 debug!("************ STARTING NODE MAINTENANCE ************");
593 debug!(
594 "Maintaining {} running nodes with the following args:",
595 config.count
596 );
597 debug!(
598 " owner: {:?}, init_peers_config: {:?}, antnode_path: {:?}, network_id: {:?}",
599 config.owner, config.init_peers_config, config.antnode_path, args.network_id
600 );
601 debug!(
602 " data_dir_path: {:?}, connection_mode: {:?}",
603 config.data_dir_path, args.connection_mode
604 );
605 debug!(
606 " auto_set_nat_flags: {:?}, custom_ports: {:?}, upnp: {}, relay: {}",
607 config.auto_set_nat_flags, config.custom_ports, config.upnp, config.relay
608 );
609}
610
611fn get_used_ports(node_registry: &NodeRegistry) -> Vec<u16> {
613 let used_ports: Vec<u16> = node_registry
614 .nodes
615 .iter()
616 .filter_map(|node| node.node_port)
617 .collect();
618 debug!("Currently used ports: {:?}", used_ports);
619 used_ports
620}
621
622fn get_port_range(custom_ports: &Option<PortRange>) -> (u16, u16) {
624 match custom_ports {
625 Some(PortRange::Single(port)) => (*port, *port),
626 Some(PortRange::Range(start, end)) => (*start, *end),
627 None => (PORT_MIN as u16, PORT_MAX as u16),
628 }
629}
630
631async fn scale_down_nodes(config: &NodeConfig, count: u16) {
633 match ant_node_manager::cmd::node::maintain_n_running_nodes(
634 false,
635 false,
636 config.auto_set_nat_flags,
637 CONNECTION_TIMEOUT_START,
638 count,
639 config.data_dir_path.clone(),
640 true,
641 None,
642 Some(EvmNetwork::default()),
643 None,
644 None,
645 None,
646 None,
647 None,
648 config.network_id,
649 None,
650 None, config.init_peers_config.clone(),
652 config.relay,
653 RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
654 None,
655 None,
656 config.antnode_path.clone(),
657 None,
658 !config.upnp,
659 None,
660 None,
661 VerbosityLevel::Minimal,
662 None,
663 )
664 .await
665 {
666 Ok(_) => {
667 info!("Scaling down to {} nodes", count);
668 }
669 Err(err) => {
670 error!("Error while scaling down to {} nodes: {err:?}", count);
671 }
672 }
673}
674
675async fn add_nodes(
677 action_sender: &UnboundedSender<Action>,
678 config: &NodeConfig,
679 mut nodes_to_add: i32,
680 used_ports: &mut Vec<u16>,
681 current_port: &mut u16,
682 max_port: u16,
683) {
684 let mut retry_count = 0;
685
686 while nodes_to_add > 0 && retry_count < NODE_ADD_MAX_RETRIES {
687 while used_ports.contains(current_port) && *current_port <= max_port {
689 *current_port += 1;
690 }
691
692 if *current_port > max_port {
693 error!("Reached maximum port number. Unable to find an available port.");
694 send_action(
695 action_sender.clone(),
696 Action::StatusActions(StatusActions::ErrorScalingUpNodes {
697 raw_error: format!(
698 "Reached maximum port number ({}).\nUnable to find an available port.",
699 max_port
700 ),
701 }),
702 );
703 break;
704 }
705
706 let port_range = Some(PortRange::Single(*current_port));
707 match ant_node_manager::cmd::node::maintain_n_running_nodes(
708 false,
709 false,
710 config.auto_set_nat_flags,
711 CONNECTION_TIMEOUT_START,
712 config.count,
713 config.data_dir_path.clone(),
714 true,
715 None,
716 Some(EvmNetwork::default()),
717 None,
718 None,
719 None,
720 None,
721 None,
722 config.network_id,
723 None,
724 port_range,
725 config.init_peers_config.clone(),
726 config.relay,
727 RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
728 None,
729 None,
730 config.antnode_path.clone(),
731 None,
732 !config.upnp,
733 None,
734 None,
735 VerbosityLevel::Minimal,
736 None,
737 )
738 .await
739 {
740 Ok(_) => {
741 info!("Successfully added a node on port {}", current_port);
742 used_ports.push(*current_port);
743 nodes_to_add -= 1;
744 *current_port += 1;
745 retry_count = 0; }
747 Err(err) => {
748 if err.to_string().contains("is being used by another service") {
750 warn!(
751 "Port {} is being used, retrying with a different port. Attempt {}/{}",
752 current_port,
753 retry_count + 1,
754 NODE_ADD_MAX_RETRIES
755 );
756 } else if err
757 .to_string()
758 .contains("Failed to add one or more services")
759 && retry_count >= NODE_ADD_MAX_RETRIES
760 {
761 send_action(
762 action_sender.clone(),
763 Action::StatusActions(StatusActions::ErrorScalingUpNodes {
764 raw_error: "When trying to add a node, we failed.\n\
765 Maybe you ran out of disk space?\n\
766 Maybe you need to change the port range?"
767 .to_string(),
768 }),
769 );
770 } else if err
771 .to_string()
772 .contains("contains a virus or potentially unwanted software")
773 && retry_count >= NODE_ADD_MAX_RETRIES
774 {
775 send_action(
776 action_sender.clone(),
777 Action::StatusActions(StatusActions::ErrorScalingUpNodes {
778 raw_error: "When trying to add a node, we failed.\n\
779 You may be running an old version of antnode service?\n\
780 Did you whitelisted antnode and the launchpad?"
781 .to_string(),
782 }),
783 );
784 } else {
785 error!("Range of ports to be used {:?}", *current_port..max_port);
786 error!("Error while adding node on port {}: {err:?}", current_port);
787 }
788 *current_port += 1;
790 retry_count += 1;
791 }
792 }
793 }
794 if retry_count >= NODE_ADD_MAX_RETRIES {
795 send_action(
796 action_sender.clone(),
797 Action::StatusActions(StatusActions::ErrorScalingUpNodes {
798 raw_error: format!(
799 "When trying to start a node, we reached the maximum amount of retries ({}).\n\
800 Could this be a firewall blocking nodes starting or ports on your router already in use?",
801 NODE_ADD_MAX_RETRIES
802 ),
803 }),
804 );
805 }
806}