ant_node_manager/
helpers.rs1use ant_releases::{get_running_platform, AntReleaseRepoActions, ArchiveType, ReleaseType};
10use ant_service_management::NodeServiceData;
11use color_eyre::{
12 eyre::{bail, eyre},
13 Result,
14};
15use indicatif::{ProgressBar, ProgressStyle};
16use semver::Version;
17use std::{
18 io::Read,
19 path::{Path, PathBuf},
20 process::{Command, Stdio},
21 sync::Arc,
22};
23use tokio::sync::RwLock;
24
25use crate::{add_services::config::PortRange, config, VerbosityLevel};
26
27const MAX_DOWNLOAD_RETRIES: u8 = 3;
28
29pub fn get_faucet_data_dir() -> PathBuf {
32 let mut data_dirs = dirs_next::data_dir().expect("A homedir to exist.");
33 data_dirs.push("autonomi");
34 data_dirs.push("test_faucet");
35 std::fs::create_dir_all(data_dirs.as_path())
36 .expect("Faucet test path to be successfully created.");
37 data_dirs
38}
39
40#[cfg(windows)]
41pub async fn configure_winsw(dest_path: &Path, verbosity: VerbosityLevel) -> Result<()> {
42 if which::which("winsw.exe").is_ok() {
43 debug!("WinSW already installed, which returned Ok");
44 return Ok(());
45 }
46
47 if !dest_path.exists() {
48 if verbosity != VerbosityLevel::Minimal {
49 println!("Downloading winsw.exe...");
50 }
51 debug!("Downloading WinSW to {dest_path:?}");
52
53 let release_repo = <dyn AntReleaseRepoActions>::default_config();
54
55 let mut pb = None;
56 let callback = if verbosity != VerbosityLevel::Minimal {
57 let progress_bar = Arc::new(ProgressBar::new(0));
58 progress_bar.set_style(ProgressStyle::default_bar()
59 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")?
60 .progress_chars("#>-"));
61 pb = Some(progress_bar.clone());
62 let pb_clone = progress_bar.clone();
63 let callback: Box<dyn Fn(u64, u64) + Send + Sync> =
64 Box::new(move |downloaded, total| {
65 pb_clone.set_length(total);
66 pb_clone.set_position(downloaded);
67 });
68 callback
69 } else {
70 let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |_, _| {});
71 callback
72 };
73
74 let mut download_attempts = 1;
75 loop {
76 if download_attempts > MAX_DOWNLOAD_RETRIES {
77 error!("Failed to download WinSW after {MAX_DOWNLOAD_RETRIES} tries.");
78 bail!("Failed to download WinSW after {MAX_DOWNLOAD_RETRIES} tries.");
79 }
80 match release_repo.download_winsw(dest_path, &callback).await {
81 Ok(_) => break,
82 Err(e) => {
83 if verbosity != VerbosityLevel::Minimal {
84 println!("Error downloading WinSW: {e:?}");
85 println!("Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}");
86 }
87 error!("Error downloading WinSW. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {e:?}");
88 download_attempts += 1;
89 if let Some(pb) = &pb {
90 pb.finish_and_clear();
91 }
92 }
93 }
94 }
95
96 if let Some(pb) = pb {
97 pb.finish_and_clear();
98 }
99 } else {
100 debug!("WinSW already installed, dest_path exists: {dest_path:?}");
101 }
102
103 info!("WinSW installed at {dest_path:?}. Setting WINSW_PATH environment variable.");
104
105 std::env::set_var("WINSW_PATH", dest_path.to_string_lossy().to_string());
106
107 Ok(())
108}
109
110#[cfg(not(windows))]
111pub async fn configure_winsw(_dest_path: &Path, _verbosity: VerbosityLevel) -> Result<()> {
112 Ok(())
113}
114
115pub async fn download_and_extract_release(
121 release_type: ReleaseType,
122 url: Option<String>,
123 version: Option<String>,
124 release_repo: &dyn AntReleaseRepoActions,
125 verbosity: VerbosityLevel,
126 download_dir_path: Option<PathBuf>,
127) -> Result<(PathBuf, String)> {
128 debug!(
129 "Downloading and extracting release for {release_type}, url: {url:?}, version: {version:?}"
130 );
131 let mut pb = None;
132 let callback = if verbosity != VerbosityLevel::Minimal {
133 let progress_bar = Arc::new(ProgressBar::new(0));
134 progress_bar.set_style(ProgressStyle::default_bar()
135 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")?
136 .progress_chars("#>-"));
137 pb = Some(progress_bar.clone());
138 let pb_clone = progress_bar.clone();
139 let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |downloaded, total| {
140 pb_clone.set_length(total);
141 pb_clone.set_position(downloaded);
142 });
143 callback
144 } else {
145 let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |_, _| {});
146 callback
147 };
148
149 let download_dir_path = if let Some(path) = download_dir_path {
150 std::fs::create_dir_all(&path)?;
151 path
152 } else if url.is_some() {
153 create_temp_dir()?
154 } else {
155 let path = config::get_node_manager_path()?.join("downloads");
158 std::fs::create_dir_all(&path)?;
159 path
160 };
161 debug!("Download directory: {download_dir_path:?}");
162
163 let mut download_attempts = 1;
164 let binary_download_path = loop {
165 if download_attempts > MAX_DOWNLOAD_RETRIES {
166 error!("Failed to download release after {MAX_DOWNLOAD_RETRIES} tries.");
167 bail!("Failed to download release after {MAX_DOWNLOAD_RETRIES} tries.");
168 }
169
170 if let Some(url) = &url {
171 info!("Downloading release from {url}");
172 if verbosity != VerbosityLevel::Minimal {
173 println!("Retrieving {release_type} from {url}");
174 }
175 match release_repo
176 .download_release(url, &download_dir_path, &callback)
177 .await
178 {
179 Ok(archive_path) => {
180 let binary_download_path = release_repo
181 .extract_release_archive(&archive_path, &download_dir_path)
182 .inspect_err(|err| error!("Error while extracting archive {err:?}"))?;
183 break binary_download_path;
184 }
185 Err(err) => {
186 error!("Error downloading release: {err:?}");
187 if verbosity != VerbosityLevel::Minimal {
188 println!("Error downloading release: {err:?}");
189 println!("Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}");
190 }
191 download_attempts += 1;
192 if let Some(pb) = &pb {
193 pb.finish_and_clear();
194 }
195 }
196 }
197 } else {
198 let version = if let Some(version) = version.clone() {
199 let version = Version::parse(&version)?;
200 info!("Downloading release from S3 for version {version}");
201 version
202 } else {
203 if verbosity != VerbosityLevel::Minimal {
204 println!("Retrieving latest version for {release_type}...");
205 }
206 let version = release_repo
207 .get_latest_version(&release_type)
208 .await
209 .inspect_err(|err| error!("Error obtaining latest version {err:?}"))?;
210 info!("Downloading latest version from S3: {version}");
211 version
212 };
213
214 let archive_name = format!(
215 "{}-{}-{}.{}",
216 release_type.to_string().to_lowercase(),
217 version,
218 &get_running_platform()?,
219 &ArchiveType::TarGz
220 );
221 let archive_path = download_dir_path.join(&archive_name);
222 if archive_path.exists() {
223 match release_repo.extract_release_archive(&archive_path, &download_dir_path) {
225 Ok(binary_download_path) => {
226 info!("Using cached {release_type} version {version}...");
227 if verbosity != VerbosityLevel::Minimal {
228 println!("Using cached {release_type} version {version}...");
229 }
230 break binary_download_path;
231 }
232 Err(_) => {
233 info!("Cached {release_type} version {version} is corrupted. Downloading again...");
234 if verbosity != VerbosityLevel::Minimal {
235 println!("Cached {release_type} version {version} is corrupted. Downloading again...");
236 }
237 }
238 }
239 }
240
241 if verbosity != VerbosityLevel::Minimal {
242 println!("Downloading {release_type} version {version}...");
243 }
244 match release_repo
245 .download_release_from_s3(
246 &release_type,
247 &version,
248 &get_running_platform()?,
249 &ArchiveType::TarGz,
250 &download_dir_path,
251 &callback,
252 )
253 .await
254 {
255 Ok(archive_path) => {
256 let binary_download_path =
257 release_repo.extract_release_archive(&archive_path, &download_dir_path)?;
258 break binary_download_path;
259 }
260 Err(err) => {
261 error!("Error while downloading release. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {err:?}");
262 if verbosity != VerbosityLevel::Minimal {
263 println!("Error while downloading release. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {err:?}");
264 }
265 download_attempts += 1;
266 if let Some(pb) = &pb {
267 pb.finish_and_clear();
268 }
269 }
270 }
271 };
272 };
273 if let Some(pb) = pb {
274 pb.finish_and_clear();
275 }
276 info!("Download completed: {binary_download_path:?}");
277
278 if verbosity != VerbosityLevel::Minimal {
279 println!("Download completed: {}", &binary_download_path.display());
280 }
281
282 let bin_version = get_bin_version(&binary_download_path)?;
286
287 Ok((binary_download_path, bin_version))
288}
289
290pub fn get_bin_version(bin_path: &PathBuf) -> Result<String> {
291 debug!("Obtaining version of binary {bin_path:?}");
292 let mut cmd = Command::new(bin_path)
293 .arg("--version")
294 .stdout(Stdio::piped())
295 .spawn()
296 .inspect_err(|err| error!("The program {bin_path:?} failed to start: {err:?}"))?;
297
298 let mut output = String::new();
299 cmd.stdout
300 .as_mut()
301 .ok_or_else(|| {
302 error!("Failed to capture stdout");
303 eyre!("Failed to capture stdout")
304 })?
305 .read_to_string(&mut output)
306 .inspect_err(|err| error!("Output contained non utf8 chars: {err:?}"))?;
307
308 let first_line = output.lines().next().ok_or_else(|| {
310 error!("No output received from binary");
311 eyre!("No output received from binary")
312 })?;
313
314 let version = if let Some(v_pos) = first_line.find('v') {
315 first_line[v_pos + 1..]
317 .split_whitespace()
318 .next()
319 .map(String::from)
320 } else {
321 first_line.split_whitespace().last().map(String::from)
323 }
324 .ok_or_else(|| {
325 error!("Failed to parse version from output");
326 eyre!("Failed to parse version from output")
327 })?;
328
329 debug!("Obtained version of binary: {version}");
330
331 Ok(version)
332}
333
334#[cfg(target_os = "windows")]
335pub fn get_username() -> Result<String> {
336 Ok(std::env::var("USERNAME")?)
337}
338
339#[cfg(not(target_os = "windows"))]
340pub fn get_username() -> Result<String> {
341 Ok(std::env::var("USER")?)
342}
343
344pub fn create_temp_dir() -> Result<PathBuf> {
347 let temp_dir = std::env::temp_dir();
348 let unique_dir_name = uuid::Uuid::new_v4().to_string();
349 let new_temp_dir = temp_dir.join(unique_dir_name);
350 std::fs::create_dir_all(&new_temp_dir)
351 .inspect_err(|err| error!("Failed to crete temp dir: {err:?}"))?;
352 Ok(new_temp_dir)
353}
354
355pub fn get_start_port_if_applicable(range: Option<PortRange>) -> Option<u16> {
357 if let Some(port) = range {
358 match port {
359 PortRange::Single(val) => return Some(val),
360 PortRange::Range(start, _) => return Some(start),
361 }
362 }
363 None
364}
365
366pub fn increment_port_option(port: Option<u16>) -> Option<u16> {
368 if let Some(port) = port {
369 let incremented_port = port + 1;
370 return Some(incremented_port);
371 }
372 None
373}
374
375pub async fn check_port_availability(
377 port_option: &PortRange,
378 nodes: &Arc<RwLock<Vec<Arc<RwLock<NodeServiceData>>>>>,
379) -> Result<()> {
380 let mut all_ports = Vec::new();
381 for node in nodes.read().await.iter() {
382 let node = node.read().await;
383 if let Some(port) = node.metrics_port {
384 all_ports.push(port);
385 }
386 if let Some(port) = node.node_port {
387 all_ports.push(port);
388 }
389 all_ports.push(node.rpc_socket_addr.port());
390 }
391
392 match port_option {
393 PortRange::Single(port) => {
394 if all_ports.contains(port) {
395 error!("Port {port} is being used by another service");
396 return Err(eyre!("Port {port} is being used by another service"));
397 }
398 }
399 PortRange::Range(start, end) => {
400 for i in *start..=*end {
401 if all_ports.contains(&i) {
402 error!("Port {i} is being used by another service");
403 return Err(eyre!("Port {i} is being used by another service"));
404 }
405 }
406 }
407 }
408 Ok(())
409}