cloudpub_client/
shell.rs

1use crate::config::ClientConfig;
2use anyhow::{bail, Context, Result};
3use std::cmp::min;
4use std::fs::File;
5use std::io::Write;
6use std::path::PathBuf;
7
8use cloudpub_common::protocol::message::Message;
9use cloudpub_common::protocol::{Break, ErrorInfo, ErrorKind, ProgressInfo};
10use cloudpub_common::transport::rustls::load_roots;
11use dirs::cache_dir;
12use futures::stream::StreamExt;
13use parking_lot::RwLock;
14use reqwest::{Certificate, ClientBuilder};
15use std::collections::HashMap;
16use std::io;
17use std::path::Path;
18use std::process::Stdio;
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21use tokio::io::{AsyncBufReadExt, BufReader};
22use tokio::process::Command;
23use tokio::sync::mpsc;
24use tracing::{error, info, warn};
25use walkdir::WalkDir;
26use zip::read::ZipArchive;
27
28pub const DOWNLOAD_SUBDIR: &str = "download";
29
30pub struct SubProcess {
31    shutdown_tx: mpsc::Sender<Message>,
32    command: PathBuf,
33    args: Vec<String>,
34    pub port: u16,
35    canceled: Arc<AtomicBool>,
36    pub result: Arc<RwLock<Result<()>>>,
37}
38
39impl SubProcess {
40    pub fn new(
41        command: PathBuf,
42        args: Vec<String>,
43        chdir: Option<PathBuf>,
44        envs: HashMap<String, String>,
45        result_tx: mpsc::Sender<Message>,
46        port: u16,
47    ) -> Self {
48        let canceled = Arc::new(AtomicBool::new(false));
49        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
50        let command2 = command.clone();
51        let args2 = args.clone();
52        let canceled2 = canceled.clone();
53        let result = Arc::new(RwLock::new(Ok(())));
54        let result_clone = result.clone();
55        tokio::spawn(async move {
56            if let Err(err) = execute(command2, args2, chdir, envs, None, &mut shutdown_rx).await {
57                if !canceled2.load(Ordering::Relaxed) {
58                    error!("Failed to execute command: {:?}", err);
59                    result_tx
60                        .send(Message::Error(ErrorInfo {
61                            kind: ErrorKind::ExecuteFailed.into(),
62                            message: format!("Ошибка запуска {}", err),
63                            guid: String::new(),
64                        }))
65                        .await
66                        .ok();
67                }
68                *result_clone.write() = Err(err);
69            }
70        });
71        Self {
72            shutdown_tx,
73            port,
74            command,
75            args,
76            canceled,
77            result,
78        }
79    }
80
81    pub fn stop(&mut self) {
82        self.canceled.store(true, Ordering::Relaxed);
83        self.shutdown_tx
84            .try_send(Message::Break(Break {
85                ..Default::default()
86            }))
87            .ok();
88    }
89}
90
91impl Drop for SubProcess {
92    fn drop(&mut self) {
93        info!("Drop subprocess: {:?} {:?}", self.command, self.args);
94        self.stop();
95    }
96}
97
98pub async fn send_progress(
99    message: &str,
100    template: &str,
101    total: u64,
102    current: u64,
103    progress_tx: &mpsc::Sender<Message>,
104) {
105    let progress = ProgressInfo {
106        message: message.to_string(),
107        template: template.to_string(),
108        total: total as u32,
109        current: current as u32,
110        ..Default::default()
111    };
112    progress_tx.send(Message::Progress(progress)).await.ok();
113}
114
115pub async fn execute(
116    command: PathBuf,
117    args: Vec<String>,
118    chdir: Option<PathBuf>,
119    envs: HashMap<String, String>,
120    progress: Option<(String, mpsc::Sender<Message>, u64)>,
121    shutdown_rx: &mut mpsc::Receiver<Message>,
122) -> Result<()> {
123    let argv = format!("{} {}", command.to_str().unwrap(), args.join(" "));
124    info!("Executing command: {}", argv);
125
126    info!("Environment: {:?}", envs);
127
128    let template = crate::t!("progress-files-eta");
129    let chdir = chdir.as_deref().unwrap_or(Path::new("."));
130
131    if let Some((message, tx, total)) = progress.as_ref() {
132        send_progress(message, &template, *total, 0, tx).await;
133        send_progress(message, &template, *total, 1, tx).await;
134    }
135
136    #[cfg(windows)]
137    let mut child = Command::new(command.clone())
138        .args(args.clone())
139        .kill_on_drop(true)
140        .current_dir(&chdir)
141        .stdout(Stdio::piped())
142        .stderr(Stdio::piped())
143        .creation_flags(0x08000000)
144        .envs(envs)
145        .spawn()
146        .context(format!(
147            "Failed to execute command: {:?} {:?}",
148            command, args
149        ))?;
150
151    #[cfg(not(windows))]
152    let mut child = Command::new(command.clone())
153        .args(args.clone())
154        .kill_on_drop(true)
155        .current_dir(chdir)
156        .stdout(Stdio::piped())
157        .stderr(Stdio::piped())
158        .envs(envs)
159        .spawn()
160        .context(format!(
161            "Failed to execute command: {:?} {:?}",
162            command, args
163        ))?;
164
165    let stdout = child.stdout.take().context("Failed to get stdout")?;
166    let stderr = child.stderr.take().context("Failed to get stderr")?;
167
168    let stdout_reader = BufReader::new(stdout).lines();
169    let stderr_reader = BufReader::new(stderr).lines();
170
171    let progress1 = progress.clone();
172    let template_clone = template.clone();
173    tokio::spawn(async move {
174        tokio::pin!(stdout_reader);
175        tokio::pin!(stderr_reader);
176        let mut current = 0;
177        loop {
178            let progress = progress1.clone();
179            tokio::select! {
180                line = stdout_reader.next_line() => match line {
181                    Ok(Some(line)) => {
182                        info!("STDOUT: {}", line);
183                        current += 1;
184                        if let Some((message, tx, total)) = progress.as_ref() {
185                            send_progress(message, &template_clone, *total, current, tx).await;
186                        }
187                    }
188                    Err(e) => {
189                        bail!("Error reading stdout: {}", e);
190                    },
191                    Ok(None) => {
192                        info!("STDOUT EOF");
193                        break;
194                    }
195                },
196                line = stderr_reader.next_line() => match line {
197                    Ok(Some(line)) => {
198                        warn!("STDERR: {}", line);
199                        current += 1;
200                        if let Some((message, tx, total)) = progress.as_ref() {
201                            send_progress(message, &template_clone, *total, current, tx).await;
202                        }
203                    },
204                    Err(e) => {
205                        bail!("Error reading stderr: {}", e);
206                    },
207                    Ok(None) => {
208                        info!("STDERR EOF");
209                        break;
210                    }
211                },
212            }
213        }
214        Ok(())
215    });
216
217    tokio::select! {
218        status = child.wait() => {
219            let status = status.context("Failed to wait on child")?;
220            if !status.success() {
221                if let Some((message, tx, total)) = progress.as_ref() {
222                    send_progress(message, &template, *total, *total, tx).await;
223                }
224                bail!("{}: exit code {}", command.file_name().unwrap().to_string_lossy(), status.code().unwrap_or(-1));
225            }
226        }
227
228        cmd = shutdown_rx.recv() => match cmd {
229            Some(Message::Stop(_)) | Some(Message::Break(_)) => {
230                info!("Received break command, killing child process: {}", argv);
231                child.kill().await.ok();
232            }
233            None => {
234                info!("Command channel closed, killing child process: {}", argv);
235                child.kill().await.ok();
236            }
237            _ => {}
238        }
239    }
240
241    if let Some((message, tx, total)) = progress.as_ref() {
242        send_progress(message, &template, *total, *total, tx).await;
243    }
244    info!("Command executed successfully");
245
246    Ok(())
247}
248
249pub async fn unzip(
250    message: &str,
251    zip_file_path: &Path,
252    extract_dir: &Path,
253    skip: usize,
254    result_tx: &mpsc::Sender<Message>,
255) -> Result<()> {
256    info!("Unzipping {:?} to {:?}", zip_file_path, extract_dir);
257    let file = File::open(zip_file_path)?;
258    let mut archive = ZipArchive::new(file)?;
259
260    std::fs::create_dir_all(extract_dir)
261        .context(format!("Failed to create dir '{:?}'", extract_dir))?;
262
263    let template = crate::t!("progress-files-eta");
264
265    let mut progress = ProgressInfo {
266        message: message.to_string(),
267        template,
268        total: archive.len() as u32,
269        current: 0,
270        ..Default::default()
271    };
272
273    result_tx
274        .send(Message::Progress(progress.clone()))
275        .await
276        .ok();
277
278    for i in 0..archive.len() {
279        let mut file = archive
280            .by_index(i)
281            .context("Failed to get file from archive")?;
282        let file_name = Path::new(file.name())
283            .components()
284            .skip(skip)
285            .collect::<PathBuf>();
286        let target_path = Path::new(extract_dir).join(file_name);
287        if target_path == extract_dir {
288            continue;
289        }
290        info!("Extracting {:?}", target_path);
291        if file.is_dir() {
292            std::fs::create_dir_all(target_path.clone())
293                .context(format!("unzip failed to create dir '{:?}'", target_path))?;
294        } else {
295            let mut output_file = File::create(target_path.clone())
296                .context(format!("unzip failed to create file '{:?}'", target_path))?;
297            io::copy(&mut file, &mut output_file).context("unzip failed to copy file")?;
298        }
299
300        progress.current = (i + 1) as u32;
301        if progress.current % 100 == 0 {
302            result_tx
303                .send(Message::Progress(progress.clone()))
304                .await
305                .ok();
306        }
307    }
308
309    progress.current = progress.total;
310    result_tx
311        .send(Message::Progress(progress.clone()))
312        .await
313        .ok();
314    Ok(())
315}
316
317pub async fn download(
318    message: &str,
319    config: Arc<RwLock<ClientConfig>>,
320    url: &str,
321    path: &Path,
322    command_rx: &mut mpsc::Receiver<Message>,
323    result_tx: &mpsc::Sender<Message>,
324) -> Result<()> {
325    info!("Downloading {} to {:?}", url, path);
326
327    let mut client = ClientBuilder::default();
328
329    let danger_accept_invalid_certs = config
330        .read()
331        .transport
332        .tls
333        .as_ref()
334        .and_then(|tls| tls.danger_ignore_certificate_verification)
335        .unwrap_or(false);
336
337    if let Some(tls) = &config.read().transport.tls {
338        let roots = load_roots(tls).context("Failed to load client config")?;
339        for cert_der in roots {
340            let cert = Certificate::from_der(&cert_der)?;
341            client = client.add_root_certificate(cert);
342        }
343        if tls.danger_ignore_certificate_verification.unwrap_or(false) {
344            client = client.danger_accept_invalid_certs(danger_accept_invalid_certs);
345        }
346    }
347
348    let client = match client.build() {
349        Ok(client) => client,
350        Err(e) => {
351            error!(
352                "Failed to create reqwest client with system certificates:  {:?}",
353                e
354            );
355            warn!("Using default reqwest client");
356            reqwest::Client::builder()
357                .danger_accept_invalid_certs(danger_accept_invalid_certs)
358                .build()
359                .context("Failed to create defaut reqwest client")?
360        }
361    };
362
363    // Reqwest setup
364    let res = client
365        .get(url)
366        .send()
367        .await
368        .context(format!("Failed to GET from '{}'", &url))?;
369
370    // Check if response status is 200 OK
371    if !res.status().is_success() {
372        bail!("HTTP request failed with status: {}", res.status());
373    }
374
375    // Indicatif setup
376    let total_size = res
377        .content_length()
378        .context(format!("Failed to get content length from '{}'", &url))?;
379
380    if let Ok(file) = File::open(path) {
381        if file
382            .metadata()
383            .context(format!("Failed to get metadata from '{:?}'", path))?
384            .len()
385            == total_size
386        {
387            return Ok(());
388        }
389    }
390
391    let template = crate::t!("progress-bytes");
392
393    let mut progress = ProgressInfo {
394        message: message.to_string(),
395        template,
396        total: total_size as u32,
397        current: 0,
398        ..Default::default()
399    };
400
401    result_tx
402        .send(Message::Progress(progress.clone()))
403        .await
404        .ok();
405
406    // download chunks
407    let mut file = File::create(path).context(format!("Failed to create file '{:?}'", path))?;
408    let mut stream = res.bytes_stream();
409
410    loop {
411        tokio::select! {
412            cmd = command_rx.recv() => {
413                match cmd {
414                    Some(Message::Stop(_)) | Some(Message::Break(_)) => {
415                        info!("Download cancelled");
416                        progress.total = total_size as u32;
417                        result_tx.send(Message::Progress(progress.clone())).await.ok();
418                        bail!("Download cancelled");
419                    }
420                    None => {
421                        progress.total = total_size as u32;
422                        result_tx.send(Message::Progress(progress.clone())).await.ok();
423                        bail!("Command channel closed");
424                    }
425                    _ => {}
426                }
427            }
428
429            item = stream.next() => {
430                if let Some(item) =  item {
431                let chunk = item.context("Failed to get chunk")?;
432                    file.write_all(&chunk).context("Error while writing to file")?;
433                    let kb_current = progress.current / 1024;
434                    progress.current = min(progress.current + (chunk.len() as u32), total_size as u32);
435                    let kb_new = progress.current / 1024;
436                    // Throttle download progress
437                    if kb_new > kb_current {
438                        result_tx.send(Message::Progress(progress.clone())).await.ok();
439                    }
440                } else {
441                    break;
442                }
443            }
444        }
445    }
446
447    progress.current = total_size as u32;
448    result_tx
449        .send(Message::Progress(progress.clone()))
450        .await
451        .ok();
452    Ok(())
453}
454
455pub fn compare_filenames(path1: &Path, path2: &Path) -> bool {
456    if let (Some(file_name1), Some(file_name2)) = (path1.file_name(), path2.file_name()) {
457        let filename1 = file_name1.to_string_lossy();
458        let filename2 = file_name2.to_string_lossy();
459        #[cfg(windows)]
460        return filename1.eq_ignore_ascii_case(&filename2);
461        #[cfg(not(windows))]
462        filename1.eq(&filename2)
463    } else {
464        false
465    }
466}
467
468pub fn find(dir: &Path, file: &Path) -> Result<Option<PathBuf>> {
469    info!("Searching for {:?} in {:?}", file, dir);
470    for entry in WalkDir::new(dir).into_iter().filter_map(|e| e.ok()) {
471        if compare_filenames(entry.path(), file) {
472            return Ok(Some(entry.path().to_path_buf()));
473        }
474    }
475    Ok(None)
476}
477
478pub fn get_cache_dir(subdir: &str) -> Result<PathBuf> {
479    let mut cache_dir = cache_dir().context("Can't get cache dir")?;
480    cache_dir.push("cloudpub");
481    if !subdir.is_empty() {
482        cache_dir.push(subdir);
483    }
484    std::fs::create_dir_all(cache_dir.clone()).context("Can't create cache dir")?;
485    Ok(cache_dir)
486}