Skip to main content

ai_agent/
stream.rs

1// Source: Internal module — async stream interface for CLI/TUI users
2// Provides futures::Stream-based event consumption for the AI Agent SDK.
3
4use crate::types::AgentEvent;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio::sync::mpsc;
8
9// Re-use the Stream trait from futures_util (already a dependency)
10use futures_util::Stream;
11use std::sync::Arc;
12
13/// Manage a collection of event channel senders for broadcasting agent events
14/// to multiple subscribers.
15///
16/// Stores `Vec<mpsc::Sender<AgentEvent>>` protected by a `parking_lot::Mutex`.
17/// `broadcast()` sends an event to all active subscribers and removes closed
18/// channels. `subscribe()` creates a new (`EventSubscriber`, `CancelGuard`) pair.
19#[derive(Clone)]
20pub struct EventBroadcasters {
21    senders: Arc<parking_lot::Mutex<Vec<mpsc::Sender<AgentEvent>>>>,
22}
23
24impl EventBroadcasters {
25    pub fn new() -> Self {
26        Self {
27            senders: Arc::new(parking_lot::Mutex::new(Vec::new())),
28        }
29    }
30
31    /// Send an event to all active subscribers.
32    ///
33    /// Iterates all channel senders, attempting to send the event to each.
34    /// Removes (and drops) senders whose receiver has been disconnected,
35    /// keeping the list lean.
36    pub fn broadcast(&self, event: &AgentEvent) {
37        let mut senders = self.senders.lock();
38        senders.retain(|tx| tx.try_send(event.clone()).is_ok());
39    }
40
41    /// Subscribe to events.
42    ///
43    /// Creates a new channel and registers the sender. Returns an
44    /// `EventSubscriber` (the receiver) and a `CancelGuard` that will
45    /// automatically remove the sender when dropped.
46    pub fn subscribe(&self) -> (EventSubscriber, CancelGuard) {
47        let (tx, rx) = mpsc::channel(256);
48        let index = {
49            let mut senders = self.senders.lock();
50            let index = senders.len();
51            senders.push(tx);
52            index
53        };
54        let guard = CancelGuard::new(Arc::clone(&self.senders), index);
55        (EventSubscriber::new(rx), guard)
56    }
57}
58
59impl Default for EventBroadcasters {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65/// A subscriber to agent events across multiple queries.
66///
67/// Returned by [`crate::agent::Agent::subscribe`]. Events from the *current*
68/// and *subsequent* queries flow to the subscriber until the associated
69/// [`CancelGuard`] is dropped.
70///
71/// # Example
72///
73/// ```rust,ignore
74/// let agent = Agent::new("claude-sonnet-4-6");
75/// let (mut sub, _guard) = agent.subscribe();
76///
77/// tokio::spawn(async move {
78///     agent.query("hello").await;
79/// });
80///
81/// while let Some(ev) = sub.next().await {
82///     // render in TUI
83/// }
84/// ```
85pub struct EventSubscriber {
86    receiver: mpsc::Receiver<AgentEvent>,
87}
88
89impl EventSubscriber {
90    pub(crate) fn new(receiver: mpsc::Receiver<AgentEvent>) -> Self {
91        Self { receiver }
92    }
93}
94
95impl Stream for EventSubscriber {
96    type Item = AgentEvent;
97
98    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
99        match Pin::new(&mut self.receiver).poll_recv(cx) {
100            Poll::Ready(Some(event)) => Poll::Ready(Some(event)),
101            Poll::Ready(None) => Poll::Ready(None),
102            Poll::Pending => Poll::Pending,
103        }
104    }
105}
106
107/// Token that unsubscribes the [`EventSubscriber`] when dropped.
108///
109/// Dropping this guard removes the sender from the [`EventBroadcasters`],
110/// which closes the channel and causes the subscriber's stream to return `None`.
111pub struct CancelGuard {
112    senders: Option<Arc<parking_lot::Mutex<Vec<mpsc::Sender<AgentEvent>>>>>,
113    index: usize,
114}
115
116impl CancelGuard {
117    /// Create a new guard that will remove the sender at `index` when dropped.
118    pub(crate) fn new(
119        senders: Arc<parking_lot::Mutex<Vec<mpsc::Sender<AgentEvent>>>>,
120        index: usize,
121    ) -> Self {
122        Self {
123            senders: Some(senders),
124            index,
125        }
126    }
127}
128
129impl Drop for CancelGuard {
130    fn drop(&mut self) {
131        if let Some(ref senders) = self.senders {
132            let mut s = senders.lock();
133            if self.index < s.len() {
134                s.remove(self.index);
135            }
136        }
137    }
138}