ant_node_manager/
helpers.rs1#![allow(clippy::expect_used)]
11
12use ant_releases::{AntReleaseRepoActions, ArchiveType, ReleaseType, get_running_platform};
13use ant_service_management::NodeServiceData;
14use color_eyre::{
15 Result,
16 eyre::{bail, eyre},
17};
18use indicatif::{ProgressBar, ProgressStyle};
19use semver::Version;
20use std::{
21 io::Read,
22 path::{Path, PathBuf},
23 process::{Command, Stdio},
24 sync::Arc,
25};
26use tokio::sync::RwLock;
27
28use crate::{VerbosityLevel, add_services::config::PortRange, config};
29
30const MAX_DOWNLOAD_RETRIES: u8 = 3;
31
32pub fn get_faucet_data_dir() -> PathBuf {
35 let mut data_dirs = dirs_next::data_dir().expect("A homedir to exist.");
36 data_dirs.push("autonomi");
37 data_dirs.push("test_faucet");
38 std::fs::create_dir_all(data_dirs.as_path())
39 .expect("Faucet test path to be successfully created.");
40 data_dirs
41}
42
43#[cfg(windows)]
44pub async fn configure_winsw(dest_path: &Path, verbosity: VerbosityLevel) -> Result<()> {
45 if which::which("winsw.exe").is_ok() {
46 debug!("WinSW already installed, which returned Ok");
47 return Ok(());
48 }
49
50 if !dest_path.exists() {
51 if verbosity != VerbosityLevel::Minimal {
52 println!("Downloading winsw.exe...");
53 }
54 debug!("Downloading WinSW to {dest_path:?}");
55
56 let release_repo = <dyn AntReleaseRepoActions>::default_config();
57
58 let mut pb = None;
59 let callback = if verbosity != VerbosityLevel::Minimal {
60 let progress_bar = Arc::new(ProgressBar::new(0));
61 progress_bar.set_style(ProgressStyle::default_bar()
62 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")?
63 .progress_chars("#>-"));
64 pb = Some(Arc::clone(&progress_bar));
65 let pb_clone = Arc::clone(&progress_bar);
66 let callback: Box<dyn Fn(u64, u64) + Send + Sync> =
67 Box::new(move |downloaded, total| {
68 pb_clone.set_length(total);
69 pb_clone.set_position(downloaded);
70 });
71 callback
72 } else {
73 let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |_, _| {});
74 callback
75 };
76
77 let mut download_attempts = 1;
78 loop {
79 if download_attempts > MAX_DOWNLOAD_RETRIES {
80 error!("Failed to download WinSW after {MAX_DOWNLOAD_RETRIES} tries.");
81 bail!("Failed to download WinSW after {MAX_DOWNLOAD_RETRIES} tries.");
82 }
83 match release_repo.download_winsw(dest_path, &callback).await {
84 Ok(_) => break,
85 Err(e) => {
86 if verbosity != VerbosityLevel::Minimal {
87 println!("Error downloading WinSW: {e:?}");
88 println!("Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}");
89 }
90 error!(
91 "Error downloading WinSW. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {e:?}"
92 );
93 download_attempts += 1;
94 if let Some(pb) = &pb {
95 pb.finish_and_clear();
96 }
97 }
98 }
99 }
100
101 if let Some(pb) = pb {
102 pb.finish_and_clear();
103 }
104 } else {
105 debug!("WinSW already installed, dest_path exists: {dest_path:?}");
106 }
107
108 info!("WinSW installed at {dest_path:?}. Setting WINSW_PATH environment variable.");
109
110 #[allow(unsafe_code)]
111 unsafe {
112 std::env::set_var("WINSW_PATH", dest_path.to_string_lossy().to_string());
113 }
114
115 Ok(())
116}
117
118#[cfg(not(windows))]
119#[allow(clippy::unused_async)]
120pub async fn configure_winsw(_dest_path: &Path, _verbosity: VerbosityLevel) -> Result<()> {
121 Ok(())
122}
123
124pub async fn download_and_extract_release(
130 release_type: ReleaseType,
131 url: Option<String>,
132 version: Option<String>,
133 release_repo: &dyn AntReleaseRepoActions,
134 verbosity: VerbosityLevel,
135 download_dir_path: Option<PathBuf>,
136) -> Result<(PathBuf, String)> {
137 debug!(
138 "Downloading and extracting release for {release_type}, url: {url:?}, version: {version:?}"
139 );
140 let mut pb = None;
141 let callback = if verbosity != VerbosityLevel::Minimal {
142 let progress_bar = Arc::new(ProgressBar::new(0));
143 progress_bar.set_style(ProgressStyle::default_bar()
144 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")?
145 .progress_chars("#>-"));
146 pb = Some(Arc::clone(&progress_bar));
147 let pb_clone = Arc::clone(&progress_bar);
148 let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |downloaded, total| {
149 pb_clone.set_length(total);
150 pb_clone.set_position(downloaded);
151 });
152 callback
153 } else {
154 let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |_, _| {});
155 callback
156 };
157
158 let download_dir_path = if let Some(path) = download_dir_path {
159 std::fs::create_dir_all(&path)?;
160 path
161 } else if url.is_some() {
162 create_temp_dir()?
163 } else {
164 let path = config::get_node_manager_path()?.join("downloads");
167 std::fs::create_dir_all(&path)?;
168 path
169 };
170 debug!("Download directory: {download_dir_path:?}");
171
172 let mut download_attempts = 1;
173 let binary_download_path = loop {
174 if download_attempts > MAX_DOWNLOAD_RETRIES {
175 error!("Failed to download release after {MAX_DOWNLOAD_RETRIES} tries.");
176 bail!("Failed to download release after {MAX_DOWNLOAD_RETRIES} tries.");
177 }
178
179 if let Some(url) = &url {
180 info!("Downloading release from {url}");
181 if verbosity != VerbosityLevel::Minimal {
182 println!("Retrieving {release_type} from {url}");
183 }
184 match release_repo
185 .download_release(url, &download_dir_path, &callback)
186 .await
187 {
188 Ok(archive_path) => {
189 let binary_download_path = release_repo
190 .extract_release_archive(&archive_path, &download_dir_path)
191 .inspect_err(|err| error!("Error while extracting archive {err:?}"))?;
192 break binary_download_path;
193 }
194 Err(err) => {
195 error!("Error downloading release: {err:?}");
196 if verbosity != VerbosityLevel::Minimal {
197 println!("Error downloading release: {err:?}");
198 println!("Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}");
199 }
200 download_attempts += 1;
201 if let Some(pb) = &pb {
202 pb.finish_and_clear();
203 }
204 }
205 }
206 } else {
207 let version = if let Some(version) = version.clone() {
208 let version = Version::parse(&version)?;
209 info!("Downloading release from S3 for version {version}");
210 version
211 } else {
212 if verbosity != VerbosityLevel::Minimal {
213 println!("Retrieving latest version for {release_type}...");
214 }
215 let version = release_repo
216 .get_latest_version(&release_type)
217 .await
218 .inspect_err(|err| error!("Error obtaining latest version {err:?}"))?;
219 info!("Downloading latest version from S3: {version}");
220 version
221 };
222
223 let archive_name = format!(
224 "{}-{}-{}.{}",
225 release_type.to_string().to_lowercase(),
226 version,
227 &get_running_platform()?,
228 &ArchiveType::TarGz
229 );
230 let archive_path = download_dir_path.join(&archive_name);
231 if archive_path.exists() {
232 match release_repo.extract_release_archive(&archive_path, &download_dir_path) {
234 Ok(binary_download_path) => {
235 info!("Using cached {release_type} version {version}...");
236 if verbosity != VerbosityLevel::Minimal {
237 println!("Using cached {release_type} version {version}...");
238 }
239 break binary_download_path;
240 }
241 Err(_) => {
242 info!(
243 "Cached {release_type} version {version} is corrupted. Downloading again..."
244 );
245 if verbosity != VerbosityLevel::Minimal {
246 println!(
247 "Cached {release_type} version {version} is corrupted. Downloading again..."
248 );
249 }
250 }
251 }
252 }
253
254 if verbosity != VerbosityLevel::Minimal {
255 println!("Downloading {release_type} version {version}...");
256 }
257 match release_repo
258 .download_release_from_s3(
259 &release_type,
260 &version,
261 &get_running_platform()?,
262 &ArchiveType::TarGz,
263 &download_dir_path,
264 &callback,
265 )
266 .await
267 {
268 Ok(archive_path) => {
269 let binary_download_path =
270 release_repo.extract_release_archive(&archive_path, &download_dir_path)?;
271 break binary_download_path;
272 }
273 Err(err) => {
274 error!(
275 "Error while downloading release. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {err:?}"
276 );
277 if verbosity != VerbosityLevel::Minimal {
278 println!(
279 "Error while downloading release. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {err:?}"
280 );
281 }
282 download_attempts += 1;
283 if let Some(pb) = &pb {
284 pb.finish_and_clear();
285 }
286 }
287 }
288 };
289 };
290 if let Some(pb) = pb {
291 pb.finish_and_clear();
292 }
293 info!("Download completed: {binary_download_path:?}");
294
295 if verbosity != VerbosityLevel::Minimal {
296 println!("Download completed: {}", &binary_download_path.display());
297 }
298
299 let bin_version = get_bin_version(&binary_download_path)?;
303
304 Ok((binary_download_path, bin_version))
305}
306
307pub fn get_bin_version(bin_path: &PathBuf) -> Result<String> {
308 debug!("Obtaining version of binary {bin_path:?}");
309 let mut cmd = Command::new(bin_path)
310 .arg("--version")
311 .stdout(Stdio::piped())
312 .spawn()
313 .inspect_err(|err| error!("The program {bin_path:?} failed to start: {err:?}"))?;
314
315 let mut output = String::new();
316 cmd.stdout
317 .as_mut()
318 .ok_or_else(|| {
319 error!("Failed to capture stdout");
320 eyre!("Failed to capture stdout")
321 })?
322 .read_to_string(&mut output)
323 .inspect_err(|err| error!("Output contained non utf8 chars: {err:?}"))?;
324
325 let first_line = output.lines().next().ok_or_else(|| {
327 error!("No output received from binary");
328 eyre!("No output received from binary")
329 })?;
330
331 let version = if let Some(v_pos) = first_line.find('v') {
332 first_line[v_pos + 1..]
334 .split_whitespace()
335 .next()
336 .map(String::from)
337 } else {
338 first_line.split_whitespace().last().map(String::from)
340 }
341 .ok_or_else(|| {
342 error!("Failed to parse version from output");
343 eyre!("Failed to parse version from output")
344 })?;
345
346 debug!("Obtained version of binary: {version}");
347
348 Ok(version)
349}
350
351#[cfg(target_os = "windows")]
352pub fn get_username() -> Result<String> {
353 Ok(std::env::var("USERNAME")?)
354}
355
356#[cfg(not(target_os = "windows"))]
357pub fn get_username() -> Result<String> {
358 Ok(std::env::var("USER")?)
359}
360
361pub fn create_temp_dir() -> Result<PathBuf> {
364 let temp_dir = std::env::temp_dir();
365 let unique_dir_name = uuid::Uuid::new_v4().to_string();
366 let new_temp_dir = temp_dir.join(unique_dir_name);
367 std::fs::create_dir_all(&new_temp_dir)
368 .inspect_err(|err| error!("Failed to crete temp dir: {err:?}"))?;
369 Ok(new_temp_dir)
370}
371
372pub fn get_start_port_if_applicable(range: Option<PortRange>) -> Option<u16> {
374 if let Some(port) = range {
375 match port {
376 PortRange::Single(val) => return Some(val),
377 PortRange::Range(start, _) => return Some(start),
378 }
379 }
380 None
381}
382
383pub fn increment_port_option(port: Option<u16>) -> Option<u16> {
385 if let Some(port) = port {
386 let incremented_port = port + 1;
387 return Some(incremented_port);
388 }
389 None
390}
391
392pub async fn check_port_availability(
394 port_option: &PortRange,
395 nodes: &Arc<RwLock<Vec<Arc<RwLock<NodeServiceData>>>>>,
396) -> Result<()> {
397 let mut all_ports = Vec::new();
398 for node in nodes.read().await.iter() {
399 let node = node.read().await;
400 if let Some(port) = node.metrics_port {
401 all_ports.push(port);
402 }
403 if let Some(port) = node.node_port {
404 all_ports.push(port);
405 }
406 all_ports.push(node.rpc_socket_addr.port());
407 }
408
409 match port_option {
410 PortRange::Single(port) => {
411 if all_ports.contains(port) {
412 error!("Port {port} is being used by another service");
413 return Err(eyre!("Port {port} is being used by another service"));
414 }
415 }
416 PortRange::Range(start, end) => {
417 for i in *start..=*end {
418 if all_ports.contains(&i) {
419 error!("Port {i} is being used by another service");
420 return Err(eyre!("Port {i} is being used by another service"));
421 }
422 }
423 }
424 }
425 Ok(())
426}