ant_node_manager/
helpers.rs1use ant_releases::{get_running_platform, AntReleaseRepoActions, ArchiveType, ReleaseType};
10use ant_service_management::NodeServiceData;
11use color_eyre::{
12 eyre::{bail, eyre},
13 Result,
14};
15use indicatif::{ProgressBar, ProgressStyle};
16use semver::Version;
17use std::{
18 io::Read,
19 path::{Path, PathBuf},
20 process::{Command, Stdio},
21 sync::Arc,
22};
23
24use crate::{add_services::config::PortRange, config, VerbosityLevel};
25
26const MAX_DOWNLOAD_RETRIES: u8 = 3;
27
28pub fn get_faucet_data_dir() -> PathBuf {
31 let mut data_dirs = dirs_next::data_dir().expect("A homedir to exist.");
32 data_dirs.push("autonomi");
33 data_dirs.push("test_faucet");
34 std::fs::create_dir_all(data_dirs.as_path())
35 .expect("Faucet test path to be successfully created.");
36 data_dirs
37}
38
39#[cfg(windows)]
40pub async fn configure_winsw(dest_path: &Path, verbosity: VerbosityLevel) -> Result<()> {
41 if which::which("winsw.exe").is_ok() {
42 debug!("WinSW already installed, which returned Ok");
43 return Ok(());
44 }
45
46 if !dest_path.exists() {
47 if verbosity != VerbosityLevel::Minimal {
48 println!("Downloading winsw.exe...");
49 }
50 debug!("Downloading WinSW to {dest_path:?}");
51
52 let release_repo = <dyn AntReleaseRepoActions>::default_config();
53
54 let mut pb = None;
55 let callback = if verbosity != VerbosityLevel::Minimal {
56 let progress_bar = Arc::new(ProgressBar::new(0));
57 progress_bar.set_style(ProgressStyle::default_bar()
58 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")?
59 .progress_chars("#>-"));
60 pb = Some(progress_bar.clone());
61 let pb_clone = progress_bar.clone();
62 let callback: Box<dyn Fn(u64, u64) + Send + Sync> =
63 Box::new(move |downloaded, total| {
64 pb_clone.set_length(total);
65 pb_clone.set_position(downloaded);
66 });
67 callback
68 } else {
69 let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |_, _| {});
70 callback
71 };
72
73 let mut download_attempts = 1;
74 loop {
75 if download_attempts > MAX_DOWNLOAD_RETRIES {
76 error!("Failed to download WinSW after {MAX_DOWNLOAD_RETRIES} tries.");
77 bail!("Failed to download WinSW after {MAX_DOWNLOAD_RETRIES} tries.");
78 }
79 match release_repo.download_winsw(dest_path, &callback).await {
80 Ok(_) => break,
81 Err(e) => {
82 if verbosity != VerbosityLevel::Minimal {
83 println!("Error downloading WinSW: {e:?}");
84 println!("Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}");
85 }
86 error!("Error downloading WinSW. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {e:?}");
87 download_attempts += 1;
88 if let Some(pb) = &pb {
89 pb.finish_and_clear();
90 }
91 }
92 }
93 }
94
95 if let Some(pb) = pb {
96 pb.finish_and_clear();
97 }
98 } else {
99 debug!("WinSW already installed, dest_path exists: {dest_path:?}");
100 }
101
102 info!("WinSW installed at {dest_path:?}. Setting WINSW_PATH environment variable.");
103
104 std::env::set_var("WINSW_PATH", dest_path.to_string_lossy().to_string());
105
106 Ok(())
107}
108
109#[cfg(not(windows))]
110pub async fn configure_winsw(_dest_path: &Path, _verbosity: VerbosityLevel) -> Result<()> {
111 Ok(())
112}
113
114pub async fn download_and_extract_release(
120 release_type: ReleaseType,
121 url: Option<String>,
122 version: Option<String>,
123 release_repo: &dyn AntReleaseRepoActions,
124 verbosity: VerbosityLevel,
125 download_dir_path: Option<PathBuf>,
126) -> Result<(PathBuf, String)> {
127 debug!(
128 "Downloading and extracting release for {release_type}, url: {url:?}, version: {version:?}"
129 );
130 let mut pb = None;
131 let callback = if verbosity != VerbosityLevel::Minimal {
132 let progress_bar = Arc::new(ProgressBar::new(0));
133 progress_bar.set_style(ProgressStyle::default_bar()
134 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")?
135 .progress_chars("#>-"));
136 pb = Some(progress_bar.clone());
137 let pb_clone = progress_bar.clone();
138 let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |downloaded, total| {
139 pb_clone.set_length(total);
140 pb_clone.set_position(downloaded);
141 });
142 callback
143 } else {
144 let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |_, _| {});
145 callback
146 };
147
148 let download_dir_path = if let Some(path) = download_dir_path {
149 std::fs::create_dir_all(&path)?;
150 path
151 } else if url.is_some() {
152 create_temp_dir()?
153 } else {
154 let path = config::get_node_manager_path()?.join("downloads");
157 std::fs::create_dir_all(&path)?;
158 path
159 };
160 debug!("Download directory: {download_dir_path:?}");
161
162 let mut download_attempts = 1;
163 let binary_download_path = loop {
164 if download_attempts > MAX_DOWNLOAD_RETRIES {
165 error!("Failed to download release after {MAX_DOWNLOAD_RETRIES} tries.");
166 bail!("Failed to download release after {MAX_DOWNLOAD_RETRIES} tries.");
167 }
168
169 if let Some(url) = &url {
170 info!("Downloading release from {url}");
171 if verbosity != VerbosityLevel::Minimal {
172 println!("Retrieving {release_type} from {url}");
173 }
174 match release_repo
175 .download_release(url, &download_dir_path, &callback)
176 .await
177 {
178 Ok(archive_path) => {
179 let binary_download_path = release_repo
180 .extract_release_archive(&archive_path, &download_dir_path)
181 .inspect_err(|err| error!("Error while extracting archive {err:?}"))?;
182 break binary_download_path;
183 }
184 Err(err) => {
185 error!("Error downloading release: {err:?}");
186 if verbosity != VerbosityLevel::Minimal {
187 println!("Error downloading release: {err:?}");
188 println!("Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}");
189 }
190 download_attempts += 1;
191 if let Some(pb) = &pb {
192 pb.finish_and_clear();
193 }
194 }
195 }
196 } else {
197 let version = if let Some(version) = version.clone() {
198 let version = Version::parse(&version)?;
199 info!("Downloading release from S3 for version {version}");
200 version
201 } else {
202 if verbosity != VerbosityLevel::Minimal {
203 println!("Retrieving latest version for {release_type}...");
204 }
205 let version = release_repo
206 .get_latest_version(&release_type)
207 .await
208 .inspect_err(|err| error!("Error obtaining latest version {err:?}"))?;
209 info!("Downloading latest version from S3: {version}");
210 version
211 };
212
213 let archive_name = format!(
214 "{}-{}-{}.{}",
215 release_type.to_string().to_lowercase(),
216 version,
217 &get_running_platform()?,
218 &ArchiveType::TarGz
219 );
220 let archive_path = download_dir_path.join(&archive_name);
221 if archive_path.exists() {
222 match release_repo.extract_release_archive(&archive_path, &download_dir_path) {
224 Ok(binary_download_path) => {
225 info!("Using cached {release_type} version {version}...");
226 if verbosity != VerbosityLevel::Minimal {
227 println!("Using cached {release_type} version {version}...");
228 }
229 break binary_download_path;
230 }
231 Err(_) => {
232 info!("Cached {release_type} version {version} is corrupted. Downloading again...");
233 if verbosity != VerbosityLevel::Minimal {
234 println!("Cached {release_type} version {version} is corrupted. Downloading again...");
235 }
236 }
237 }
238 }
239
240 if verbosity != VerbosityLevel::Minimal {
241 println!("Downloading {release_type} version {version}...");
242 }
243 match release_repo
244 .download_release_from_s3(
245 &release_type,
246 &version,
247 &get_running_platform()?,
248 &ArchiveType::TarGz,
249 &download_dir_path,
250 &callback,
251 )
252 .await
253 {
254 Ok(archive_path) => {
255 let binary_download_path =
256 release_repo.extract_release_archive(&archive_path, &download_dir_path)?;
257 break binary_download_path;
258 }
259 Err(err) => {
260 error!("Error while downloading release. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {err:?}");
261 if verbosity != VerbosityLevel::Minimal {
262 println!("Error while downloading release. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {err:?}");
263 }
264 download_attempts += 1;
265 if let Some(pb) = &pb {
266 pb.finish_and_clear();
267 }
268 }
269 }
270 };
271 };
272 if let Some(pb) = pb {
273 pb.finish_and_clear();
274 }
275 info!("Download completed: {binary_download_path:?}");
276
277 if verbosity != VerbosityLevel::Minimal {
278 println!("Download completed: {}", &binary_download_path.display());
279 }
280
281 let bin_version = get_bin_version(&binary_download_path)?;
285
286 Ok((binary_download_path, bin_version))
287}
288
289pub fn get_bin_version(bin_path: &PathBuf) -> Result<String> {
290 debug!("Obtaining version of binary {bin_path:?}");
291 let mut cmd = Command::new(bin_path)
292 .arg("--version")
293 .stdout(Stdio::piped())
294 .spawn()
295 .inspect_err(|err| error!("The program {bin_path:?} failed to start: {err:?}"))?;
296
297 let mut output = String::new();
298 cmd.stdout
299 .as_mut()
300 .ok_or_else(|| {
301 error!("Failed to capture stdout");
302 eyre!("Failed to capture stdout")
303 })?
304 .read_to_string(&mut output)
305 .inspect_err(|err| error!("Output contained non utf8 chars: {err:?}"))?;
306
307 let first_line = output.lines().next().ok_or_else(|| {
309 error!("No output received from binary");
310 eyre!("No output received from binary")
311 })?;
312
313 let version = if let Some(v_pos) = first_line.find('v') {
314 first_line[v_pos + 1..]
316 .split_whitespace()
317 .next()
318 .map(String::from)
319 } else {
320 first_line.split_whitespace().last().map(String::from)
322 }
323 .ok_or_else(|| {
324 error!("Failed to parse version from output");
325 eyre!("Failed to parse version from output")
326 })?;
327
328 debug!("Obtained version of binary: {version}");
329
330 Ok(version)
331}
332
333#[cfg(target_os = "windows")]
334pub fn get_username() -> Result<String> {
335 Ok(std::env::var("USERNAME")?)
336}
337
338#[cfg(not(target_os = "windows"))]
339pub fn get_username() -> Result<String> {
340 Ok(std::env::var("USER")?)
341}
342
343pub fn create_temp_dir() -> Result<PathBuf> {
346 let temp_dir = std::env::temp_dir();
347 let unique_dir_name = uuid::Uuid::new_v4().to_string();
348 let new_temp_dir = temp_dir.join(unique_dir_name);
349 std::fs::create_dir_all(&new_temp_dir)
350 .inspect_err(|err| error!("Failed to crete temp dir: {err:?}"))?;
351 Ok(new_temp_dir)
352}
353
354pub fn get_start_port_if_applicable(range: Option<PortRange>) -> Option<u16> {
356 if let Some(port) = range {
357 match port {
358 PortRange::Single(val) => return Some(val),
359 PortRange::Range(start, _) => return Some(start),
360 }
361 }
362 None
363}
364
365pub fn increment_port_option(port: Option<u16>) -> Option<u16> {
367 if let Some(port) = port {
368 let incremented_port = port + 1;
369 return Some(incremented_port);
370 }
371 None
372}
373
374pub fn check_port_availability(port_option: &PortRange, nodes: &[NodeServiceData]) -> Result<()> {
376 let mut all_ports = Vec::new();
377 for node in nodes {
378 if let Some(port) = node.metrics_port {
379 all_ports.push(port);
380 }
381 if let Some(port) = node.node_port {
382 all_ports.push(port);
383 }
384 all_ports.push(node.rpc_socket_addr.port());
385 }
386
387 match port_option {
388 PortRange::Single(port) => {
389 if all_ports.iter().any(|p| *p == *port) {
390 error!("Port {port} is being used by another service");
391 return Err(eyre!("Port {port} is being used by another service"));
392 }
393 }
394 PortRange::Range(start, end) => {
395 for i in *start..=*end {
396 if all_ports.iter().any(|p| *p == i) {
397 error!("Port {i} is being used by another service");
398 return Err(eyre!("Port {i} is being used by another service"));
399 }
400 }
401 }
402 }
403 Ok(())
404}