1use crate::action::{Action, StatusActions};
10use crate::connection_mode::ConnectionMode;
11use ant_bootstrap::InitialPeersConfig;
12use ant_evm::{EvmNetwork, RewardsAddress};
13use ant_node_manager::{VerbosityLevel, add_services::config::PortRange};
14use ant_releases::{self, AntReleaseRepoActions, ReleaseType};
15use ant_service_management::NodeRegistryManager;
16use color_eyre::Result;
17use color_eyre::eyre::eyre;
18use std::{path::PathBuf, str::FromStr};
19use tokio::runtime::Builder;
20use tokio::sync::mpsc::{self, UnboundedSender};
21use tokio::task::LocalSet;
22
23pub const PORT_MAX: u32 = 65535;
24pub const PORT_MIN: u32 = 1024;
25
26const NODE_ADD_MAX_RETRIES: u32 = 5;
27
28pub const FIXED_INTERVAL: u64 = 60_000;
29pub const CONNECTION_TIMEOUT_START: u64 = 120;
30
31pub const NODES_ALL: &str = "NODES_ALL";
32
33#[derive(Debug)]
34pub enum NodeManagementTask {
35 MaintainNodes {
36 args: MaintainNodesArgs,
37 },
38 ResetNodes {
39 start_nodes_after_reset: bool,
40 action_sender: UnboundedSender<Action>,
41 },
42 StopNodes {
43 services: Vec<String>,
44 action_sender: UnboundedSender<Action>,
45 },
46 UpgradeNodes {
47 args: UpgradeNodesArgs,
48 },
49 AddNode {
50 args: MaintainNodesArgs,
51 },
52 RemoveNodes {
53 services: Vec<String>,
54 action_sender: UnboundedSender<Action>,
55 },
56 StartNode {
57 services: Vec<String>,
58 action_sender: UnboundedSender<Action>,
59 },
60}
61
62#[derive(Clone)]
63pub struct NodeManagement {
64 task_sender: mpsc::UnboundedSender<NodeManagementTask>,
65}
66
67impl NodeManagement {
68 pub fn new(node_registry: NodeRegistryManager) -> Result<Self> {
69 let (send, mut recv) = mpsc::unbounded_channel();
70
71 let rt = Builder::new_current_thread().enable_all().build()?;
72
73 std::thread::spawn(move || {
74 let local = LocalSet::new();
75
76 local.spawn_local(async move {
77 while let Some(new_task) = recv.recv().await {
78 match new_task {
79 NodeManagementTask::MaintainNodes { args } => {
80 maintain_n_running_nodes(args, node_registry.clone()).await;
81 }
82 NodeManagementTask::ResetNodes {
83 start_nodes_after_reset,
84 action_sender,
85 } => {
86 reset_nodes(
87 action_sender,
88 node_registry.clone(),
89 start_nodes_after_reset,
90 )
91 .await;
92 }
93 NodeManagementTask::StopNodes {
94 services,
95 action_sender,
96 } => {
97 stop_nodes(services, action_sender, node_registry.clone()).await;
98 }
99 NodeManagementTask::UpgradeNodes { args } => {
100 upgrade_nodes(args, node_registry.clone()).await
101 }
102 NodeManagementTask::RemoveNodes {
103 services,
104 action_sender,
105 } => remove_nodes(services, action_sender, node_registry.clone()).await,
106 NodeManagementTask::StartNode {
107 services,
108 action_sender,
109 } => start_nodes(services, action_sender, node_registry.clone()).await,
110 NodeManagementTask::AddNode { args } => {
111 add_node(args, node_registry.clone()).await
112 }
113 }
114 }
115 });
118
119 rt.block_on(local);
122 });
123
124 Ok(Self { task_sender: send })
125 }
126
127 pub fn send_task(&self, task: NodeManagementTask) -> Result<()> {
134 self.task_sender
135 .send(task)
136 .inspect_err(|err| error!("The node management local set is down {err:?}"))
137 .map_err(|_| eyre!("Failed to send task to the node management local set"))?;
138 Ok(())
139 }
140}
141
142async fn stop_nodes(
144 services: Vec<String>,
145 action_sender: UnboundedSender<Action>,
146 node_registry: NodeRegistryManager,
147) {
148 if let Err(err) = ant_node_manager::cmd::node::stop(
149 None,
150 node_registry.clone(),
151 vec![],
152 services.clone(),
153 VerbosityLevel::Minimal,
154 )
155 .await
156 {
157 error!("Error while stopping services {err:?}");
158 send_action(
159 action_sender,
160 Action::StatusActions(StatusActions::ErrorStoppingNodes {
161 services,
162 raw_error: err.to_string(),
163 }),
164 );
165 } else {
166 info!("Successfully stopped services");
167 for service in services {
168 send_action(
169 action_sender.clone(),
170 Action::StatusActions(StatusActions::StopNodesCompleted {
171 service_name: service,
172 all_nodes_data: node_registry.get_node_service_data().await,
173 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
174 }),
175 );
176 }
177 }
178}
179
180#[derive(Debug)]
181pub struct MaintainNodesArgs {
182 pub action_sender: UnboundedSender<Action>,
183 pub antnode_path: Option<PathBuf>,
184 pub connection_mode: ConnectionMode,
185 pub count: u16,
186 pub data_dir_path: Option<PathBuf>,
187 pub network_id: Option<u8>,
188 pub owner: String,
189 pub init_peers_config: InitialPeersConfig,
190 pub port_range: Option<PortRange>,
191 pub rewards_address: String,
192 pub run_nat_detection: bool,
193}
194
195async fn maintain_n_running_nodes(args: MaintainNodesArgs, node_registry: NodeRegistryManager) {
197 debug!("Maintaining {} nodes", args.count);
198 if args.run_nat_detection {
199 run_nat_detection(&args.action_sender).await;
200 }
201
202 let config = prepare_node_config(&args);
203 debug_log_config(&config, &args);
204
205 let mut used_ports = get_used_ports(&node_registry).await;
206 let (mut current_port, max_port) = get_port_range(&config.custom_ports);
207
208 let nodes_to_add = args.count as i32 - node_registry.nodes.read().await.len() as i32;
209
210 if nodes_to_add <= 0 {
211 debug!("Scaling down nodes to {}", nodes_to_add);
212 scale_down_nodes(&config, args.count, node_registry.clone()).await;
213 } else {
214 debug!("Scaling up nodes to {}", nodes_to_add);
215 add_nodes(
216 &args.action_sender,
217 &config,
218 nodes_to_add,
219 &mut used_ports,
220 &mut current_port,
221 max_port,
222 node_registry.clone(),
223 )
224 .await;
225 }
226
227 debug!("Finished maintaining {} nodes", args.count);
228 send_action(
229 args.action_sender,
230 Action::StatusActions(StatusActions::StartNodesCompleted {
231 service_name: NODES_ALL.to_string(),
232 all_nodes_data: node_registry.get_node_service_data().await,
233 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
234 }),
235 );
236}
237
238async fn reset_nodes(
240 action_sender: UnboundedSender<Action>,
241 node_registry: NodeRegistryManager,
242 start_nodes_after_reset: bool,
243) {
244 if let Err(err) =
245 ant_node_manager::cmd::node::reset(true, node_registry.clone(), VerbosityLevel::Minimal)
246 .await
247 {
248 error!("Error while resetting services {err:?}");
249 send_action(
250 action_sender,
251 Action::StatusActions(StatusActions::ErrorResettingNodes {
252 raw_error: err.to_string(),
253 }),
254 );
255 } else {
256 info!("Successfully reset services");
257 send_action(
258 action_sender,
259 Action::StatusActions(StatusActions::ResetNodesCompleted {
260 trigger_start_node: start_nodes_after_reset,
261 all_nodes_data: node_registry.get_node_service_data().await,
262 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
263 }),
264 );
265 }
266}
267
268#[derive(Debug)]
269pub struct UpgradeNodesArgs {
270 pub action_sender: UnboundedSender<Action>,
271 pub connection_timeout_s: u64,
272 pub do_not_start: bool,
273 pub custom_bin_path: Option<PathBuf>,
274 pub force: bool,
275 pub fixed_interval: Option<u64>,
276 pub peer_ids: Vec<String>,
277 pub provided_env_variables: Option<Vec<(String, String)>>,
278 pub service_names: Vec<String>,
279 pub url: Option<String>,
280 pub version: Option<String>,
281}
282
283async fn upgrade_nodes(args: UpgradeNodesArgs, node_registry: NodeRegistryManager) {
284 if let Err(err) = ant_node_manager::cmd::node::stop(
286 None,
287 node_registry.clone(),
288 vec![],
289 args.service_names.clone(),
290 VerbosityLevel::Minimal,
291 )
292 .await
293 {
294 error!("Error while stopping services {err:?}");
295 send_action(
296 args.action_sender.clone(),
297 Action::StatusActions(StatusActions::ErrorUpdatingNodes {
298 raw_error: err.to_string(),
299 }),
300 );
301 }
302
303 if let Err(err) = ant_node_manager::cmd::node::upgrade(
304 0, args.do_not_start,
306 args.custom_bin_path,
307 args.force,
308 Some(FIXED_INTERVAL),
309 node_registry.clone(),
310 args.peer_ids,
311 args.provided_env_variables,
312 args.service_names,
313 args.url,
314 args.version,
315 VerbosityLevel::Minimal,
316 )
317 .await
318 {
319 error!("Error while updating services {err:?}");
320 send_action(
321 args.action_sender,
322 Action::StatusActions(StatusActions::ErrorUpdatingNodes {
323 raw_error: err.to_string(),
324 }),
325 );
326 } else {
327 info!("Successfully updated services");
328 send_action(
329 args.action_sender,
330 Action::StatusActions(StatusActions::UpdateNodesCompleted {
331 all_nodes_data: node_registry.get_node_service_data().await,
332 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
333 }),
334 );
335 }
336}
337
338async fn remove_nodes(
339 services: Vec<String>,
340 action_sender: UnboundedSender<Action>,
341 node_registry: NodeRegistryManager,
342) {
343 if let Err(err) = ant_node_manager::cmd::node::stop(
345 None,
346 node_registry.clone(),
347 vec![],
348 services.clone(),
349 VerbosityLevel::Minimal,
350 )
351 .await
352 {
353 error!("Error while stopping services {err:?}");
354 send_action(
355 action_sender.clone(),
356 Action::StatusActions(StatusActions::ErrorRemovingNodes {
357 services: services.clone(),
358 raw_error: err.to_string(),
359 }),
360 );
361 }
362
363 if let Err(err) = ant_node_manager::cmd::node::remove(
364 false,
365 vec![],
366 node_registry.clone(),
367 services.clone(),
368 VerbosityLevel::Minimal,
369 )
370 .await
371 {
372 error!("Error while removing services {err:?}");
373 send_action(
374 action_sender,
375 Action::StatusActions(StatusActions::ErrorRemovingNodes {
376 services,
377 raw_error: err.to_string(),
378 }),
379 );
380 } else {
381 info!("Successfully removed services {:?}", services);
382 for service in services {
383 send_action(
384 action_sender.clone(),
385 Action::StatusActions(StatusActions::RemoveNodesCompleted {
386 service_name: service,
387 all_nodes_data: node_registry.get_node_service_data().await,
388 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
389 }),
390 );
391 }
392 }
393}
394
395async fn add_node(args: MaintainNodesArgs, node_registry: NodeRegistryManager) {
396 debug!("Adding node");
397
398 if args.run_nat_detection {
399 run_nat_detection(&args.action_sender).await;
400 }
401
402 let config = prepare_node_config(&args);
403
404 let used_ports = get_used_ports(&node_registry).await;
405 let (mut current_port, max_port) = get_port_range(&config.custom_ports);
406
407 while used_ports.contains(¤t_port) && current_port <= max_port {
408 current_port += 1;
409 }
410
411 if current_port > max_port {
412 error!("Reached maximum port number. Unable to find an available port.");
413 send_action(
414 args.action_sender.clone(),
415 Action::StatusActions(StatusActions::ErrorAddingNodes {
416 raw_error: format!(
417 "When adding a new node we reached maximum port number ({max_port}).\nUnable to find an available port."
418 ),
419 }),
420 );
421 }
422
423 let port_range = Some(PortRange::Single(current_port));
424 match ant_node_manager::cmd::node::add(
425 false, false, config.auto_set_nat_flags,
428 Some(config.count),
429 config.data_dir_path,
430 true, None, None, None, None, None, None, None, None, None, port_range, node_registry.clone(),
442 config.init_peers_config.clone(),
443 config.relay, RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
445 None, None, config.antnode_path.clone(), !config.upnp,
449 None, None, None, VerbosityLevel::Minimal,
453 false, )
455 .await
456 {
457 Err(err) => {
458 error!("Error while adding services {err:?}");
459 send_action(
460 args.action_sender,
461 Action::StatusActions(StatusActions::ErrorAddingNodes {
462 raw_error: err.to_string(),
463 }),
464 );
465 }
466 Ok(services) => {
467 info!("Successfully added services: {:?}", services);
468 for service in services {
469 send_action(
470 args.action_sender.clone(),
471 Action::StatusActions(StatusActions::AddNodesCompleted {
472 service_name: service,
473 all_nodes_data: node_registry.get_node_service_data().await,
474 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
475 }),
476 );
477 }
478 }
479 }
480}
481
482async fn start_nodes(
483 services: Vec<String>,
484 action_sender: UnboundedSender<Action>,
485 node_registry: NodeRegistryManager,
486) {
487 debug!("Starting node {:?}", services);
488 if let Err(err) = ant_node_manager::cmd::node::start(
489 CONNECTION_TIMEOUT_START,
490 None,
491 node_registry.clone(),
492 vec![],
493 services.clone(),
494 VerbosityLevel::Minimal,
495 )
496 .await
497 {
498 error!("Error while starting services {err:?}");
499 send_action(
500 action_sender,
501 Action::StatusActions(StatusActions::ErrorStartingNodes {
502 services,
503 raw_error: err.to_string(),
504 }),
505 );
506 } else {
507 info!("Successfully started services {:?}", services);
508 for service in services {
509 send_action(
510 action_sender.clone(),
511 Action::StatusActions(StatusActions::StartNodesCompleted {
512 service_name: service,
513 all_nodes_data: node_registry.get_node_service_data().await,
514 is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
515 }),
516 );
517 }
518 }
519}
520
521fn send_action(action_sender: UnboundedSender<Action>, action: Action) {
524 if let Err(err) = action_sender.send(action) {
525 error!("Error while sending action: {err:?}");
526 }
527}
528
529struct NodeConfig {
530 antnode_path: Option<PathBuf>,
531 auto_set_nat_flags: bool,
532 count: u16,
533 custom_ports: Option<PortRange>,
534 data_dir_path: Option<PathBuf>,
535 relay: bool,
536 network_id: Option<u8>,
537 owner: Option<String>,
538 init_peers_config: InitialPeersConfig,
539 rewards_address: String,
540 upnp: bool,
541}
542
543async fn run_nat_detection(action_sender: &UnboundedSender<Action>) {
545 info!("Running nat detection....");
546
547 if let Err(err) = action_sender.send(Action::StatusActions(StatusActions::NatDetectionStarted))
549 {
550 error!("Error while sending action: {err:?}");
551 }
552
553 let release_repo = <dyn AntReleaseRepoActions>::default_config();
554 let version = match release_repo
555 .get_latest_version(&ReleaseType::NatDetection)
556 .await
557 {
558 Ok(v) => {
559 info!("Using NAT detection version {}", v.to_string());
560 v.to_string()
561 }
562 Err(err) => {
563 info!("No NAT detection release found, using fallback version 0.1.0");
564 info!("Error: {err}");
565 "0.1.0".to_string()
566 }
567 };
568
569 if let Err(err) = ant_node_manager::cmd::nat_detection::run_nat_detection(
570 None,
571 true,
572 None,
573 None,
574 Some(version),
575 VerbosityLevel::Minimal,
576 )
577 .await
578 {
579 error!("Error while running nat detection {err:?}. Registering the error.");
580 if let Err(err) = action_sender.send(Action::StatusActions(
581 StatusActions::ErrorWhileRunningNatDetection,
582 )) {
583 error!("Error while sending action: {err:?}");
584 }
585 } else {
586 info!("Successfully ran nat detection.");
587 if let Err(err) = action_sender.send(Action::StatusActions(
588 StatusActions::SuccessfullyDetectedNatStatus,
589 )) {
590 error!("Error while sending action: {err:?}");
591 }
592 }
593}
594
595fn prepare_node_config(args: &MaintainNodesArgs) -> NodeConfig {
596 NodeConfig {
597 antnode_path: args.antnode_path.clone(),
598 auto_set_nat_flags: args.connection_mode == ConnectionMode::Automatic,
599 data_dir_path: args.data_dir_path.clone(),
600 count: args.count,
601 custom_ports: if args.connection_mode == ConnectionMode::CustomPorts {
602 args.port_range.clone()
603 } else {
604 None
605 },
606 owner: if args.owner.is_empty() {
607 None
608 } else {
609 Some(args.owner.clone())
610 },
611 relay: args.connection_mode == ConnectionMode::HomeNetwork,
612 network_id: args.network_id,
613 init_peers_config: args.init_peers_config.clone(),
614 rewards_address: args.rewards_address.clone(),
615 upnp: args.connection_mode == ConnectionMode::UPnP,
616 }
617}
618
619fn debug_log_config(config: &NodeConfig, args: &MaintainNodesArgs) {
621 debug!("************ STARTING NODE MAINTENANCE ************");
622 debug!(
623 "Maintaining {} running nodes with the following args:",
624 config.count
625 );
626 debug!(
627 " owner: {:?}, init_peers_config: {:?}, antnode_path: {:?}, network_id: {:?}",
628 config.owner, config.init_peers_config, config.antnode_path, args.network_id
629 );
630 debug!(
631 " data_dir_path: {:?}, connection_mode: {:?}",
632 config.data_dir_path, args.connection_mode
633 );
634 debug!(
635 " auto_set_nat_flags: {:?}, custom_ports: {:?}, upnp: {}, relay: {}",
636 config.auto_set_nat_flags, config.custom_ports, config.upnp, config.relay
637 );
638}
639
640async fn get_used_ports(node_registry: &NodeRegistryManager) -> Vec<u16> {
642 let mut used_ports = Vec::new();
643 for node in node_registry.nodes.read().await.iter() {
644 let node = node.read().await;
645 if let Some(port) = node.node_port {
646 used_ports.push(port);
647 }
648 }
649 debug!("Currently used ports: {:?}", used_ports);
650 used_ports
651}
652
653fn get_port_range(custom_ports: &Option<PortRange>) -> (u16, u16) {
655 match custom_ports {
656 Some(PortRange::Single(port)) => (*port, *port),
657 Some(PortRange::Range(start, end)) => (*start, *end),
658 None => (PORT_MIN as u16, PORT_MAX as u16),
659 }
660}
661
662async fn scale_down_nodes(config: &NodeConfig, count: u16, node_registry: NodeRegistryManager) {
664 match ant_node_manager::cmd::node::maintain_n_running_nodes(
665 false,
666 false,
667 config.auto_set_nat_flags,
668 CONNECTION_TIMEOUT_START,
669 count,
670 config.data_dir_path.clone(),
671 true,
672 None,
673 Some(EvmNetwork::default()),
674 None,
675 None,
676 None,
677 None,
678 None,
679 config.network_id,
680 None,
681 None, node_registry,
683 config.init_peers_config.clone(),
684 config.relay,
685 RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
686 None,
687 None,
688 config.antnode_path.clone(),
689 None,
690 !config.upnp,
691 None,
692 None,
693 VerbosityLevel::Minimal,
694 None,
695 false,
696 )
697 .await
698 {
699 Ok(_) => {
700 info!("Scaling down to {} nodes", count);
701 }
702 Err(err) => {
703 error!("Error while scaling down to {} nodes: {err:?}", count);
704 }
705 }
706}
707
708async fn add_nodes(
710 action_sender: &UnboundedSender<Action>,
711 config: &NodeConfig,
712 mut nodes_to_add: i32,
713 used_ports: &mut Vec<u16>,
714 current_port: &mut u16,
715 max_port: u16,
716 node_registry: NodeRegistryManager,
717) {
718 let mut retry_count = 0;
719
720 while nodes_to_add > 0 && retry_count < NODE_ADD_MAX_RETRIES {
721 while used_ports.contains(current_port) && *current_port <= max_port {
723 *current_port += 1;
724 }
725
726 if *current_port > max_port {
727 error!("Reached maximum port number. Unable to find an available port.");
728 send_action(
729 action_sender.clone(),
730 Action::StatusActions(StatusActions::ErrorScalingUpNodes {
731 raw_error: format!(
732 "Reached maximum port number ({max_port}).\nUnable to find an available port."
733 ),
734 }),
735 );
736 break;
737 }
738
739 let port_range = Some(PortRange::Single(*current_port));
740 match ant_node_manager::cmd::node::maintain_n_running_nodes(
741 false,
742 false,
743 config.auto_set_nat_flags,
744 CONNECTION_TIMEOUT_START,
745 config.count,
746 config.data_dir_path.clone(),
747 true,
748 None,
749 Some(EvmNetwork::default()),
750 None,
751 None,
752 None,
753 None,
754 None,
755 config.network_id,
756 None,
757 port_range,
758 node_registry.clone(),
759 config.init_peers_config.clone(),
760 config.relay,
761 RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
762 None,
763 None,
764 config.antnode_path.clone(),
765 None,
766 !config.upnp,
767 None,
768 None,
769 VerbosityLevel::Minimal,
770 None,
771 false,
772 )
773 .await
774 {
775 Ok(_) => {
776 info!("Successfully added a node on port {}", current_port);
777 used_ports.push(*current_port);
778 nodes_to_add -= 1;
779 *current_port += 1;
780 retry_count = 0; }
782 Err(err) => {
783 if err.to_string().contains("is being used by another service") {
785 warn!(
786 "Port {} is being used, retrying with a different port. Attempt {}/{}",
787 current_port,
788 retry_count + 1,
789 NODE_ADD_MAX_RETRIES
790 );
791 } else if err
792 .to_string()
793 .contains("Failed to add one or more services")
794 && retry_count >= NODE_ADD_MAX_RETRIES
795 {
796 send_action(
797 action_sender.clone(),
798 Action::StatusActions(StatusActions::ErrorScalingUpNodes {
799 raw_error: "When trying to add a node, we failed.\n\
800 Maybe you ran out of disk space?\n\
801 Maybe you need to change the port range?"
802 .to_string(),
803 }),
804 );
805 } else if err
806 .to_string()
807 .contains("contains a virus or potentially unwanted software")
808 && retry_count >= NODE_ADD_MAX_RETRIES
809 {
810 send_action(
811 action_sender.clone(),
812 Action::StatusActions(StatusActions::ErrorScalingUpNodes {
813 raw_error: "When trying to add a node, we failed.\n\
814 You may be running an old version of antnode service?\n\
815 Did you whitelisted antnode and the launchpad?"
816 .to_string(),
817 }),
818 );
819 } else {
820 error!("Range of ports to be used {:?}", *current_port..max_port);
821 error!("Error while adding node on port {}: {err:?}", current_port);
822 }
823 *current_port += 1;
825 retry_count += 1;
826 }
827 }
828 }
829 if retry_count >= NODE_ADD_MAX_RETRIES {
830 send_action(
831 action_sender.clone(),
832 Action::StatusActions(StatusActions::ErrorScalingUpNodes {
833 raw_error: format!(
834 "When trying to start a node, we reached the maximum amount of retries ({NODE_ADD_MAX_RETRIES}).\n\
835 Could this be a firewall blocking nodes starting or ports on your router already in use?"
836 ),
837 }),
838 );
839 }
840}