Skip to main content

orchestrator_runner/
output_capture.rs

1use crate::runner::redact_text;
2use anyhow::{Context, Result};
3use std::fs::File;
4use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
5
6/// Join handles for background tasks that redact and persist child output streams.
7pub struct OutputCaptureHandles {
8    stdout_task: tokio::task::JoinHandle<Result<()>>,
9    stderr_task: tokio::task::JoinHandle<Result<()>>,
10}
11
12impl OutputCaptureHandles {
13    /// Waits for both capture tasks to finish and propagates any failure.
14    pub async fn wait(self) -> Result<()> {
15        self.stdout_task
16            .await
17            .context("stdout capture task panicked")??;
18        self.stderr_task
19            .await
20            .context("stderr capture task panicked")??;
21        Ok(())
22    }
23}
24
25/// Spawns background tasks that redact stdout and stderr into separate files.
26pub fn spawn_sanitized_output_capture<
27    Stdout: AsyncRead + Unpin + Send + 'static,
28    Stderr: AsyncRead + Unpin + Send + 'static,
29>(
30    stdout: Stdout,
31    stderr: Stderr,
32    stdout_file: File,
33    stderr_file: File,
34    redaction_patterns: Vec<String>,
35) -> OutputCaptureHandles {
36    let stdout_patterns = redaction_patterns.clone();
37    let stdout_task =
38        tokio::spawn(async move { pipe_and_redact(stdout, stdout_file, stdout_patterns).await });
39    let stderr_task =
40        tokio::spawn(async move { pipe_and_redact(stderr, stderr_file, redaction_patterns).await });
41    OutputCaptureHandles {
42        stdout_task,
43        stderr_task,
44    }
45}
46
47struct StreamingRedactor {
48    patterns: Vec<String>,
49    pending: String,
50}
51
52impl StreamingRedactor {
53    fn new(patterns: Vec<String>) -> Self {
54        Self {
55            patterns,
56            pending: String::new(),
57        }
58    }
59
60    fn push_chunk(&mut self, chunk: &[u8]) -> String {
61        self.pending.push_str(&String::from_utf8_lossy(chunk));
62        let Some(last_newline) = self.pending.rfind('\n') else {
63            return String::new();
64        };
65        let split_at = last_newline + 1;
66        let emit = self.pending[..split_at].to_string();
67        self.pending.drain(..split_at);
68        redact_text(&emit, &self.patterns)
69    }
70
71    fn finish(mut self) -> String {
72        if self.pending.is_empty() {
73            return String::new();
74        }
75        let final_text = redact_text(&self.pending, &self.patterns);
76        self.pending.clear();
77        final_text
78    }
79}
80
81async fn pipe_and_redact<R: AsyncRead + Unpin>(
82    mut reader: R,
83    file: File,
84    redaction_patterns: Vec<String>,
85) -> Result<()> {
86    let mut writer = tokio::fs::File::from_std(file);
87    let mut redactor = StreamingRedactor::new(redaction_patterns);
88    let mut buf = [0_u8; 8192];
89    loop {
90        let read = reader
91            .read(&mut buf)
92            .await
93            .context("failed to read child output")?;
94        if read == 0 {
95            break;
96        }
97        let redacted = redactor.push_chunk(&buf[..read]);
98        if !redacted.is_empty() {
99            writer
100                .write_all(redacted.as_bytes())
101                .await
102                .context("failed to write redacted output")?;
103        }
104    }
105    let final_chunk = redactor.finish();
106    if !final_chunk.is_empty() {
107        writer
108            .write_all(final_chunk.as_bytes())
109            .await
110            .context("failed to flush final redacted output")?;
111    }
112    writer
113        .flush()
114        .await
115        .context("failed to flush redacted output file")?;
116    Ok(())
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122
123    #[test]
124    fn streaming_redactor_redacts_cross_chunk_secrets() {
125        let mut redactor = StreamingRedactor::new(vec!["super-secret-value".to_string()]);
126        let first = redactor.push_chunk(b"token=super-sec");
127        let second = redactor.push_chunk(b"ret-value done\n");
128        let final_chunk = redactor.finish();
129
130        let combined = format!("{first}{second}{final_chunk}");
131        assert!(combined.contains("[REDACTED]"));
132        assert!(!combined.contains("super-secret-value"));
133    }
134
135    #[test]
136    fn streaming_redactor_preserves_visible_text() {
137        let mut redactor = StreamingRedactor::new(vec!["secret".to_string()]);
138        let chunk = redactor.push_chunk(b"public=visible");
139        let final_chunk = redactor.finish();
140        assert_eq!(format!("{chunk}{final_chunk}"), "public=visible");
141    }
142}