1use super::redact::Redactor;
2use async_trait::async_trait;
3use chrono::{DateTime, Duration, Utc};
4use derive_builder::Builder;
5use mockall::automock;
6use std::collections::BTreeMap;
7use std::ffi::OsString;
8use std::fmt::Write;
9use std::os::unix::fs::PermissionsExt;
10use std::path::{Path, PathBuf};
11use std::process::Stdio;
12use thiserror::Error;
13use tokio::io::{AsyncBufReadExt, BufReader};
14use tokio::sync::RwLock;
15use tracing::{debug, error, info};
16use which::which_in;
17
18#[derive(Debug, Default)]
19struct RwLockOutput {
20 output: RwLock<Vec<(DateTime<Utc>, String)>>,
21}
22
23impl RwLockOutput {
24 async fn add_line(&self, line: &str) {
25 let mut stdout = self.output.write().await;
26 stdout.push((Utc::now(), line.to_string()));
27 }
28}
29
30#[derive(Default, Builder)]
31#[builder(setter(into))]
32pub struct OutputCapture {
33 #[builder(default)]
34 pub working_dir: PathBuf,
35 #[builder(default)]
36 stdout: Vec<(DateTime<Utc>, String)>,
37 #[builder(default)]
38 stderr: Vec<(DateTime<Utc>, String)>,
39 #[builder(default)]
40 pub exit_code: Option<i32>,
41 #[builder(default)]
42 start_time: DateTime<Utc>,
43 #[builder(default)]
44 end_time: DateTime<Utc>,
45 #[builder(default)]
46 pub command: String,
47}
48
49#[derive(Clone, Debug)]
50pub enum OutputDestination {
51 StandardOut,
52 Logging,
53 Null,
54}
55
56#[derive(Error, Debug)]
57pub enum CaptureError {
58 #[error("Unable to process file. {error:?}")]
59 IoError {
60 #[from]
61 error: std::io::Error,
62 },
63 #[error("File {name} was not executable or it did not exist.")]
64 MissingShExec { name: String },
65 #[error("Unable to parse UTF-8 output. {error:?}")]
66 FromUtf8Error {
67 #[from]
68 error: std::string::FromUtf8Error,
69 },
70}
71
72#[automock]
73#[async_trait]
74pub trait ExecutionProvider: Send + Sync {
75 async fn run_command<'a>(&self, opts: CaptureOpts<'a>) -> Result<OutputCapture, CaptureError>;
76}
77
78#[derive(Default, Debug)]
79pub struct DefaultExecutionProvider {}
80
81#[async_trait]
82impl ExecutionProvider for DefaultExecutionProvider {
83 async fn run_command<'a>(&self, opts: CaptureOpts<'a>) -> Result<OutputCapture, CaptureError> {
84 OutputCapture::capture_output(opts).await
85 }
86}
87
88pub struct CaptureOpts<'a> {
89 pub working_dir: &'a Path,
90 pub env_vars: BTreeMap<String, String>,
91 pub path: &'a str,
92 pub args: &'a [String],
93 pub output_dest: OutputDestination,
94}
95
96impl<'a> CaptureOpts<'a> {
97 fn command(&self) -> String {
98 self.args.join(" ")
99 }
100}
101
102impl OutputCapture {
103 pub async fn capture_output(opts: CaptureOpts<'_>) -> Result<Self, CaptureError> {
104 check_pre_exec(&opts)?;
105 let args = opts.args.to_vec();
106
107 debug!("Executing PATH={} {:?}", &opts.path, &args);
108
109 let start_time = Utc::now();
110 let mut command = tokio::process::Command::new("/usr/bin/env");
111 let mut child = command
112 .arg("-S")
113 .args(args)
114 .env("PATH", opts.path)
115 .envs(&opts.env_vars)
116 .stderr(Stdio::piped())
117 .stdout(Stdio::piped())
118 .current_dir(opts.working_dir)
119 .spawn()?;
120
121 let stdout = child.stdout.take().expect("stdout to be available");
122 let stderr = child.stderr.take().expect("stdout to be available");
123
124 let stdout = {
125 let captured = RwLockOutput::default();
126 let output_dest = opts.output_dest.clone();
127 async move {
128 let mut reader = BufReader::new(stdout).lines();
129 while let Some(line) = reader.next_line().await? {
130 captured.add_line(&line).await;
131 match output_dest {
132 OutputDestination::Logging => info!("{}", line),
133 OutputDestination::StandardOut => println!("{}", line),
134 OutputDestination::Null => {}
135 }
136 }
137
138 Ok::<_, anyhow::Error>(captured.output.into_inner())
139 }
140 };
141
142 let stderr = {
143 let captured = RwLockOutput::default();
144 let output_dest = opts.output_dest.clone();
145 async move {
146 let mut reader = BufReader::new(stderr).lines();
147 while let Some(line) = reader.next_line().await? {
148 captured.add_line(&line).await;
149 match output_dest {
150 OutputDestination::Logging => error!("{}", line),
151 OutputDestination::StandardOut => eprintln!("{}", line),
152 OutputDestination::Null => {}
153 }
154 }
155
156 Ok::<_, anyhow::Error>(captured.output.into_inner())
157 }
158 };
159
160 let (command_result, wait_stdout, wait_stderr) = tokio::join!(child.wait(), stdout, stderr);
161 let end_time = Utc::now();
162 debug!("join result {:?}", command_result);
163
164 let captured_stdout = wait_stdout.unwrap_or_default();
165 let captured_stderr = wait_stderr.unwrap_or_default();
166
167 Ok(Self {
168 working_dir: opts.working_dir.to_path_buf(),
169 stdout: captured_stdout,
170 stderr: captured_stderr,
171 exit_code: command_result.ok().and_then(|x| x.code()),
172 start_time,
173 end_time,
174 command: opts.command(),
175 })
176 }
177
178 pub fn generate_output(&self) -> String {
179 let stdout: Vec<_> = self
180 .stdout
181 .iter()
182 .map(|(time, line)| {
183 let offset: Duration = *time - self.start_time;
184 (*time, format!("{} OUT: {}", offset, line))
185 })
186 .collect();
187
188 let stderr: Vec<_> = self
189 .stderr
190 .iter()
191 .map(|(time, line)| {
192 let offset: Duration = *time - self.start_time;
193 (*time, format!("{} ERR: {}", offset, line))
194 })
195 .collect();
196
197 let mut output = Vec::new();
198 output.extend(stdout);
199 output.extend(stderr);
200
201 output.sort_by(|(l_time, _), (r_time, _)| l_time.cmp(r_time));
202
203 let text: String = output
204 .iter()
205 .map(|(_, line)| line.clone())
206 .collect::<Vec<_>>()
207 .join("\n");
208
209 Redactor::new().redact_text(&text).to_string()
210 }
211
212 pub fn create_report_text(&self) -> anyhow::Result<String> {
213 let mut f = String::new();
214 writeln!(&mut f, "### Command Results\n")?;
215 writeln!(&mut f, "Ran command `/usr/bin/env -S {}`", self.command)?;
216 writeln!(
217 &mut f,
218 "Execution started: {}; finished: {}",
219 self.start_time, self.end_time
220 )?;
221 writeln!(
222 &mut f,
223 "Result of command: {}",
224 self.exit_code.unwrap_or(-1)
225 )?;
226 writeln!(&mut f)?;
227 writeln!(&mut f, "#### Output")?;
228 writeln!(&mut f)?;
229 writeln!(&mut f, "```text")?;
230 writeln!(&mut f, "{}", self.generate_output().trim())?;
231 writeln!(&mut f, "```")?;
232 writeln!(&mut f)?;
233 Ok(f)
234 }
235
236 pub fn get_stdout(&self) -> String {
237 self.stdout
238 .iter()
239 .map(|(_, line)| line.clone())
240 .collect::<Vec<_>>()
241 .join("\n")
242 }
243
244 pub fn get_stderr(&self) -> String {
245 self.stderr
246 .iter()
247 .map(|(_, line)| line.clone())
248 .collect::<Vec<_>>()
249 .join("\n")
250 }
251}
252
253fn check_pre_exec(opts: &CaptureOpts) -> Result<(), CaptureError> {
254 let command = opts.command();
255 let found_binary = match command.split(' ').collect::<Vec<_>>().first() {
256 None => return Err(CaptureError::MissingShExec { name: command }),
257 Some(path) => which_in(path, Some(OsString::from(opts.path)), opts.working_dir),
258 };
259
260 let path = match found_binary {
261 Ok(path) => path,
262 Err(e) => {
263 debug!("Unable to find binary {:?}", e);
264 return Err(CaptureError::MissingShExec { name: command });
265 }
266 };
267
268 if !path.exists() {
269 return Err(CaptureError::MissingShExec {
270 name: path.display().to_string(),
271 });
272 }
273 let metadata = std::fs::metadata(&path)?;
274 let permissions = metadata.permissions().mode();
275 if permissions & 0x700 == 0 {
276 return Err(CaptureError::MissingShExec {
277 name: path.display().to_string(),
278 });
279 }
280
281 Ok(())
282}