node_launchpad/
node_mgmt.rs

1// Copyright 2025 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::action::{Action, StatusActions};
10use crate::connection_mode::ConnectionMode;
11use ant_bootstrap::InitialPeersConfig;
12use ant_evm::{EvmNetwork, RewardsAddress};
13use ant_node_manager::{VerbosityLevel, add_services::config::PortRange};
14use ant_releases::{self, AntReleaseRepoActions, ReleaseType};
15use ant_service_management::NodeRegistryManager;
16use color_eyre::Result;
17use color_eyre::eyre::eyre;
18use service_manager::RestartPolicy;
19use std::{path::PathBuf, str::FromStr};
20use tokio::runtime::Builder;
21use tokio::sync::mpsc::{self, UnboundedSender};
22use tokio::task::LocalSet;
23
24pub const PORT_MAX: u32 = 65535;
25pub const PORT_MIN: u32 = 1024;
26
27const NODE_ADD_MAX_RETRIES: u32 = 5;
28
29pub const FIXED_INTERVAL: u64 = 60_000;
30pub const CONNECTION_TIMEOUT_START: u64 = 120;
31
32pub const NODES_ALL: &str = "NODES_ALL";
33
34#[derive(Debug)]
35pub enum NodeManagementTask {
36    MaintainNodes {
37        args: MaintainNodesArgs,
38    },
39    ResetNodes {
40        start_nodes_after_reset: bool,
41        action_sender: UnboundedSender<Action>,
42    },
43    StopNodes {
44        services: Vec<String>,
45        action_sender: UnboundedSender<Action>,
46    },
47    UpgradeNodes {
48        args: UpgradeNodesArgs,
49    },
50    AddNode {
51        args: MaintainNodesArgs,
52    },
53    RemoveNodes {
54        services: Vec<String>,
55        action_sender: UnboundedSender<Action>,
56    },
57    StartNode {
58        services: Vec<String>,
59        action_sender: UnboundedSender<Action>,
60    },
61}
62
63#[derive(Clone)]
64pub struct NodeManagement {
65    task_sender: mpsc::UnboundedSender<NodeManagementTask>,
66}
67
68impl NodeManagement {
69    pub fn new(node_registry: NodeRegistryManager) -> Result<Self> {
70        let (send, mut recv) = mpsc::unbounded_channel();
71
72        let rt = Builder::new_current_thread().enable_all().build()?;
73
74        std::thread::spawn(move || {
75            let local = LocalSet::new();
76
77            local.spawn_local(async move {
78                while let Some(new_task) = recv.recv().await {
79                    match new_task {
80                        NodeManagementTask::MaintainNodes { args } => {
81                            maintain_n_running_nodes(args, node_registry.clone()).await;
82                        }
83                        NodeManagementTask::ResetNodes {
84                            start_nodes_after_reset,
85                            action_sender,
86                        } => {
87                            reset_nodes(
88                                action_sender,
89                                node_registry.clone(),
90                                start_nodes_after_reset,
91                            )
92                            .await;
93                        }
94                        NodeManagementTask::StopNodes {
95                            services,
96                            action_sender,
97                        } => {
98                            stop_nodes(services, action_sender, node_registry.clone()).await;
99                        }
100                        NodeManagementTask::UpgradeNodes { args } => {
101                            upgrade_nodes(args, node_registry.clone()).await
102                        }
103                        NodeManagementTask::RemoveNodes {
104                            services,
105                            action_sender,
106                        } => remove_nodes(services, action_sender, node_registry.clone()).await,
107                        NodeManagementTask::StartNode {
108                            services,
109                            action_sender,
110                        } => start_nodes(services, action_sender, node_registry.clone()).await,
111                        NodeManagementTask::AddNode { args } => {
112                            add_node(args, node_registry.clone()).await
113                        }
114                    }
115                }
116                // If the while loop returns, then all the LocalSpawner
117                // objects have been dropped.
118            });
119
120            // This will return once all senders are dropped and all
121            // spawned tasks have returned.
122            rt.block_on(local);
123        });
124
125        Ok(Self { task_sender: send })
126    }
127
128    /// Send a task to the NodeManagement local set
129    /// These tasks will be executed on a different thread to avoid blocking the main thread
130    ///
131    /// The results are returned via the standard `UnboundedSender<Action>` that is passed to each task.
132    ///
133    /// If this function returns an error, it means that the task could not be sent to the local set.
134    pub fn send_task(&self, task: NodeManagementTask) -> Result<()> {
135        self.task_sender
136            .send(task)
137            .inspect_err(|err| error!("The node management local set is down {err:?}"))
138            .map_err(|_| eyre!("Failed to send task to the node management local set"))?;
139        Ok(())
140    }
141}
142
143/// Stop the specified services
144async fn stop_nodes(
145    services: Vec<String>,
146    action_sender: UnboundedSender<Action>,
147    node_registry: NodeRegistryManager,
148) {
149    if let Err(err) = ant_node_manager::cmd::node::stop(
150        None,
151        node_registry.clone(),
152        vec![],
153        services.clone(),
154        VerbosityLevel::Minimal,
155    )
156    .await
157    {
158        error!("Error while stopping services {err:?}");
159        send_action(
160            action_sender,
161            Action::StatusActions(StatusActions::ErrorStoppingNodes {
162                services,
163                raw_error: err.to_string(),
164            }),
165        );
166    } else {
167        info!("Successfully stopped services");
168        for service in services {
169            send_action(
170                action_sender.clone(),
171                Action::StatusActions(StatusActions::StopNodesCompleted {
172                    service_name: service,
173                    all_nodes_data: node_registry.get_node_service_data().await,
174                    is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
175                }),
176            );
177        }
178    }
179}
180
181#[derive(Debug)]
182pub struct MaintainNodesArgs {
183    pub action_sender: UnboundedSender<Action>,
184    pub antnode_path: Option<PathBuf>,
185    pub connection_mode: ConnectionMode,
186    pub count: u16,
187    pub data_dir_path: Option<PathBuf>,
188    pub network_id: Option<u8>,
189    pub owner: String,
190    pub init_peers_config: InitialPeersConfig,
191    pub port_range: Option<PortRange>,
192    pub rewards_address: String,
193    pub run_nat_detection: bool,
194}
195
196/// Maintain the specified number of nodes
197async fn maintain_n_running_nodes(args: MaintainNodesArgs, node_registry: NodeRegistryManager) {
198    debug!("Maintaining {} nodes", args.count);
199    if args.run_nat_detection {
200        run_nat_detection(&args.action_sender).await;
201    }
202
203    let config = prepare_node_config(&args);
204    debug_log_config(&config, &args);
205
206    let mut used_ports = get_used_ports(&node_registry).await;
207    let (mut current_port, max_port) = get_port_range(&config.custom_ports);
208
209    let nodes_to_add = args.count as i32 - node_registry.nodes.read().await.len() as i32;
210
211    if nodes_to_add <= 0 {
212        debug!("Scaling down nodes to {}", nodes_to_add);
213        scale_down_nodes(&config, args.count, node_registry.clone()).await;
214    } else {
215        debug!("Scaling up nodes to {}", nodes_to_add);
216        add_nodes(
217            &args.action_sender,
218            &config,
219            nodes_to_add,
220            &mut used_ports,
221            &mut current_port,
222            max_port,
223            node_registry.clone(),
224        )
225        .await;
226    }
227
228    debug!("Finished maintaining {} nodes", args.count);
229    send_action(
230        args.action_sender,
231        Action::StatusActions(StatusActions::StartNodesCompleted {
232            service_name: NODES_ALL.to_string(),
233            all_nodes_data: node_registry.get_node_service_data().await,
234            is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
235        }),
236    );
237}
238
239/// Reset all the nodes
240async fn reset_nodes(
241    action_sender: UnboundedSender<Action>,
242    node_registry: NodeRegistryManager,
243    start_nodes_after_reset: bool,
244) {
245    if let Err(err) =
246        ant_node_manager::cmd::node::reset(true, node_registry.clone(), VerbosityLevel::Minimal)
247            .await
248    {
249        error!("Error while resetting services {err:?}");
250        send_action(
251            action_sender,
252            Action::StatusActions(StatusActions::ErrorResettingNodes {
253                raw_error: err.to_string(),
254            }),
255        );
256    } else {
257        info!("Successfully reset services");
258        send_action(
259            action_sender,
260            Action::StatusActions(StatusActions::ResetNodesCompleted {
261                trigger_start_node: start_nodes_after_reset,
262                all_nodes_data: node_registry.get_node_service_data().await,
263                is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
264            }),
265        );
266    }
267}
268
269#[derive(Debug)]
270pub struct UpgradeNodesArgs {
271    pub action_sender: UnboundedSender<Action>,
272    pub connection_timeout_s: u64,
273    pub do_not_start: bool,
274    pub custom_bin_path: Option<PathBuf>,
275    pub force: bool,
276    pub fixed_interval: Option<u64>,
277    pub peer_ids: Vec<String>,
278    pub provided_env_variables: Option<Vec<(String, String)>>,
279    pub service_names: Vec<String>,
280    pub url: Option<String>,
281    pub version: Option<String>,
282}
283
284async fn upgrade_nodes(args: UpgradeNodesArgs, node_registry: NodeRegistryManager) {
285    // First we stop the Nodes
286    if let Err(err) = ant_node_manager::cmd::node::stop(
287        None,
288        node_registry.clone(),
289        vec![],
290        args.service_names.clone(),
291        VerbosityLevel::Minimal,
292    )
293    .await
294    {
295        error!("Error while stopping services {err:?}");
296        send_action(
297            args.action_sender.clone(),
298            Action::StatusActions(StatusActions::ErrorUpdatingNodes {
299                raw_error: err.to_string(),
300            }),
301        );
302    }
303
304    if let Err(err) = ant_node_manager::cmd::node::upgrade(
305        0, // will be overwrite by FIXED_INTERVAL
306        args.do_not_start,
307        args.custom_bin_path,
308        args.force,
309        Some(FIXED_INTERVAL),
310        node_registry.clone(),
311        args.peer_ids,
312        args.provided_env_variables,
313        args.service_names,
314        args.url,
315        args.version,
316        VerbosityLevel::Minimal,
317    )
318    .await
319    {
320        error!("Error while updating services {err:?}");
321        send_action(
322            args.action_sender,
323            Action::StatusActions(StatusActions::ErrorUpdatingNodes {
324                raw_error: err.to_string(),
325            }),
326        );
327    } else {
328        info!("Successfully updated services");
329        send_action(
330            args.action_sender,
331            Action::StatusActions(StatusActions::UpdateNodesCompleted {
332                all_nodes_data: node_registry.get_node_service_data().await,
333                is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
334            }),
335        );
336    }
337}
338
339async fn remove_nodes(
340    services: Vec<String>,
341    action_sender: UnboundedSender<Action>,
342    node_registry: NodeRegistryManager,
343) {
344    // First we stop the nodes
345    if let Err(err) = ant_node_manager::cmd::node::stop(
346        None,
347        node_registry.clone(),
348        vec![],
349        services.clone(),
350        VerbosityLevel::Minimal,
351    )
352    .await
353    {
354        error!("Error while stopping services {err:?}");
355        send_action(
356            action_sender.clone(),
357            Action::StatusActions(StatusActions::ErrorRemovingNodes {
358                services: services.clone(),
359                raw_error: err.to_string(),
360            }),
361        );
362    }
363
364    if let Err(err) = ant_node_manager::cmd::node::remove(
365        false,
366        vec![],
367        node_registry.clone(),
368        services.clone(),
369        VerbosityLevel::Minimal,
370    )
371    .await
372    {
373        error!("Error while removing services {err:?}");
374        send_action(
375            action_sender,
376            Action::StatusActions(StatusActions::ErrorRemovingNodes {
377                services,
378                raw_error: err.to_string(),
379            }),
380        );
381    } else {
382        info!("Successfully removed services {:?}", services);
383        for service in services {
384            send_action(
385                action_sender.clone(),
386                Action::StatusActions(StatusActions::RemoveNodesCompleted {
387                    service_name: service,
388                    all_nodes_data: node_registry.get_node_service_data().await,
389                    is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
390                }),
391            );
392        }
393    }
394}
395
396async fn add_node(args: MaintainNodesArgs, node_registry: NodeRegistryManager) {
397    debug!("Adding node");
398
399    if args.run_nat_detection {
400        run_nat_detection(&args.action_sender).await;
401    }
402
403    let config = prepare_node_config(&args);
404
405    let used_ports = get_used_ports(&node_registry).await;
406    let (mut current_port, max_port) = get_port_range(&config.custom_ports);
407
408    while used_ports.contains(&current_port) && current_port <= max_port {
409        current_port += 1;
410    }
411
412    if current_port > max_port {
413        error!("Reached maximum port number. Unable to find an available port.");
414        send_action(
415            args.action_sender.clone(),
416            Action::StatusActions(StatusActions::ErrorAddingNodes {
417                raw_error: format!(
418                    "When adding a new node we reached maximum port number ({max_port}).\nUnable to find an available port."
419                ),
420            }),
421        );
422    }
423
424    let port_range = Some(PortRange::Single(current_port));
425    match ant_node_manager::cmd::node::add(
426        false, // alpha,
427        false, // auto_restart,
428        config.auto_set_nat_flags,
429        Some(config.count),
430        config.data_dir_path,
431        true,       // enable_metrics_server,
432        None,       // env_variables,
433        None,       // evm_network
434        None,       // log_dir_path,
435        None,       // log_format,
436        None,       // max_archived_log_files,
437        None,       // max_log_files,
438        None,       // metrics_port,
439        None,       // network_id
440        None,       // node_ip,
441        port_range, // node_port
442        node_registry.clone(),
443        config.init_peers_config.clone(),
444        config.relay, // relay,
445        get_restart_policy(),
446        RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
447        None,                        // rpc_address,
448        None,                        // rpc_port,
449        config.antnode_path.clone(), // src_path,
450        !config.upnp,
451        None, // url,
452        None, // user,
453        None, // version,
454        VerbosityLevel::Minimal,
455        false, // write_older_cache_files
456    )
457    .await
458    {
459        Err(err) => {
460            error!("Error while adding services {err:?}");
461            send_action(
462                args.action_sender,
463                Action::StatusActions(StatusActions::ErrorAddingNodes {
464                    raw_error: err.to_string(),
465                }),
466            );
467        }
468        Ok(services) => {
469            info!("Successfully added services: {:?}", services);
470            for service in services {
471                send_action(
472                    args.action_sender.clone(),
473                    Action::StatusActions(StatusActions::AddNodesCompleted {
474                        service_name: service,
475                        all_nodes_data: node_registry.get_node_service_data().await,
476                        is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
477                    }),
478                );
479            }
480        }
481    }
482}
483
484async fn start_nodes(
485    services: Vec<String>,
486    action_sender: UnboundedSender<Action>,
487    node_registry: NodeRegistryManager,
488) {
489    debug!("Starting node {:?}", services);
490    if let Err(err) = ant_node_manager::cmd::node::start(
491        CONNECTION_TIMEOUT_START,
492        None,
493        node_registry.clone(),
494        vec![],
495        services.clone(),
496        VerbosityLevel::Minimal,
497    )
498    .await
499    {
500        error!("Error while starting services {err:?}");
501        send_action(
502            action_sender,
503            Action::StatusActions(StatusActions::ErrorStartingNodes {
504                services,
505                raw_error: err.to_string(),
506            }),
507        );
508    } else {
509        info!("Successfully started services {:?}", services);
510        for service in services {
511            send_action(
512                action_sender.clone(),
513                Action::StatusActions(StatusActions::StartNodesCompleted {
514                    service_name: service,
515                    all_nodes_data: node_registry.get_node_service_data().await,
516                    is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
517                }),
518            );
519        }
520    }
521}
522
523// --- Helper functions ---
524
525/// Get the appropriate restart policy for the current platform
526#[cfg(unix)]
527fn get_restart_policy() -> RestartPolicy {
528    RestartPolicy::OnSuccess { delay_secs: None }
529}
530
531#[cfg(windows)]
532fn get_restart_policy() -> RestartPolicy {
533    RestartPolicy::Never
534}
535
536fn send_action(action_sender: UnboundedSender<Action>, action: Action) {
537    if let Err(err) = action_sender.send(action) {
538        error!("Error while sending action: {err:?}");
539    }
540}
541
542struct NodeConfig {
543    antnode_path: Option<PathBuf>,
544    auto_set_nat_flags: bool,
545    count: u16,
546    custom_ports: Option<PortRange>,
547    data_dir_path: Option<PathBuf>,
548    relay: bool,
549    network_id: Option<u8>,
550    owner: Option<String>,
551    init_peers_config: InitialPeersConfig,
552    rewards_address: String,
553    upnp: bool,
554}
555
556/// Run the NAT detection process
557async fn run_nat_detection(action_sender: &UnboundedSender<Action>) {
558    info!("Running nat detection....");
559
560    // Notify that NAT detection is starting
561    if let Err(err) = action_sender.send(Action::StatusActions(StatusActions::NatDetectionStarted))
562    {
563        error!("Error while sending action: {err:?}");
564    }
565
566    let release_repo = <dyn AntReleaseRepoActions>::default_config();
567    let version = match release_repo
568        .get_latest_version(&ReleaseType::NatDetection)
569        .await
570    {
571        Ok(v) => {
572            info!("Using NAT detection version {}", v.to_string());
573            v.to_string()
574        }
575        Err(err) => {
576            info!("No NAT detection release found, using fallback version 0.1.0");
577            info!("Error: {err}");
578            "0.1.0".to_string()
579        }
580    };
581
582    if let Err(err) = ant_node_manager::cmd::nat_detection::run_nat_detection(
583        None,
584        true,
585        None,
586        None,
587        Some(version),
588        VerbosityLevel::Minimal,
589    )
590    .await
591    {
592        error!("Error while running nat detection {err:?}. Registering the error.");
593        if let Err(err) = action_sender.send(Action::StatusActions(
594            StatusActions::ErrorWhileRunningNatDetection,
595        )) {
596            error!("Error while sending action: {err:?}");
597        }
598    } else {
599        info!("Successfully ran nat detection.");
600        if let Err(err) = action_sender.send(Action::StatusActions(
601            StatusActions::SuccessfullyDetectedNatStatus,
602        )) {
603            error!("Error while sending action: {err:?}");
604        }
605    }
606}
607
608fn prepare_node_config(args: &MaintainNodesArgs) -> NodeConfig {
609    NodeConfig {
610        antnode_path: args.antnode_path.clone(),
611        auto_set_nat_flags: args.connection_mode == ConnectionMode::Automatic,
612        data_dir_path: args.data_dir_path.clone(),
613        count: args.count,
614        custom_ports: if args.connection_mode == ConnectionMode::CustomPorts {
615            args.port_range.clone()
616        } else {
617            None
618        },
619        owner: if args.owner.is_empty() {
620            None
621        } else {
622            Some(args.owner.clone())
623        },
624        relay: args.connection_mode == ConnectionMode::HomeNetwork,
625        network_id: args.network_id,
626        init_peers_config: args.init_peers_config.clone(),
627        rewards_address: args.rewards_address.clone(),
628        upnp: args.connection_mode == ConnectionMode::UPnP,
629    }
630}
631
632/// Debug log the node config
633fn debug_log_config(config: &NodeConfig, args: &MaintainNodesArgs) {
634    debug!("************ STARTING NODE MAINTENANCE ************");
635    debug!(
636        "Maintaining {} running nodes with the following args:",
637        config.count
638    );
639    debug!(
640        " owner: {:?}, init_peers_config: {:?}, antnode_path: {:?}, network_id: {:?}",
641        config.owner, config.init_peers_config, config.antnode_path, args.network_id
642    );
643    debug!(
644        " data_dir_path: {:?}, connection_mode: {:?}",
645        config.data_dir_path, args.connection_mode
646    );
647    debug!(
648        " auto_set_nat_flags: {:?}, custom_ports: {:?}, upnp: {}, relay: {}",
649        config.auto_set_nat_flags, config.custom_ports, config.upnp, config.relay
650    );
651}
652
653/// Get the currently used ports from the node registry
654async fn get_used_ports(node_registry: &NodeRegistryManager) -> Vec<u16> {
655    let mut used_ports = Vec::new();
656    for node in node_registry.nodes.read().await.iter() {
657        let node = node.read().await;
658        if let Some(port) = node.node_port {
659            used_ports.push(port);
660        }
661    }
662    debug!("Currently used ports: {:?}", used_ports);
663    used_ports
664}
665
666/// Get the port range (u16, u16) from the custom ports PortRange
667fn get_port_range(custom_ports: &Option<PortRange>) -> (u16, u16) {
668    match custom_ports {
669        Some(PortRange::Single(port)) => (*port, *port),
670        Some(PortRange::Range(start, end)) => (*start, *end),
671        None => (PORT_MIN as u16, PORT_MAX as u16),
672    }
673}
674
675/// Scale down the nodes
676async fn scale_down_nodes(config: &NodeConfig, count: u16, node_registry: NodeRegistryManager) {
677    match ant_node_manager::cmd::node::maintain_n_running_nodes(
678        false,
679        false,
680        config.auto_set_nat_flags,
681        CONNECTION_TIMEOUT_START,
682        count,
683        config.data_dir_path.clone(),
684        true,
685        None,
686        Some(EvmNetwork::default()),
687        None,
688        None,
689        None,
690        None,
691        None,
692        config.network_id,
693        None,
694        None, // We don't care about the port, as we are scaling down
695        node_registry,
696        config.init_peers_config.clone(),
697        config.relay,
698        RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
699        get_restart_policy(),
700        None,
701        None,
702        config.antnode_path.clone(),
703        None,
704        !config.upnp,
705        None,
706        None,
707        VerbosityLevel::Minimal,
708        None,
709        false,
710    )
711    .await
712    {
713        Ok(_) => {
714            info!("Scaling down to {} nodes", count);
715        }
716        Err(err) => {
717            error!("Error while scaling down to {} nodes: {err:?}", count);
718        }
719    }
720}
721
722/// Add the specified number of nodes
723async fn add_nodes(
724    action_sender: &UnboundedSender<Action>,
725    config: &NodeConfig,
726    mut nodes_to_add: i32,
727    used_ports: &mut Vec<u16>,
728    current_port: &mut u16,
729    max_port: u16,
730    node_registry: NodeRegistryManager,
731) {
732    let mut retry_count = 0;
733
734    while nodes_to_add > 0 && retry_count < NODE_ADD_MAX_RETRIES {
735        // Find the next available port
736        while used_ports.contains(current_port) && *current_port <= max_port {
737            *current_port += 1;
738        }
739
740        if *current_port > max_port {
741            error!("Reached maximum port number. Unable to find an available port.");
742            send_action(
743                action_sender.clone(),
744                Action::StatusActions(StatusActions::ErrorScalingUpNodes {
745                    raw_error: format!(
746                        "Reached maximum port number ({max_port}).\nUnable to find an available port."
747                    ),
748                }),
749            );
750            break;
751        }
752
753        let port_range = Some(PortRange::Single(*current_port));
754        match ant_node_manager::cmd::node::maintain_n_running_nodes(
755            false,
756            false,
757            config.auto_set_nat_flags,
758            CONNECTION_TIMEOUT_START,
759            config.count,
760            config.data_dir_path.clone(),
761            true,
762            None,
763            Some(EvmNetwork::default()),
764            None,
765            None,
766            None,
767            None,
768            None,
769            config.network_id,
770            None,
771            port_range,
772            node_registry.clone(),
773            config.init_peers_config.clone(),
774            config.relay,
775            RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
776            get_restart_policy(),
777            None,
778            None,
779            config.antnode_path.clone(),
780            None,
781            !config.upnp,
782            None,
783            None,
784            VerbosityLevel::Minimal,
785            None,
786            false,
787        )
788        .await
789        {
790            Ok(_) => {
791                info!("Successfully added a node on port {}", current_port);
792                used_ports.push(*current_port);
793                nodes_to_add -= 1;
794                *current_port += 1;
795                retry_count = 0; // Reset retry count on success
796            }
797            Err(err) => {
798                //TODO: We should use concrete error types here instead of string matching (ant_node_manager)
799                if err.to_string().contains("is being used by another service") {
800                    warn!(
801                        "Port {} is being used, retrying with a different port. Attempt {}/{}",
802                        current_port,
803                        retry_count + 1,
804                        NODE_ADD_MAX_RETRIES
805                    );
806                } else if err
807                    .to_string()
808                    .contains("Failed to add one or more services")
809                    && retry_count >= NODE_ADD_MAX_RETRIES
810                {
811                    send_action(
812                        action_sender.clone(),
813                        Action::StatusActions(StatusActions::ErrorScalingUpNodes {
814                            raw_error: "When trying to add a node, we failed.\n\
815                                 Maybe you ran out of disk space?\n\
816                                 Maybe you need to change the port range?"
817                                .to_string(),
818                        }),
819                    );
820                } else if err
821                    .to_string()
822                    .contains("contains a virus or potentially unwanted software")
823                    && retry_count >= NODE_ADD_MAX_RETRIES
824                {
825                    send_action(
826                        action_sender.clone(),
827                        Action::StatusActions(StatusActions::ErrorScalingUpNodes {
828                            raw_error: "When trying to add a node, we failed.\n\
829                             You may be running an old version of antnode service?\n\
830                             Did you whitelisted antnode and the launchpad?"
831                                .to_string(),
832                        }),
833                    );
834                } else {
835                    error!("Range of ports to be used {:?}", *current_port..max_port);
836                    error!("Error while adding node on port {}: {err:?}", current_port);
837                }
838                // In case of error, we increase the port and the retry count
839                *current_port += 1;
840                retry_count += 1;
841            }
842        }
843    }
844    if retry_count >= NODE_ADD_MAX_RETRIES {
845        send_action(
846            action_sender.clone(),
847            Action::StatusActions(StatusActions::ErrorScalingUpNodes {
848                raw_error: format!(
849                    "When trying to start a node, we reached the maximum amount of retries ({NODE_ADD_MAX_RETRIES}).\n\
850                    Could this be a firewall blocking nodes starting or ports on your router already in use?"
851                ),
852            }),
853        );
854    }
855}