agent_core_runtime/agent/interface/sink.rs
1//! Event Sink - Receives events from the engine
2//!
3//! The [`EventSink`] trait defines how the engine delivers events to consumers.
4
5use std::future::Future;
6use std::pin::Pin;
7
8use tokio::sync::mpsc;
9
10use crate::agent::UiMessage;
11
12/// Error when sending an event fails.
13///
14/// Contains the original message for retry or logging.
15#[derive(Debug)]
16pub struct SendError(pub UiMessage);
17
18impl std::fmt::Display for SendError {
19 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 write!(f, "failed to send event")
21 }
22}
23
24impl std::error::Error for SendError {}
25
26/// Receives events from the agent engine and delivers them to a consumer.
27///
28/// Implementations handle the transport-specific details of delivering
29/// events to the user interface (TUI, WebSocket, stdout, etc.).
30///
31/// # Backpressure
32///
33/// The [`send_async`](EventSink::send_async) method supports backpressure by
34/// awaiting until the consumer can accept the event. Implementations should
35/// use this when the consumer has bounded capacity (e.g., channel-based).
36///
37/// # Thread Safety
38///
39/// EventSink must be `Send + Sync` to allow sharing across async tasks.
40/// The engine may call `send` from multiple tasks concurrently.
41///
42/// # Example
43///
44/// ```ignore
45/// use agent_core_runtime::agent::interface::{EventSink, SendError};
46/// use agent_core_runtime::agent::UiMessage;
47///
48/// struct MyCustomSink { /* ... */ }
49///
50/// impl EventSink for MyCustomSink {
51/// fn send(&self, event: UiMessage) -> Result<(), SendError> {
52/// // Deliver event to your transport
53/// Ok(())
54/// }
55///
56/// fn clone_box(&self) -> Box<dyn EventSink> {
57/// Box::new(MyCustomSink { /* ... */ })
58/// }
59/// }
60/// ```
61pub trait EventSink: Send + Sync + 'static {
62 /// Send an event to the consumer (non-blocking).
63 ///
64 /// Returns immediately. If the consumer cannot accept the event
65 /// (e.g., buffer full), returns `Err(SendError)`.
66 ///
67 /// Use this for fire-and-forget scenarios or when you have your
68 /// own backpressure mechanism.
69 fn send(&self, event: UiMessage) -> Result<(), SendError>;
70
71 /// Send an event to the consumer (async, with backpressure).
72 ///
73 /// Waits until the consumer can accept the event. This is the
74 /// preferred method when backpressure is needed to avoid overwhelming
75 /// slow consumers.
76 ///
77 /// Default implementation calls `send()` and returns immediately.
78 fn send_async(&self, event: UiMessage) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
79 Box::pin(async move { self.send(event) })
80 }
81
82 /// Clone this sink into a boxed trait object.
83 ///
84 /// Required because we need to clone sinks for internal routing
85 /// but `Clone` is not object-safe.
86 fn clone_box(&self) -> Box<dyn EventSink>;
87}
88
89// Allow Box<dyn EventSink> to be used as EventSink
90impl EventSink for Box<dyn EventSink> {
91 fn send(&self, event: UiMessage) -> Result<(), SendError> {
92 (**self).send(event)
93 }
94
95 fn send_async(&self, event: UiMessage) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
96 (**self).send_async(event)
97 }
98
99 fn clone_box(&self) -> Box<dyn EventSink> {
100 (**self).clone_box()
101 }
102}
103
104/// Event sink backed by an async channel.
105///
106/// This is the default sink used internally. It connects the engine
107/// to a channel that the consumer reads from.
108///
109/// # Backpressure
110///
111/// When the channel is full, `send()` returns an error immediately,
112/// while `send_async()` waits until space is available.
113#[derive(Clone)]
114pub struct ChannelEventSink {
115 tx: mpsc::Sender<UiMessage>,
116}
117
118impl ChannelEventSink {
119 /// Create a new channel-backed event sink.
120 pub fn new(tx: mpsc::Sender<UiMessage>) -> Self {
121 Self { tx }
122 }
123}
124
125impl EventSink for ChannelEventSink {
126 fn send(&self, event: UiMessage) -> Result<(), SendError> {
127 self.tx.try_send(event).map_err(|e| SendError(e.into_inner()))
128 }
129
130 fn send_async(&self, event: UiMessage) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
131 let tx = self.tx.clone();
132 Box::pin(async move {
133 tx.send(event).await.map_err(|e| SendError(e.0))
134 })
135 }
136
137 fn clone_box(&self) -> Box<dyn EventSink> {
138 Box::new(self.clone())
139 }
140}
141
142/// Simple event sink that prints to stdout.
143///
144/// A minimal sink for CLI tools, debugging, or batch processing. Prints
145/// LLM text output directly to stdout with basic formatting.
146///
147/// # Limitations
148///
149/// This sink is **non-interactive** and cannot handle:
150/// - **Permission requests**: Use with `AutoApprovePolicy` to auto-approve
151/// - **User interactions**: Use with `AutoApprovePolicy` to auto-cancel
152///
153/// If you need interactive permission prompts or user questions, use
154/// `ChannelEventSink` with a proper frontend (TUI, WebSocket, etc.).
155///
156/// # Output Format
157///
158/// | Event | Output |
159/// |-------|--------|
160/// | `TextChunk` | Prints text directly (no newline) |
161/// | `Complete` | Prints newline |
162/// | `Error` | Prints to stderr with "Error: " prefix |
163/// | `ToolExecuting` | Prints "[Tool: name]" |
164/// | `ToolCompleted` | Prints errors only |
165/// | `PermissionRequired` | Warning - use `AutoApprovePolicy` |
166/// | `UserInteractionRequired` | Warning - use `AutoApprovePolicy` |
167/// | Other events | Silently ignored |
168///
169/// # Example
170///
171/// ```ignore
172/// use agent_core_runtime::agent::{
173/// AgentCore, SimpleEventSink, ChannelInputSource, AutoApprovePolicy
174/// };
175///
176/// // Simple CLI agent - must use AutoApprovePolicy
177/// agent.run_with_frontend(
178/// SimpleEventSink::new(),
179/// input_source,
180/// AutoApprovePolicy::new(), // Required for non-interactive sink
181/// )?;
182/// ```
183#[derive(Clone, Default)]
184pub struct SimpleEventSink;
185
186impl SimpleEventSink {
187 /// Create a new simple event sink.
188 pub fn new() -> Self {
189 Self
190 }
191}
192
193impl EventSink for SimpleEventSink {
194 fn send(&self, event: UiMessage) -> Result<(), SendError> {
195 use std::io::Write;
196
197 match &event {
198 UiMessage::TextChunk { text, .. } => {
199 print!("{}", text);
200 std::io::stdout().flush().ok();
201 }
202 UiMessage::Error { error, .. } => {
203 eprintln!("Error: {}", error);
204 }
205 UiMessage::Complete { .. } => {
206 println!();
207 }
208 UiMessage::ToolExecuting { display_name, .. } => {
209 println!("[Tool: {}]", display_name);
210 }
211 UiMessage::ToolCompleted { error, .. } => {
212 if let Some(err) = error {
213 eprintln!("[Tool error: {}]", err);
214 }
215 }
216 UiMessage::PermissionRequired { .. } => {
217 eprintln!("Warning: SimpleEventSink received permission request. Use AutoApprovePolicy to handle permissions automatically.");
218 }
219 UiMessage::BatchPermissionRequired { .. } => {
220 eprintln!("Warning: SimpleEventSink received batch permission request. Use AutoApprovePolicy to handle permissions automatically.");
221 }
222 UiMessage::UserInteractionRequired { .. } => {
223 eprintln!("Warning: SimpleEventSink received user interaction request. Use AutoApprovePolicy to auto-cancel interactions.");
224 }
225 _ => {
226 // Silently ignore other events
227 }
228 }
229 Ok(())
230 }
231
232 fn clone_box(&self) -> Box<dyn EventSink> {
233 Box::new(self.clone())
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use tokio::sync::mpsc;
241
242 #[tokio::test]
243 async fn test_channel_event_sink_send() {
244 let (tx, mut rx) = mpsc::channel(10);
245 let sink = ChannelEventSink::new(tx);
246
247 let event = UiMessage::System {
248 session_id: 1,
249 message: "test".to_string(),
250 };
251
252 sink.send(event).unwrap();
253
254 let received = rx.recv().await.unwrap();
255 match received {
256 UiMessage::System { session_id, message } => {
257 assert_eq!(session_id, 1);
258 assert_eq!(message, "test");
259 }
260 _ => panic!("unexpected message type"),
261 }
262 }
263
264 #[tokio::test]
265 async fn test_channel_event_sink_send_async() {
266 let (tx, mut rx) = mpsc::channel(10);
267 let sink = ChannelEventSink::new(tx);
268
269 let event = UiMessage::System {
270 session_id: 2,
271 message: "async test".to_string(),
272 };
273
274 sink.send_async(event).await.unwrap();
275
276 let received = rx.recv().await.unwrap();
277 match received {
278 UiMessage::System { session_id, message } => {
279 assert_eq!(session_id, 2);
280 assert_eq!(message, "async test");
281 }
282 _ => panic!("unexpected message type"),
283 }
284 }
285
286 #[test]
287 fn test_channel_event_sink_full_channel() {
288 let (tx, _rx) = mpsc::channel(1);
289 let sink = ChannelEventSink::new(tx);
290
291 // Fill the channel
292 let event1 = UiMessage::System {
293 session_id: 1,
294 message: "first".to_string(),
295 };
296 sink.send(event1).unwrap();
297
298 // Second send should fail (channel full)
299 let event2 = UiMessage::System {
300 session_id: 1,
301 message: "second".to_string(),
302 };
303 let result = sink.send(event2);
304 assert!(result.is_err());
305 }
306
307 #[test]
308 fn test_simple_event_sink_send() {
309 let sink = SimpleEventSink::new();
310
311 // These should all succeed (simple sink never fails)
312 let events = vec![
313 UiMessage::TextChunk {
314 session_id: 1,
315 turn_id: None,
316 text: "hello".to_string(),
317 input_tokens: 0,
318 output_tokens: 0,
319 },
320 UiMessage::Complete {
321 session_id: 1,
322 turn_id: None,
323 input_tokens: 10,
324 output_tokens: 20,
325 stop_reason: None,
326 },
327 UiMessage::Error {
328 session_id: 1,
329 turn_id: None,
330 error: "test error".to_string(),
331 },
332 ];
333
334 for event in events {
335 assert!(sink.send(event).is_ok());
336 }
337 }
338
339 #[test]
340 fn test_boxed_event_sink() {
341 let (tx, _rx) = mpsc::channel(10);
342 let sink: Box<dyn EventSink> = Box::new(ChannelEventSink::new(tx));
343
344 let event = UiMessage::System {
345 session_id: 1,
346 message: "boxed test".to_string(),
347 };
348
349 assert!(sink.send(event).is_ok());
350
351 // Test clone_box
352 let _cloned = sink.clone_box();
353 }
354}