Skip to main content

koda_core/engine/
sink.rs

1//! Engine output sink trait.
2//!
3//! The `EngineSink` trait abstracts how the engine delivers events to clients.
4//! Implementations decide how to render or transport events:
5//! - `CliSink` (in koda-cli): renders to terminal
6//! - Future `AcpSink`: serializes over WebSocket
7//! - `TestSink`: collects events for assertions
8
9use super::event::EngineEvent;
10
11/// Trait for consuming engine events.
12///
13/// Implementors decide how to render or transport events:
14/// - `CliSink`: renders to terminal via `display::` and `markdown::`
15/// - Future `AcpSink`: serializes over WebSocket
16/// - `TestSink`: collects events for assertions
17pub trait EngineSink: Send + Sync {
18    /// Emit an engine event to the client.
19    fn emit(&self, event: EngineEvent);
20}
21
22/// A no-op sink that discards all events.
23///
24/// Used by background sub-agents that don't have a live channel to
25/// the user. **#1022 B9**: superseded for bg-agent use by
26/// [`BufferingSink`], which captures a narrative trace so the user
27/// can see what the bg agent did at result-injection time. `NullSink`
28/// is still useful for tests and for any future fully-detached
29/// execution path.
30pub struct NullSink;
31
32impl EngineSink for NullSink {
33    fn emit(&self, _event: EngineEvent) {}
34}
35
36/// A sink that buffers a *narrative trace* of bg-agent activity.
37///
38/// **#1022 B9**: pre-fix, bg agents ran with [`NullSink`] so every
39/// event inside them — tool calls, info lines, approval requests,
40/// errors — was silently dropped. The user only saw two lines: the
41/// spawn message and the completion message. The model only saw the
42/// final output. *What the bg agent actually did* was opaque.
43///
44/// `BufferingSink` records short, human-readable lines for events
45/// that matter for traceability:
46/// - `ToolCallStart` → `"  🔧 ToolName"`
47/// - `Info` → forwarded as-is (sub-agent emits info for things like
48///   nested spawn / cache hit)
49/// - `ApprovalRequest` / `AskUserRequest` → short auto-reject note
50///   (they auto-reject on closed channel — see B10)
51/// - Streaming text (`TextDelta`/`TextDone`) is *not* recorded — the
52///   final output already crosses the result oneshot, so capturing
53///   text here would duplicate it.
54///
55/// Drained at result-injection time and emitted as a multi-line
56/// `Info` event so the user sees `✅ bg agent X completed\n  🔧 Read\n
57/// 🔧 Bash\n  …` instead of just `✅ bg agent X completed`.
58///
59/// Cap is intentionally generous (256 lines): a runaway bg agent
60/// could otherwise grow this unboundedly. After the cap we record a
61/// single `… (trace truncated at N lines)` marker and stop.
62pub struct BufferingSink {
63    lines: std::sync::Mutex<Vec<String>>,
64    cap: usize,
65}
66
67impl BufferingSink {
68    /// Create a buffering sink with the default 256-line cap.
69    pub fn new() -> Self {
70        Self::with_cap(256)
71    }
72
73    /// Create a buffering sink with a custom cap (mainly for tests).
74    pub fn with_cap(cap: usize) -> Self {
75        Self {
76            lines: std::sync::Mutex::new(Vec::new()),
77            cap,
78        }
79    }
80
81    /// Drain and return all buffered lines. The sink is empty after
82    /// this returns.
83    pub fn take_lines(&self) -> Vec<String> {
84        std::mem::take(&mut *self.lines.lock().unwrap())
85    }
86
87    /// Append a line, honoring the cap. Idempotent on the truncation
88    /// marker so a single overflow only produces one marker.
89    fn push_capped(&self, line: String) {
90        let mut guard = self.lines.lock().unwrap();
91        if guard.len() < self.cap {
92            guard.push(line);
93        } else if guard.last().map(|l| !l.starts_with('…')).unwrap_or(true) {
94            // Cap reached — emit one truncation marker and stop.
95            guard.push(format!("… (trace truncated at {} lines)", self.cap));
96        }
97    }
98}
99
100impl Default for BufferingSink {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106impl EngineSink for BufferingSink {
107    fn emit(&self, event: EngineEvent) {
108        match event {
109            EngineEvent::ToolCallStart { name, .. } => {
110                self.push_capped(format!("  \u{1f527} {name}"));
111            }
112            EngineEvent::Info { message } => {
113                // Sub-agent already prefixes its own info lines with
114                // two spaces and an emoji — forward as-is so the
115                // visual hierarchy survives.
116                self.push_capped(message);
117            }
118            EngineEvent::ApprovalRequest { tool_name, .. } => {
119                // B10: bg agents have no user channel — these always
120                // auto-reject. Record so the model's apparent
121                // "failure to do X" is debuggable.
122                self.push_capped(format!(
123                    "  \u{2398} approval auto-rejected for {tool_name} (no user channel)"
124                ));
125            }
126            EngineEvent::AskUserRequest { question, .. } => {
127                self.push_capped(format!(
128                    "  \u{2398} ask-user auto-skipped: {}",
129                    question.chars().take(80).collect::<String>()
130                ));
131            }
132            // Everything else (streaming text, thinking, status, etc.)
133            // is intentionally dropped — either redundant with the
134            // result oneshot or noisy without context.
135            _ => {}
136        }
137    }
138}
139
140/// A sink that collects events into a Vec for testing.
141#[cfg(any(test, feature = "test-support"))]
142#[derive(Debug, Default)]
143pub struct TestSink {
144    events: std::sync::Mutex<Vec<EngineEvent>>,
145}
146
147#[cfg(any(test, feature = "test-support"))]
148impl TestSink {
149    /// Create an empty test sink.
150    pub fn new() -> Self {
151        Self::default()
152    }
153
154    /// Get all collected events.
155    pub fn events(&self) -> Vec<EngineEvent> {
156        self.events.lock().unwrap().clone()
157    }
158
159    /// Get the count of collected events.
160    pub fn len(&self) -> usize {
161        self.events.lock().unwrap().len()
162    }
163
164    /// Check if no events were collected.
165    pub fn is_empty(&self) -> bool {
166        self.events.lock().unwrap().is_empty()
167    }
168}
169
170#[cfg(any(test, feature = "test-support"))]
171impl EngineSink for TestSink {
172    fn emit(&self, event: EngineEvent) {
173        self.events.lock().unwrap().push(event);
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180
181    #[test]
182    fn test_sink_collects_events() {
183        let sink = TestSink::new();
184        assert!(sink.is_empty());
185
186        sink.emit(EngineEvent::ResponseStart);
187        sink.emit(EngineEvent::TextDelta {
188            text: "hello".into(),
189        });
190        sink.emit(EngineEvent::TextDone);
191
192        assert_eq!(sink.len(), 3);
193        let events = sink.events();
194        assert!(matches!(events[0], EngineEvent::ResponseStart));
195        assert!(matches!(&events[1], EngineEvent::TextDelta { text } if text == "hello"));
196        assert!(matches!(events[2], EngineEvent::TextDone));
197    }
198
199    #[test]
200    fn test_sink_is_send_sync() {
201        fn assert_send_sync<T: Send + Sync>() {}
202        assert_send_sync::<TestSink>();
203    }
204
205    #[test]
206    fn test_trait_object_works() {
207        let sink: Box<dyn EngineSink> = Box::new(TestSink::new());
208        sink.emit(EngineEvent::Info {
209            message: "test".into(),
210        });
211    }
212
213    // ── BufferingSink (#1022 B9) ─────────────────────────────────
214
215    #[test]
216    fn buffering_sink_records_tool_calls_and_info() {
217        let sink = BufferingSink::new();
218        sink.emit(EngineEvent::ToolCallStart {
219            id: "t1".into(),
220            name: "Read".into(),
221            args: serde_json::json!({"path": "foo.txt"}),
222            is_sub_agent: false,
223        });
224        sink.emit(EngineEvent::Info {
225            message: "  \u{26a1} cache hit".into(),
226        });
227        sink.emit(EngineEvent::ToolCallStart {
228            id: "t2".into(),
229            name: "Bash".into(),
230            args: serde_json::json!({"command": "ls"}),
231            is_sub_agent: false,
232        });
233
234        let lines = sink.take_lines();
235        assert_eq!(lines.len(), 3);
236        assert!(lines[0].contains("Read"), "got: {}", lines[0]);
237        assert!(lines[1].contains("cache hit"), "got: {}", lines[1]);
238        assert!(lines[2].contains("Bash"), "got: {}", lines[2]);
239    }
240
241    #[test]
242    fn buffering_sink_drops_streaming_text() {
243        let sink = BufferingSink::new();
244        sink.emit(EngineEvent::TextDelta {
245            text: "hello".into(),
246        });
247        sink.emit(EngineEvent::TextDelta {
248            text: " world".into(),
249        });
250        sink.emit(EngineEvent::TextDone);
251        sink.emit(EngineEvent::ThinkingDelta {
252            text: "reasoning".into(),
253        });
254        // Streaming text crosses the result oneshot already — capturing
255        // it here would duplicate the model's final output in the
256        // user-facing trace.
257        assert!(sink.take_lines().is_empty());
258    }
259
260    #[test]
261    fn buffering_sink_records_auto_reject_for_approval() {
262        let sink = BufferingSink::new();
263        sink.emit(EngineEvent::ApprovalRequest {
264            id: "a1".into(),
265            tool_name: "Delete".into(),
266            detail: "foo.txt".into(),
267            preview: None,
268            effect: crate::tools::ToolEffect::Destructive,
269        });
270        let lines = sink.take_lines();
271        assert_eq!(lines.len(), 1);
272        assert!(lines[0].contains("Delete"));
273        assert!(
274            lines[0].contains("auto-rejected"),
275            "approval-without-channel must be marked as auto-rejected; got: {}",
276            lines[0]
277        );
278    }
279
280    #[test]
281    fn buffering_sink_caps_runaway_traces() {
282        let sink = BufferingSink::with_cap(3);
283        for i in 0..10 {
284            sink.emit(EngineEvent::Info {
285                message: format!("line {i}"),
286            });
287        }
288        let lines = sink.take_lines();
289        // 3 real lines + 1 truncation marker. Marker is idempotent
290        // even though we tried to push 7 more lines.
291        assert_eq!(lines.len(), 4, "got: {lines:?}");
292        assert!(lines.last().unwrap().starts_with('\u{2026}'));
293        assert!(lines.last().unwrap().contains("truncated"));
294    }
295
296    #[test]
297    fn buffering_sink_take_drains() {
298        let sink = BufferingSink::new();
299        sink.emit(EngineEvent::Info {
300            message: "a".into(),
301        });
302        assert_eq!(sink.take_lines().len(), 1);
303        // Second take returns empty — not a snapshot, a drain.
304        assert!(sink.take_lines().is_empty());
305    }
306
307    #[test]
308    fn buffering_sink_is_send_sync() {
309        fn assert_send_sync<T: Send + Sync>() {}
310        assert_send_sync::<BufferingSink>();
311    }
312}