1use anyhow::Context;
2use indicatif::ProgressStyle;
3use owo_colors::{OwoColorize, Stream::Stdout};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::{
7 io::{BufRead, Write},
8 sync::{Arc, Mutex, mpsc},
9};
10use strum::Display;
11
12mod file_term;
13pub mod markdown;
14mod null_term;
15
16#[derive(
17 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display, Default, Serialize, Deserialize,
18)]
19pub enum Level {
20 Trace,
21 Debug,
22 Message,
23 #[default]
24 Info,
25 App,
26 Passthrough,
27 Warning,
28 Error,
29 Silent,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct LogHeader {
34 command: Arc<str>,
35 working_directory: Option<Arc<str>>,
36 environment: HashMap<Arc<str>, HashMap<Arc<str>, Arc<str>>>,
37 arguments: Vec<Arc<str>>,
38 shell: Arc<str>,
39}
40
41#[derive(Debug, Clone, Copy, Default)]
42pub struct Verbosity {
43 pub level: Level,
44 pub is_show_progress_bars: bool,
45 pub is_show_elapsed_time: bool,
46 pub is_tty: bool,
47}
48
49const PROGRESS_PREFIX_WIDTH: usize = 0;
50
51#[derive(Debug, Clone)]
52struct Secrets {
53 secrets: Vec<Arc<str>>,
54 redacted: Arc<str>,
55}
56
57impl Secrets {
58 fn redact(&self, text: Arc<str>) -> Arc<str> {
59 if self.secrets.is_empty() {
60 text
61 } else {
62 let mut result = text.to_string();
63 for secret in &self.secrets {
64 result = result.replace(secret.as_ref(), self.redacted.as_ref());
65 }
66 result.into()
67 }
68 }
69}
70
71fn is_verbosity_active(printer_level: Verbosity, verbosity: Level) -> bool {
72 verbosity >= printer_level.level
73}
74
75fn format_log(
76 indent: usize,
77 max_width: usize,
78 verbosity: Level,
79 message: &str,
80 is_show_elapsed_time: bool,
81 start_time: std::time::Instant,
82) -> String {
83 let timestamp: Arc<str> = if is_show_elapsed_time {
84 let elapsed = std::time::Instant::now() - start_time;
85 format!("[{:.3}] ", elapsed.as_secs_f64()).into()
86 } else {
87 "".into()
88 };
89 let mut result = if verbosity == Level::Passthrough {
90 format!("{timestamp}{message}")
91 } else {
92 let message_level_string = verbosity.to_string().to_lowercase();
93 let message_level = message_level_string.if_supports_color(Stdout, |text| text.bold());
94 format!(
95 "::{message_level}::{timestamp}{}{message}",
96 " ".repeat(indent),
97 )
98 };
99 while result.len() < max_width {
100 result.push(' ');
101 }
102 result.push('\n');
103 result
104}
105
106pub struct Section<'a> {
107 pub printer: &'a mut Printer,
108}
109
110impl<'a> Section<'a> {
111 pub fn new(printer: &'a mut Printer, name: &str) -> anyhow::Result<Self> {
112 printer.write(format!("{}{}:", " ".repeat(printer.indent), name.bold()).as_str())?;
113 printer.shift_right();
114 Ok(Self { printer })
115 }
116}
117
118impl Drop for Section<'_> {
119 fn drop(&mut self) {
120 self.printer.shift_left();
121 }
122}
123
124pub struct MultiProgressBar {
125 lock: Arc<Mutex<()>>,
126 printer_verbosity: Verbosity,
127 start_time: std::time::Instant,
128 indent: usize,
129 max_width: usize,
130 progress_width: usize,
131 progress: Option<indicatif::ProgressBar>,
132 final_message: Option<Arc<str>>,
133 is_increasing: bool,
134 secrets: Secrets,
135}
136
137impl MultiProgressBar {
138 pub fn total(&self) -> Option<u64> {
139 if let Some(progress) = self.progress.as_ref() {
140 progress.length()
141 } else {
142 None
143 }
144 }
145
146 pub fn reset_elapsed(&mut self) {
147 if let Some(progress) = self.progress.as_mut() {
148 progress.reset_elapsed();
149 }
150 }
151
152 pub fn set_total(&mut self, total: u64) {
153 if let Some(progress) = self.progress.as_mut() {
154 if let Some(length) = progress.length() {
155 if length != total {
156 let _lock = self.lock.lock().unwrap();
157 progress.set_length(total);
158 progress.set_position(0);
159 }
160 }
161 }
162 }
163
164 pub fn log(&mut self, verbosity: Level, message: &str) {
165 if is_verbosity_active(self.printer_verbosity, verbosity) {
166 let formatted_message = format_log(
167 self.indent,
168 self.max_width,
169 verbosity,
170 message,
171 self.printer_verbosity.is_show_elapsed_time,
172 self.start_time,
173 );
174 let _lock = self.lock.lock().unwrap();
175 if let Some(progress) = self.progress.as_ref() {
176 progress.println(formatted_message.as_str());
177 } else {
178 print!("{formatted_message}");
179 }
180 }
181 }
182
183 pub fn set_prefix(&mut self, message: &str) {
184 if let Some(progress) = self.progress.as_mut() {
185 let _lock = self.lock.lock().unwrap();
186 progress.set_prefix(message.to_owned());
187 }
188 }
189
190 fn construct_message(&self, message: &str) -> String {
191 let prefix_size = if let Some(progress) = self.progress.as_ref() {
192 progress.prefix().len()
193 } else {
194 0_usize
195 };
196 let length = if self.max_width > self.progress_width + prefix_size {
197 self.max_width - self.progress_width - prefix_size
198 } else {
199 0_usize
200 };
201 sanitize_output(message, length)
202 }
203
204 pub fn set_message(&mut self, message: &str) {
205 let constructed_message = self.construct_message(message);
206 if let Some(progress) = self.progress.as_mut() {
207 let _lock = self.lock.lock().unwrap();
208 progress.set_message(constructed_message);
209 }
210 }
211
212 pub fn set_ending_message(&mut self, message: &str) {
213 self.final_message = Some(self.construct_message(message).into());
214 }
215
216 pub fn set_ending_message_none(&mut self) {
217 self.final_message = None;
218 }
219
220 pub fn increment_with_overflow(&mut self, count: u64) {
221 let progress_total = self.total();
222 if let Some(progress) = self.progress.as_mut() {
223 let _lock = self.lock.lock().unwrap();
224 if self.is_increasing {
225 progress.inc(count);
226 if progress.position() == progress_total.unwrap_or(100) {
227 self.is_increasing = false;
228 }
229 } else if progress.position() >= count {
230 progress.set_position(progress.position() - count);
231 } else {
232 progress.set_position(0);
233 self.is_increasing = true;
234 }
235 }
236 }
237
238 pub fn decrement(&mut self, count: u64) {
239 if let Some(progress) = self.progress.as_mut() {
240 let _lock = self.lock.lock().unwrap();
241 if progress.position() >= count {
242 progress.set_position(progress.position() - count);
243 } else {
244 progress.set_position(0);
245 }
246 }
247 }
248
249 pub fn increment(&mut self, count: u64) {
250 if let Some(progress) = self.progress.as_mut() {
251 let _lock = self.lock.lock().unwrap();
252 progress.inc(count);
253 }
254 }
255
256 fn start_process(
257 &mut self,
258 command: &str,
259 options: &ExecuteOptions,
260 ) -> anyhow::Result<std::process::Child> {
261 if let Some(directory) = &options.working_directory {
262 if !std::path::Path::new(directory.as_ref()).exists() {
263 return Err(anyhow::anyhow!("Directory does not exist: {directory}"));
264 }
265 }
266
267 let child_process = options
268 .spawn(command)
269 .context(format!("Failed to spawn a child process using {command}"))?;
270 Ok(child_process)
271 }
272
273 pub fn execute_process(
274 &mut self,
275 command: &str,
276 options: ExecuteOptions,
277 ) -> anyhow::Result<Option<String>> {
278 self.set_message(&options.get_full_command(command));
279 let child_process = self
280 .start_process(command, &options)
281 .context(format!("Failed to start process {command}"))?;
282 let secrets = self.secrets.clone();
283 let result = monitor_process(command, child_process, self, &options, &secrets)
284 .context(format!("Command `{command}` failed to execute"))?;
285 Ok(result)
286 }
287}
288
289impl Drop for MultiProgressBar {
290 fn drop(&mut self) {
291 if let Some(message) = &self.final_message {
292 let constructed_message = self.construct_message(message);
293 if let Some(progress) = self.progress.as_mut() {
294 let _lock = self.lock.lock().unwrap();
295 progress.finish_with_message(constructed_message.bold().to_string());
296 }
297 }
298 }
299}
300
301pub struct MultiProgress<'a> {
302 pub printer: &'a mut Printer,
303 multi_progress: indicatif::MultiProgress,
304}
305
306impl<'a> MultiProgress<'a> {
307 pub fn new(printer: &'a mut Printer) -> Self {
308 let locker = printer.lock.clone();
309 let _lock = locker.lock().unwrap();
310
311 let draw_target = indicatif::ProgressDrawTarget::term_like_with_hz(
312 (printer.create_progress_printer)(),
313 10,
314 );
315
316 Self {
317 printer,
318 multi_progress: indicatif::MultiProgress::with_draw_target(draw_target),
319 }
320 }
321
322 pub fn add_progress(
323 &mut self,
324 prefix: &str,
325 total: Option<u64>,
326 finish_message: Option<&str>,
327 ) -> MultiProgressBar {
328 let _lock = self.printer.lock.lock().unwrap();
329
330 let template_string = "{elapsed_precise}|{bar:.cyan/blue}|{prefix} {msg}";
331
332 let (progress, progress_chars) = if let Some(total) = total {
333 let progress = indicatif::ProgressBar::new(total);
334 (progress, "#>-")
335 } else {
336 let progress = indicatif::ProgressBar::new(200);
337 (progress, "*>-")
338 };
339
340 progress.set_style(
341 ProgressStyle::with_template(template_string)
342 .unwrap()
343 .progress_chars(progress_chars),
344 );
345
346 let progress = if self.printer.verbosity.is_show_progress_bars {
347 let progress = self.multi_progress.add(progress);
348 let prefix = format!("{prefix}:");
349 progress.set_prefix(
350 format!("{prefix:PROGRESS_PREFIX_WIDTH$}")
351 .if_supports_color(Stdout, |text| text.bold())
352 .to_string(),
353 );
354 Some(progress)
355 } else {
356 None
357 };
358
359 MultiProgressBar {
360 lock: self.printer.lock.clone(),
361 printer_verbosity: self.printer.verbosity,
362 indent: self.printer.indent,
363 progress,
364 progress_width: 28, max_width: self.printer.max_width,
366 final_message: finish_message.map(|s| s.into()),
367 is_increasing: true,
368 start_time: self.printer.start_time,
369 secrets: self.printer.get_secrets(),
370 }
371 }
372}
373
374pub struct Heading<'a> {
375 pub printer: &'a mut Printer,
376}
377
378impl<'a> Heading<'a> {
379 pub fn new(printer: &'a mut Printer, name: &str) -> anyhow::Result<Self> {
380 printer.newline()?;
381 printer.enter_heading();
382 {
383 let heading = if printer.heading_count == 1 {
384 format!("{} {name}", "#".repeat(printer.heading_count))
385 .yellow()
386 .bold()
387 .to_string()
388 } else {
389 format!("{} {name}", "#".repeat(printer.heading_count))
390 .bold()
391 .to_string()
392 };
393 printer.write(heading.as_str())?;
394 printer.write("\n")?;
395 }
396 Ok(Self { printer })
397 }
398}
399
400impl Drop for Heading<'_> {
401 fn drop(&mut self) {
402 self.printer.exit_heading();
403 }
404}
405
406#[derive(Clone, Debug)]
407pub struct ExecuteOptions {
408 pub label: Arc<str>,
409 pub is_return_stdout: bool,
410 pub working_directory: Option<Arc<str>>,
411 pub environment: Vec<(Arc<str>, Arc<str>)>,
412 pub arguments: Vec<Arc<str>>,
413 pub log_file_path: Option<Arc<str>>,
414 pub clear_environment: bool,
415 pub process_started_with_id: Option<fn(&str, u32)>,
416 pub log_level: Option<Level>,
417 pub timeout: Option<std::time::Duration>,
418}
419
420impl Default for ExecuteOptions {
421 fn default() -> Self {
422 Self {
423 label: "working".into(),
424 is_return_stdout: false,
425 working_directory: None,
426 environment: vec![],
427 arguments: vec![],
428 log_file_path: None,
429 clear_environment: false,
430 process_started_with_id: None,
431 log_level: None,
432 timeout: None,
433 }
434 }
435}
436
437impl ExecuteOptions {
438 fn process_child_output<OutputType: std::io::Read + Send + 'static>(
439 output: OutputType,
440 ) -> anyhow::Result<(std::thread::JoinHandle<()>, mpsc::Receiver<String>)> {
441 let (tx, rx) = mpsc::channel::<String>();
442
443 let thread = std::thread::spawn(move || {
444 use std::io::BufReader;
445 let reader = BufReader::new(output);
446 for line in reader.lines() {
447 let line = line.unwrap();
448 tx.send(line).unwrap();
449 }
450 });
451
452 Ok((thread, rx))
453 }
454
455 fn spawn(&self, command: &str) -> anyhow::Result<std::process::Child> {
456 use std::process::{Command, Stdio};
457 let mut process = Command::new(command);
458
459 if self.clear_environment {
460 process.env_clear();
461 }
462
463 for argument in &self.arguments {
464 process.arg(argument.as_ref());
465 }
466
467 if let Some(directory) = &self.working_directory {
468 process.current_dir(directory.as_ref());
469 }
470
471 for (key, value) in self.environment.iter() {
472 process.env(key.as_ref(), value.as_ref());
473 }
474
475 let result = process
476 .stdout(Stdio::piped())
477 .stderr(Stdio::piped())
478 .stdin(Stdio::null())
479 .spawn()
480 .context(format!("while spawning piped {command}"))?;
481
482 if let Some(callback) = self.process_started_with_id.as_ref() {
483 callback(self.label.as_ref(), result.id());
484 }
485
486 Ok(result)
487 }
488
489 pub fn get_full_command(&self, command: &str) -> String {
490 format!("{command} {}", self.arguments.join(" "))
491 }
492
493 pub fn get_full_command_in_working_directory(&self, command: &str) -> String {
494 format!(
495 "{} {command} {}",
496 if let Some(directory) = &self.working_directory {
497 directory
498 } else {
499 ""
500 },
501 self.arguments.join(" "),
502 )
503 }
504}
505
506trait PrinterTrait: std::io::Write + indicatif::TermLike {}
507impl<W: std::io::Write + indicatif::TermLike> PrinterTrait for W {}
508
509pub struct Printer {
510 pub verbosity: Verbosity,
511 pub secrets: Vec<Arc<str>>,
512 pub redacted: Arc<str>,
513 lock: Arc<Mutex<()>>,
514 indent: usize,
515 heading_count: usize,
516 max_width: usize,
517 writer: Box<dyn PrinterTrait>,
518 start_time: std::time::Instant,
519 create_progress_printer: fn() -> Box<dyn PrinterTrait>,
520}
521
522impl Printer {
523 pub fn get_log_divider() -> Arc<str> {
524 "=".repeat(80).into()
525 }
526
527 pub fn get_terminal_width() -> usize {
528 const ASSUMED_WIDTH: usize = 80;
529 if let Some((width, _)) = terminal_size::terminal_size() {
530 width.0 as usize
531 } else {
532 ASSUMED_WIDTH
533 }
534 }
535
536 pub fn new_stdout() -> Self {
537 let max_width = Self::get_terminal_width();
538 Self {
539 indent: 0,
540 lock: Arc::new(Mutex::new(())),
541 verbosity: Verbosity::default(),
542 heading_count: 0,
543 max_width,
544 writer: Box::new(console::Term::stdout()),
545 create_progress_printer: || Box::new(console::Term::stdout()),
546 start_time: std::time::Instant::now(),
547 secrets: Vec::new(),
548 redacted: "REDACTED".into(),
549 }
550 }
551
552 pub fn new_file(path: &str) -> anyhow::Result<Self> {
553 let file_writer = file_term::FileTerm::new(path)?;
554 Ok(Self {
555 indent: 0,
556 lock: Arc::new(Mutex::new(())),
557 verbosity: Verbosity::default(),
558 heading_count: 0,
559 max_width: 65535,
560 writer: Box::new(file_writer),
561 create_progress_printer: || Box::new(null_term::NullTerm {}),
562 start_time: std::time::Instant::now(),
563 secrets: Vec::new(),
564 redacted: "REDACTED".into(),
565 })
566 }
567
568 pub fn new_null_term() -> Self {
569 Self {
570 indent: 0,
571 lock: Arc::new(Mutex::new(())),
572 verbosity: Verbosity::default(),
573 heading_count: 0,
574 max_width: 80,
575 writer: Box::new(null_term::NullTerm {}),
576 create_progress_printer: || Box::new(null_term::NullTerm {}),
577 start_time: std::time::Instant::now(),
578 secrets: Vec::new(),
579 redacted: "REDACTED".into(),
580 }
581 }
582
583 pub fn raw(&mut self, message: &str) -> anyhow::Result<()> {
584 let _lock = self.lock.lock().unwrap();
585 write!(self.writer, "{message}")?;
586 Ok(())
587 }
588
589 pub(crate) fn write(&mut self, message: &str) -> anyhow::Result<()> {
590 let redacted = self.get_secrets().redact(message.into());
591 let _lock = self.lock.lock().unwrap();
592 write!(self.writer, "{redacted}")?;
593 Ok(())
594 }
595
596 pub fn newline(&mut self) -> anyhow::Result<()> {
597 self.write("\n")?;
598 Ok(())
599 }
600
601 pub fn trace<Type: Serialize>(&mut self, name: &str, value: &Type) -> anyhow::Result<()> {
602 if is_verbosity_active(self.verbosity, Level::Trace) {
603 self.object(name, value)
604 } else {
605 Ok(())
606 }
607 }
608
609 pub fn debug<Type: Serialize>(&mut self, name: &str, value: &Type) -> anyhow::Result<()> {
610 if is_verbosity_active(self.verbosity, Level::Debug) {
611 self.object(name, value)
612 } else {
613 Ok(())
614 }
615 }
616
617 pub fn message<Type: Serialize>(&mut self, name: &str, value: &Type) -> anyhow::Result<()> {
618 if is_verbosity_active(self.verbosity, Level::Message) {
619 self.object(name, value)
620 } else {
621 Ok(())
622 }
623 }
624
625 pub fn info<Type: Serialize>(&mut self, name: &str, value: &Type) -> anyhow::Result<()> {
626 if is_verbosity_active(self.verbosity, Level::Info) {
627 self.object(name, value)
628 } else {
629 Ok(())
630 }
631 }
632
633 pub fn warning<Type: Serialize>(&mut self, name: &str, value: &Type) -> anyhow::Result<()> {
634 if is_verbosity_active(self.verbosity, Level::Warning) {
635 self.object(name.yellow().to_string().as_str(), value)
636 } else {
637 Ok(())
638 }
639 }
640
641 pub fn error<Type: Serialize>(&mut self, name: &str, value: &Type) -> anyhow::Result<()> {
642 if is_verbosity_active(self.verbosity, Level::Error) {
643 self.object(name.red().to_string().as_str(), value)
644 } else {
645 Ok(())
646 }
647 }
648
649 pub fn log(&mut self, level: Level, message: &str) -> anyhow::Result<()> {
650 if is_verbosity_active(self.verbosity, level) {
651 self.write(
652 format_log(
653 self.indent,
654 self.max_width,
655 level,
656 message,
657 self.verbosity.is_show_elapsed_time,
658 self.start_time,
659 )
660 .as_str(),
661 )
662 } else {
663 Ok(())
664 }
665 }
666
667 pub fn code_block(&mut self, name: &str, content: &str) -> anyhow::Result<()> {
668 self.write(format!("```{name}\n{content}```\n").as_str())?;
669 Ok(())
670 }
671
672 fn get_secrets(&self) -> Secrets {
673 Secrets {
674 secrets: self.secrets.clone(),
675 redacted: self.redacted.clone(),
676 }
677 }
678
679 fn object<Type: Serialize>(&mut self, name: &str, value: &Type) -> anyhow::Result<()> {
680 let value = serde_json::to_value(value).context("failed to serialize as JSON")?;
681
682 if self.verbosity.level <= Level::Message && value == serde_json::Value::Null {
683 return Ok(());
684 }
685
686 self.write(
687 format!(
688 "{}{}: ",
689 " ".repeat(self.indent),
690 name.if_supports_color(Stdout, |text| text.bold())
691 )
692 .as_str(),
693 )?;
694
695 self.print_value(&value)?;
696 Ok(())
697 }
698
699 fn enter_heading(&mut self) {
700 self.heading_count += 1;
701 }
702
703 fn exit_heading(&mut self) {
704 self.heading_count -= 1;
705 }
706
707 fn shift_right(&mut self) {
708 self.indent += 2;
709 }
710
711 fn shift_left(&mut self) {
712 self.indent -= 2;
713 }
714
715 fn print_value(&mut self, value: &serde_json::Value) -> anyhow::Result<()> {
716 match value {
717 serde_json::Value::Object(map) => {
718 self.write("\n")?;
719 self.shift_right();
720 for (key, value) in map {
721 let is_skip =
722 *value == serde_json::Value::Null && self.verbosity.level > Level::Message;
723 if !is_skip {
724 {
725 self.write(
726 format!(
727 "{}{}: ",
728 " ".repeat(self.indent),
729 key.if_supports_color(Stdout, |text| text.bold())
730 )
731 .as_str(),
732 )?;
733 }
734 self.print_value(value)?;
735 }
736 }
737 self.shift_left();
738 }
739 serde_json::Value::Array(array) => {
740 self.write("\n")?;
741 self.shift_right();
742 for (index, value) in array.iter().enumerate() {
743 self.write(format!("{}[{index}]: ", " ".repeat(self.indent)).as_str())?;
744 self.print_value(value)?;
745 }
746 self.shift_left();
747 }
748 serde_json::Value::Null => {
749 self.write("null\n")?;
750 }
751 serde_json::Value::Bool(value) => {
752 self.write(format!("{value}\n").as_str())?;
753 }
754 serde_json::Value::Number(value) => {
755 self.write(format!("{value}\n").as_str())?;
756 }
757 serde_json::Value::String(value) => {
758 self.write(format!("{value}\n").as_str())?;
759 }
760 }
761
762 Ok(())
763 }
764
765 pub fn start_process(
766 &mut self,
767 command: &str,
768 options: &ExecuteOptions,
769 ) -> anyhow::Result<std::process::Child> {
770 let args = options.arguments.join(" ");
771 let full_command = format!("{command} {args}");
772
773 self.info("execute", &full_command)?;
774 if let Some(directory) = &options.working_directory {
775 self.info("directory", &directory)?;
776 if !std::path::Path::new(directory.as_ref()).exists() {
777 return Err(anyhow::anyhow!("Directory does not exist: {directory}"));
778 }
779 }
780
781 let child_process = options
782 .spawn(command)
783 .context(format!("while spawning {command}"))?;
784 Ok(child_process)
785 }
786
787 pub fn execute_process(
788 &mut self,
789 command: &str,
790 options: ExecuteOptions,
791 ) -> anyhow::Result<Option<String>> {
792 let section = Section::new(self, command)?;
793 let child_process = section
794 .printer
795 .start_process(command, &options)
796 .context(format!("Faild to execute process: {command}"))?;
797 let mut multi_progress = MultiProgress::new(section.printer);
798 let mut progress_bar = multi_progress.add_progress("progress", None, None);
799 let secrets = multi_progress.printer.get_secrets();
800 let result = monitor_process(
801 command,
802 child_process,
803 &mut progress_bar,
804 &options,
805 &secrets,
806 )
807 .context(format!("Command `{command}` failed to execute"))?;
808
809 Ok(result)
810 }
811}
812
813fn sanitize_output(input: &str, max_length: usize) -> String {
814 let escaped: Vec<_> = input.chars().flat_map(|c| c.escape_default()).collect();
817
818 let mut result = String::new();
819 let mut length = 0usize;
820 for character in escaped.into_iter() {
821 if length < max_length {
822 result.push(character);
823 length += 1;
824 }
825 }
826 while result.len() < max_length {
827 result.push(' ');
828 }
829
830 result
831}
832
833fn format_monitor_log_message(level: Level, source: &str, command: &str, message: &str) -> String {
834 if level == Level::Passthrough {
835 message.to_string()
836 } else {
837 format!("[{source}:{command}] {message}")
838 }
839}
840
841fn monitor_process(
842 command: &str,
843 mut child_process: std::process::Child,
844 progress_bar: &mut MultiProgressBar,
845 options: &ExecuteOptions,
846 secrets: &Secrets,
847) -> anyhow::Result<Option<String>> {
848 let start_time = std::time::Instant::now();
849
850 let child_stdout = child_process
851 .stdout
852 .take()
853 .ok_or(anyhow::anyhow!("Internal Error: Child has no stdout"))?;
854
855 let child_stderr = child_process
856 .stderr
857 .take()
858 .ok_or(anyhow::anyhow!("Internal Error: Child has no stderr"))?;
859
860 let log_level_stdout = options.log_level;
861 let log_level_stderr = options.log_level;
862
863 let (stdout_thread, stdout_rx) = ExecuteOptions::process_child_output(child_stdout)?;
864 let (stderr_thread, stderr_rx) = ExecuteOptions::process_child_output(child_stderr)?;
865
866 let handle_stdout = |progress: &mut MultiProgressBar,
867 writer: Option<&mut std::fs::File>,
868 content: Option<&mut String>|
869 -> anyhow::Result<()> {
870 let mut stdout = String::new();
871 while let Ok(message) = stdout_rx.try_recv() {
872 let redacted = secrets.redact(message.into());
873 if writer.is_some() || content.is_some() {
874 stdout.push_str(redacted.as_ref());
875 stdout.push('\n');
876 }
877 progress.set_message(redacted.as_ref());
878 if let Some(level) = log_level_stdout.as_ref() {
879 progress.log(
880 *level,
881 format_monitor_log_message(*level, "stdout", command, redacted.as_ref())
882 .as_str(),
883 );
884 }
885 }
886
887 if let Some(content) = content {
888 content.push_str(stdout.as_str());
889 }
890
891 if let Some(writer) = writer {
892 let _ = writer.write_all(stdout.as_bytes());
893 }
894 Ok(())
895 };
896
897 let handle_stderr = |progress: &mut MultiProgressBar,
898 writer: Option<&mut std::fs::File>,
899 content: &mut String|
900 -> anyhow::Result<()> {
901 let mut stderr = String::new();
902 while let Ok(message) = stderr_rx.try_recv() {
903 let redacted = secrets.redact(message.into());
904 stderr.push_str(redacted.as_ref());
905 stderr.push('\n');
906 progress.set_message(redacted.as_ref());
907 if let Some(level) = log_level_stderr.as_ref() {
908 progress.log(
909 *level,
910 format_monitor_log_message(*level, "stdout", command, redacted.as_ref())
911 .as_str(),
912 );
913 }
914 }
915 content.push_str(stderr.as_str());
916
917 if let Some(writer) = writer {
918 let _ = writer.write_all(stderr.as_bytes());
919 }
920 Ok(())
921 };
922
923 let mut stderr_content = String::new();
924 let mut stdout_content = String::new();
925
926 let mut output_file =
927 create_log_file(command, options, secrets).context("Failed to create log file")?;
928
929 let exit_status;
930
931 loop {
932 if let Some(status) = child_process
933 .try_wait()
934 .context("while waiting for child process")?
935 {
936 exit_status = Some(status);
937 break;
938 }
939
940 let stdout_content = if options.is_return_stdout {
941 Some(&mut stdout_content)
942 } else {
943 None
944 };
945
946 handle_stdout(progress_bar, output_file.as_mut(), stdout_content)
947 .context("failed to handle stdout")?;
948 handle_stderr(progress_bar, output_file.as_mut(), &mut stderr_content)
949 .context("failed to handle stderr")?;
950 std::thread::sleep(std::time::Duration::from_millis(100));
951 progress_bar.increment_with_overflow(1);
952
953 let now = std::time::Instant::now();
954 if let Some(timeout) = options.timeout {
955 if now - start_time > timeout {
956 child_process.kill().context("Failed to kill process")?;
957 }
958 }
959 }
960
961 let _ = stdout_thread.join();
962 let _ = stderr_thread.join();
963
964 {
965 let stdout_content = if options.is_return_stdout {
966 Some(&mut stdout_content)
967 } else {
968 None
969 };
970
971 handle_stdout(progress_bar, output_file.as_mut(), stdout_content)
972 .context("while handling stdout")?;
973 }
974
975 handle_stderr(progress_bar, output_file.as_mut(), &mut stderr_content)
976 .context("while handling stderr")?;
977
978 if let Some(exit_status) = exit_status {
979 if !exit_status.success() {
980 let stderr_message = if output_file.is_some() {
981 String::new()
982 } else {
983 format!(": {stderr_content}")
984 };
985 if let Some(code) = exit_status.code() {
986 let exit_message = format!("Command `{command}` failed with exit code: {code}");
987 return Err(anyhow::anyhow!("{exit_message}{stderr_message}"));
988 } else {
989 return Err(anyhow::anyhow!(
990 "Command `{command}` failed with unknown exit code{stderr_message}"
991 ));
992 }
993 }
994 }
995
996 Ok(if options.is_return_stdout {
997 Some(stdout_content)
998 } else {
999 None
1000 })
1001}
1002
1003fn create_log_file(
1004 command: &str,
1005 options: &ExecuteOptions,
1006 secrets: &Secrets,
1007) -> anyhow::Result<Option<std::fs::File>> {
1008 if let Some(log_path) = options.log_file_path.as_ref() {
1009 let mut file = std::fs::File::create(log_path.as_ref())
1010 .context(format!("while creating {log_path}"))?;
1011
1012 let mut environment = HashMap::new();
1013 const INHERITED: &str = "inherited";
1014 const GIVEN: &str = "given";
1015 environment.insert(INHERITED.into(), HashMap::new());
1016 environment.insert(GIVEN.into(), HashMap::new());
1017 let env_inherited = environment.get_mut(INHERITED).unwrap();
1018 if !options.clear_environment {
1019 for (key, value) in std::env::vars() {
1020 let redacted = secrets.redact(value.into());
1021 env_inherited.insert(key.into(), redacted);
1022 }
1023 }
1024 let env_given = environment.get_mut(GIVEN).unwrap();
1025 for (key, value) in options.environment.iter() {
1026 let redacted = secrets.redact(value.clone());
1027 env_given.insert(key.clone(), redacted);
1028 }
1029
1030 let arguments = options.arguments.join(" ");
1031 let arguments_escaped: Vec<_> =
1032 arguments.chars().flat_map(|c| c.escape_default()).collect();
1033 let args = arguments_escaped.into_iter().collect::<String>();
1034 let shell = format!("{command} {args}").into();
1035
1036 let redacted_arguments = options
1037 .arguments
1038 .iter()
1039 .map(|arg| secrets.redact(arg.clone()))
1040 .collect();
1041
1042 let log_header = LogHeader {
1043 command: command.into(),
1044 working_directory: options.working_directory.clone(),
1045 environment,
1046 arguments: redacted_arguments,
1047 shell,
1048 };
1049
1050 let log_header_serialized = serde_yaml::to_string(&log_header)
1051 .context("Internal Error: failed to yamlize log header")?;
1052
1053 let divider = Printer::get_log_divider();
1054
1055 file.write(format!("{log_header_serialized}{divider}\n").as_bytes())
1056 .context(format!("while writing {log_path}"))?;
1057
1058 Ok(Some(file))
1059 } else {
1060 Ok(None)
1061 }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066 use super::*;
1067
1068 #[derive(Serialize)]
1069 pub struct Test {
1070 pub name: String,
1071 pub age: u32,
1072 pub alive: bool,
1073 pub dead: bool,
1074 pub children: f64,
1075 }
1076
1077 #[test]
1078 fn printer() {
1079 let mut printer = Printer::new_stdout();
1080 let mut options = ExecuteOptions::default();
1081 options.arguments.push("-alt".into());
1082
1083 let runtime =
1084 tokio::runtime::Runtime::new().expect("Internal Error: Failed to create runtime");
1085
1086 let (async_sender, sync_receiver) = flume::bounded(1);
1087 runtime.spawn(async move {
1088 async_sender.send_async(10).await.expect("Failed to send");
1089 });
1090 let received = sync_receiver.recv().expect("Failed to receive");
1091
1092 drop(runtime);
1093
1094 printer.info("Received", &received).unwrap();
1095
1096 printer.execute_process("/bin/ls", options).unwrap();
1097
1098 {
1099 let mut heading = Heading::new(&mut printer, "First").unwrap();
1100 {
1101 let section = Section::new(&mut heading.printer, "PersonWrapper").unwrap();
1102 section
1103 .printer
1104 .object(
1105 "Person",
1106 &Test {
1107 name: "John".to_string(),
1108 age: 30,
1109 alive: true,
1110 dead: false,
1111 children: 2.5,
1112 },
1113 )
1114 .unwrap();
1115 }
1116
1117 let mut sub_heading = Heading::new(&mut heading.printer, "Second").unwrap();
1118
1119 let mut sub_section = Section::new(&mut sub_heading.printer, "PersonWrapper").unwrap();
1120 sub_section.printer.object("Hello", &"World").unwrap();
1121
1122 {
1123 let mut multi_progress = MultiProgress::new(&mut sub_section.printer);
1124 let mut first = multi_progress.add_progress("First", Some(10), None);
1125 let mut second = multi_progress.add_progress("Second", Some(50), None);
1126 let mut third = multi_progress.add_progress("Third", Some(100), None);
1127
1128 let first_handle = std::thread::spawn(move || {
1129 first.set_ending_message("Done!");
1130 for index in 0..10 {
1131 first.increment(1);
1132 if index == 5 {
1133 first.set_message("half way");
1134 }
1135 std::thread::sleep(std::time::Duration::from_millis(100));
1136 }
1137 });
1138
1139 let second_handle = std::thread::spawn(move || {
1140 for index in 0..50 {
1141 second.increment(1);
1142 if index == 25 {
1143 second.set_message("half way");
1144 }
1145 std::thread::sleep(std::time::Duration::from_millis(10));
1146 }
1147 });
1148
1149 for _ in 0..100 {
1150 third.increment(1);
1151 std::thread::sleep(std::time::Duration::from_millis(10));
1152 }
1153
1154 first_handle.join().unwrap();
1155 second_handle.join().unwrap();
1156 }
1157 }
1158
1159 {
1160 let runtime =
1161 tokio::runtime::Runtime::new().expect("Internal Error: Failed to create runtime");
1162
1163 let heading = Heading::new(&mut printer, "Async").unwrap();
1164
1165 let mut multi_progress = MultiProgress::new(heading.printer);
1166
1167 let mut handles = Vec::new();
1168
1169 let task1_progress = multi_progress.add_progress("Task1", Some(30), None);
1170 let task2_progress = multi_progress.add_progress("Task2", Some(30), None);
1171 let task1 = async move {
1172 let mut progress = task1_progress;
1173 progress.set_message("Task1a");
1174 for _ in 0..10 {
1175 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1176 progress.increment(1);
1177 }
1178
1179 progress.set_message("Task1b");
1180 for _ in 0..10 {
1181 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1182 progress.increment(1);
1183 }
1184
1185 progress.set_message("Task1c");
1186 for _ in 0..10 {
1187 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1188 progress.increment(1);
1189 }
1190 ()
1191 };
1192 handles.push(runtime.spawn(task1));
1193
1194 let task2 = async move {
1195 let mut progress = task2_progress;
1196 progress.set_message("Task2a");
1197 for _ in 0..10 {
1198 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1199 progress.increment(1);
1200 }
1201
1202 progress.set_message("Task2b");
1203 for _ in 0..10 {
1204 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1205 progress.increment(1);
1206 }
1207
1208 progress.set_message("Task2c");
1209 for _ in 0..10 {
1210 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1211 progress.increment(1);
1212 }
1213 ()
1214 };
1215 handles.push(runtime.spawn(task2));
1216
1217 for handle in handles {
1218 runtime.block_on(handle).unwrap();
1219 }
1220 }
1221 }
1222}