use regex::Regex;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{ChildStderr, ChildStdout};
use tokio::sync::{broadcast, mpsc};
use crate::redaction;
use terraphim_types::capability::ProcessId;
const MAX_CAPTURED_EVENTS: usize = 4096;
#[derive(Debug, Clone)]
pub enum OutputEvent {
Stdout { process_id: ProcessId, line: String },
Stderr { process_id: ProcessId, line: String },
Mention {
process_id: ProcessId,
target: String,
message: String,
},
Completed {
process_id: ProcessId,
exit_code: Option<i32>,
},
}
impl OutputEvent {
fn redacted(&self) -> Self {
match self {
Self::Stdout { process_id, line } => Self::Stdout {
process_id: *process_id,
line: redaction::redact(line),
},
Self::Stderr { process_id, line } => Self::Stderr {
process_id: *process_id,
line: redaction::redact(line),
},
Self::Mention {
process_id,
target,
message,
} => Self::Mention {
process_id: *process_id,
target: target.clone(),
message: redaction::redact(message),
},
Self::Completed {
process_id,
exit_code,
} => Self::Completed {
process_id: *process_id,
exit_code: *exit_code,
},
}
}
}
#[derive(Debug)]
pub struct OutputCapture {
process_id: ProcessId,
mention_regex: Regex,
event_sender: mpsc::Sender<OutputEvent>,
broadcast_sender: broadcast::Sender<OutputEvent>,
captured_events: Arc<Mutex<VecDeque<OutputEvent>>>,
}
impl OutputCapture {
pub fn new(
process_id: ProcessId,
stdout: BufReader<ChildStdout>,
stderr: BufReader<ChildStderr>,
) -> Self {
let (event_sender, _event_receiver) = mpsc::channel::<OutputEvent>(100);
let (broadcast_sender, _) = broadcast::channel(256);
let capture = Self {
process_id,
mention_regex: Regex::new(r"@(\w+)").unwrap(),
event_sender,
broadcast_sender,
captured_events: Arc::new(Mutex::new(VecDeque::new())),
};
capture.capture_stdout(stdout);
capture.capture_stderr(stderr);
capture
}
pub fn subscribe(&self) -> broadcast::Receiver<OutputEvent> {
self.broadcast_sender.subscribe()
}
pub fn broadcaster(&self) -> &broadcast::Sender<OutputEvent> {
&self.broadcast_sender
}
pub fn captured_events(&self) -> Vec<OutputEvent> {
self.captured_events
.lock()
.unwrap_or_else(|e| e.into_inner())
.iter()
.cloned()
.collect()
}
fn record_event(captured_events: &Arc<Mutex<VecDeque<OutputEvent>>>, event: &OutputEvent) {
let mut events = captured_events.lock().unwrap_or_else(|e| e.into_inner());
if events.len() >= MAX_CAPTURED_EVENTS {
events.pop_front();
}
events.push_back(event.redacted());
}
fn capture_stdout(&self, mut stdout: BufReader<ChildStdout>) {
let process_id = self.process_id;
let mention_regex = self.mention_regex.clone();
let event_sender = self.event_sender.clone();
let broadcast_sender = self.broadcast_sender.clone();
let captured_events = self.captured_events.clone();
tokio::spawn(async move {
let mut line = String::new();
loop {
line.clear();
match stdout.read_line(&mut line).await {
Ok(0) => break, Ok(_) => {
let line = line.trim().to_string();
if line.is_empty() {
continue;
}
if let Some(captures) = mention_regex.captures(&line) {
if let Some(target) = captures.get(1) {
let target = target.as_str().to_string();
let message = line.clone();
let mention_event = OutputEvent::Mention {
process_id,
target,
message,
};
Self::record_event(&captured_events, &mention_event);
let _ = event_sender.send(mention_event.clone()).await;
let _ = broadcast_sender.send(mention_event);
}
}
let stdout_event = OutputEvent::Stdout {
process_id,
line: line.clone(),
};
Self::record_event(&captured_events, &stdout_event);
let _ = event_sender.send(stdout_event.clone()).await;
let _ = broadcast_sender.send(stdout_event);
}
Err(e) => {
tracing::error!(process_id = %process_id, error = %e, "Error reading stdout");
break;
}
}
}
});
}
fn capture_stderr(&self, mut stderr: BufReader<ChildStderr>) {
let process_id = self.process_id;
let event_sender = self.event_sender.clone();
let broadcast_sender = self.broadcast_sender.clone();
let captured_events = self.captured_events.clone();
tokio::spawn(async move {
let mut line = String::new();
loop {
line.clear();
match stderr.read_line(&mut line).await {
Ok(0) => break, Ok(_) => {
let line = line.trim().to_string();
if line.is_empty() {
continue;
}
let stderr_event = OutputEvent::Stderr {
process_id,
line: line.clone(),
};
Self::record_event(&captured_events, &stderr_event);
let _ = event_sender.send(stderr_event.clone()).await;
let _ = broadcast_sender.send(stderr_event);
}
Err(e) => {
tracing::error!(process_id = %process_id, error = %e, "Error reading stderr");
break;
}
}
}
});
}
pub fn event_sender(&self) -> mpsc::Sender<OutputEvent> {
self.event_sender.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mention_regex() {
let regex = Regex::new(r"@(\w+)").unwrap();
let text = "Hello @kimiko, can you help?";
let captures = regex.captures(text).unwrap();
assert_eq!(captures.get(1).unwrap().as_str(), "kimiko");
let text = "No mentions here";
assert!(regex.captures(text).is_none());
}
#[test]
fn test_output_event_redacted_scrubs_secrets() {
let event = OutputEvent::Stdout {
process_id: ProcessId::new(),
line: "api_key=secret123".to_string(),
};
let redacted = event.redacted();
match redacted {
OutputEvent::Stdout { line, .. } => {
assert!(line.contains("***REDACTED***"));
assert!(!line.contains("secret123"));
}
_ => panic!("Expected Stdout event"),
}
}
#[test]
fn test_output_event_redacted_preserves_structure() {
let event = OutputEvent::Stderr {
process_id: ProcessId::new(),
line: "Error: timeout after 30s".to_string(),
};
let redacted = event.redacted();
match redacted {
OutputEvent::Stderr { line, .. } => {
assert_eq!(line, "Error: timeout after 30s");
}
_ => panic!("Expected Stderr event"),
}
}
#[test]
fn test_captured_events_bounded() {
let (_event_sender, _event_receiver) = mpsc::channel::<OutputEvent>(100);
let (_broadcast_sender, _) = broadcast::channel::<OutputEvent>(256);
let captured = Arc::new(Mutex::new(VecDeque::new()));
for i in 0..MAX_CAPTURED_EVENTS + 10 {
let event = OutputEvent::Stdout {
process_id: ProcessId::new(),
line: format!("line {}", i),
};
OutputCapture::record_event(&captured, &event);
}
let events = captured.lock().unwrap();
assert_eq!(events.len(), MAX_CAPTURED_EVENTS);
assert!(!events
.iter()
.any(|e| matches!(e, OutputEvent::Stdout { line, .. } if line == "line 0")));
assert!(events
.iter()
.any(|e| matches!(e, OutputEvent::Stdout { line, .. } if line == "line 10")));
}
#[test]
fn test_captured_events_redacts_before_storage() {
let (_event_sender, _event_receiver) = mpsc::channel::<OutputEvent>(100);
let (_broadcast_sender, _) = broadcast::channel::<OutputEvent>(256);
let captured = Arc::new(Mutex::new(VecDeque::new()));
let event = OutputEvent::Stdout {
process_id: ProcessId::new(),
line: "api_key=secret123".to_string(),
};
OutputCapture::record_event(&captured, &event);
let events = captured.lock().unwrap();
assert_eq!(events.len(), 1);
match &events[0] {
OutputEvent::Stdout { line, .. } => {
assert!(line.contains("***REDACTED***"));
assert!(!line.contains("secret123"));
}
_ => panic!("Expected Stdout event"),
}
}
}