node_launchpad/
node_mgmt.rs

1// Copyright 2025 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::action::{Action, StatusActions};
10use crate::connection_mode::ConnectionMode;
11use ant_bootstrap::InitialPeersConfig;
12use ant_evm::{EvmNetwork, RewardsAddress};
13use ant_node_manager::{VerbosityLevel, add_services::config::PortRange};
14use ant_releases::{self, AntReleaseRepoActions, ReleaseType};
15use ant_service_management::{NodeRegistryManager, ServiceStatus};
16use color_eyre::Result;
17use color_eyre::eyre::eyre;
18use service_manager::RestartPolicy;
19use std::{path::PathBuf, str::FromStr};
20use tokio::runtime::Builder;
21use tokio::sync::mpsc::{self, UnboundedSender};
22use tokio::task::LocalSet;
23
24pub const PORT_MAX: u32 = 65535;
25pub const PORT_MIN: u32 = 1024;
26
27const NODE_ADD_MAX_RETRIES: u32 = 5;
28
29pub const FIXED_INTERVAL: u64 = 60_000;
30pub const CONNECTION_TIMEOUT_START: u64 = 120;
31
32pub const NODES_ALL: &str = "NODES_ALL";
33
34#[derive(Debug)]
35pub enum NodeManagementTask {
36    MaintainNodes {
37        args: MaintainNodesArgs,
38    },
39    ResetNodes {
40        start_nodes_after_reset: bool,
41        action_sender: UnboundedSender<Action>,
42    },
43    StopNodes {
44        services: Vec<String>,
45        action_sender: UnboundedSender<Action>,
46    },
47    UpgradeNodes {
48        args: UpgradeNodesArgs,
49    },
50    AddNode {
51        args: MaintainNodesArgs,
52    },
53    RemoveNodes {
54        services: Vec<String>,
55        action_sender: UnboundedSender<Action>,
56    },
57    StartNode {
58        services: Vec<String>,
59        action_sender: UnboundedSender<Action>,
60    },
61}
62
63#[derive(Clone)]
64pub struct NodeManagement {
65    task_sender: mpsc::UnboundedSender<NodeManagementTask>,
66}
67
68impl NodeManagement {
69    pub fn new(node_registry: NodeRegistryManager) -> Result<Self> {
70        let (send, mut recv) = mpsc::unbounded_channel();
71
72        let rt = Builder::new_current_thread().enable_all().build()?;
73
74        std::thread::spawn(move || {
75            let local = LocalSet::new();
76
77            local.spawn_local(async move {
78                while let Some(new_task) = recv.recv().await {
79                    match new_task {
80                        NodeManagementTask::MaintainNodes { args } => {
81                            maintain_n_running_nodes(args, node_registry.clone()).await;
82                        }
83                        NodeManagementTask::ResetNodes {
84                            start_nodes_after_reset,
85                            action_sender,
86                        } => {
87                            reset_nodes(
88                                action_sender,
89                                node_registry.clone(),
90                                start_nodes_after_reset,
91                            )
92                            .await;
93                        }
94                        NodeManagementTask::StopNodes {
95                            services,
96                            action_sender,
97                        } => {
98                            stop_nodes(services, action_sender, node_registry.clone()).await;
99                        }
100                        NodeManagementTask::UpgradeNodes { args } => {
101                            upgrade_nodes(args, node_registry.clone()).await
102                        }
103                        NodeManagementTask::RemoveNodes {
104                            services,
105                            action_sender,
106                        } => remove_nodes(services, action_sender, node_registry.clone()).await,
107                        NodeManagementTask::StartNode {
108                            services,
109                            action_sender,
110                        } => start_nodes(services, action_sender, node_registry.clone()).await,
111                        NodeManagementTask::AddNode { args } => {
112                            add_node(args, node_registry.clone()).await
113                        }
114                    }
115                }
116                // If the while loop returns, then all the LocalSpawner
117                // objects have been dropped.
118            });
119
120            // This will return once all senders are dropped and all
121            // spawned tasks have returned.
122            rt.block_on(local);
123        });
124
125        Ok(Self { task_sender: send })
126    }
127
128    /// Send a task to the NodeManagement local set
129    /// These tasks will be executed on a different thread to avoid blocking the main thread
130    ///
131    /// The results are returned via the standard `UnboundedSender<Action>` that is passed to each task.
132    ///
133    /// If this function returns an error, it means that the task could not be sent to the local set.
134    pub fn send_task(&self, task: NodeManagementTask) -> Result<()> {
135        self.task_sender
136            .send(task)
137            .inspect_err(|err| error!("The node management local set is down {err:?}"))
138            .map_err(|_| eyre!("Failed to send task to the node management local set"))?;
139        Ok(())
140    }
141}
142
143/// Stop the specified services
144async fn stop_nodes(
145    services: Vec<String>,
146    action_sender: UnboundedSender<Action>,
147    node_registry: NodeRegistryManager,
148) {
149    if let Err(err) = ant_node_manager::cmd::node::stop(
150        None,
151        node_registry.clone(),
152        vec![],
153        services.clone(),
154        VerbosityLevel::Minimal,
155    )
156    .await
157    {
158        error!("Error while stopping services {err:?}");
159        send_action(
160            action_sender,
161            Action::StatusActions(StatusActions::ErrorStoppingNodes {
162                services,
163                raw_error: err.to_string(),
164            }),
165        );
166    } else {
167        info!("Successfully stopped services");
168        for service in services {
169            send_action(
170                action_sender.clone(),
171                Action::StatusActions(StatusActions::StopNodesCompleted {
172                    service_name: service,
173                    all_nodes_data: node_registry.get_node_service_data().await,
174                    is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
175                }),
176            );
177        }
178    }
179}
180
181#[derive(Debug)]
182pub struct MaintainNodesArgs {
183    pub action_sender: UnboundedSender<Action>,
184    pub antnode_path: Option<PathBuf>,
185    pub connection_mode: ConnectionMode,
186    pub count: u16,
187    pub data_dir_path: Option<PathBuf>,
188    pub network_id: Option<u8>,
189    pub owner: String,
190    pub init_peers_config: InitialPeersConfig,
191    pub port_range: Option<PortRange>,
192    pub rewards_address: String,
193    pub run_nat_detection: bool,
194}
195
196/// Maintain the specified number of nodes
197async fn maintain_n_running_nodes(args: MaintainNodesArgs, node_registry: NodeRegistryManager) {
198    debug!("Maintaining {} nodes", args.count);
199    if args.run_nat_detection {
200        run_nat_detection(&args.action_sender).await;
201    }
202
203    let config = prepare_node_config(&args);
204    debug_log_config(&config, &args);
205
206    let mut used_ports = get_used_ports(&node_registry).await;
207    let (mut current_port, max_port) = get_port_range(&config.custom_ports);
208
209    let mut non_removed_nodes = 0;
210    for node in node_registry.nodes.read().await.iter() {
211        let node = node.read().await;
212        if node.status != ServiceStatus::Removed {
213            non_removed_nodes += 1;
214        }
215    }
216    let nodes_to_add = args.count as i32 - non_removed_nodes;
217
218    if nodes_to_add <= 0 {
219        debug!("Scaling down nodes to {}", nodes_to_add);
220        scale_down_nodes(&config, args.count, node_registry.clone()).await;
221    } else {
222        debug!("Scaling up nodes to {}", nodes_to_add);
223        add_nodes(
224            &args.action_sender,
225            &config,
226            nodes_to_add,
227            &mut used_ports,
228            &mut current_port,
229            max_port,
230            node_registry.clone(),
231        )
232        .await;
233    }
234
235    debug!("Finished maintaining {} nodes", args.count);
236    send_action(
237        args.action_sender,
238        Action::StatusActions(StatusActions::StartNodesCompleted {
239            service_name: NODES_ALL.to_string(),
240            all_nodes_data: node_registry.get_node_service_data().await,
241            is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
242        }),
243    );
244}
245
246/// Reset all the nodes
247async fn reset_nodes(
248    action_sender: UnboundedSender<Action>,
249    node_registry: NodeRegistryManager,
250    start_nodes_after_reset: bool,
251) {
252    if let Err(err) =
253        ant_node_manager::cmd::node::reset(true, node_registry.clone(), VerbosityLevel::Minimal)
254            .await
255    {
256        error!("Error while resetting services {err:?}");
257        send_action(
258            action_sender,
259            Action::StatusActions(StatusActions::ErrorResettingNodes {
260                raw_error: err.to_string(),
261            }),
262        );
263    } else {
264        info!("Successfully reset services");
265        send_action(
266            action_sender,
267            Action::StatusActions(StatusActions::ResetNodesCompleted {
268                trigger_start_node: start_nodes_after_reset,
269                all_nodes_data: node_registry.get_node_service_data().await,
270                is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
271            }),
272        );
273    }
274}
275
276#[derive(Debug)]
277pub struct UpgradeNodesArgs {
278    pub action_sender: UnboundedSender<Action>,
279    pub connection_timeout_s: u64,
280    pub do_not_start: bool,
281    pub custom_bin_path: Option<PathBuf>,
282    pub force: bool,
283    pub fixed_interval: Option<u64>,
284    pub peer_ids: Vec<String>,
285    pub provided_env_variables: Option<Vec<(String, String)>>,
286    pub service_names: Vec<String>,
287    pub url: Option<String>,
288    pub version: Option<String>,
289}
290
291async fn upgrade_nodes(args: UpgradeNodesArgs, node_registry: NodeRegistryManager) {
292    // First we stop the Nodes
293    if let Err(err) = ant_node_manager::cmd::node::stop(
294        None,
295        node_registry.clone(),
296        vec![],
297        args.service_names.clone(),
298        VerbosityLevel::Minimal,
299    )
300    .await
301    {
302        error!("Error while stopping services {err:?}");
303        send_action(
304            args.action_sender.clone(),
305            Action::StatusActions(StatusActions::ErrorUpdatingNodes {
306                raw_error: err.to_string(),
307            }),
308        );
309    }
310
311    if let Err(err) = ant_node_manager::cmd::node::upgrade(
312        0, // will be overwrite by FIXED_INTERVAL
313        args.do_not_start,
314        args.custom_bin_path,
315        args.force,
316        Some(FIXED_INTERVAL),
317        node_registry.clone(),
318        args.peer_ids,
319        args.provided_env_variables,
320        args.service_names,
321        args.url,
322        args.version,
323        VerbosityLevel::Minimal,
324    )
325    .await
326    {
327        error!("Error while updating services {err:?}");
328        send_action(
329            args.action_sender,
330            Action::StatusActions(StatusActions::ErrorUpdatingNodes {
331                raw_error: err.to_string(),
332            }),
333        );
334    } else {
335        info!("Successfully updated services");
336        send_action(
337            args.action_sender,
338            Action::StatusActions(StatusActions::UpdateNodesCompleted {
339                all_nodes_data: node_registry.get_node_service_data().await,
340                is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
341            }),
342        );
343    }
344}
345
346async fn remove_nodes(
347    services: Vec<String>,
348    action_sender: UnboundedSender<Action>,
349    node_registry: NodeRegistryManager,
350) {
351    // First we stop the nodes
352    if let Err(err) = ant_node_manager::cmd::node::stop(
353        None,
354        node_registry.clone(),
355        vec![],
356        services.clone(),
357        VerbosityLevel::Minimal,
358    )
359    .await
360    {
361        error!("Error while stopping services {err:?}");
362        send_action(
363            action_sender.clone(),
364            Action::StatusActions(StatusActions::ErrorRemovingNodes {
365                services: services.clone(),
366                raw_error: err.to_string(),
367            }),
368        );
369    }
370
371    if let Err(err) = ant_node_manager::cmd::node::remove(
372        false,
373        vec![],
374        node_registry.clone(),
375        services.clone(),
376        VerbosityLevel::Minimal,
377    )
378    .await
379    {
380        error!("Error while removing services {err:?}");
381        send_action(
382            action_sender,
383            Action::StatusActions(StatusActions::ErrorRemovingNodes {
384                services,
385                raw_error: err.to_string(),
386            }),
387        );
388    } else {
389        info!("Successfully removed services {:?}", services);
390        for service in services {
391            send_action(
392                action_sender.clone(),
393                Action::StatusActions(StatusActions::RemoveNodesCompleted {
394                    service_name: service,
395                    all_nodes_data: node_registry.get_node_service_data().await,
396                    is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
397                }),
398            );
399        }
400    }
401}
402
403async fn add_node(args: MaintainNodesArgs, node_registry: NodeRegistryManager) {
404    debug!("Adding node");
405
406    if args.run_nat_detection {
407        run_nat_detection(&args.action_sender).await;
408    }
409
410    let config = prepare_node_config(&args);
411
412    let used_ports = get_used_ports(&node_registry).await;
413    let (mut current_port, max_port) = get_port_range(&config.custom_ports);
414
415    while used_ports.contains(&current_port) && current_port <= max_port {
416        current_port += 1;
417    }
418
419    if current_port > max_port {
420        error!("Reached maximum port number. Unable to find an available port.");
421        send_action(
422            args.action_sender.clone(),
423            Action::StatusActions(StatusActions::ErrorAddingNodes {
424                raw_error: format!(
425                    "When adding a new node we reached maximum port number ({max_port}).\nUnable to find an available port."
426                ),
427            }),
428        );
429    }
430
431    let port_range = Some(PortRange::Single(current_port));
432    match ant_node_manager::cmd::node::add(
433        false, // alpha,
434        false, // auto_restart,
435        config.auto_set_nat_flags,
436        Some(config.count),
437        config.data_dir_path,
438        true,       // enable_metrics_server,
439        None,       // env_variables,
440        None,       // evm_network
441        None,       // log_dir_path,
442        None,       // log_format,
443        None,       // max_archived_log_files,
444        None,       // max_log_files,
445        None,       // metrics_port,
446        None,       // network_id
447        None,       // node_ip,
448        port_range, // node_port
449        node_registry.clone(),
450        config.init_peers_config.clone(),
451        config.relay, // relay,
452        get_restart_policy(),
453        RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
454        None,                        // rpc_address,
455        None,                        // rpc_port,
456        config.antnode_path.clone(), // src_path,
457        !config.upnp,
458        None, // url,
459        None, // user,
460        None, // version,
461        VerbosityLevel::Minimal,
462        false, // write_older_cache_files
463    )
464    .await
465    {
466        Err(err) => {
467            error!("Error while adding services {err:?}");
468            send_action(
469                args.action_sender,
470                Action::StatusActions(StatusActions::ErrorAddingNodes {
471                    raw_error: err.to_string(),
472                }),
473            );
474        }
475        Ok(services) => {
476            info!("Successfully added services: {:?}", services);
477            for service in services {
478                send_action(
479                    args.action_sender.clone(),
480                    Action::StatusActions(StatusActions::AddNodesCompleted {
481                        service_name: service,
482                        all_nodes_data: node_registry.get_node_service_data().await,
483                        is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
484                    }),
485                );
486            }
487        }
488    }
489}
490
491async fn start_nodes(
492    services: Vec<String>,
493    action_sender: UnboundedSender<Action>,
494    node_registry: NodeRegistryManager,
495) {
496    debug!("Starting node {:?}", services);
497    if let Err(err) = ant_node_manager::cmd::node::start(
498        CONNECTION_TIMEOUT_START,
499        None,
500        node_registry.clone(),
501        vec![],
502        services.clone(),
503        VerbosityLevel::Minimal,
504    )
505    .await
506    {
507        error!("Error while starting services {err:?}");
508        send_action(
509            action_sender,
510            Action::StatusActions(StatusActions::ErrorStartingNodes {
511                services,
512                raw_error: err.to_string(),
513            }),
514        );
515    } else {
516        info!("Successfully started services {:?}", services);
517        for service in services {
518            send_action(
519                action_sender.clone(),
520                Action::StatusActions(StatusActions::StartNodesCompleted {
521                    service_name: service,
522                    all_nodes_data: node_registry.get_node_service_data().await,
523                    is_nat_status_determined: node_registry.nat_status.read().await.is_some(),
524                }),
525            );
526        }
527    }
528}
529
530// --- Helper functions ---
531
532/// Get the appropriate restart policy for the current platform
533#[cfg(unix)]
534fn get_restart_policy() -> RestartPolicy {
535    RestartPolicy::OnSuccess { delay_secs: None }
536}
537
538#[cfg(windows)]
539fn get_restart_policy() -> RestartPolicy {
540    RestartPolicy::Never
541}
542
543fn send_action(action_sender: UnboundedSender<Action>, action: Action) {
544    if let Err(err) = action_sender.send(action) {
545        error!("Error while sending action: {err:?}");
546    }
547}
548
549struct NodeConfig {
550    antnode_path: Option<PathBuf>,
551    auto_set_nat_flags: bool,
552    count: u16,
553    custom_ports: Option<PortRange>,
554    data_dir_path: Option<PathBuf>,
555    relay: bool,
556    network_id: Option<u8>,
557    owner: Option<String>,
558    init_peers_config: InitialPeersConfig,
559    rewards_address: String,
560    upnp: bool,
561}
562
563/// Run the NAT detection process
564async fn run_nat_detection(action_sender: &UnboundedSender<Action>) {
565    info!("Running nat detection....");
566
567    // Notify that NAT detection is starting
568    if let Err(err) = action_sender.send(Action::StatusActions(StatusActions::NatDetectionStarted))
569    {
570        error!("Error while sending action: {err:?}");
571    }
572
573    let release_repo = <dyn AntReleaseRepoActions>::default_config();
574    let version = match release_repo
575        .get_latest_version(&ReleaseType::NatDetection)
576        .await
577    {
578        Ok(v) => {
579            info!("Using NAT detection version {}", v.to_string());
580            v.to_string()
581        }
582        Err(err) => {
583            info!("No NAT detection release found, using fallback version 0.1.0");
584            info!("Error: {err}");
585            "0.1.0".to_string()
586        }
587    };
588
589    if let Err(err) = ant_node_manager::cmd::nat_detection::run_nat_detection(
590        None,
591        true,
592        None,
593        None,
594        Some(version),
595        VerbosityLevel::Minimal,
596    )
597    .await
598    {
599        error!("Error while running nat detection {err:?}. Registering the error.");
600        if let Err(err) = action_sender.send(Action::StatusActions(
601            StatusActions::ErrorWhileRunningNatDetection,
602        )) {
603            error!("Error while sending action: {err:?}");
604        }
605    } else {
606        info!("Successfully ran nat detection.");
607        if let Err(err) = action_sender.send(Action::StatusActions(
608            StatusActions::SuccessfullyDetectedNatStatus,
609        )) {
610            error!("Error while sending action: {err:?}");
611        }
612    }
613}
614
615fn prepare_node_config(args: &MaintainNodesArgs) -> NodeConfig {
616    NodeConfig {
617        antnode_path: args.antnode_path.clone(),
618        auto_set_nat_flags: args.connection_mode == ConnectionMode::Automatic,
619        data_dir_path: args.data_dir_path.clone(),
620        count: args.count,
621        custom_ports: if args.connection_mode == ConnectionMode::CustomPorts {
622            args.port_range.clone()
623        } else {
624            None
625        },
626        owner: if args.owner.is_empty() {
627            None
628        } else {
629            Some(args.owner.clone())
630        },
631        relay: args.connection_mode == ConnectionMode::HomeNetwork,
632        network_id: args.network_id,
633        init_peers_config: args.init_peers_config.clone(),
634        rewards_address: args.rewards_address.clone(),
635        upnp: args.connection_mode == ConnectionMode::UPnP,
636    }
637}
638
639/// Debug log the node config
640fn debug_log_config(config: &NodeConfig, args: &MaintainNodesArgs) {
641    debug!("************ STARTING NODE MAINTENANCE ************");
642    debug!(
643        "Maintaining {} running nodes with the following args:",
644        config.count
645    );
646    debug!(
647        " owner: {:?}, init_peers_config: {:?}, antnode_path: {:?}, network_id: {:?}",
648        config.owner, config.init_peers_config, config.antnode_path, args.network_id
649    );
650    debug!(
651        " data_dir_path: {:?}, connection_mode: {:?}",
652        config.data_dir_path, args.connection_mode
653    );
654    debug!(
655        " auto_set_nat_flags: {:?}, custom_ports: {:?}, upnp: {}, relay: {}",
656        config.auto_set_nat_flags, config.custom_ports, config.upnp, config.relay
657    );
658}
659
660/// Get the currently used ports from the node registry
661async fn get_used_ports(node_registry: &NodeRegistryManager) -> Vec<u16> {
662    let mut used_ports = Vec::new();
663    for node in node_registry.nodes.read().await.iter() {
664        let node = node.read().await;
665        if let Some(port) = node.node_port {
666            used_ports.push(port);
667        }
668    }
669    debug!("Currently used ports: {:?}", used_ports);
670    used_ports
671}
672
673/// Get the port range (u16, u16) from the custom ports PortRange
674fn get_port_range(custom_ports: &Option<PortRange>) -> (u16, u16) {
675    match custom_ports {
676        Some(PortRange::Single(port)) => (*port, *port),
677        Some(PortRange::Range(start, end)) => (*start, *end),
678        None => (PORT_MIN as u16, PORT_MAX as u16),
679    }
680}
681
682/// Scale down the nodes
683async fn scale_down_nodes(config: &NodeConfig, count: u16, node_registry: NodeRegistryManager) {
684    match ant_node_manager::cmd::node::maintain_n_running_nodes(
685        false,
686        false,
687        config.auto_set_nat_flags,
688        CONNECTION_TIMEOUT_START,
689        count,
690        config.data_dir_path.clone(),
691        true,
692        None,
693        Some(EvmNetwork::default()),
694        None,
695        None,
696        None,
697        None,
698        None,
699        config.network_id,
700        None,
701        None, // We don't care about the port, as we are scaling down
702        node_registry,
703        config.init_peers_config.clone(),
704        config.relay,
705        RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
706        get_restart_policy(),
707        None,
708        None,
709        config.antnode_path.clone(),
710        None,
711        !config.upnp,
712        None,
713        None,
714        VerbosityLevel::Minimal,
715        None,
716        false,
717    )
718    .await
719    {
720        Ok(_) => {
721            info!("Scaling down to {} nodes", count);
722        }
723        Err(err) => {
724            error!("Error while scaling down to {} nodes: {err:?}", count);
725        }
726    }
727}
728
729/// Add the specified number of nodes
730async fn add_nodes(
731    action_sender: &UnboundedSender<Action>,
732    config: &NodeConfig,
733    mut nodes_to_add: i32,
734    used_ports: &mut Vec<u16>,
735    current_port: &mut u16,
736    max_port: u16,
737    node_registry: NodeRegistryManager,
738) {
739    let mut retry_count = 0;
740
741    while nodes_to_add > 0 && retry_count < NODE_ADD_MAX_RETRIES {
742        // Find the next available port
743        while used_ports.contains(current_port) && *current_port <= max_port {
744            *current_port += 1;
745        }
746
747        if *current_port > max_port {
748            error!("Reached maximum port number. Unable to find an available port.");
749            send_action(
750                action_sender.clone(),
751                Action::StatusActions(StatusActions::ErrorScalingUpNodes {
752                    raw_error: format!(
753                        "Reached maximum port number ({max_port}).\nUnable to find an available port."
754                    ),
755                }),
756            );
757            break;
758        }
759
760        let port_range = Some(PortRange::Single(*current_port));
761        match ant_node_manager::cmd::node::maintain_n_running_nodes(
762            false,
763            false,
764            config.auto_set_nat_flags,
765            CONNECTION_TIMEOUT_START,
766            config.count,
767            config.data_dir_path.clone(),
768            true,
769            None,
770            Some(EvmNetwork::default()),
771            None,
772            None,
773            None,
774            None,
775            None,
776            config.network_id,
777            None,
778            port_range,
779            node_registry.clone(),
780            config.init_peers_config.clone(),
781            config.relay,
782            RewardsAddress::from_str(config.rewards_address.as_str()).unwrap(),
783            get_restart_policy(),
784            None,
785            None,
786            config.antnode_path.clone(),
787            None,
788            !config.upnp,
789            None,
790            None,
791            VerbosityLevel::Minimal,
792            None,
793            false,
794        )
795        .await
796        {
797            Ok(_) => {
798                info!("Successfully added a node on port {}", current_port);
799                used_ports.push(*current_port);
800                nodes_to_add -= 1;
801                *current_port += 1;
802                retry_count = 0; // Reset retry count on success
803            }
804            Err(err) => {
805                //TODO: We should use concrete error types here instead of string matching (ant_node_manager)
806                if err.to_string().contains("is being used by another service") {
807                    warn!(
808                        "Port {} is being used, retrying with a different port. Attempt {}/{}",
809                        current_port,
810                        retry_count + 1,
811                        NODE_ADD_MAX_RETRIES
812                    );
813                } else if err
814                    .to_string()
815                    .contains("Failed to add one or more services")
816                    && retry_count >= NODE_ADD_MAX_RETRIES
817                {
818                    send_action(
819                        action_sender.clone(),
820                        Action::StatusActions(StatusActions::ErrorScalingUpNodes {
821                            raw_error: "When trying to add a node, we failed.\n\
822                                 Maybe you ran out of disk space?\n\
823                                 Maybe you need to change the port range?"
824                                .to_string(),
825                        }),
826                    );
827                } else if err
828                    .to_string()
829                    .contains("contains a virus or potentially unwanted software")
830                    && retry_count >= NODE_ADD_MAX_RETRIES
831                {
832                    send_action(
833                        action_sender.clone(),
834                        Action::StatusActions(StatusActions::ErrorScalingUpNodes {
835                            raw_error: "When trying to add a node, we failed.\n\
836                             You may be running an old version of antnode service?\n\
837                             Did you whitelisted antnode and the launchpad?"
838                                .to_string(),
839                        }),
840                    );
841                } else {
842                    error!("Range of ports to be used {:?}", *current_port..max_port);
843                    error!("Error while adding node on port {}: {err:?}", current_port);
844                }
845                // In case of error, we increase the port and the retry count
846                *current_port += 1;
847                retry_count += 1;
848            }
849        }
850    }
851    if retry_count >= NODE_ADD_MAX_RETRIES {
852        send_action(
853            action_sender.clone(),
854            Action::StatusActions(StatusActions::ErrorScalingUpNodes {
855                raw_error: format!(
856                    "When trying to start a node, we reached the maximum amount of retries ({NODE_ADD_MAX_RETRIES}).\n\
857                    Could this be a firewall blocking nodes starting or ports on your router already in use?"
858                ),
859            }),
860        );
861    }
862}