ant_node_manager/
helpers.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9// Allow expect usage - to be refactored
10#![allow(clippy::expect_used)]
11
12use ant_releases::{AntReleaseRepoActions, ArchiveType, ReleaseType, get_running_platform};
13use ant_service_management::NodeServiceData;
14use color_eyre::{
15    Result,
16    eyre::{bail, eyre},
17};
18use indicatif::{ProgressBar, ProgressStyle};
19use semver::Version;
20use std::{
21    io::Read,
22    path::{Path, PathBuf},
23    process::{Command, Stdio},
24    sync::Arc,
25};
26use tokio::sync::RwLock;
27
28use crate::{VerbosityLevel, add_services::config::PortRange, config};
29
30const MAX_DOWNLOAD_RETRIES: u8 = 3;
31
32// We need deterministic and fix path for the faucet wallet.
33// Otherwise the test instances will not be able to find the same faucet instance.
34pub fn get_faucet_data_dir() -> PathBuf {
35    let mut data_dirs = dirs_next::data_dir().expect("A homedir to exist.");
36    data_dirs.push("autonomi");
37    data_dirs.push("test_faucet");
38    std::fs::create_dir_all(data_dirs.as_path())
39        .expect("Faucet test path to be successfully created.");
40    data_dirs
41}
42
43#[cfg(windows)]
44pub async fn configure_winsw(dest_path: &Path, verbosity: VerbosityLevel) -> Result<()> {
45    if which::which("winsw.exe").is_ok() {
46        debug!("WinSW already installed, which returned Ok");
47        return Ok(());
48    }
49
50    if !dest_path.exists() {
51        if verbosity != VerbosityLevel::Minimal {
52            println!("Downloading winsw.exe...");
53        }
54        debug!("Downloading WinSW to {dest_path:?}");
55
56        let release_repo = <dyn AntReleaseRepoActions>::default_config();
57
58        let mut pb = None;
59        let callback = if verbosity != VerbosityLevel::Minimal {
60            let progress_bar = Arc::new(ProgressBar::new(0));
61            progress_bar.set_style(ProgressStyle::default_bar()
62                .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")?
63                .progress_chars("#>-"));
64            pb = Some(Arc::clone(&progress_bar));
65            let pb_clone = Arc::clone(&progress_bar);
66            let callback: Box<dyn Fn(u64, u64) + Send + Sync> =
67                Box::new(move |downloaded, total| {
68                    pb_clone.set_length(total);
69                    pb_clone.set_position(downloaded);
70                });
71            callback
72        } else {
73            let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |_, _| {});
74            callback
75        };
76
77        let mut download_attempts = 1;
78        loop {
79            if download_attempts > MAX_DOWNLOAD_RETRIES {
80                error!("Failed to download WinSW after {MAX_DOWNLOAD_RETRIES} tries.");
81                bail!("Failed to download WinSW after {MAX_DOWNLOAD_RETRIES} tries.");
82            }
83            match release_repo.download_winsw(dest_path, &callback).await {
84                Ok(_) => break,
85                Err(e) => {
86                    if verbosity != VerbosityLevel::Minimal {
87                        println!("Error downloading WinSW: {e:?}");
88                        println!("Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}");
89                    }
90                    error!(
91                        "Error downloading WinSW. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {e:?}"
92                    );
93                    download_attempts += 1;
94                    if let Some(pb) = &pb {
95                        pb.finish_and_clear();
96                    }
97                }
98            }
99        }
100
101        if let Some(pb) = pb {
102            pb.finish_and_clear();
103        }
104    } else {
105        debug!("WinSW already installed, dest_path exists: {dest_path:?}");
106    }
107
108    info!("WinSW installed at {dest_path:?}. Setting WINSW_PATH environment variable.");
109
110    #[allow(unsafe_code)]
111    unsafe {
112        std::env::set_var("WINSW_PATH", dest_path.to_string_lossy().to_string());
113    }
114
115    Ok(())
116}
117
118#[cfg(not(windows))]
119#[allow(clippy::unused_async)]
120pub async fn configure_winsw(_dest_path: &Path, _verbosity: VerbosityLevel) -> Result<()> {
121    Ok(())
122}
123
124/// Downloads and extracts a release binary to a temporary location.
125///
126/// If the URL is supplied, that will be downloaded and extracted, and the binary inside the
127/// archive will be used; if the version is supplied, a specific version will be downloaded and
128/// used; otherwise the latest version will be downloaded and used.
129pub async fn download_and_extract_release(
130    release_type: ReleaseType,
131    url: Option<String>,
132    version: Option<String>,
133    release_repo: &dyn AntReleaseRepoActions,
134    verbosity: VerbosityLevel,
135    download_dir_path: Option<PathBuf>,
136) -> Result<(PathBuf, String)> {
137    debug!(
138        "Downloading and extracting release for {release_type}, url: {url:?}, version: {version:?}"
139    );
140    let mut pb = None;
141    let callback = if verbosity != VerbosityLevel::Minimal {
142        let progress_bar = Arc::new(ProgressBar::new(0));
143        progress_bar.set_style(ProgressStyle::default_bar()
144            .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")?
145            .progress_chars("#>-"));
146        pb = Some(Arc::clone(&progress_bar));
147        let pb_clone = Arc::clone(&progress_bar);
148        let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |downloaded, total| {
149            pb_clone.set_length(total);
150            pb_clone.set_position(downloaded);
151        });
152        callback
153    } else {
154        let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |_, _| {});
155        callback
156    };
157
158    let download_dir_path = if let Some(path) = download_dir_path {
159        std::fs::create_dir_all(&path)?;
160        path
161    } else if url.is_some() {
162        create_temp_dir()?
163    } else {
164        // The node manager path can require root access, or can only be accessed by the service
165        // user, which is why we have an optional path for the whole function.
166        let path = config::get_node_manager_path()?.join("downloads");
167        std::fs::create_dir_all(&path)?;
168        path
169    };
170    debug!("Download directory: {download_dir_path:?}");
171
172    let mut download_attempts = 1;
173    let binary_download_path = loop {
174        if download_attempts > MAX_DOWNLOAD_RETRIES {
175            error!("Failed to download release after {MAX_DOWNLOAD_RETRIES} tries.");
176            bail!("Failed to download release after {MAX_DOWNLOAD_RETRIES} tries.");
177        }
178
179        if let Some(url) = &url {
180            info!("Downloading release from {url}");
181            if verbosity != VerbosityLevel::Minimal {
182                println!("Retrieving {release_type} from {url}");
183            }
184            match release_repo
185                .download_release(url, &download_dir_path, &callback)
186                .await
187            {
188                Ok(archive_path) => {
189                    let binary_download_path = release_repo
190                        .extract_release_archive(&archive_path, &download_dir_path)
191                        .inspect_err(|err| error!("Error while extracting archive {err:?}"))?;
192                    break binary_download_path;
193                }
194                Err(err) => {
195                    error!("Error downloading release: {err:?}");
196                    if verbosity != VerbosityLevel::Minimal {
197                        println!("Error downloading release: {err:?}");
198                        println!("Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}");
199                    }
200                    download_attempts += 1;
201                    if let Some(pb) = &pb {
202                        pb.finish_and_clear();
203                    }
204                }
205            }
206        } else {
207            let version = if let Some(version) = version.clone() {
208                let version = Version::parse(&version)?;
209                info!("Downloading release from S3 for version {version}");
210                version
211            } else {
212                if verbosity != VerbosityLevel::Minimal {
213                    println!("Retrieving latest version for {release_type}...");
214                }
215                let version = release_repo
216                    .get_latest_version(&release_type)
217                    .await
218                    .inspect_err(|err| error!("Error obtaining latest version {err:?}"))?;
219                info!("Downloading latest version from S3: {version}");
220                version
221            };
222
223            let archive_name = format!(
224                "{}-{}-{}.{}",
225                release_type.to_string().to_lowercase(),
226                version,
227                &get_running_platform()?,
228                &ArchiveType::TarGz
229            );
230            let archive_path = download_dir_path.join(&archive_name);
231            if archive_path.exists() {
232                // try extracting it, else download it.
233                match release_repo.extract_release_archive(&archive_path, &download_dir_path) {
234                    Ok(binary_download_path) => {
235                        info!("Using cached {release_type} version {version}...");
236                        if verbosity != VerbosityLevel::Minimal {
237                            println!("Using cached {release_type} version {version}...");
238                        }
239                        break binary_download_path;
240                    }
241                    Err(_) => {
242                        info!(
243                            "Cached {release_type} version {version} is corrupted. Downloading again..."
244                        );
245                        if verbosity != VerbosityLevel::Minimal {
246                            println!(
247                                "Cached {release_type} version {version} is corrupted. Downloading again..."
248                            );
249                        }
250                    }
251                }
252            }
253
254            if verbosity != VerbosityLevel::Minimal {
255                println!("Downloading {release_type} version {version}...");
256            }
257            match release_repo
258                .download_release_from_s3(
259                    &release_type,
260                    &version,
261                    &get_running_platform()?,
262                    &ArchiveType::TarGz,
263                    &download_dir_path,
264                    &callback,
265                )
266                .await
267            {
268                Ok(archive_path) => {
269                    let binary_download_path =
270                        release_repo.extract_release_archive(&archive_path, &download_dir_path)?;
271                    break binary_download_path;
272                }
273                Err(err) => {
274                    error!(
275                        "Error while downloading release. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}:  {err:?}"
276                    );
277                    if verbosity != VerbosityLevel::Minimal {
278                        println!(
279                            "Error while downloading release. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {err:?}"
280                        );
281                    }
282                    download_attempts += 1;
283                    if let Some(pb) = &pb {
284                        pb.finish_and_clear();
285                    }
286                }
287            }
288        };
289    };
290    if let Some(pb) = pb {
291        pb.finish_and_clear();
292    }
293    info!("Download completed: {binary_download_path:?}");
294
295    if verbosity != VerbosityLevel::Minimal {
296        println!("Download completed: {}", &binary_download_path.display());
297    }
298
299    // Finally, obtain the version number from the binary by running `--version`. This is useful
300    // when the `--url` argument is used, and in any case, ultimately the binary we obtained is the
301    // source of truth.
302    let bin_version = get_bin_version(&binary_download_path)?;
303
304    Ok((binary_download_path, bin_version))
305}
306
307pub fn get_bin_version(bin_path: &PathBuf) -> Result<String> {
308    debug!("Obtaining version of binary {bin_path:?}");
309    let mut cmd = Command::new(bin_path)
310        .arg("--version")
311        .stdout(Stdio::piped())
312        .spawn()
313        .inspect_err(|err| error!("The program {bin_path:?} failed to start: {err:?}"))?;
314
315    let mut output = String::new();
316    cmd.stdout
317        .as_mut()
318        .ok_or_else(|| {
319            error!("Failed to capture stdout");
320            eyre!("Failed to capture stdout")
321        })?
322        .read_to_string(&mut output)
323        .inspect_err(|err| error!("Output contained non utf8 chars: {err:?}"))?;
324
325    // Extract the first line of the output
326    let first_line = output.lines().next().ok_or_else(|| {
327        error!("No output received from binary");
328        eyre!("No output received from binary")
329    })?;
330
331    let version = if let Some(v_pos) = first_line.find('v') {
332        // Stable binary: Extract version after 'v'
333        first_line[v_pos + 1..]
334            .split_whitespace()
335            .next()
336            .map(String::from)
337    } else {
338        // Nightly binary: Extract the date at the end of the first line
339        first_line.split_whitespace().last().map(String::from)
340    }
341    .ok_or_else(|| {
342        error!("Failed to parse version from output");
343        eyre!("Failed to parse version from output")
344    })?;
345
346    debug!("Obtained version of binary: {version}");
347
348    Ok(version)
349}
350
351#[cfg(target_os = "windows")]
352pub fn get_username() -> Result<String> {
353    Ok(std::env::var("USERNAME")?)
354}
355
356#[cfg(not(target_os = "windows"))]
357pub fn get_username() -> Result<String> {
358    Ok(std::env::var("USER")?)
359}
360
361/// There is a `tempdir` crate that provides the same kind of functionality, but it was flagged for
362/// a security vulnerability.
363pub fn create_temp_dir() -> Result<PathBuf> {
364    let temp_dir = std::env::temp_dir();
365    let unique_dir_name = uuid::Uuid::new_v4().to_string();
366    let new_temp_dir = temp_dir.join(unique_dir_name);
367    std::fs::create_dir_all(&new_temp_dir)
368        .inspect_err(|err| error!("Failed to crete temp dir: {err:?}"))?;
369    Ok(new_temp_dir)
370}
371
372/// Get the start port from the `PortRange` if applicable.
373pub fn get_start_port_if_applicable(range: Option<PortRange>) -> Option<u16> {
374    if let Some(port) = range {
375        match port {
376            PortRange::Single(val) => return Some(val),
377            PortRange::Range(start, _) => return Some(start),
378        }
379    }
380    None
381}
382
383/// Increment the port by 1.
384pub fn increment_port_option(port: Option<u16>) -> Option<u16> {
385    if let Some(port) = port {
386        let incremented_port = port + 1;
387        return Some(incremented_port);
388    }
389    None
390}
391
392/// Make sure the port is not already in use by another node.
393pub async fn check_port_availability(
394    port_option: &PortRange,
395    nodes: &Arc<RwLock<Vec<Arc<RwLock<NodeServiceData>>>>>,
396) -> Result<()> {
397    let mut all_ports = Vec::new();
398    for node in nodes.read().await.iter() {
399        let node = node.read().await;
400        if let Some(port) = node.metrics_port {
401            all_ports.push(port);
402        }
403        if let Some(port) = node.node_port {
404            all_ports.push(port);
405        }
406        all_ports.push(node.rpc_socket_addr.port());
407    }
408
409    match port_option {
410        PortRange::Single(port) => {
411            if all_ports.contains(port) {
412                error!("Port {port} is being used by another service");
413                return Err(eyre!("Port {port} is being used by another service"));
414            }
415        }
416        PortRange::Range(start, end) => {
417            for i in *start..=*end {
418                if all_ports.contains(&i) {
419                    error!("Port {i} is being used by another service");
420                    return Err(eyre!("Port {i} is being used by another service"));
421                }
422            }
423        }
424    }
425    Ok(())
426}