sn_node_manager/
helpers.rs1use color_eyre::{
10 eyre::{bail, eyre},
11 Result,
12};
13use indicatif::{ProgressBar, ProgressStyle};
14use semver::Version;
15use sn_releases::{get_running_platform, ArchiveType, ReleaseType, SafeReleaseRepoActions};
16use sn_service_management::NodeServiceData;
17use std::{
18 io::Read,
19 path::{Path, PathBuf},
20 process::{Command, Stdio},
21 sync::Arc,
22};
23
24use crate::{add_services::config::PortRange, config, VerbosityLevel};
25
26const MAX_DOWNLOAD_RETRIES: u8 = 3;
27
28#[cfg(windows)]
29pub async fn configure_winsw(dest_path: &Path, verbosity: VerbosityLevel) -> Result<()> {
30 if which::which("winsw.exe").is_ok() {
31 debug!("WinSW already installed, which returned Ok");
32 return Ok(());
33 }
34
35 if !dest_path.exists() {
36 if verbosity != VerbosityLevel::Minimal {
37 println!("Downloading winsw.exe...");
38 }
39 debug!("Downloading WinSW to {dest_path:?}");
40
41 let release_repo = <dyn SafeReleaseRepoActions>::default_config();
42
43 let mut pb = None;
44 let callback = if verbosity != VerbosityLevel::Minimal {
45 let progress_bar = Arc::new(ProgressBar::new(0));
46 progress_bar.set_style(ProgressStyle::default_bar()
47 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")?
48 .progress_chars("#>-"));
49 pb = Some(progress_bar.clone());
50 let pb_clone = progress_bar.clone();
51 let callback: Box<dyn Fn(u64, u64) + Send + Sync> =
52 Box::new(move |downloaded, total| {
53 pb_clone.set_length(total);
54 pb_clone.set_position(downloaded);
55 });
56 callback
57 } else {
58 let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |_, _| {});
59 callback
60 };
61
62 let mut download_attempts = 1;
63 loop {
64 if download_attempts > MAX_DOWNLOAD_RETRIES {
65 error!("Failed to download WinSW after {MAX_DOWNLOAD_RETRIES} tries.");
66 bail!("Failed to download WinSW after {MAX_DOWNLOAD_RETRIES} tries.");
67 }
68 match release_repo.download_winsw(dest_path, &callback).await {
69 Ok(_) => break,
70 Err(e) => {
71 if verbosity != VerbosityLevel::Minimal {
72 println!("Error downloading WinSW: {e:?}");
73 println!("Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}");
74 }
75 error!("Error downloading WinSW. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {e:?}");
76 download_attempts += 1;
77 if let Some(pb) = &pb {
78 pb.finish_and_clear();
79 }
80 }
81 }
82 }
83
84 if let Some(pb) = pb {
85 pb.finish_and_clear();
86 }
87 } else {
88 debug!("WinSW already installed, dest_path exists: {dest_path:?}");
89 }
90
91 info!("WinSW installed at {dest_path:?}. Setting WINSW_PATH environment variable.");
92
93 std::env::set_var("WINSW_PATH", dest_path.to_string_lossy().to_string());
94
95 Ok(())
96}
97
98#[cfg(not(windows))]
99pub async fn configure_winsw(_dest_path: &Path, _verbosity: VerbosityLevel) -> Result<()> {
100 Ok(())
101}
102
103pub async fn download_and_extract_release(
109 release_type: ReleaseType,
110 url: Option<String>,
111 version: Option<String>,
112 release_repo: &dyn SafeReleaseRepoActions,
113 verbosity: VerbosityLevel,
114 download_dir_path: Option<PathBuf>,
115) -> Result<(PathBuf, String)> {
116 debug!(
117 "Downloading and extracting release for {release_type}, url: {url:?}, version: {version:?}"
118 );
119 let mut pb = None;
120 let callback = if verbosity != VerbosityLevel::Minimal {
121 let progress_bar = Arc::new(ProgressBar::new(0));
122 progress_bar.set_style(ProgressStyle::default_bar()
123 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")?
124 .progress_chars("#>-"));
125 pb = Some(progress_bar.clone());
126 let pb_clone = progress_bar.clone();
127 let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |downloaded, total| {
128 pb_clone.set_length(total);
129 pb_clone.set_position(downloaded);
130 });
131 callback
132 } else {
133 let callback: Box<dyn Fn(u64, u64) + Send + Sync> = Box::new(move |_, _| {});
134 callback
135 };
136
137 let download_dir_path = if let Some(path) = download_dir_path {
138 std::fs::create_dir_all(&path)?;
139 path
140 } else if url.is_some() {
141 create_temp_dir()?
142 } else {
143 let path = config::get_node_manager_path()?.join("downloads");
146 std::fs::create_dir_all(&path)?;
147 path
148 };
149 debug!("Download directory: {download_dir_path:?}");
150
151 let mut download_attempts = 1;
152 let binary_download_path = loop {
153 if download_attempts > MAX_DOWNLOAD_RETRIES {
154 error!("Failed to download release after {MAX_DOWNLOAD_RETRIES} tries.");
155 bail!("Failed to download release after {MAX_DOWNLOAD_RETRIES} tries.");
156 }
157
158 if let Some(url) = &url {
159 info!("Downloading release from {url}");
160 if verbosity != VerbosityLevel::Minimal {
161 println!("Retrieving {release_type} from {url}");
162 }
163 match release_repo
164 .download_release(url, &download_dir_path, &callback)
165 .await
166 {
167 Ok(archive_path) => {
168 let binary_download_path = release_repo
169 .extract_release_archive(&archive_path, &download_dir_path)
170 .inspect_err(|err| error!("Error while extracting archive {err:?}"))?;
171 break binary_download_path;
172 }
173 Err(err) => {
174 error!("Error downloading release: {err:?}");
175 if verbosity != VerbosityLevel::Minimal {
176 println!("Error downloading release: {err:?}");
177 println!("Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}");
178 }
179 download_attempts += 1;
180 if let Some(pb) = &pb {
181 pb.finish_and_clear();
182 }
183 }
184 }
185 } else {
186 let version = if let Some(version) = version.clone() {
187 let version = Version::parse(&version)?;
188 info!("Downloading release from S3 for version {version}");
189 version
190 } else {
191 if verbosity != VerbosityLevel::Minimal {
192 println!("Retrieving latest version for {release_type}...");
193 }
194 let version = release_repo
195 .get_latest_version(&release_type)
196 .await
197 .inspect_err(|err| error!("Error obtaining latest version {err:?}"))?;
198 info!("Downloading latest version from S3: {version}");
199 version
200 };
201
202 let archive_name = format!(
203 "{}-{}-{}.{}",
204 release_type.to_string().to_lowercase(),
205 version,
206 &get_running_platform()?,
207 &ArchiveType::TarGz
208 );
209 let archive_path = download_dir_path.join(&archive_name);
210 if archive_path.exists() {
211 match release_repo.extract_release_archive(&archive_path, &download_dir_path) {
213 Ok(binary_download_path) => {
214 info!("Using cached {release_type} version {version}...");
215 if verbosity != VerbosityLevel::Minimal {
216 println!("Using cached {release_type} version {version}...");
217 }
218 break binary_download_path;
219 }
220 Err(_) => {
221 info!("Cached {release_type} version {version} is corrupted. Downloading again...");
222 if verbosity != VerbosityLevel::Minimal {
223 println!("Cached {release_type} version {version} is corrupted. Downloading again...");
224 }
225 }
226 }
227 }
228
229 if verbosity != VerbosityLevel::Minimal {
230 println!("Downloading {release_type} version {version}...");
231 }
232 match release_repo
233 .download_release_from_s3(
234 &release_type,
235 &version,
236 &get_running_platform()?,
237 &ArchiveType::TarGz,
238 &download_dir_path,
239 &callback,
240 )
241 .await
242 {
243 Ok(archive_path) => {
244 let binary_download_path =
245 release_repo.extract_release_archive(&archive_path, &download_dir_path)?;
246 break binary_download_path;
247 }
248 Err(err) => {
249 error!("Error while downloading release. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {err:?}");
250 if verbosity != VerbosityLevel::Minimal {
251 println!("Error while downloading release. Trying again {download_attempts}/{MAX_DOWNLOAD_RETRIES}: {err:?}");
252 }
253 download_attempts += 1;
254 if let Some(pb) = &pb {
255 pb.finish_and_clear();
256 }
257 }
258 }
259 };
260 };
261 if let Some(pb) = pb {
262 pb.finish_and_clear();
263 }
264 info!("Download completed: {binary_download_path:?}");
265
266 if verbosity != VerbosityLevel::Minimal {
267 println!("Download completed: {}", &binary_download_path.display());
268 }
269
270 let bin_version = get_bin_version(&binary_download_path)?;
274
275 Ok((binary_download_path, bin_version))
276}
277
278pub fn get_bin_version(bin_path: &PathBuf) -> Result<String> {
279 debug!("Obtaining version of binary {bin_path:?}");
280 let mut cmd = Command::new(bin_path)
281 .arg("--version")
282 .stdout(Stdio::piped())
283 .spawn()
284 .inspect_err(|err| error!("The program {bin_path:?} failed to start: {err:?}"))?;
285
286 let mut output = String::new();
287 cmd.stdout
288 .as_mut()
289 .ok_or_else(|| {
290 error!("Failed to capture stdout");
291 eyre!("Failed to capture stdout")
292 })?
293 .read_to_string(&mut output)
294 .inspect_err(|err| error!("Output contained non utf8 chars: {err:?}"))?;
295
296 let first_line = output.lines().next().ok_or_else(|| {
298 error!("No output received from binary");
299 eyre!("No output received from binary")
300 })?;
301
302 let version = if let Some(v_pos) = first_line.find('v') {
303 first_line[v_pos + 1..]
305 .split_whitespace()
306 .next()
307 .map(String::from)
308 } else {
309 first_line.split_whitespace().last().map(String::from)
311 }
312 .ok_or_else(|| {
313 error!("Failed to parse version from output");
314 eyre!("Failed to parse version from output")
315 })?;
316
317 debug!("Obtained version of binary: {version}");
318
319 Ok(version)
320}
321
322#[cfg(target_os = "windows")]
323pub fn get_username() -> Result<String> {
324 Ok(std::env::var("USERNAME")?)
325}
326
327#[cfg(not(target_os = "windows"))]
328pub fn get_username() -> Result<String> {
329 Ok(std::env::var("USER")?)
330}
331
332pub fn create_temp_dir() -> Result<PathBuf> {
335 let temp_dir = std::env::temp_dir();
336 let unique_dir_name = uuid::Uuid::new_v4().to_string();
337 let new_temp_dir = temp_dir.join(unique_dir_name);
338 std::fs::create_dir_all(&new_temp_dir)
339 .inspect_err(|err| error!("Failed to crete temp dir: {err:?}"))?;
340 Ok(new_temp_dir)
341}
342
343pub fn get_start_port_if_applicable(range: Option<PortRange>) -> Option<u16> {
345 if let Some(port) = range {
346 match port {
347 PortRange::Single(val) => return Some(val),
348 PortRange::Range(start, _) => return Some(start),
349 }
350 }
351 None
352}
353
354pub fn increment_port_option(port: Option<u16>) -> Option<u16> {
356 if let Some(port) = port {
357 let incremented_port = port + 1;
358 return Some(incremented_port);
359 }
360 None
361}
362
363pub fn check_port_availability(port_option: &PortRange, nodes: &[NodeServiceData]) -> Result<()> {
365 let mut all_ports = Vec::new();
366 for node in nodes {
367 if let Some(port) = node.metrics_port {
368 all_ports.push(port);
369 }
370 if let Some(port) = node.node_port {
371 all_ports.push(port);
372 }
373 all_ports.push(node.rpc_socket_addr.port());
374 }
375
376 match port_option {
377 PortRange::Single(port) => {
378 if all_ports.iter().any(|p| *p == *port) {
379 error!("Port {port} is being used by another service");
380 return Err(eyre!("Port {port} is being used by another service"));
381 }
382 }
383 PortRange::Range(start, end) => {
384 for i in *start..=*end {
385 if all_ports.iter().any(|p| *p == i) {
386 error!("Port {i} is being used by another service");
387 return Err(eyre!("Port {i} is being used by another service"));
388 }
389 }
390 }
391 }
392 Ok(())
393}