Skip to main content

agent_air_runtime/agent/interface/
sink.rs

1//! Event Sink - Receives events from the engine
2//!
3//! The [`EventSink`] trait defines how the engine delivers events to consumers.
4
5use std::future::Future;
6use std::pin::Pin;
7
8use tokio::sync::mpsc;
9
10use crate::agent::UiMessage;
11
12/// Error when sending an event fails.
13///
14/// Contains the original message for retry or logging.
15#[derive(Debug)]
16pub struct SendError(pub UiMessage);
17
18impl std::fmt::Display for SendError {
19    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20        write!(f, "failed to send event")
21    }
22}
23
24impl std::error::Error for SendError {}
25
26/// Receives events from the agent engine and delivers them to a consumer.
27///
28/// Implementations handle the transport-specific details of delivering
29/// events to the user interface (TUI, WebSocket, stdout, etc.).
30///
31/// # Backpressure
32///
33/// The [`send_async`](EventSink::send_async) method supports backpressure by
34/// awaiting until the consumer can accept the event. Implementations should
35/// use this when the consumer has bounded capacity (e.g., channel-based).
36///
37/// # Thread Safety
38///
39/// EventSink must be `Send + Sync` to allow sharing across async tasks.
40/// The engine may call `send` from multiple tasks concurrently.
41///
42/// # Example
43///
44/// ```ignore
45/// use agent_air_runtime::agent::interface::{EventSink, SendError};
46/// use agent_air_runtime::agent::UiMessage;
47///
48/// struct MyCustomSink { /* ... */ }
49///
50/// impl EventSink for MyCustomSink {
51///     fn send(&self, event: UiMessage) -> Result<(), SendError> {
52///         // Deliver event to your transport
53///         Ok(())
54///     }
55///
56///     fn clone_box(&self) -> Box<dyn EventSink> {
57///         Box::new(MyCustomSink { /* ... */ })
58///     }
59/// }
60/// ```
61#[allow(clippy::result_large_err)]
62pub trait EventSink: Send + Sync + 'static {
63    /// Send an event to the consumer (non-blocking).
64    ///
65    /// Returns immediately. If the consumer cannot accept the event
66    /// (e.g., buffer full), returns `Err(SendError)`.
67    ///
68    /// Use this for fire-and-forget scenarios or when you have your
69    /// own backpressure mechanism.
70    fn send(&self, event: UiMessage) -> Result<(), SendError>;
71
72    /// Send an event to the consumer (async, with backpressure).
73    ///
74    /// Waits until the consumer can accept the event. This is the
75    /// preferred method when backpressure is needed to avoid overwhelming
76    /// slow consumers.
77    ///
78    /// Default implementation calls `send()` and returns immediately.
79    fn send_async(
80        &self,
81        event: UiMessage,
82    ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
83        Box::pin(async move { self.send(event) })
84    }
85
86    /// Clone this sink into a boxed trait object.
87    ///
88    /// Required because we need to clone sinks for internal routing
89    /// but `Clone` is not object-safe.
90    fn clone_box(&self) -> Box<dyn EventSink>;
91}
92
93// Allow Box<dyn EventSink> to be used as EventSink
94impl EventSink for Box<dyn EventSink> {
95    fn send(&self, event: UiMessage) -> Result<(), SendError> {
96        (**self).send(event)
97    }
98
99    fn send_async(
100        &self,
101        event: UiMessage,
102    ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
103        (**self).send_async(event)
104    }
105
106    fn clone_box(&self) -> Box<dyn EventSink> {
107        (**self).clone_box()
108    }
109}
110
111/// Event sink backed by an async channel.
112///
113/// This is the default sink used internally. It connects the engine
114/// to a channel that the consumer reads from.
115///
116/// # Backpressure
117///
118/// When the channel is full, `send()` returns an error immediately,
119/// while `send_async()` waits until space is available.
120#[derive(Clone)]
121pub struct ChannelEventSink {
122    tx: mpsc::Sender<UiMessage>,
123}
124
125impl ChannelEventSink {
126    /// Create a new channel-backed event sink.
127    pub fn new(tx: mpsc::Sender<UiMessage>) -> Self {
128        Self { tx }
129    }
130}
131
132impl EventSink for ChannelEventSink {
133    fn send(&self, event: UiMessage) -> Result<(), SendError> {
134        self.tx
135            .try_send(event)
136            .map_err(|e| SendError(e.into_inner()))
137    }
138
139    fn send_async(
140        &self,
141        event: UiMessage,
142    ) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
143        let tx = self.tx.clone();
144        Box::pin(async move { tx.send(event).await.map_err(|e| SendError(e.0)) })
145    }
146
147    fn clone_box(&self) -> Box<dyn EventSink> {
148        Box::new(self.clone())
149    }
150}
151
152/// Simple event sink that prints to stdout.
153///
154/// A minimal sink for CLI tools, debugging, or batch processing. Prints
155/// LLM text output directly to stdout with basic formatting.
156///
157/// # Limitations
158///
159/// This sink is **non-interactive** and cannot handle:
160/// - **Permission requests**: Use with `AutoApprovePolicy` to auto-approve
161/// - **User interactions**: Use with `AutoApprovePolicy` to auto-cancel
162///
163/// If you need interactive permission prompts or user questions, use
164/// `ChannelEventSink` with a proper frontend (TUI, WebSocket, etc.).
165///
166/// # Output Format
167///
168/// | Event | Output |
169/// |-------|--------|
170/// | `TextChunk` | Prints text directly (no newline) |
171/// | `Complete` | Prints newline |
172/// | `Error` | Prints to stderr with "Error: " prefix |
173/// | `ToolExecuting` | Prints "[Tool: name]" |
174/// | `ToolCompleted` | Prints errors only |
175/// | `PermissionRequired` | Warning - use `AutoApprovePolicy` |
176/// | `UserInteractionRequired` | Warning - use `AutoApprovePolicy` |
177/// | Other events | Silently ignored |
178///
179/// # Example
180///
181/// ```ignore
182/// use agent_air_runtime::agent::{
183///     AgentAir, SimpleEventSink, ChannelInputSource, AutoApprovePolicy
184/// };
185///
186/// // Simple CLI agent - must use AutoApprovePolicy
187/// agent.run_with_frontend(
188///     SimpleEventSink::new(),
189///     input_source,
190///     AutoApprovePolicy::new(),  // Required for non-interactive sink
191/// )?;
192/// ```
193#[derive(Clone, Default)]
194pub struct SimpleEventSink;
195
196impl SimpleEventSink {
197    /// Create a new simple event sink.
198    pub fn new() -> Self {
199        Self
200    }
201}
202
203impl EventSink for SimpleEventSink {
204    fn send(&self, event: UiMessage) -> Result<(), SendError> {
205        use std::io::Write;
206
207        match &event {
208            UiMessage::TextChunk { text, .. } => {
209                print!("{}", text);
210                std::io::stdout().flush().ok();
211            }
212            UiMessage::Error { error, .. } => {
213                eprintln!("Error: {}", error);
214            }
215            UiMessage::Complete { .. } => {
216                println!();
217            }
218            UiMessage::ToolExecuting { display_name, .. } => {
219                println!("[Tool: {}]", display_name);
220            }
221            UiMessage::ToolCompleted {
222                error: Some(err), ..
223            } => {
224                eprintln!("[Tool error: {}]", err);
225            }
226            UiMessage::PermissionRequired { .. } => {
227                eprintln!(
228                    "Warning: SimpleEventSink received permission request. Use AutoApprovePolicy to handle permissions automatically."
229                );
230            }
231            UiMessage::BatchPermissionRequired { .. } => {
232                eprintln!(
233                    "Warning: SimpleEventSink received batch permission request. Use AutoApprovePolicy to handle permissions automatically."
234                );
235            }
236            UiMessage::UserInteractionRequired { .. } => {
237                eprintln!(
238                    "Warning: SimpleEventSink received user interaction request. Use AutoApprovePolicy to auto-cancel interactions."
239                );
240            }
241            _ => {
242                // Silently ignore other events
243            }
244        }
245        Ok(())
246    }
247
248    fn clone_box(&self) -> Box<dyn EventSink> {
249        Box::new(self.clone())
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use tokio::sync::mpsc;
257
258    #[tokio::test]
259    async fn test_channel_event_sink_send() {
260        let (tx, mut rx) = mpsc::channel(10);
261        let sink = ChannelEventSink::new(tx);
262
263        let event = UiMessage::System {
264            session_id: 1,
265            message: "test".to_string(),
266        };
267
268        sink.send(event).unwrap();
269
270        let received = rx.recv().await.unwrap();
271        match received {
272            UiMessage::System {
273                session_id,
274                message,
275            } => {
276                assert_eq!(session_id, 1);
277                assert_eq!(message, "test");
278            }
279            _ => panic!("unexpected message type"),
280        }
281    }
282
283    #[tokio::test]
284    async fn test_channel_event_sink_send_async() {
285        let (tx, mut rx) = mpsc::channel(10);
286        let sink = ChannelEventSink::new(tx);
287
288        let event = UiMessage::System {
289            session_id: 2,
290            message: "async test".to_string(),
291        };
292
293        sink.send_async(event).await.unwrap();
294
295        let received = rx.recv().await.unwrap();
296        match received {
297            UiMessage::System {
298                session_id,
299                message,
300            } => {
301                assert_eq!(session_id, 2);
302                assert_eq!(message, "async test");
303            }
304            _ => panic!("unexpected message type"),
305        }
306    }
307
308    #[test]
309    fn test_channel_event_sink_full_channel() {
310        let (tx, _rx) = mpsc::channel(1);
311        let sink = ChannelEventSink::new(tx);
312
313        // Fill the channel
314        let event1 = UiMessage::System {
315            session_id: 1,
316            message: "first".to_string(),
317        };
318        sink.send(event1).unwrap();
319
320        // Second send should fail (channel full)
321        let event2 = UiMessage::System {
322            session_id: 1,
323            message: "second".to_string(),
324        };
325        let result = sink.send(event2);
326        assert!(result.is_err());
327    }
328
329    #[test]
330    fn test_simple_event_sink_send() {
331        let sink = SimpleEventSink::new();
332
333        // These should all succeed (simple sink never fails)
334        let events = vec![
335            UiMessage::TextChunk {
336                session_id: 1,
337                turn_id: None,
338                text: "hello".to_string(),
339                input_tokens: 0,
340                output_tokens: 0,
341            },
342            UiMessage::Complete {
343                session_id: 1,
344                turn_id: None,
345                input_tokens: 10,
346                output_tokens: 20,
347                stop_reason: None,
348            },
349            UiMessage::Error {
350                session_id: 1,
351                turn_id: None,
352                error: "test error".to_string(),
353            },
354        ];
355
356        for event in events {
357            assert!(sink.send(event).is_ok());
358        }
359    }
360
361    #[test]
362    fn test_boxed_event_sink() {
363        let (tx, _rx) = mpsc::channel(10);
364        let sink: Box<dyn EventSink> = Box::new(ChannelEventSink::new(tx));
365
366        let event = UiMessage::System {
367            session_id: 1,
368            message: "boxed test".to_string(),
369        };
370
371        assert!(sink.send(event).is_ok());
372
373        // Test clone_box
374        let _cloned = sink.clone_box();
375    }
376}