Skip to main content

agent_core_runtime/agent/interface/
sink.rs

1//! Event Sink - Receives events from the engine
2//!
3//! The [`EventSink`] trait defines how the engine delivers events to consumers.
4
5use std::future::Future;
6use std::pin::Pin;
7
8use tokio::sync::mpsc;
9
10use crate::agent::UiMessage;
11
12/// Error when sending an event fails.
13///
14/// Contains the original message for retry or logging.
15#[derive(Debug)]
16pub struct SendError(pub UiMessage);
17
18impl std::fmt::Display for SendError {
19    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20        write!(f, "failed to send event")
21    }
22}
23
24impl std::error::Error for SendError {}
25
26/// Receives events from the agent engine and delivers them to a consumer.
27///
28/// Implementations handle the transport-specific details of delivering
29/// events to the user interface (TUI, WebSocket, stdout, etc.).
30///
31/// # Backpressure
32///
33/// The [`send_async`](EventSink::send_async) method supports backpressure by
34/// awaiting until the consumer can accept the event. Implementations should
35/// use this when the consumer has bounded capacity (e.g., channel-based).
36///
37/// # Thread Safety
38///
39/// EventSink must be `Send + Sync` to allow sharing across async tasks.
40/// The engine may call `send` from multiple tasks concurrently.
41///
42/// # Example
43///
44/// ```ignore
45/// use agent_core_runtime::agent::interface::{EventSink, SendError};
46/// use agent_core_runtime::agent::UiMessage;
47///
48/// struct MyCustomSink { /* ... */ }
49///
50/// impl EventSink for MyCustomSink {
51///     fn send(&self, event: UiMessage) -> Result<(), SendError> {
52///         // Deliver event to your transport
53///         Ok(())
54///     }
55///
56///     fn clone_box(&self) -> Box<dyn EventSink> {
57///         Box::new(MyCustomSink { /* ... */ })
58///     }
59/// }
60/// ```
61pub trait EventSink: Send + Sync + 'static {
62    /// Send an event to the consumer (non-blocking).
63    ///
64    /// Returns immediately. If the consumer cannot accept the event
65    /// (e.g., buffer full), returns `Err(SendError)`.
66    ///
67    /// Use this for fire-and-forget scenarios or when you have your
68    /// own backpressure mechanism.
69    fn send(&self, event: UiMessage) -> Result<(), SendError>;
70
71    /// Send an event to the consumer (async, with backpressure).
72    ///
73    /// Waits until the consumer can accept the event. This is the
74    /// preferred method when backpressure is needed to avoid overwhelming
75    /// slow consumers.
76    ///
77    /// Default implementation calls `send()` and returns immediately.
78    fn send_async(&self, event: UiMessage) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
79        Box::pin(async move { self.send(event) })
80    }
81
82    /// Clone this sink into a boxed trait object.
83    ///
84    /// Required because we need to clone sinks for internal routing
85    /// but `Clone` is not object-safe.
86    fn clone_box(&self) -> Box<dyn EventSink>;
87}
88
89// Allow Box<dyn EventSink> to be used as EventSink
90impl EventSink for Box<dyn EventSink> {
91    fn send(&self, event: UiMessage) -> Result<(), SendError> {
92        (**self).send(event)
93    }
94
95    fn send_async(&self, event: UiMessage) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
96        (**self).send_async(event)
97    }
98
99    fn clone_box(&self) -> Box<dyn EventSink> {
100        (**self).clone_box()
101    }
102}
103
104/// Event sink backed by an async channel.
105///
106/// This is the default sink used internally. It connects the engine
107/// to a channel that the consumer reads from.
108///
109/// # Backpressure
110///
111/// When the channel is full, `send()` returns an error immediately,
112/// while `send_async()` waits until space is available.
113#[derive(Clone)]
114pub struct ChannelEventSink {
115    tx: mpsc::Sender<UiMessage>,
116}
117
118impl ChannelEventSink {
119    /// Create a new channel-backed event sink.
120    pub fn new(tx: mpsc::Sender<UiMessage>) -> Self {
121        Self { tx }
122    }
123}
124
125impl EventSink for ChannelEventSink {
126    fn send(&self, event: UiMessage) -> Result<(), SendError> {
127        self.tx.try_send(event).map_err(|e| SendError(e.into_inner()))
128    }
129
130    fn send_async(&self, event: UiMessage) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
131        let tx = self.tx.clone();
132        Box::pin(async move {
133            tx.send(event).await.map_err(|e| SendError(e.0))
134        })
135    }
136
137    fn clone_box(&self) -> Box<dyn EventSink> {
138        Box::new(self.clone())
139    }
140}
141
142/// Simple event sink that prints to stdout.
143///
144/// A minimal sink for CLI tools, debugging, or batch processing. Prints
145/// LLM text output directly to stdout with basic formatting.
146///
147/// # Limitations
148///
149/// This sink is **non-interactive** and cannot handle:
150/// - **Permission requests**: Use with `AutoApprovePolicy` to auto-approve
151/// - **User interactions**: Use with `AutoApprovePolicy` to auto-cancel
152///
153/// If you need interactive permission prompts or user questions, use
154/// `ChannelEventSink` with a proper frontend (TUI, WebSocket, etc.).
155///
156/// # Output Format
157///
158/// | Event | Output |
159/// |-------|--------|
160/// | `TextChunk` | Prints text directly (no newline) |
161/// | `Complete` | Prints newline |
162/// | `Error` | Prints to stderr with "Error: " prefix |
163/// | `ToolExecuting` | Prints "[Tool: name]" |
164/// | `ToolCompleted` | Prints errors only |
165/// | `PermissionRequired` | Warning - use `AutoApprovePolicy` |
166/// | `UserInteractionRequired` | Warning - use `AutoApprovePolicy` |
167/// | Other events | Silently ignored |
168///
169/// # Example
170///
171/// ```ignore
172/// use agent_core_runtime::agent::{
173///     AgentCore, SimpleEventSink, ChannelInputSource, AutoApprovePolicy
174/// };
175///
176/// // Simple CLI agent - must use AutoApprovePolicy
177/// agent.run_with_frontend(
178///     SimpleEventSink::new(),
179///     input_source,
180///     AutoApprovePolicy::new(),  // Required for non-interactive sink
181/// )?;
182/// ```
183#[derive(Clone, Default)]
184pub struct SimpleEventSink;
185
186impl SimpleEventSink {
187    /// Create a new simple event sink.
188    pub fn new() -> Self {
189        Self
190    }
191}
192
193impl EventSink for SimpleEventSink {
194    fn send(&self, event: UiMessage) -> Result<(), SendError> {
195        use std::io::Write;
196
197        match &event {
198            UiMessage::TextChunk { text, .. } => {
199                print!("{}", text);
200                std::io::stdout().flush().ok();
201            }
202            UiMessage::Error { error, .. } => {
203                eprintln!("Error: {}", error);
204            }
205            UiMessage::Complete { .. } => {
206                println!();
207            }
208            UiMessage::ToolExecuting { display_name, .. } => {
209                println!("[Tool: {}]", display_name);
210            }
211            UiMessage::ToolCompleted { error, .. } => {
212                if let Some(err) = error {
213                    eprintln!("[Tool error: {}]", err);
214                }
215            }
216            UiMessage::PermissionRequired { .. } => {
217                eprintln!("Warning: SimpleEventSink received permission request. Use AutoApprovePolicy to handle permissions automatically.");
218            }
219            UiMessage::BatchPermissionRequired { .. } => {
220                eprintln!("Warning: SimpleEventSink received batch permission request. Use AutoApprovePolicy to handle permissions automatically.");
221            }
222            UiMessage::UserInteractionRequired { .. } => {
223                eprintln!("Warning: SimpleEventSink received user interaction request. Use AutoApprovePolicy to auto-cancel interactions.");
224            }
225            _ => {
226                // Silently ignore other events
227            }
228        }
229        Ok(())
230    }
231
232    fn clone_box(&self) -> Box<dyn EventSink> {
233        Box::new(self.clone())
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use tokio::sync::mpsc;
241
242    #[tokio::test]
243    async fn test_channel_event_sink_send() {
244        let (tx, mut rx) = mpsc::channel(10);
245        let sink = ChannelEventSink::new(tx);
246
247        let event = UiMessage::System {
248            session_id: 1,
249            message: "test".to_string(),
250        };
251
252        sink.send(event).unwrap();
253
254        let received = rx.recv().await.unwrap();
255        match received {
256            UiMessage::System { session_id, message } => {
257                assert_eq!(session_id, 1);
258                assert_eq!(message, "test");
259            }
260            _ => panic!("unexpected message type"),
261        }
262    }
263
264    #[tokio::test]
265    async fn test_channel_event_sink_send_async() {
266        let (tx, mut rx) = mpsc::channel(10);
267        let sink = ChannelEventSink::new(tx);
268
269        let event = UiMessage::System {
270            session_id: 2,
271            message: "async test".to_string(),
272        };
273
274        sink.send_async(event).await.unwrap();
275
276        let received = rx.recv().await.unwrap();
277        match received {
278            UiMessage::System { session_id, message } => {
279                assert_eq!(session_id, 2);
280                assert_eq!(message, "async test");
281            }
282            _ => panic!("unexpected message type"),
283        }
284    }
285
286    #[test]
287    fn test_channel_event_sink_full_channel() {
288        let (tx, _rx) = mpsc::channel(1);
289        let sink = ChannelEventSink::new(tx);
290
291        // Fill the channel
292        let event1 = UiMessage::System {
293            session_id: 1,
294            message: "first".to_string(),
295        };
296        sink.send(event1).unwrap();
297
298        // Second send should fail (channel full)
299        let event2 = UiMessage::System {
300            session_id: 1,
301            message: "second".to_string(),
302        };
303        let result = sink.send(event2);
304        assert!(result.is_err());
305    }
306
307    #[test]
308    fn test_simple_event_sink_send() {
309        let sink = SimpleEventSink::new();
310
311        // These should all succeed (simple sink never fails)
312        let events = vec![
313            UiMessage::TextChunk {
314                session_id: 1,
315                turn_id: None,
316                text: "hello".to_string(),
317                input_tokens: 0,
318                output_tokens: 0,
319            },
320            UiMessage::Complete {
321                session_id: 1,
322                turn_id: None,
323                input_tokens: 10,
324                output_tokens: 20,
325                stop_reason: None,
326            },
327            UiMessage::Error {
328                session_id: 1,
329                turn_id: None,
330                error: "test error".to_string(),
331            },
332        ];
333
334        for event in events {
335            assert!(sink.send(event).is_ok());
336        }
337    }
338
339    #[test]
340    fn test_boxed_event_sink() {
341        let (tx, _rx) = mpsc::channel(10);
342        let sink: Box<dyn EventSink> = Box::new(ChannelEventSink::new(tx));
343
344        let event = UiMessage::System {
345            session_id: 1,
346            message: "boxed test".to_string(),
347        };
348
349        assert!(sink.send(event).is_ok());
350
351        // Test clone_box
352        let _cloned = sink.clone_box();
353    }
354}