ai_agent/stream.rs
1// Source: Internal module — async stream interface for CLI/TUI users
2// Provides futures::Stream-based event consumption for the AI Agent SDK.
3
4use crate::types::AgentEvent;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio::sync::mpsc;
8
9// Re-use the Stream trait from futures_util (already a dependency)
10use futures_util::Stream;
11use std::sync::Arc;
12
13/// Manage a collection of event channel senders for broadcasting agent events
14/// to multiple subscribers.
15///
16/// Stores `Vec<mpsc::Sender<AgentEvent>>` protected by a `parking_lot::Mutex`.
17/// `broadcast()` sends an event to all active subscribers and removes closed
18/// channels. `subscribe()` creates a new (`EventSubscriber`, `CancelGuard`) pair.
19#[derive(Clone)]
20pub struct EventBroadcasters {
21 senders: Arc<parking_lot::Mutex<Vec<mpsc::Sender<AgentEvent>>>>,
22}
23
24impl EventBroadcasters {
25 pub fn new() -> Self {
26 Self {
27 senders: Arc::new(parking_lot::Mutex::new(Vec::new())),
28 }
29 }
30
31 /// Send an event to all active subscribers.
32 ///
33 /// Iterates all channel senders, attempting to send the event to each.
34 /// Removes (and drops) senders whose receiver has been disconnected,
35 /// keeping the list lean.
36 pub fn broadcast(&self, event: &AgentEvent) {
37 let mut senders = self.senders.lock();
38 senders.retain(|tx| tx.try_send(event.clone()).is_ok());
39 }
40
41 /// Subscribe to events.
42 ///
43 /// Creates a new channel and registers the sender. Returns an
44 /// `EventSubscriber` (the receiver) and a `CancelGuard` that will
45 /// automatically remove the sender when dropped.
46 pub fn subscribe(&self) -> (EventSubscriber, CancelGuard) {
47 let (tx, rx) = mpsc::channel(256);
48 let index = {
49 let mut senders = self.senders.lock();
50 let index = senders.len();
51 senders.push(tx);
52 index
53 };
54 let guard = CancelGuard::new(Arc::clone(&self.senders), index);
55 (EventSubscriber::new(rx), guard)
56 }
57}
58
59impl Default for EventBroadcasters {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65/// A subscriber to agent events across multiple queries.
66///
67/// Returned by [`crate::agent::Agent::subscribe`]. Events from the *current*
68/// and *subsequent* queries flow to the subscriber until the associated
69/// [`CancelGuard`] is dropped.
70///
71/// # Example
72///
73/// ```rust,ignore
74/// let agent = Agent::new("claude-sonnet-4-6");
75/// let (mut sub, _guard) = agent.subscribe();
76///
77/// tokio::spawn(async move {
78/// agent.query("hello").await;
79/// });
80///
81/// while let Some(ev) = sub.next().await {
82/// // render in TUI
83/// }
84/// ```
85pub struct EventSubscriber {
86 receiver: mpsc::Receiver<AgentEvent>,
87}
88
89impl EventSubscriber {
90 pub(crate) fn new(receiver: mpsc::Receiver<AgentEvent>) -> Self {
91 Self { receiver }
92 }
93}
94
95impl Stream for EventSubscriber {
96 type Item = AgentEvent;
97
98 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
99 match Pin::new(&mut self.receiver).poll_recv(cx) {
100 Poll::Ready(Some(event)) => Poll::Ready(Some(event)),
101 Poll::Ready(None) => Poll::Ready(None),
102 Poll::Pending => Poll::Pending,
103 }
104 }
105}
106
107/// Token that unsubscribes the [`EventSubscriber`] when dropped.
108///
109/// Dropping this guard removes the sender from the [`EventBroadcasters`],
110/// which closes the channel and causes the subscriber's stream to return `None`.
111pub struct CancelGuard {
112 senders: Option<Arc<parking_lot::Mutex<Vec<mpsc::Sender<AgentEvent>>>>>,
113 index: usize,
114}
115
116impl CancelGuard {
117 /// Create a new guard that will remove the sender at `index` when dropped.
118 pub(crate) fn new(
119 senders: Arc<parking_lot::Mutex<Vec<mpsc::Sender<AgentEvent>>>>,
120 index: usize,
121 ) -> Self {
122 Self {
123 senders: Some(senders),
124 index,
125 }
126 }
127}
128
129impl Drop for CancelGuard {
130 fn drop(&mut self) {
131 if let Some(ref senders) = self.senders {
132 let mut s = senders.lock();
133 if self.index < s.len() {
134 s.remove(self.index);
135 }
136 }
137 }
138}