1use super::event::EngineEvent;
10
11pub trait EngineSink: Send + Sync {
18 fn emit(&self, event: EngineEvent);
20}
21
22pub struct NullSink;
31
32impl EngineSink for NullSink {
33 fn emit(&self, _event: EngineEvent) {}
34}
35
36pub struct BufferingSink {
63 lines: std::sync::Mutex<Vec<String>>,
64 cap: usize,
65}
66
67impl BufferingSink {
68 pub fn new() -> Self {
70 Self::with_cap(256)
71 }
72
73 pub fn with_cap(cap: usize) -> Self {
75 Self {
76 lines: std::sync::Mutex::new(Vec::new()),
77 cap,
78 }
79 }
80
81 pub fn take_lines(&self) -> Vec<String> {
84 std::mem::take(&mut *self.lines.lock().unwrap())
85 }
86
87 fn push_capped(&self, line: String) {
90 let mut guard = self.lines.lock().unwrap();
91 if guard.len() < self.cap {
92 guard.push(line);
93 } else if guard.last().map(|l| !l.starts_with('…')).unwrap_or(true) {
94 guard.push(format!("… (trace truncated at {} lines)", self.cap));
96 }
97 }
98}
99
100impl Default for BufferingSink {
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106impl EngineSink for BufferingSink {
107 fn emit(&self, event: EngineEvent) {
108 match event {
109 EngineEvent::ToolCallStart { name, .. } => {
110 self.push_capped(format!(" \u{1f527} {name}"));
111 }
112 EngineEvent::Info { message } => {
113 self.push_capped(message);
117 }
118 EngineEvent::ApprovalRequest { tool_name, .. } => {
119 self.push_capped(format!(
123 " \u{2398} approval auto-rejected for {tool_name} (no user channel)"
124 ));
125 }
126 EngineEvent::AskUserRequest { question, .. } => {
127 self.push_capped(format!(
128 " \u{2398} ask-user auto-skipped: {}",
129 question.chars().take(80).collect::<String>()
130 ));
131 }
132 _ => {}
136 }
137 }
138}
139
140#[cfg(any(test, feature = "test-support"))]
142#[derive(Debug, Default)]
143pub struct TestSink {
144 events: std::sync::Mutex<Vec<EngineEvent>>,
145}
146
147#[cfg(any(test, feature = "test-support"))]
148impl TestSink {
149 pub fn new() -> Self {
151 Self::default()
152 }
153
154 pub fn events(&self) -> Vec<EngineEvent> {
156 self.events.lock().unwrap().clone()
157 }
158
159 pub fn len(&self) -> usize {
161 self.events.lock().unwrap().len()
162 }
163
164 pub fn is_empty(&self) -> bool {
166 self.events.lock().unwrap().is_empty()
167 }
168}
169
170#[cfg(any(test, feature = "test-support"))]
171impl EngineSink for TestSink {
172 fn emit(&self, event: EngineEvent) {
173 self.events.lock().unwrap().push(event);
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180
181 #[test]
182 fn test_sink_collects_events() {
183 let sink = TestSink::new();
184 assert!(sink.is_empty());
185
186 sink.emit(EngineEvent::ResponseStart);
187 sink.emit(EngineEvent::TextDelta {
188 text: "hello".into(),
189 });
190 sink.emit(EngineEvent::TextDone);
191
192 assert_eq!(sink.len(), 3);
193 let events = sink.events();
194 assert!(matches!(events[0], EngineEvent::ResponseStart));
195 assert!(matches!(&events[1], EngineEvent::TextDelta { text } if text == "hello"));
196 assert!(matches!(events[2], EngineEvent::TextDone));
197 }
198
199 #[test]
200 fn test_sink_is_send_sync() {
201 fn assert_send_sync<T: Send + Sync>() {}
202 assert_send_sync::<TestSink>();
203 }
204
205 #[test]
206 fn test_trait_object_works() {
207 let sink: Box<dyn EngineSink> = Box::new(TestSink::new());
208 sink.emit(EngineEvent::Info {
209 message: "test".into(),
210 });
211 }
212
213 #[test]
216 fn buffering_sink_records_tool_calls_and_info() {
217 let sink = BufferingSink::new();
218 sink.emit(EngineEvent::ToolCallStart {
219 id: "t1".into(),
220 name: "Read".into(),
221 args: serde_json::json!({"path": "foo.txt"}),
222 is_sub_agent: false,
223 });
224 sink.emit(EngineEvent::Info {
225 message: " \u{26a1} cache hit".into(),
226 });
227 sink.emit(EngineEvent::ToolCallStart {
228 id: "t2".into(),
229 name: "Bash".into(),
230 args: serde_json::json!({"command": "ls"}),
231 is_sub_agent: false,
232 });
233
234 let lines = sink.take_lines();
235 assert_eq!(lines.len(), 3);
236 assert!(lines[0].contains("Read"), "got: {}", lines[0]);
237 assert!(lines[1].contains("cache hit"), "got: {}", lines[1]);
238 assert!(lines[2].contains("Bash"), "got: {}", lines[2]);
239 }
240
241 #[test]
242 fn buffering_sink_drops_streaming_text() {
243 let sink = BufferingSink::new();
244 sink.emit(EngineEvent::TextDelta {
245 text: "hello".into(),
246 });
247 sink.emit(EngineEvent::TextDelta {
248 text: " world".into(),
249 });
250 sink.emit(EngineEvent::TextDone);
251 sink.emit(EngineEvent::ThinkingDelta {
252 text: "reasoning".into(),
253 });
254 assert!(sink.take_lines().is_empty());
258 }
259
260 #[test]
261 fn buffering_sink_records_auto_reject_for_approval() {
262 let sink = BufferingSink::new();
263 sink.emit(EngineEvent::ApprovalRequest {
264 id: "a1".into(),
265 tool_name: "Delete".into(),
266 detail: "foo.txt".into(),
267 preview: None,
268 effect: crate::tools::ToolEffect::Destructive,
269 });
270 let lines = sink.take_lines();
271 assert_eq!(lines.len(), 1);
272 assert!(lines[0].contains("Delete"));
273 assert!(
274 lines[0].contains("auto-rejected"),
275 "approval-without-channel must be marked as auto-rejected; got: {}",
276 lines[0]
277 );
278 }
279
280 #[test]
281 fn buffering_sink_caps_runaway_traces() {
282 let sink = BufferingSink::with_cap(3);
283 for i in 0..10 {
284 sink.emit(EngineEvent::Info {
285 message: format!("line {i}"),
286 });
287 }
288 let lines = sink.take_lines();
289 assert_eq!(lines.len(), 4, "got: {lines:?}");
292 assert!(lines.last().unwrap().starts_with('\u{2026}'));
293 assert!(lines.last().unwrap().contains("truncated"));
294 }
295
296 #[test]
297 fn buffering_sink_take_drains() {
298 let sink = BufferingSink::new();
299 sink.emit(EngineEvent::Info {
300 message: "a".into(),
301 });
302 assert_eq!(sink.take_lines().len(), 1);
303 assert!(sink.take_lines().is_empty());
305 }
306
307 #[test]
308 fn buffering_sink_is_send_sync() {
309 fn assert_send_sync<T: Send + Sync>() {}
310 assert_send_sync::<BufferingSink>();
311 }
312}