dev_scope/shared/
capture.rs

1use super::redact::Redactor;
2use async_trait::async_trait;
3use chrono::{DateTime, Duration, Utc};
4use derive_builder::Builder;
5use mockall::automock;
6use std::collections::BTreeMap;
7use std::ffi::OsString;
8use std::fmt::Write;
9use std::os::unix::fs::PermissionsExt;
10use std::path::{Path, PathBuf};
11use std::process::Stdio;
12use thiserror::Error;
13use tokio::io::{AsyncBufReadExt, BufReader};
14use tokio::sync::RwLock;
15use tracing::{debug, error, info};
16use which::which_in;
17
18#[derive(Debug, Default)]
19struct RwLockOutput {
20    output: RwLock<Vec<(DateTime<Utc>, String)>>,
21}
22
23impl RwLockOutput {
24    async fn add_line(&self, line: &str) {
25        let mut stdout = self.output.write().await;
26        stdout.push((Utc::now(), line.to_string()));
27    }
28}
29
30#[derive(Default, Builder)]
31#[builder(setter(into))]
32pub struct OutputCapture {
33    #[builder(default)]
34    pub working_dir: PathBuf,
35    #[builder(default)]
36    stdout: Vec<(DateTime<Utc>, String)>,
37    #[builder(default)]
38    stderr: Vec<(DateTime<Utc>, String)>,
39    #[builder(default)]
40    pub exit_code: Option<i32>,
41    #[builder(default)]
42    start_time: DateTime<Utc>,
43    #[builder(default)]
44    end_time: DateTime<Utc>,
45    #[builder(default)]
46    pub command: String,
47}
48
49#[derive(Clone, Debug)]
50pub enum OutputDestination {
51    StandardOut,
52    Logging,
53    Null,
54}
55
56#[derive(Error, Debug)]
57pub enum CaptureError {
58    #[error("Unable to process file. {error:?}")]
59    IoError {
60        #[from]
61        error: std::io::Error,
62    },
63    #[error("File {name} was not executable or it did not exist.")]
64    MissingShExec { name: String },
65    #[error("Unable to parse UTF-8 output. {error:?}")]
66    FromUtf8Error {
67        #[from]
68        error: std::string::FromUtf8Error,
69    },
70}
71
72#[automock]
73#[async_trait]
74pub trait ExecutionProvider: Send + Sync {
75    async fn run_command<'a>(&self, opts: CaptureOpts<'a>) -> Result<OutputCapture, CaptureError>;
76}
77
78#[derive(Default, Debug)]
79pub struct DefaultExecutionProvider {}
80
81#[async_trait]
82impl ExecutionProvider for DefaultExecutionProvider {
83    async fn run_command<'a>(&self, opts: CaptureOpts<'a>) -> Result<OutputCapture, CaptureError> {
84        OutputCapture::capture_output(opts).await
85    }
86}
87
88pub struct CaptureOpts<'a> {
89    pub working_dir: &'a Path,
90    pub env_vars: BTreeMap<String, String>,
91    pub path: &'a str,
92    pub args: &'a [String],
93    pub output_dest: OutputDestination,
94}
95
96impl<'a> CaptureOpts<'a> {
97    fn command(&self) -> String {
98        self.args.join(" ")
99    }
100}
101
102impl OutputCapture {
103    pub async fn capture_output(opts: CaptureOpts<'_>) -> Result<Self, CaptureError> {
104        check_pre_exec(&opts)?;
105        let args = opts.args.to_vec();
106
107        debug!("Executing PATH={} {:?}", &opts.path, &args);
108
109        let start_time = Utc::now();
110        let mut command = tokio::process::Command::new("/usr/bin/env");
111        let mut child = command
112            .arg("-S")
113            .args(args)
114            .env("PATH", opts.path)
115            .envs(&opts.env_vars)
116            .stderr(Stdio::piped())
117            .stdout(Stdio::piped())
118            .current_dir(opts.working_dir)
119            .spawn()?;
120
121        let stdout = child.stdout.take().expect("stdout to be available");
122        let stderr = child.stderr.take().expect("stdout to be available");
123
124        let stdout = {
125            let captured = RwLockOutput::default();
126            let output_dest = opts.output_dest.clone();
127            async move {
128                let mut reader = BufReader::new(stdout).lines();
129                while let Some(line) = reader.next_line().await? {
130                    captured.add_line(&line).await;
131                    match output_dest {
132                        OutputDestination::Logging => info!("{}", line),
133                        OutputDestination::StandardOut => println!("{}", line),
134                        OutputDestination::Null => {}
135                    }
136                }
137
138                Ok::<_, anyhow::Error>(captured.output.into_inner())
139            }
140        };
141
142        let stderr = {
143            let captured = RwLockOutput::default();
144            let output_dest = opts.output_dest.clone();
145            async move {
146                let mut reader = BufReader::new(stderr).lines();
147                while let Some(line) = reader.next_line().await? {
148                    captured.add_line(&line).await;
149                    match output_dest {
150                        OutputDestination::Logging => error!("{}", line),
151                        OutputDestination::StandardOut => eprintln!("{}", line),
152                        OutputDestination::Null => {}
153                    }
154                }
155
156                Ok::<_, anyhow::Error>(captured.output.into_inner())
157            }
158        };
159
160        let (command_result, wait_stdout, wait_stderr) = tokio::join!(child.wait(), stdout, stderr);
161        let end_time = Utc::now();
162        debug!("join result {:?}", command_result);
163
164        let captured_stdout = wait_stdout.unwrap_or_default();
165        let captured_stderr = wait_stderr.unwrap_or_default();
166
167        Ok(Self {
168            working_dir: opts.working_dir.to_path_buf(),
169            stdout: captured_stdout,
170            stderr: captured_stderr,
171            exit_code: command_result.ok().and_then(|x| x.code()),
172            start_time,
173            end_time,
174            command: opts.command(),
175        })
176    }
177
178    pub fn generate_output(&self) -> String {
179        let stdout: Vec<_> = self
180            .stdout
181            .iter()
182            .map(|(time, line)| {
183                let offset: Duration = *time - self.start_time;
184                (*time, format!("{} OUT: {}", offset, line))
185            })
186            .collect();
187
188        let stderr: Vec<_> = self
189            .stderr
190            .iter()
191            .map(|(time, line)| {
192                let offset: Duration = *time - self.start_time;
193                (*time, format!("{} ERR: {}", offset, line))
194            })
195            .collect();
196
197        let mut output = Vec::new();
198        output.extend(stdout);
199        output.extend(stderr);
200
201        output.sort_by(|(l_time, _), (r_time, _)| l_time.cmp(r_time));
202
203        let text: String = output
204            .iter()
205            .map(|(_, line)| line.clone())
206            .collect::<Vec<_>>()
207            .join("\n");
208
209        Redactor::new().redact_text(&text).to_string()
210    }
211
212    pub fn create_report_text(&self) -> anyhow::Result<String> {
213        let mut f = String::new();
214        writeln!(&mut f, "### Command Results\n")?;
215        writeln!(&mut f, "Ran command `/usr/bin/env -S {}`", self.command)?;
216        writeln!(
217            &mut f,
218            "Execution started: {}; finished: {}",
219            self.start_time, self.end_time
220        )?;
221        writeln!(
222            &mut f,
223            "Result of command: {}",
224            self.exit_code.unwrap_or(-1)
225        )?;
226        writeln!(&mut f)?;
227        writeln!(&mut f, "#### Output")?;
228        writeln!(&mut f)?;
229        writeln!(&mut f, "```text")?;
230        writeln!(&mut f, "{}", self.generate_output().trim())?;
231        writeln!(&mut f, "```")?;
232        writeln!(&mut f)?;
233        Ok(f)
234    }
235
236    pub fn get_stdout(&self) -> String {
237        self.stdout
238            .iter()
239            .map(|(_, line)| line.clone())
240            .collect::<Vec<_>>()
241            .join("\n")
242    }
243
244    pub fn get_stderr(&self) -> String {
245        self.stderr
246            .iter()
247            .map(|(_, line)| line.clone())
248            .collect::<Vec<_>>()
249            .join("\n")
250    }
251}
252
253fn check_pre_exec(opts: &CaptureOpts) -> Result<(), CaptureError> {
254    let command = opts.command();
255    let found_binary = match command.split(' ').collect::<Vec<_>>().first() {
256        None => return Err(CaptureError::MissingShExec { name: command }),
257        Some(path) => which_in(path, Some(OsString::from(opts.path)), opts.working_dir),
258    };
259
260    let path = match found_binary {
261        Ok(path) => path,
262        Err(e) => {
263            debug!("Unable to find binary {:?}", e);
264            return Err(CaptureError::MissingShExec { name: command });
265        }
266    };
267
268    if !path.exists() {
269        return Err(CaptureError::MissingShExec {
270            name: path.display().to_string(),
271        });
272    }
273    let metadata = std::fs::metadata(&path)?;
274    let permissions = metadata.permissions().mode();
275    if permissions & 0x700 == 0 {
276        return Err(CaptureError::MissingShExec {
277            name: path.display().to_string(),
278        });
279    }
280
281    Ok(())
282}