1use std::process::Stdio;
2
3use tokio::io::{AsyncBufReadExt, BufReader};
4use tokio::process::Command;
5
6use crate::pipeline::{Pipeline, PipelineResult};
7
8pub async fn run_filtered(
14 command: &str,
15 args: &[String],
16 pipeline: &Pipeline,
17) -> Result<FilteredRun, StreamError> {
18 let mut child = Command::new(command)
19 .args(args)
20 .stdout(Stdio::piped())
21 .stderr(Stdio::piped())
22 .spawn()
23 .map_err(|e| StreamError::SpawnFailed {
24 command: command.to_string(),
25 source: e,
26 })?;
27
28 let stdout = child.stdout.take();
29 let stderr = child.stderr.take();
30
31 let (stdout_lines, stderr_lines) = tokio::join!(read_lines(stdout), read_lines_stderr(stderr),);
32
33 let status = child.wait().await.map_err(|e| StreamError::WaitFailed {
34 command: command.to_string(),
35 source: e,
36 })?;
37
38 let raw_stdout = stdout_lines.join("\n");
39 let raw_stderr = stderr_lines.join("\n");
40
41 let stdout_result = pipeline.process(&raw_stdout);
42 let stderr_result = pipeline.process(&raw_stderr);
43
44 Ok(FilteredRun {
45 stdout: stdout_result,
46 stderr: stderr_result,
47 exit_code: status.code().unwrap_or(-1),
48 raw_stdout_len: raw_stdout.len(),
49 raw_stderr_len: raw_stderr.len(),
50 })
51}
52
53async fn read_lines(reader: Option<tokio::process::ChildStdout>) -> Vec<String> {
54 let Some(reader) = reader else {
55 return Vec::new();
56 };
57 let mut lines = Vec::new();
58 let mut buf_reader = BufReader::new(reader);
59 let mut line = String::new();
60 while buf_reader.read_line(&mut line).await.unwrap_or(0) > 0 {
61 lines.push(line.trim_end_matches('\n').to_string());
62 line.clear();
63 }
64 lines
65}
66
67async fn read_lines_stderr(reader: Option<tokio::process::ChildStderr>) -> Vec<String> {
69 let Some(reader) = reader else {
70 return Vec::new();
71 };
72 let mut lines = Vec::new();
73 let mut buf_reader = BufReader::new(reader);
74 let mut line = String::new();
75 while buf_reader.read_line(&mut line).await.unwrap_or(0) > 0 {
76 lines.push(line.trim_end_matches('\n').to_string());
77 line.clear();
78 }
79 lines
80}
81
82#[derive(Debug)]
83pub struct FilteredRun {
84 pub stdout: PipelineResult,
85 pub stderr: PipelineResult,
86 pub exit_code: i32,
87 pub raw_stdout_len: usize,
88 pub raw_stderr_len: usize,
89}
90
91impl FilteredRun {
92 #[must_use]
93 pub fn total_savings(&self) -> usize {
94 self.stdout.savings + self.stderr.savings
95 }
96
97 #[must_use]
98 pub fn total_original_tokens(&self) -> usize {
99 self.stdout.original_tokens + self.stderr.original_tokens
100 }
101}
102
103#[derive(Debug, thiserror::Error)]
104pub enum StreamError {
105 #[error("failed to spawn '{command}': {source}")]
106 SpawnFailed {
107 command: String,
108 source: std::io::Error,
109 },
110 #[error("failed waiting for '{command}': {source}")]
111 WaitFailed {
112 command: String,
113 source: std::io::Error,
114 },
115}