orchestrator_runner/
output_capture.rs1use crate::runner::redact_text;
2use anyhow::{Context, Result};
3use std::fs::File;
4use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
5
6pub struct OutputCaptureHandles {
8 stdout_task: tokio::task::JoinHandle<Result<()>>,
9 stderr_task: tokio::task::JoinHandle<Result<()>>,
10}
11
12impl OutputCaptureHandles {
13 pub async fn wait(self) -> Result<()> {
15 self.stdout_task
16 .await
17 .context("stdout capture task panicked")??;
18 self.stderr_task
19 .await
20 .context("stderr capture task panicked")??;
21 Ok(())
22 }
23}
24
25pub fn spawn_sanitized_output_capture<
27 Stdout: AsyncRead + Unpin + Send + 'static,
28 Stderr: AsyncRead + Unpin + Send + 'static,
29>(
30 stdout: Stdout,
31 stderr: Stderr,
32 stdout_file: File,
33 stderr_file: File,
34 redaction_patterns: Vec<String>,
35) -> OutputCaptureHandles {
36 let stdout_patterns = redaction_patterns.clone();
37 let stdout_task =
38 tokio::spawn(async move { pipe_and_redact(stdout, stdout_file, stdout_patterns).await });
39 let stderr_task =
40 tokio::spawn(async move { pipe_and_redact(stderr, stderr_file, redaction_patterns).await });
41 OutputCaptureHandles {
42 stdout_task,
43 stderr_task,
44 }
45}
46
47struct StreamingRedactor {
48 patterns: Vec<String>,
49 pending: String,
50}
51
52impl StreamingRedactor {
53 fn new(patterns: Vec<String>) -> Self {
54 Self {
55 patterns,
56 pending: String::new(),
57 }
58 }
59
60 fn push_chunk(&mut self, chunk: &[u8]) -> String {
61 self.pending.push_str(&String::from_utf8_lossy(chunk));
62 let Some(last_newline) = self.pending.rfind('\n') else {
63 return String::new();
64 };
65 let split_at = last_newline + 1;
66 let emit = self.pending[..split_at].to_string();
67 self.pending.drain(..split_at);
68 redact_text(&emit, &self.patterns)
69 }
70
71 fn finish(mut self) -> String {
72 if self.pending.is_empty() {
73 return String::new();
74 }
75 let final_text = redact_text(&self.pending, &self.patterns);
76 self.pending.clear();
77 final_text
78 }
79}
80
81async fn pipe_and_redact<R: AsyncRead + Unpin>(
82 mut reader: R,
83 file: File,
84 redaction_patterns: Vec<String>,
85) -> Result<()> {
86 let mut writer = tokio::fs::File::from_std(file);
87 let mut redactor = StreamingRedactor::new(redaction_patterns);
88 let mut buf = [0_u8; 8192];
89 loop {
90 let read = reader
91 .read(&mut buf)
92 .await
93 .context("failed to read child output")?;
94 if read == 0 {
95 break;
96 }
97 let redacted = redactor.push_chunk(&buf[..read]);
98 if !redacted.is_empty() {
99 writer
100 .write_all(redacted.as_bytes())
101 .await
102 .context("failed to write redacted output")?;
103 }
104 }
105 let final_chunk = redactor.finish();
106 if !final_chunk.is_empty() {
107 writer
108 .write_all(final_chunk.as_bytes())
109 .await
110 .context("failed to flush final redacted output")?;
111 }
112 writer
113 .flush()
114 .await
115 .context("failed to flush redacted output file")?;
116 Ok(())
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122
123 #[test]
124 fn streaming_redactor_redacts_cross_chunk_secrets() {
125 let mut redactor = StreamingRedactor::new(vec!["super-secret-value".to_string()]);
126 let first = redactor.push_chunk(b"token=super-sec");
127 let second = redactor.push_chunk(b"ret-value done\n");
128 let final_chunk = redactor.finish();
129
130 let combined = format!("{first}{second}{final_chunk}");
131 assert!(combined.contains("[REDACTED]"));
132 assert!(!combined.contains("super-secret-value"));
133 }
134
135 #[test]
136 fn streaming_redactor_preserves_visible_text() {
137 let mut redactor = StreamingRedactor::new(vec!["secret".to_string()]);
138 let chunk = redactor.push_chunk(b"public=visible");
139 let final_chunk = redactor.finish();
140 assert_eq!(format!("{chunk}{final_chunk}"), "public=visible");
141 }
142}