cloudpub_client/
shell.rs

1use crate::config::ClientConfig;
2use anyhow::{bail, Context, Result};
3use std::cmp::min;
4use std::fs::File;
5use std::io::Write;
6use std::path::PathBuf;
7
8use cloudpub_common::protocol::message::Message;
9use cloudpub_common::protocol::{Break, ErrorInfo, ErrorKind, ProgressInfo};
10use cloudpub_common::transport::rustls::load_roots;
11use dirs::cache_dir;
12use futures::stream::StreamExt;
13use parking_lot::RwLock;
14use reqwest::{Certificate, ClientBuilder};
15use std::collections::HashMap;
16use std::io;
17use std::path::Path;
18use std::process::Stdio;
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21use tokio::io::{AsyncBufReadExt, BufReader};
22use tokio::process::Command;
23use tokio::sync::mpsc;
24use tracing::{error, info, warn};
25use walkdir::WalkDir;
26use zip::read::ZipArchive;
27
28pub const DOWNLOAD_SUBDIR: &str = "download";
29
30pub struct SubProcess {
31    shutdown_tx: mpsc::Sender<Message>,
32    command: PathBuf,
33    args: Vec<String>,
34    pub port: u16,
35    canceled: Arc<AtomicBool>,
36    pub result: Arc<RwLock<Result<()>>>,
37}
38
39impl SubProcess {
40    pub fn new(
41        command: PathBuf,
42        args: Vec<String>,
43        chdir: Option<PathBuf>,
44        envs: HashMap<String, String>,
45        result_tx: mpsc::Sender<Message>,
46        port: u16,
47    ) -> Self {
48        let canceled = Arc::new(AtomicBool::new(false));
49        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
50        let command2 = command.clone();
51        let args2 = args.clone();
52        let canceled2 = canceled.clone();
53        let result = Arc::new(RwLock::new(Ok(())));
54        let result_clone = result.clone();
55        tokio::spawn(async move {
56            if let Err(err) = execute(command2, args2, chdir, envs, None, &mut shutdown_rx).await {
57                if !canceled2.load(Ordering::Relaxed) {
58                    error!("Failed to execute command: {:?}", err);
59                    result_tx
60                        .send(Message::Error(ErrorInfo {
61                            kind: ErrorKind::ExecuteFailed.into(),
62                            message: crate::t!("error-execute-failed", "err" => err.to_string()),
63                            guid: String::new(),
64                        }))
65                        .await
66                        .ok();
67                }
68                *result_clone.write() = Err(err);
69            }
70        });
71        Self {
72            shutdown_tx,
73            port,
74            command,
75            args,
76            canceled,
77            result,
78        }
79    }
80
81    pub fn stop(&mut self) {
82        self.canceled.store(true, Ordering::Relaxed);
83        self.shutdown_tx
84            .try_send(Message::Break(Break {
85                ..Default::default()
86            }))
87            .ok();
88    }
89}
90
91impl Drop for SubProcess {
92    fn drop(&mut self) {
93        info!("Drop subprocess: {:?} {:?}", self.command, self.args);
94        self.stop();
95    }
96}
97
98pub async fn send_progress(
99    message: &str,
100    template: &str,
101    total: u64,
102    current: u64,
103    progress_tx: &mpsc::Sender<Message>,
104) {
105    let progress = ProgressInfo {
106        message: message.to_string(),
107        template: template.to_string(),
108        total: total as u32,
109        current: current as u32,
110        ..Default::default()
111    };
112    progress_tx.send(Message::Progress(progress)).await.ok();
113}
114
115pub async fn execute(
116    command: PathBuf,
117    args: Vec<String>,
118    chdir: Option<PathBuf>,
119    envs: HashMap<String, String>,
120    progress: Option<(String, mpsc::Sender<Message>, u64)>,
121    shutdown_rx: &mut mpsc::Receiver<Message>,
122) -> Result<()> {
123    let argv = format!("{} {}", command.to_str().unwrap(), args.join(" "));
124    info!("Executing command: {}", argv);
125
126    info!("Environment: {:?}", envs);
127
128    let template = crate::t!("progress-files-eta");
129    let chdir = chdir.as_deref().unwrap_or(Path::new("."));
130
131    if let Some((message, tx, total)) = progress.as_ref() {
132        send_progress(message, &template, *total, 0, tx).await;
133        send_progress(message, &template, *total, 1, tx).await;
134    }
135
136    #[cfg(windows)]
137    let mut child = Command::new(command.clone())
138        .args(args.clone())
139        .kill_on_drop(true)
140        .current_dir(&chdir)
141        .stdout(Stdio::piped())
142        .stderr(Stdio::piped())
143        .creation_flags(0x08000000)
144        .envs(envs)
145        .spawn()
146        .context(format!(
147            "Failed to execute command: {:?} {:?}",
148            command, args
149        ))?;
150
151    #[cfg(not(windows))]
152    let mut child = Command::new(command.clone())
153        .args(args.clone())
154        .kill_on_drop(true)
155        .current_dir(chdir)
156        .stdout(Stdio::piped())
157        .stderr(Stdio::piped())
158        .envs(envs)
159        .spawn()
160        .context(format!(
161            "Failed to execute command: {:?} {:?}",
162            command, args
163        ))?;
164
165    let stdout = child.stdout.take().context("Failed to get stdout")?;
166    let stderr = child.stderr.take().context("Failed to get stderr")?;
167
168    let stdout_reader = BufReader::new(stdout).lines();
169    let stderr_reader = BufReader::new(stderr).lines();
170
171    let progress1 = progress.clone();
172    let template_clone = template.clone();
173    let last_stderr = Arc::new(RwLock::new(String::new()));
174    let last_stderr_clone = last_stderr.clone();
175    tokio::spawn(async move {
176        tokio::pin!(stdout_reader);
177        tokio::pin!(stderr_reader);
178        let mut current = 0;
179        loop {
180            let progress = progress1.clone();
181            tokio::select! {
182                line = stdout_reader.next_line() => match line {
183                    Ok(Some(line)) => {
184                        info!("STDOUT: {}", line);
185                        current += 1;
186                        if let Some((message, tx, total)) = progress.as_ref() {
187                            send_progress(message, &template_clone, *total, current, tx).await;
188                        }
189                    }
190                    Err(e) => {
191                        bail!("Error reading stdout: {}", e);
192                    },
193                    Ok(None) => {
194                        info!("STDOUT EOF");
195                        break;
196                    }
197                },
198                line = stderr_reader.next_line() => match line {
199                    Ok(Some(line)) => {
200                        warn!("STDERR: {}", line);
201                        {
202                            let mut last = last_stderr.write();
203                            if !last.is_empty() {
204                                *last += " ";
205                            }
206                            if (*last).len() > 2000 {
207                                *last = (*last).split_off(1000);
208                            }
209                            *last += &line;
210                        }
211                        current += 1;
212                        if let Some((message, tx, total)) = progress.as_ref() {
213                            send_progress(message, &template_clone, *total, current, tx).await;
214                        }
215                    },
216                    Err(e) => {
217                        bail!("Error reading stderr: {}", e);
218                    },
219                    Ok(None) => {
220                        info!("STDERR EOF");
221                        break;
222                    }
223                },
224            }
225        }
226        Ok(())
227    });
228
229    tokio::select! {
230        status = child.wait() => {
231            let status = status.context("Failed to wait on child")?;
232            if !status.success() {
233                if let Some((message, tx, total)) = progress.as_ref() {
234                    send_progress(message, &template, *total, *total, tx).await;
235                }
236                bail!("{}: exit code {} ({})", command.file_name().unwrap().to_string_lossy(), status.code().unwrap_or(-1), *last_stderr_clone.read());
237            }
238        }
239
240        cmd = shutdown_rx.recv() => match cmd {
241            Some(Message::Stop(_)) | Some(Message::Break(_)) => {
242                info!("Received break command, killing child process: {}", argv);
243                child.kill().await.ok();
244            }
245            None => {
246                info!("Command channel closed, killing child process: {}", argv);
247                child.kill().await.ok();
248            }
249            _ => {}
250        }
251    }
252
253    if let Some((message, tx, total)) = progress.as_ref() {
254        send_progress(message, &template, *total, *total, tx).await;
255    }
256    info!("Command executed successfully");
257
258    Ok(())
259}
260
261pub async fn unzip(
262    message: &str,
263    zip_file_path: &Path,
264    extract_dir: &Path,
265    skip: usize,
266    result_tx: &mpsc::Sender<Message>,
267) -> Result<()> {
268    info!("Unzipping {:?} to {:?}", zip_file_path, extract_dir);
269    let file = File::open(zip_file_path)?;
270    let mut archive = ZipArchive::new(file)?;
271
272    std::fs::create_dir_all(extract_dir)
273        .context(format!("Failed to create dir '{:?}'", extract_dir))?;
274
275    let template = crate::t!("progress-files-eta");
276
277    let mut progress = ProgressInfo {
278        message: message.to_string(),
279        template,
280        total: archive.len() as u32,
281        current: 0,
282        ..Default::default()
283    };
284
285    result_tx
286        .send(Message::Progress(progress.clone()))
287        .await
288        .ok();
289
290    for i in 0..archive.len() {
291        let mut file = archive
292            .by_index(i)
293            .context("Failed to get file from archive")?;
294        let file_name = Path::new(file.name())
295            .components()
296            .skip(skip)
297            .collect::<PathBuf>();
298        let target_path = Path::new(extract_dir).join(file_name);
299        if target_path == extract_dir {
300            continue;
301        }
302        info!("Extracting {:?}", target_path);
303        if file.is_dir() {
304            std::fs::create_dir_all(target_path.clone())
305                .context(format!("unzip failed to create dir '{:?}'", target_path))?;
306        } else {
307            let mut output_file = File::create(target_path.clone())
308                .context(format!("unzip failed to create file '{:?}'", target_path))?;
309            io::copy(&mut file, &mut output_file).context("unzip failed to copy file")?;
310        }
311
312        progress.current = (i + 1) as u32;
313        if progress.current.is_multiple_of(100) {
314            result_tx
315                .send(Message::Progress(progress.clone()))
316                .await
317                .ok();
318        }
319    }
320
321    progress.current = progress.total;
322    result_tx
323        .send(Message::Progress(progress.clone()))
324        .await
325        .ok();
326    Ok(())
327}
328
329pub async fn download(
330    message: &str,
331    config: Arc<RwLock<ClientConfig>>,
332    url: &str,
333    path: &Path,
334    command_rx: &mut mpsc::Receiver<Message>,
335    result_tx: &mpsc::Sender<Message>,
336) -> Result<()> {
337    info!("Downloading {} to {:?}", url, path);
338
339    let mut client = ClientBuilder::default();
340
341    let danger_accept_invalid_certs = config
342        .read()
343        .transport
344        .tls
345        .as_ref()
346        .and_then(|tls| tls.danger_ignore_certificate_verification)
347        .unwrap_or(false);
348
349    if let Some(tls) = &config.read().transport.tls {
350        let roots = load_roots(tls).context("Failed to load client config")?;
351        for cert_der in roots {
352            let cert = Certificate::from_der(&cert_der)?;
353            client = client.add_root_certificate(cert);
354        }
355        if tls.danger_ignore_certificate_verification.unwrap_or(false) {
356            client = client.danger_accept_invalid_certs(danger_accept_invalid_certs);
357        }
358    }
359
360    let client = match client.build() {
361        Ok(client) => client,
362        Err(e) => {
363            error!(
364                "Failed to create reqwest client with system certificates:  {:?}",
365                e
366            );
367            warn!("Using default reqwest client");
368            reqwest::Client::builder()
369                .danger_accept_invalid_certs(danger_accept_invalid_certs)
370                .build()
371                .context("Failed to create defaut reqwest client")?
372        }
373    };
374
375    // Reqwest setup
376    let res = client
377        .get(url)
378        .send()
379        .await
380        .context(format!("Failed to GET from '{}'", &url))?;
381
382    // Check if response status is 200 OK
383    if !res.status().is_success() {
384        bail!("HTTP request failed with status: {}", res.status());
385    }
386
387    // Indicatif setup
388    let total_size = res
389        .content_length()
390        .context(format!("Failed to get content length from '{}'", &url))?;
391
392    if let Ok(file) = File::open(path) {
393        if file
394            .metadata()
395            .context(format!("Failed to get metadata from '{:?}'", path))?
396            .len()
397            == total_size
398        {
399            return Ok(());
400        }
401    }
402
403    let template = crate::t!("progress-bytes");
404
405    let mut progress = ProgressInfo {
406        message: message.to_string(),
407        template,
408        total: total_size as u32,
409        current: 0,
410        ..Default::default()
411    };
412
413    result_tx
414        .send(Message::Progress(progress.clone()))
415        .await
416        .ok();
417
418    // download chunks
419    let mut file = File::create(path).context(format!("Failed to create file '{:?}'", path))?;
420    let mut stream = res.bytes_stream();
421
422    loop {
423        tokio::select! {
424            cmd = command_rx.recv() => {
425                match cmd {
426                    Some(Message::Stop(_)) | Some(Message::Break(_)) => {
427                        info!("Download cancelled");
428                        progress.total = total_size as u32;
429                        result_tx.send(Message::Progress(progress.clone())).await.ok();
430                        bail!("Download cancelled");
431                    }
432                    None => {
433                        progress.total = total_size as u32;
434                        result_tx.send(Message::Progress(progress.clone())).await.ok();
435                        bail!("Command channel closed");
436                    }
437                    _ => {}
438                }
439            }
440
441            item = stream.next() => {
442                if let Some(item) =  item {
443                let chunk = item.context("Failed to get chunk")?;
444                    file.write_all(&chunk).context("Error while writing to file")?;
445                    let kb_current = progress.current / 1024;
446                    progress.current = min(progress.current + (chunk.len() as u32), total_size as u32);
447                    let kb_new = progress.current / 1024;
448                    // Throttle download progress
449                    if kb_new > kb_current {
450                        result_tx.send(Message::Progress(progress.clone())).await.ok();
451                    }
452                } else {
453                    break;
454                }
455            }
456        }
457    }
458
459    progress.current = total_size as u32;
460    result_tx
461        .send(Message::Progress(progress.clone()))
462        .await
463        .ok();
464    Ok(())
465}
466
467pub fn compare_filenames(path1: &Path, path2: &Path) -> bool {
468    if let (Some(file_name1), Some(file_name2)) = (path1.file_name(), path2.file_name()) {
469        let filename1 = file_name1.to_string_lossy();
470        let filename2 = file_name2.to_string_lossy();
471        #[cfg(windows)]
472        return filename1.eq_ignore_ascii_case(&filename2);
473        #[cfg(not(windows))]
474        filename1.eq(&filename2)
475    } else {
476        false
477    }
478}
479
480pub fn find(dir: &Path, file: &Path) -> Result<Option<PathBuf>> {
481    info!("Searching for {:?} in {:?}", file, dir);
482    for entry in WalkDir::new(dir).into_iter().filter_map(|e| e.ok()) {
483        if compare_filenames(entry.path(), file) {
484            return Ok(Some(entry.path().to_path_buf()));
485        }
486    }
487    Ok(None)
488}
489
490pub fn get_cache_dir(subdir: &str) -> Result<PathBuf> {
491    let mut cache_dir = cache_dir().context("Can't get cache dir")?;
492    cache_dir.push("cloudpub");
493    if !subdir.is_empty() {
494        cache_dir.push(subdir);
495    }
496    std::fs::create_dir_all(cache_dir.clone()).context("Can't create cache dir")?;
497    Ok(cache_dir)
498}