use super::event::EngineEvent;
pub trait EngineSink: Send + Sync {
fn emit(&self, event: EngineEvent);
}
pub struct NullSink;
impl EngineSink for NullSink {
fn emit(&self, _event: EngineEvent) {}
}
pub struct BufferingSink {
lines: std::sync::Mutex<Vec<String>>,
cap: usize,
}
impl BufferingSink {
pub fn new() -> Self {
Self::with_cap(256)
}
pub fn with_cap(cap: usize) -> Self {
Self {
lines: std::sync::Mutex::new(Vec::new()),
cap,
}
}
pub fn take_lines(&self) -> Vec<String> {
std::mem::take(&mut *self.lines.lock().unwrap())
}
fn push_capped(&self, line: String) {
let mut guard = self.lines.lock().unwrap();
if guard.len() < self.cap {
guard.push(line);
} else if guard.last().map(|l| !l.starts_with('…')).unwrap_or(true) {
guard.push(format!("… (trace truncated at {} lines)", self.cap));
}
}
}
impl Default for BufferingSink {
fn default() -> Self {
Self::new()
}
}
impl EngineSink for BufferingSink {
fn emit(&self, event: EngineEvent) {
match event {
EngineEvent::ToolCallStart { name, .. } => {
self.push_capped(format!(" \u{1f527} {name}"));
}
EngineEvent::Info { message } => {
self.push_capped(message);
}
EngineEvent::ApprovalRequest { tool_name, .. } => {
self.push_capped(format!(
" \u{2398} approval auto-rejected for {tool_name} (no user channel)"
));
}
EngineEvent::AskUserRequest { question, .. } => {
self.push_capped(format!(
" \u{2398} ask-user auto-skipped: {}",
question.chars().take(80).collect::<String>()
));
}
_ => {}
}
}
}
#[cfg(any(test, feature = "test-support"))]
#[derive(Debug, Default)]
pub struct TestSink {
events: std::sync::Mutex<Vec<EngineEvent>>,
}
#[cfg(any(test, feature = "test-support"))]
impl TestSink {
pub fn new() -> Self {
Self::default()
}
pub fn events(&self) -> Vec<EngineEvent> {
self.events.lock().unwrap().clone()
}
pub fn len(&self) -> usize {
self.events.lock().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.events.lock().unwrap().is_empty()
}
}
#[cfg(any(test, feature = "test-support"))]
impl EngineSink for TestSink {
fn emit(&self, event: EngineEvent) {
self.events.lock().unwrap().push(event);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sink_collects_events() {
let sink = TestSink::new();
assert!(sink.is_empty());
sink.emit(EngineEvent::ResponseStart);
sink.emit(EngineEvent::TextDelta {
text: "hello".into(),
});
sink.emit(EngineEvent::TextDone);
assert_eq!(sink.len(), 3);
let events = sink.events();
assert!(matches!(events[0], EngineEvent::ResponseStart));
assert!(matches!(&events[1], EngineEvent::TextDelta { text } if text == "hello"));
assert!(matches!(events[2], EngineEvent::TextDone));
}
#[test]
fn test_sink_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<TestSink>();
}
#[test]
fn test_trait_object_works() {
let sink: Box<dyn EngineSink> = Box::new(TestSink::new());
sink.emit(EngineEvent::Info {
message: "test".into(),
});
}
#[test]
fn buffering_sink_records_tool_calls_and_info() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::ToolCallStart {
id: "t1".into(),
name: "Read".into(),
args: serde_json::json!({"path": "foo.txt"}),
is_sub_agent: false,
});
sink.emit(EngineEvent::Info {
message: " \u{26a1} cache hit".into(),
});
sink.emit(EngineEvent::ToolCallStart {
id: "t2".into(),
name: "Bash".into(),
args: serde_json::json!({"command": "ls"}),
is_sub_agent: false,
});
let lines = sink.take_lines();
assert_eq!(lines.len(), 3);
assert!(lines[0].contains("Read"), "got: {}", lines[0]);
assert!(lines[1].contains("cache hit"), "got: {}", lines[1]);
assert!(lines[2].contains("Bash"), "got: {}", lines[2]);
}
#[test]
fn buffering_sink_drops_streaming_text() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::TextDelta {
text: "hello".into(),
});
sink.emit(EngineEvent::TextDelta {
text: " world".into(),
});
sink.emit(EngineEvent::TextDone);
sink.emit(EngineEvent::ThinkingDelta {
text: "reasoning".into(),
});
assert!(sink.take_lines().is_empty());
}
#[test]
fn buffering_sink_records_auto_reject_for_approval() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::ApprovalRequest {
id: "a1".into(),
tool_name: "Delete".into(),
detail: "foo.txt".into(),
preview: None,
effect: crate::tools::ToolEffect::Destructive,
});
let lines = sink.take_lines();
assert_eq!(lines.len(), 1);
assert!(lines[0].contains("Delete"));
assert!(
lines[0].contains("auto-rejected"),
"approval-without-channel must be marked as auto-rejected; got: {}",
lines[0]
);
}
#[test]
fn buffering_sink_caps_runaway_traces() {
let sink = BufferingSink::with_cap(3);
for i in 0..10 {
sink.emit(EngineEvent::Info {
message: format!("line {i}"),
});
}
let lines = sink.take_lines();
assert_eq!(lines.len(), 4, "got: {lines:?}");
assert!(lines.last().unwrap().starts_with('\u{2026}'));
assert!(lines.last().unwrap().contains("truncated"));
}
#[test]
fn buffering_sink_take_drains() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::Info {
message: "a".into(),
});
assert_eq!(sink.take_lines().len(), 1);
assert!(sink.take_lines().is_empty());
}
#[test]
fn buffering_sink_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<BufferingSink>();
}
}