1use crate::config::ClientConfig;
2use anyhow::{bail, Context, Result};
3use std::cmp::min;
4use std::fs::File;
5use std::io::Write;
6use std::path::PathBuf;
7
8use cloudpub_common::protocol::message::Message;
9use cloudpub_common::protocol::{Break, ErrorInfo, ErrorKind, ProgressInfo};
10use cloudpub_common::transport::rustls::load_roots;
11use dirs::cache_dir;
12use futures::stream::StreamExt;
13use parking_lot::RwLock;
14use reqwest::{Certificate, ClientBuilder};
15use std::collections::HashMap;
16use std::io;
17use std::path::Path;
18use std::process::Stdio;
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21use tokio::io::{AsyncBufReadExt, BufReader};
22use tokio::process::Command;
23use tokio::sync::mpsc;
24use tracing::{error, info, warn};
25use walkdir::WalkDir;
26use zip::read::ZipArchive;
27
28pub const DOWNLOAD_SUBDIR: &str = "download";
29
30pub struct SubProcess {
31 shutdown_tx: mpsc::Sender<Message>,
32 command: PathBuf,
33 args: Vec<String>,
34 pub port: u16,
35 canceled: Arc<AtomicBool>,
36 pub result: Arc<RwLock<Result<()>>>,
37}
38
39impl SubProcess {
40 pub fn new(
41 command: PathBuf,
42 args: Vec<String>,
43 chdir: Option<PathBuf>,
44 envs: HashMap<String, String>,
45 result_tx: mpsc::Sender<Message>,
46 port: u16,
47 ) -> Self {
48 let canceled = Arc::new(AtomicBool::new(false));
49 let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
50 let command2 = command.clone();
51 let args2 = args.clone();
52 let canceled2 = canceled.clone();
53 let result = Arc::new(RwLock::new(Ok(())));
54 let result_clone = result.clone();
55 tokio::spawn(async move {
56 if let Err(err) = execute(command2, args2, chdir, envs, None, &mut shutdown_rx).await {
57 if !canceled2.load(Ordering::Relaxed) {
58 error!("Failed to execute command: {:?}", err);
59 result_tx
60 .send(Message::Error(ErrorInfo {
61 kind: ErrorKind::ExecuteFailed.into(),
62 message: crate::t!("error-execute-failed", "err" => err.to_string()),
63 guid: String::new(),
64 }))
65 .await
66 .ok();
67 }
68 *result_clone.write() = Err(err);
69 }
70 });
71 Self {
72 shutdown_tx,
73 port,
74 command,
75 args,
76 canceled,
77 result,
78 }
79 }
80
81 pub fn stop(&mut self) {
82 self.canceled.store(true, Ordering::Relaxed);
83 self.shutdown_tx
84 .try_send(Message::Break(Break {
85 ..Default::default()
86 }))
87 .ok();
88 }
89}
90
91impl Drop for SubProcess {
92 fn drop(&mut self) {
93 info!("Drop subprocess: {:?} {:?}", self.command, self.args);
94 self.stop();
95 }
96}
97
98pub async fn send_progress(
99 message: &str,
100 template: &str,
101 total: u64,
102 current: u64,
103 progress_tx: &mpsc::Sender<Message>,
104) {
105 let progress = ProgressInfo {
106 message: message.to_string(),
107 template: template.to_string(),
108 total: total as u32,
109 current: current as u32,
110 ..Default::default()
111 };
112 progress_tx.send(Message::Progress(progress)).await.ok();
113}
114
115pub async fn execute(
116 command: PathBuf,
117 args: Vec<String>,
118 chdir: Option<PathBuf>,
119 envs: HashMap<String, String>,
120 progress: Option<(String, mpsc::Sender<Message>, u64)>,
121 shutdown_rx: &mut mpsc::Receiver<Message>,
122) -> Result<()> {
123 let argv = format!("{} {}", command.to_str().unwrap(), args.join(" "));
124 info!("Executing command: {}", argv);
125
126 info!("Environment: {:?}", envs);
127
128 let template = crate::t!("progress-files-eta");
129 let chdir = chdir.as_deref().unwrap_or(Path::new("."));
130
131 if let Some((message, tx, total)) = progress.as_ref() {
132 send_progress(message, &template, *total, 0, tx).await;
133 send_progress(message, &template, *total, 1, tx).await;
134 }
135
136 #[cfg(windows)]
137 let mut child = Command::new(command.clone())
138 .args(args.clone())
139 .kill_on_drop(true)
140 .current_dir(&chdir)
141 .stdout(Stdio::piped())
142 .stderr(Stdio::piped())
143 .creation_flags(0x08000000)
144 .envs(envs)
145 .spawn()
146 .context(format!(
147 "Failed to execute command: {:?} {:?}",
148 command, args
149 ))?;
150
151 #[cfg(not(windows))]
152 let mut child = Command::new(command.clone())
153 .args(args.clone())
154 .kill_on_drop(true)
155 .current_dir(chdir)
156 .stdout(Stdio::piped())
157 .stderr(Stdio::piped())
158 .envs(envs)
159 .spawn()
160 .context(format!(
161 "Failed to execute command: {:?} {:?}",
162 command, args
163 ))?;
164
165 let stdout = child.stdout.take().context("Failed to get stdout")?;
166 let stderr = child.stderr.take().context("Failed to get stderr")?;
167
168 let stdout_reader = BufReader::new(stdout).lines();
169 let stderr_reader = BufReader::new(stderr).lines();
170
171 let progress1 = progress.clone();
172 let template_clone = template.clone();
173 let last_stderr = Arc::new(RwLock::new(String::new()));
174 let last_stderr_clone = last_stderr.clone();
175 tokio::spawn(async move {
176 tokio::pin!(stdout_reader);
177 tokio::pin!(stderr_reader);
178 let mut current = 0;
179 loop {
180 let progress = progress1.clone();
181 tokio::select! {
182 line = stdout_reader.next_line() => match line {
183 Ok(Some(line)) => {
184 info!("STDOUT: {}", line);
185 current += 1;
186 if let Some((message, tx, total)) = progress.as_ref() {
187 send_progress(message, &template_clone, *total, current, tx).await;
188 }
189 }
190 Err(e) => {
191 bail!("Error reading stdout: {}", e);
192 },
193 Ok(None) => {
194 info!("STDOUT EOF");
195 break;
196 }
197 },
198 line = stderr_reader.next_line() => match line {
199 Ok(Some(line)) => {
200 warn!("STDERR: {}", line);
201 {
202 let mut last = last_stderr.write();
203 if !last.is_empty() {
204 *last += " ";
205 }
206 if (*last).len() > 2000 {
207 *last = (*last).split_off(1000);
208 }
209 *last += &line;
210 }
211 current += 1;
212 if let Some((message, tx, total)) = progress.as_ref() {
213 send_progress(message, &template_clone, *total, current, tx).await;
214 }
215 },
216 Err(e) => {
217 bail!("Error reading stderr: {}", e);
218 },
219 Ok(None) => {
220 info!("STDERR EOF");
221 break;
222 }
223 },
224 }
225 }
226 Ok(())
227 });
228
229 tokio::select! {
230 status = child.wait() => {
231 let status = status.context("Failed to wait on child")?;
232 if !status.success() {
233 if let Some((message, tx, total)) = progress.as_ref() {
234 send_progress(message, &template, *total, *total, tx).await;
235 }
236 bail!("{}: exit code {} ({})", command.file_name().unwrap().to_string_lossy(), status.code().unwrap_or(-1), *last_stderr_clone.read());
237 }
238 }
239
240 cmd = shutdown_rx.recv() => match cmd {
241 Some(Message::Stop(_)) | Some(Message::Break(_)) => {
242 info!("Received break command, killing child process: {}", argv);
243 child.kill().await.ok();
244 }
245 None => {
246 info!("Command channel closed, killing child process: {}", argv);
247 child.kill().await.ok();
248 }
249 _ => {}
250 }
251 }
252
253 if let Some((message, tx, total)) = progress.as_ref() {
254 send_progress(message, &template, *total, *total, tx).await;
255 }
256 info!("Command executed successfully");
257
258 Ok(())
259}
260
261pub async fn unzip(
262 message: &str,
263 zip_file_path: &Path,
264 extract_dir: &Path,
265 skip: usize,
266 result_tx: &mpsc::Sender<Message>,
267) -> Result<()> {
268 info!("Unzipping {:?} to {:?}", zip_file_path, extract_dir);
269 let file = File::open(zip_file_path)?;
270 let mut archive = ZipArchive::new(file)?;
271
272 std::fs::create_dir_all(extract_dir)
273 .context(format!("Failed to create dir '{:?}'", extract_dir))?;
274
275 let template = crate::t!("progress-files-eta");
276
277 let mut progress = ProgressInfo {
278 message: message.to_string(),
279 template,
280 total: archive.len() as u32,
281 current: 0,
282 ..Default::default()
283 };
284
285 result_tx
286 .send(Message::Progress(progress.clone()))
287 .await
288 .ok();
289
290 for i in 0..archive.len() {
291 let mut file = archive
292 .by_index(i)
293 .context("Failed to get file from archive")?;
294 let file_name = Path::new(file.name())
295 .components()
296 .skip(skip)
297 .collect::<PathBuf>();
298 let target_path = Path::new(extract_dir).join(file_name);
299 if target_path == extract_dir {
300 continue;
301 }
302 info!("Extracting {:?}", target_path);
303 if file.is_dir() {
304 std::fs::create_dir_all(target_path.clone())
305 .context(format!("unzip failed to create dir '{:?}'", target_path))?;
306 } else {
307 let mut output_file = File::create(target_path.clone())
308 .context(format!("unzip failed to create file '{:?}'", target_path))?;
309 io::copy(&mut file, &mut output_file).context("unzip failed to copy file")?;
310 }
311
312 progress.current = (i + 1) as u32;
313 if progress.current.is_multiple_of(100) {
314 result_tx
315 .send(Message::Progress(progress.clone()))
316 .await
317 .ok();
318 }
319 }
320
321 progress.current = progress.total;
322 result_tx
323 .send(Message::Progress(progress.clone()))
324 .await
325 .ok();
326 Ok(())
327}
328
329pub async fn download(
330 message: &str,
331 config: Arc<RwLock<ClientConfig>>,
332 url: &str,
333 path: &Path,
334 command_rx: &mut mpsc::Receiver<Message>,
335 result_tx: &mpsc::Sender<Message>,
336) -> Result<()> {
337 info!("Downloading {} to {:?}", url, path);
338
339 let mut client = ClientBuilder::default();
340
341 let danger_accept_invalid_certs = config
342 .read()
343 .transport
344 .tls
345 .as_ref()
346 .and_then(|tls| tls.danger_ignore_certificate_verification)
347 .unwrap_or(false);
348
349 if let Some(tls) = &config.read().transport.tls {
350 let roots = load_roots(tls).context("Failed to load client config")?;
351 for cert_der in roots {
352 let cert = Certificate::from_der(&cert_der)?;
353 client = client.add_root_certificate(cert);
354 }
355 if tls.danger_ignore_certificate_verification.unwrap_or(false) {
356 client = client.danger_accept_invalid_certs(danger_accept_invalid_certs);
357 }
358 }
359
360 let client = match client.build() {
361 Ok(client) => client,
362 Err(e) => {
363 error!(
364 "Failed to create reqwest client with system certificates: {:?}",
365 e
366 );
367 warn!("Using default reqwest client");
368 reqwest::Client::builder()
369 .danger_accept_invalid_certs(danger_accept_invalid_certs)
370 .build()
371 .context("Failed to create defaut reqwest client")?
372 }
373 };
374
375 let res = client
377 .get(url)
378 .send()
379 .await
380 .context(format!("Failed to GET from '{}'", &url))?;
381
382 if !res.status().is_success() {
384 bail!("HTTP request failed with status: {}", res.status());
385 }
386
387 let total_size = res
389 .content_length()
390 .context(format!("Failed to get content length from '{}'", &url))?;
391
392 if let Ok(file) = File::open(path) {
393 if file
394 .metadata()
395 .context(format!("Failed to get metadata from '{:?}'", path))?
396 .len()
397 == total_size
398 {
399 return Ok(());
400 }
401 }
402
403 let template = crate::t!("progress-bytes");
404
405 let mut progress = ProgressInfo {
406 message: message.to_string(),
407 template,
408 total: total_size as u32,
409 current: 0,
410 ..Default::default()
411 };
412
413 result_tx
414 .send(Message::Progress(progress.clone()))
415 .await
416 .ok();
417
418 let mut file = File::create(path).context(format!("Failed to create file '{:?}'", path))?;
420 let mut stream = res.bytes_stream();
421
422 loop {
423 tokio::select! {
424 cmd = command_rx.recv() => {
425 match cmd {
426 Some(Message::Stop(_)) | Some(Message::Break(_)) => {
427 info!("Download cancelled");
428 progress.total = total_size as u32;
429 result_tx.send(Message::Progress(progress.clone())).await.ok();
430 bail!("Download cancelled");
431 }
432 None => {
433 progress.total = total_size as u32;
434 result_tx.send(Message::Progress(progress.clone())).await.ok();
435 bail!("Command channel closed");
436 }
437 _ => {}
438 }
439 }
440
441 item = stream.next() => {
442 if let Some(item) = item {
443 let chunk = item.context("Failed to get chunk")?;
444 file.write_all(&chunk).context("Error while writing to file")?;
445 let kb_current = progress.current / 1024;
446 progress.current = min(progress.current + (chunk.len() as u32), total_size as u32);
447 let kb_new = progress.current / 1024;
448 if kb_new > kb_current {
450 result_tx.send(Message::Progress(progress.clone())).await.ok();
451 }
452 } else {
453 break;
454 }
455 }
456 }
457 }
458
459 progress.current = total_size as u32;
460 result_tx
461 .send(Message::Progress(progress.clone()))
462 .await
463 .ok();
464 Ok(())
465}
466
467pub fn compare_filenames(path1: &Path, path2: &Path) -> bool {
468 if let (Some(file_name1), Some(file_name2)) = (path1.file_name(), path2.file_name()) {
469 let filename1 = file_name1.to_string_lossy();
470 let filename2 = file_name2.to_string_lossy();
471 #[cfg(windows)]
472 return filename1.eq_ignore_ascii_case(&filename2);
473 #[cfg(not(windows))]
474 filename1.eq(&filename2)
475 } else {
476 false
477 }
478}
479
480pub fn find(dir: &Path, file: &Path) -> Result<Option<PathBuf>> {
481 info!("Searching for {:?} in {:?}", file, dir);
482 for entry in WalkDir::new(dir).into_iter().filter_map(|e| e.ok()) {
483 if compare_filenames(entry.path(), file) {
484 return Ok(Some(entry.path().to_path_buf()));
485 }
486 }
487 Ok(None)
488}
489
490pub fn get_cache_dir(subdir: &str) -> Result<PathBuf> {
491 let mut cache_dir = cache_dir().context("Can't get cache dir")?;
492 cache_dir.push("cloudpub");
493 if !subdir.is_empty() {
494 cache_dir.push(subdir);
495 }
496 std::fs::create_dir_all(cache_dir.clone()).context("Can't create cache dir")?;
497 Ok(cache_dir)
498}