Skip to main content

cli_denoiser/
stream.rs

1use std::process::Stdio;
2
3use tokio::io::{AsyncBufReadExt, BufReader};
4use tokio::process::Command;
5
6use crate::pipeline::{Pipeline, PipelineResult};
7
8/// Execute a command, capture output, filter through pipeline, return result.
9/// Preserves the original exit code.
10///
11/// # Errors
12/// Returns `StreamError` if the command cannot be spawned or waited on.
13pub async fn run_filtered(
14    command: &str,
15    args: &[String],
16    pipeline: &Pipeline,
17) -> Result<FilteredRun, StreamError> {
18    let mut child = Command::new(command)
19        .args(args)
20        .stdout(Stdio::piped())
21        .stderr(Stdio::piped())
22        .spawn()
23        .map_err(|e| StreamError::SpawnFailed {
24            command: command.to_string(),
25            source: e,
26        })?;
27
28    let stdout = child.stdout.take();
29    let stderr = child.stderr.take();
30
31    let (stdout_lines, stderr_lines) = tokio::join!(read_lines(stdout), read_lines_stderr(stderr),);
32
33    let status = child.wait().await.map_err(|e| StreamError::WaitFailed {
34        command: command.to_string(),
35        source: e,
36    })?;
37
38    let raw_stdout = stdout_lines.join("\n");
39    let raw_stderr = stderr_lines.join("\n");
40
41    let stdout_result = pipeline.process(&raw_stdout);
42    let stderr_result = pipeline.process(&raw_stderr);
43
44    Ok(FilteredRun {
45        stdout: stdout_result,
46        stderr: stderr_result,
47        exit_code: status.code().unwrap_or(-1),
48        raw_stdout_len: raw_stdout.len(),
49        raw_stderr_len: raw_stderr.len(),
50    })
51}
52
53async fn read_lines(reader: Option<tokio::process::ChildStdout>) -> Vec<String> {
54    let Some(reader) = reader else {
55        return Vec::new();
56    };
57    let mut lines = Vec::new();
58    let mut buf_reader = BufReader::new(reader);
59    let mut line = String::new();
60    while buf_reader.read_line(&mut line).await.unwrap_or(0) > 0 {
61        lines.push(line.trim_end_matches('\n').to_string());
62        line.clear();
63    }
64    lines
65}
66
67// Overload for stderr (different type)
68async fn read_lines_stderr(reader: Option<tokio::process::ChildStderr>) -> Vec<String> {
69    let Some(reader) = reader else {
70        return Vec::new();
71    };
72    let mut lines = Vec::new();
73    let mut buf_reader = BufReader::new(reader);
74    let mut line = String::new();
75    while buf_reader.read_line(&mut line).await.unwrap_or(0) > 0 {
76        lines.push(line.trim_end_matches('\n').to_string());
77        line.clear();
78    }
79    lines
80}
81
82#[derive(Debug)]
83pub struct FilteredRun {
84    pub stdout: PipelineResult,
85    pub stderr: PipelineResult,
86    pub exit_code: i32,
87    pub raw_stdout_len: usize,
88    pub raw_stderr_len: usize,
89}
90
91impl FilteredRun {
92    #[must_use]
93    pub fn total_savings(&self) -> usize {
94        self.stdout.savings + self.stderr.savings
95    }
96
97    #[must_use]
98    pub fn total_original_tokens(&self) -> usize {
99        self.stdout.original_tokens + self.stderr.original_tokens
100    }
101}
102
103#[derive(Debug, thiserror::Error)]
104pub enum StreamError {
105    #[error("failed to spawn '{command}': {source}")]
106    SpawnFailed {
107        command: String,
108        source: std::io::Error,
109    },
110    #[error("failed waiting for '{command}': {source}")]
111    WaitFailed {
112        command: String,
113        source: std::io::Error,
114    },
115}