agent_air_runtime/agent/interface/sink.rs
1//! Event Sink - Receives events from the engine
2//!
3//! The [`EventSink`] trait defines how the engine delivers events to consumers.
4
5use std::future::Future;
6use std::pin::Pin;
7
8use tokio::sync::mpsc;
9
10use crate::agent::UiMessage;
11
12/// Error when sending an event fails.
13///
14/// Contains the original message for retry or logging.
15#[derive(Debug)]
16pub struct SendError(pub UiMessage);
17
18impl std::fmt::Display for SendError {
19 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 write!(f, "failed to send event")
21 }
22}
23
24impl std::error::Error for SendError {}
25
26/// Receives events from the agent engine and delivers them to a consumer.
27///
28/// Implementations handle the transport-specific details of delivering
29/// events to the user interface (TUI, WebSocket, stdout, etc.).
30///
31/// # Backpressure
32///
33/// The [`send_async`](EventSink::send_async) method supports backpressure by
34/// awaiting until the consumer can accept the event. Implementations should
35/// use this when the consumer has bounded capacity (e.g., channel-based).
36///
37/// # Thread Safety
38///
39/// EventSink must be `Send + Sync` to allow sharing across async tasks.
40/// The engine may call `send` from multiple tasks concurrently.
41///
42/// # Example
43///
44/// ```ignore
45/// use agent_air_runtime::agent::interface::{EventSink, SendError};
46/// use agent_air_runtime::agent::UiMessage;
47///
48/// struct MyCustomSink { /* ... */ }
49///
50/// impl EventSink for MyCustomSink {
51/// fn send(&self, event: UiMessage) -> Result<(), SendError> {
52/// // Deliver event to your transport
53/// Ok(())
54/// }
55///
56/// fn clone_box(&self) -> Box<dyn EventSink> {
57/// Box::new(MyCustomSink { /* ... */ })
58/// }
59/// }
60/// ```
61#[allow(clippy::result_large_err)]
62pub trait EventSink: Send + Sync + 'static {
63 /// Send an event to the consumer (non-blocking).
64 ///
65 /// Returns immediately. If the consumer cannot accept the event
66 /// (e.g., buffer full), returns `Err(SendError)`.
67 ///
68 /// Use this for fire-and-forget scenarios or when you have your
69 /// own backpressure mechanism.
70 fn send(&self, event: UiMessage) -> Result<(), SendError>;
71
72 /// Send an event to the consumer (async, with backpressure).
73 ///
74 /// Waits until the consumer can accept the event. This is the
75 /// preferred method when backpressure is needed to avoid overwhelming
76 /// slow consumers.
77 ///
78 /// Default implementation calls `send()` and returns immediately.
79 fn send_async(
80 &self,
81 event: UiMessage,
82 ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
83 Box::pin(async move { self.send(event) })
84 }
85
86 /// Clone this sink into a boxed trait object.
87 ///
88 /// Required because we need to clone sinks for internal routing
89 /// but `Clone` is not object-safe.
90 fn clone_box(&self) -> Box<dyn EventSink>;
91}
92
93// Allow Box<dyn EventSink> to be used as EventSink
94impl EventSink for Box<dyn EventSink> {
95 fn send(&self, event: UiMessage) -> Result<(), SendError> {
96 (**self).send(event)
97 }
98
99 fn send_async(
100 &self,
101 event: UiMessage,
102 ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
103 (**self).send_async(event)
104 }
105
106 fn clone_box(&self) -> Box<dyn EventSink> {
107 (**self).clone_box()
108 }
109}
110
111/// Event sink backed by an async channel.
112///
113/// This is the default sink used internally. It connects the engine
114/// to a channel that the consumer reads from.
115///
116/// # Backpressure
117///
118/// When the channel is full, `send()` returns an error immediately,
119/// while `send_async()` waits until space is available.
120#[derive(Clone)]
121pub struct ChannelEventSink {
122 tx: mpsc::Sender<UiMessage>,
123}
124
125impl ChannelEventSink {
126 /// Create a new channel-backed event sink.
127 pub fn new(tx: mpsc::Sender<UiMessage>) -> Self {
128 Self { tx }
129 }
130}
131
132impl EventSink for ChannelEventSink {
133 fn send(&self, event: UiMessage) -> Result<(), SendError> {
134 self.tx
135 .try_send(event)
136 .map_err(|e| SendError(e.into_inner()))
137 }
138
139 fn send_async(
140 &self,
141 event: UiMessage,
142 ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
143 let tx = self.tx.clone();
144 Box::pin(async move { tx.send(event).await.map_err(|e| SendError(e.0)) })
145 }
146
147 fn clone_box(&self) -> Box<dyn EventSink> {
148 Box::new(self.clone())
149 }
150}
151
152/// Simple event sink that prints to stdout.
153///
154/// A minimal sink for CLI tools, debugging, or batch processing. Prints
155/// LLM text output directly to stdout with basic formatting.
156///
157/// # Limitations
158///
159/// This sink is **non-interactive** and cannot handle:
160/// - **Permission requests**: Use with `AutoApprovePolicy` to auto-approve
161/// - **User interactions**: Use with `AutoApprovePolicy` to auto-cancel
162///
163/// If you need interactive permission prompts or user questions, use
164/// `ChannelEventSink` with a proper frontend (TUI, WebSocket, etc.).
165///
166/// # Output Format
167///
168/// | Event | Output |
169/// |-------|--------|
170/// | `TextChunk` | Prints text directly (no newline) |
171/// | `Complete` | Prints newline |
172/// | `Error` | Prints to stderr with "Error: " prefix |
173/// | `ToolExecuting` | Prints "[Tool: name]" |
174/// | `ToolCompleted` | Prints errors only |
175/// | `PermissionRequired` | Warning - use `AutoApprovePolicy` |
176/// | `UserInteractionRequired` | Warning - use `AutoApprovePolicy` |
177/// | Other events | Silently ignored |
178///
179/// # Example
180///
181/// ```ignore
182/// use agent_air_runtime::agent::{
183/// AgentAir, SimpleEventSink, ChannelInputSource, AutoApprovePolicy
184/// };
185///
186/// // Simple CLI agent - must use AutoApprovePolicy
187/// agent.run_with_frontend(
188/// SimpleEventSink::new(),
189/// input_source,
190/// AutoApprovePolicy::new(), // Required for non-interactive sink
191/// )?;
192/// ```
193#[derive(Clone, Default)]
194pub struct SimpleEventSink;
195
196impl SimpleEventSink {
197 /// Create a new simple event sink.
198 pub fn new() -> Self {
199 Self
200 }
201}
202
203impl EventSink for SimpleEventSink {
204 fn send(&self, event: UiMessage) -> Result<(), SendError> {
205 use std::io::Write;
206
207 match &event {
208 UiMessage::TextChunk { text, .. } => {
209 print!("{}", text);
210 std::io::stdout().flush().ok();
211 }
212 UiMessage::Error { error, .. } => {
213 eprintln!("Error: {}", error);
214 }
215 UiMessage::Complete { .. } => {
216 println!();
217 }
218 UiMessage::ToolExecuting { display_name, .. } => {
219 println!("[Tool: {}]", display_name);
220 }
221 UiMessage::ToolCompleted {
222 error: Some(err), ..
223 } => {
224 eprintln!("[Tool error: {}]", err);
225 }
226 UiMessage::PermissionRequired { .. } => {
227 eprintln!(
228 "Warning: SimpleEventSink received permission request. Use AutoApprovePolicy to handle permissions automatically."
229 );
230 }
231 UiMessage::BatchPermissionRequired { .. } => {
232 eprintln!(
233 "Warning: SimpleEventSink received batch permission request. Use AutoApprovePolicy to handle permissions automatically."
234 );
235 }
236 UiMessage::UserInteractionRequired { .. } => {
237 eprintln!(
238 "Warning: SimpleEventSink received user interaction request. Use AutoApprovePolicy to auto-cancel interactions."
239 );
240 }
241 _ => {
242 // Silently ignore other events
243 }
244 }
245 Ok(())
246 }
247
248 fn clone_box(&self) -> Box<dyn EventSink> {
249 Box::new(self.clone())
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256 use tokio::sync::mpsc;
257
258 #[tokio::test]
259 async fn test_channel_event_sink_send() {
260 let (tx, mut rx) = mpsc::channel(10);
261 let sink = ChannelEventSink::new(tx);
262
263 let event = UiMessage::System {
264 session_id: 1,
265 message: "test".to_string(),
266 };
267
268 sink.send(event).unwrap();
269
270 let received = rx.recv().await.unwrap();
271 match received {
272 UiMessage::System {
273 session_id,
274 message,
275 } => {
276 assert_eq!(session_id, 1);
277 assert_eq!(message, "test");
278 }
279 _ => panic!("unexpected message type"),
280 }
281 }
282
283 #[tokio::test]
284 async fn test_channel_event_sink_send_async() {
285 let (tx, mut rx) = mpsc::channel(10);
286 let sink = ChannelEventSink::new(tx);
287
288 let event = UiMessage::System {
289 session_id: 2,
290 message: "async test".to_string(),
291 };
292
293 sink.send_async(event).await.unwrap();
294
295 let received = rx.recv().await.unwrap();
296 match received {
297 UiMessage::System {
298 session_id,
299 message,
300 } => {
301 assert_eq!(session_id, 2);
302 assert_eq!(message, "async test");
303 }
304 _ => panic!("unexpected message type"),
305 }
306 }
307
308 #[test]
309 fn test_channel_event_sink_full_channel() {
310 let (tx, _rx) = mpsc::channel(1);
311 let sink = ChannelEventSink::new(tx);
312
313 // Fill the channel
314 let event1 = UiMessage::System {
315 session_id: 1,
316 message: "first".to_string(),
317 };
318 sink.send(event1).unwrap();
319
320 // Second send should fail (channel full)
321 let event2 = UiMessage::System {
322 session_id: 1,
323 message: "second".to_string(),
324 };
325 let result = sink.send(event2);
326 assert!(result.is_err());
327 }
328
329 #[test]
330 fn test_simple_event_sink_send() {
331 let sink = SimpleEventSink::new();
332
333 // These should all succeed (simple sink never fails)
334 let events = vec![
335 UiMessage::TextChunk {
336 session_id: 1,
337 turn_id: None,
338 text: "hello".to_string(),
339 input_tokens: 0,
340 output_tokens: 0,
341 },
342 UiMessage::Complete {
343 session_id: 1,
344 turn_id: None,
345 input_tokens: 10,
346 output_tokens: 20,
347 stop_reason: None,
348 },
349 UiMessage::Error {
350 session_id: 1,
351 turn_id: None,
352 error: "test error".to_string(),
353 },
354 ];
355
356 for event in events {
357 assert!(sink.send(event).is_ok());
358 }
359 }
360
361 #[test]
362 fn test_boxed_event_sink() {
363 let (tx, _rx) = mpsc::channel(10);
364 let sink: Box<dyn EventSink> = Box::new(ChannelEventSink::new(tx));
365
366 let event = UiMessage::System {
367 session_id: 1,
368 message: "boxed test".to_string(),
369 };
370
371 assert!(sink.send(event).is_ok());
372
373 // Test clone_box
374 let _cloned = sink.clone_box();
375 }
376}