1use crate::config::ClientConfig;
2use anyhow::{bail, Context, Result};
3use std::cmp::min;
4use std::fs::File;
5use std::io::Write;
6use std::path::PathBuf;
7
8use cloudpub_common::protocol::message::Message;
9use cloudpub_common::protocol::{Break, ErrorInfo, ErrorKind, ProgressInfo};
10use cloudpub_common::transport::rustls::load_roots;
11use dirs::cache_dir;
12use futures::stream::StreamExt;
13use parking_lot::RwLock;
14use reqwest::{Certificate, ClientBuilder};
15use std::collections::HashMap;
16use std::io;
17use std::path::Path;
18use std::process::Stdio;
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21use tokio::io::{AsyncBufReadExt, BufReader};
22use tokio::process::Command;
23use tokio::sync::mpsc;
24use tracing::{error, info, warn};
25use walkdir::WalkDir;
26use zip::read::ZipArchive;
27
28pub const DOWNLOAD_SUBDIR: &str = "download";
29
30pub struct SubProcess {
31 shutdown_tx: mpsc::Sender<Message>,
32 command: PathBuf,
33 args: Vec<String>,
34 pub port: u16,
35 canceled: Arc<AtomicBool>,
36 pub result: Arc<RwLock<Result<()>>>,
37}
38
39impl SubProcess {
40 pub fn new(
41 command: PathBuf,
42 args: Vec<String>,
43 chdir: Option<PathBuf>,
44 envs: HashMap<String, String>,
45 result_tx: mpsc::Sender<Message>,
46 port: u16,
47 ) -> Self {
48 let canceled = Arc::new(AtomicBool::new(false));
49 let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
50 let command2 = command.clone();
51 let args2 = args.clone();
52 let canceled2 = canceled.clone();
53 let result = Arc::new(RwLock::new(Ok(())));
54 let result_clone = result.clone();
55 tokio::spawn(async move {
56 if let Err(err) = execute(command2, args2, chdir, envs, None, &mut shutdown_rx).await {
57 if !canceled2.load(Ordering::Relaxed) {
58 error!("Failed to execute command: {:?}", err);
59 result_tx
60 .send(Message::Error(ErrorInfo {
61 kind: ErrorKind::ExecuteFailed.into(),
62 message: format!("Ошибка запуска {}", err),
63 guid: String::new(),
64 }))
65 .await
66 .ok();
67 }
68 *result_clone.write() = Err(err);
69 }
70 });
71 Self {
72 shutdown_tx,
73 port,
74 command,
75 args,
76 canceled,
77 result,
78 }
79 }
80
81 pub fn stop(&mut self) {
82 self.canceled.store(true, Ordering::Relaxed);
83 self.shutdown_tx
84 .try_send(Message::Break(Break {
85 ..Default::default()
86 }))
87 .ok();
88 }
89}
90
91impl Drop for SubProcess {
92 fn drop(&mut self) {
93 info!("Drop subprocess: {:?} {:?}", self.command, self.args);
94 self.stop();
95 }
96}
97
98pub async fn send_progress(
99 message: &str,
100 template: &str,
101 total: u64,
102 current: u64,
103 progress_tx: &mpsc::Sender<Message>,
104) {
105 let progress = ProgressInfo {
106 message: message.to_string(),
107 template: template.to_string(),
108 total: total as u32,
109 current: current as u32,
110 ..Default::default()
111 };
112 progress_tx.send(Message::Progress(progress)).await.ok();
113}
114
115pub async fn execute(
116 command: PathBuf,
117 args: Vec<String>,
118 chdir: Option<PathBuf>,
119 envs: HashMap<String, String>,
120 progress: Option<(String, mpsc::Sender<Message>, u64)>,
121 shutdown_rx: &mut mpsc::Receiver<Message>,
122) -> Result<()> {
123 let argv = format!("{} {}", command.to_str().unwrap(), args.join(" "));
124 info!("Executing command: {}", argv);
125
126 info!("Environment: {:?}", envs);
127
128 let template = crate::t!("progress-files-eta");
129 let chdir = chdir.as_deref().unwrap_or(Path::new("."));
130
131 if let Some((message, tx, total)) = progress.as_ref() {
132 send_progress(message, &template, *total, 0, tx).await;
133 send_progress(message, &template, *total, 1, tx).await;
134 }
135
136 #[cfg(windows)]
137 let mut child = Command::new(command.clone())
138 .args(args.clone())
139 .kill_on_drop(true)
140 .current_dir(&chdir)
141 .stdout(Stdio::piped())
142 .stderr(Stdio::piped())
143 .creation_flags(0x08000000)
144 .envs(envs)
145 .spawn()
146 .context(format!(
147 "Failed to execute command: {:?} {:?}",
148 command, args
149 ))?;
150
151 #[cfg(not(windows))]
152 let mut child = Command::new(command.clone())
153 .args(args.clone())
154 .kill_on_drop(true)
155 .current_dir(chdir)
156 .stdout(Stdio::piped())
157 .stderr(Stdio::piped())
158 .envs(envs)
159 .spawn()
160 .context(format!(
161 "Failed to execute command: {:?} {:?}",
162 command, args
163 ))?;
164
165 let stdout = child.stdout.take().context("Failed to get stdout")?;
166 let stderr = child.stderr.take().context("Failed to get stderr")?;
167
168 let stdout_reader = BufReader::new(stdout).lines();
169 let stderr_reader = BufReader::new(stderr).lines();
170
171 let progress1 = progress.clone();
172 let template_clone = template.clone();
173 tokio::spawn(async move {
174 tokio::pin!(stdout_reader);
175 tokio::pin!(stderr_reader);
176 let mut current = 0;
177 loop {
178 let progress = progress1.clone();
179 tokio::select! {
180 line = stdout_reader.next_line() => match line {
181 Ok(Some(line)) => {
182 info!("STDOUT: {}", line);
183 current += 1;
184 if let Some((message, tx, total)) = progress.as_ref() {
185 send_progress(message, &template_clone, *total, current, tx).await;
186 }
187 }
188 Err(e) => {
189 bail!("Error reading stdout: {}", e);
190 },
191 Ok(None) => {
192 info!("STDOUT EOF");
193 break;
194 }
195 },
196 line = stderr_reader.next_line() => match line {
197 Ok(Some(line)) => {
198 warn!("STDERR: {}", line);
199 current += 1;
200 if let Some((message, tx, total)) = progress.as_ref() {
201 send_progress(message, &template_clone, *total, current, tx).await;
202 }
203 },
204 Err(e) => {
205 bail!("Error reading stderr: {}", e);
206 },
207 Ok(None) => {
208 info!("STDERR EOF");
209 break;
210 }
211 },
212 }
213 }
214 Ok(())
215 });
216
217 tokio::select! {
218 status = child.wait() => {
219 let status = status.context("Failed to wait on child")?;
220 if !status.success() {
221 if let Some((message, tx, total)) = progress.as_ref() {
222 send_progress(message, &template, *total, *total, tx).await;
223 }
224 bail!("{}: exit code {}", command.file_name().unwrap().to_string_lossy(), status.code().unwrap_or(-1));
225 }
226 }
227
228 cmd = shutdown_rx.recv() => match cmd {
229 Some(Message::Stop(_)) | Some(Message::Break(_)) => {
230 info!("Received break command, killing child process: {}", argv);
231 child.kill().await.ok();
232 }
233 None => {
234 info!("Command channel closed, killing child process: {}", argv);
235 child.kill().await.ok();
236 }
237 _ => {}
238 }
239 }
240
241 if let Some((message, tx, total)) = progress.as_ref() {
242 send_progress(message, &template, *total, *total, tx).await;
243 }
244 info!("Command executed successfully");
245
246 Ok(())
247}
248
249pub async fn unzip(
250 message: &str,
251 zip_file_path: &Path,
252 extract_dir: &Path,
253 skip: usize,
254 result_tx: &mpsc::Sender<Message>,
255) -> Result<()> {
256 info!("Unzipping {:?} to {:?}", zip_file_path, extract_dir);
257 let file = File::open(zip_file_path)?;
258 let mut archive = ZipArchive::new(file)?;
259
260 std::fs::create_dir_all(extract_dir)
261 .context(format!("Failed to create dir '{:?}'", extract_dir))?;
262
263 let template = crate::t!("progress-files-eta");
264
265 let mut progress = ProgressInfo {
266 message: message.to_string(),
267 template,
268 total: archive.len() as u32,
269 current: 0,
270 ..Default::default()
271 };
272
273 result_tx
274 .send(Message::Progress(progress.clone()))
275 .await
276 .ok();
277
278 for i in 0..archive.len() {
279 let mut file = archive
280 .by_index(i)
281 .context("Failed to get file from archive")?;
282 let file_name = Path::new(file.name())
283 .components()
284 .skip(skip)
285 .collect::<PathBuf>();
286 let target_path = Path::new(extract_dir).join(file_name);
287 if target_path == extract_dir {
288 continue;
289 }
290 info!("Extracting {:?}", target_path);
291 if file.is_dir() {
292 std::fs::create_dir_all(target_path.clone())
293 .context(format!("unzip failed to create dir '{:?}'", target_path))?;
294 } else {
295 let mut output_file = File::create(target_path.clone())
296 .context(format!("unzip failed to create file '{:?}'", target_path))?;
297 io::copy(&mut file, &mut output_file).context("unzip failed to copy file")?;
298 }
299
300 progress.current = (i + 1) as u32;
301 if progress.current % 100 == 0 {
302 result_tx
303 .send(Message::Progress(progress.clone()))
304 .await
305 .ok();
306 }
307 }
308
309 progress.current = progress.total;
310 result_tx
311 .send(Message::Progress(progress.clone()))
312 .await
313 .ok();
314 Ok(())
315}
316
317pub async fn download(
318 message: &str,
319 config: Arc<RwLock<ClientConfig>>,
320 url: &str,
321 path: &Path,
322 command_rx: &mut mpsc::Receiver<Message>,
323 result_tx: &mpsc::Sender<Message>,
324) -> Result<()> {
325 info!("Downloading {} to {:?}", url, path);
326
327 let mut client = ClientBuilder::default();
328
329 let danger_accept_invalid_certs = config
330 .read()
331 .transport
332 .tls
333 .as_ref()
334 .and_then(|tls| tls.danger_ignore_certificate_verification)
335 .unwrap_or(false);
336
337 if let Some(tls) = &config.read().transport.tls {
338 let roots = load_roots(tls).context("Failed to load client config")?;
339 for cert_der in roots {
340 let cert = Certificate::from_der(&cert_der)?;
341 client = client.add_root_certificate(cert);
342 }
343 if tls.danger_ignore_certificate_verification.unwrap_or(false) {
344 client = client.danger_accept_invalid_certs(danger_accept_invalid_certs);
345 }
346 }
347
348 let client = match client.build() {
349 Ok(client) => client,
350 Err(e) => {
351 error!(
352 "Failed to create reqwest client with system certificates: {:?}",
353 e
354 );
355 warn!("Using default reqwest client");
356 reqwest::Client::builder()
357 .danger_accept_invalid_certs(danger_accept_invalid_certs)
358 .build()
359 .context("Failed to create defaut reqwest client")?
360 }
361 };
362
363 let res = client
365 .get(url)
366 .send()
367 .await
368 .context(format!("Failed to GET from '{}'", &url))?;
369
370 if !res.status().is_success() {
372 bail!("HTTP request failed with status: {}", res.status());
373 }
374
375 let total_size = res
377 .content_length()
378 .context(format!("Failed to get content length from '{}'", &url))?;
379
380 if let Ok(file) = File::open(path) {
381 if file
382 .metadata()
383 .context(format!("Failed to get metadata from '{:?}'", path))?
384 .len()
385 == total_size
386 {
387 return Ok(());
388 }
389 }
390
391 let template = crate::t!("progress-bytes");
392
393 let mut progress = ProgressInfo {
394 message: message.to_string(),
395 template,
396 total: total_size as u32,
397 current: 0,
398 ..Default::default()
399 };
400
401 result_tx
402 .send(Message::Progress(progress.clone()))
403 .await
404 .ok();
405
406 let mut file = File::create(path).context(format!("Failed to create file '{:?}'", path))?;
408 let mut stream = res.bytes_stream();
409
410 loop {
411 tokio::select! {
412 cmd = command_rx.recv() => {
413 match cmd {
414 Some(Message::Stop(_)) | Some(Message::Break(_)) => {
415 info!("Download cancelled");
416 progress.total = total_size as u32;
417 result_tx.send(Message::Progress(progress.clone())).await.ok();
418 bail!("Download cancelled");
419 }
420 None => {
421 progress.total = total_size as u32;
422 result_tx.send(Message::Progress(progress.clone())).await.ok();
423 bail!("Command channel closed");
424 }
425 _ => {}
426 }
427 }
428
429 item = stream.next() => {
430 if let Some(item) = item {
431 let chunk = item.context("Failed to get chunk")?;
432 file.write_all(&chunk).context("Error while writing to file")?;
433 let kb_current = progress.current / 1024;
434 progress.current = min(progress.current + (chunk.len() as u32), total_size as u32);
435 let kb_new = progress.current / 1024;
436 if kb_new > kb_current {
438 result_tx.send(Message::Progress(progress.clone())).await.ok();
439 }
440 } else {
441 break;
442 }
443 }
444 }
445 }
446
447 progress.current = total_size as u32;
448 result_tx
449 .send(Message::Progress(progress.clone()))
450 .await
451 .ok();
452 Ok(())
453}
454
455pub fn compare_filenames(path1: &Path, path2: &Path) -> bool {
456 if let (Some(file_name1), Some(file_name2)) = (path1.file_name(), path2.file_name()) {
457 let filename1 = file_name1.to_string_lossy();
458 let filename2 = file_name2.to_string_lossy();
459 #[cfg(windows)]
460 return filename1.eq_ignore_ascii_case(&filename2);
461 #[cfg(not(windows))]
462 filename1.eq(&filename2)
463 } else {
464 false
465 }
466}
467
468pub fn find(dir: &Path, file: &Path) -> Result<Option<PathBuf>> {
469 info!("Searching for {:?} in {:?}", file, dir);
470 for entry in WalkDir::new(dir).into_iter().filter_map(|e| e.ok()) {
471 if compare_filenames(entry.path(), file) {
472 return Ok(Some(entry.path().to_path_buf()));
473 }
474 }
475 Ok(None)
476}
477
478pub fn get_cache_dir(subdir: &str) -> Result<PathBuf> {
479 let mut cache_dir = cache_dir().context("Can't get cache dir")?;
480 cache_dir.push("cloudpub");
481 if !subdir.is_empty() {
482 cache_dir.push(subdir);
483 }
484 std::fs::create_dir_all(cache_dir.clone()).context("Can't create cache dir")?;
485 Ok(cache_dir)
486}