Skip to main content

github_copilot_sdk/
subscription.rs

1//! Subscription handles for observing session and lifecycle events.
2//!
3//! Returned by [`Session::subscribe`](crate::session::Session::subscribe) and
4//! [`Client::subscribe_lifecycle`](crate::Client::subscribe_lifecycle).
5//!
6//! Each subscription is an opt-in **observer** of events that are also
7//! delivered to the per-event handlers installed on the session config
8//! (see [`crate::handler`]). Subscribers receive a clone of every event but
9//! cannot influence permission decisions, tool results, or any other event
10//! whose handler return value affects the runtime.
11//!
12//! # Async iteration
13//!
14//! The subscription types implement [`tokio_stream::Stream`], so consumers
15//! can use adapter combinators from [`tokio_stream::StreamExt`] or
16//! `futures::StreamExt` (filtering, mapping, batching, racing with
17//! `tokio::select!`, etc.) without learning the SDK's internal channel
18//! choice. A simple `while let Ok(event) = sub.recv().await { ... }` loop
19//! also works for callers who don't need the [`Stream`](tokio_stream::Stream)
20//! surface.
21//!
22//! # Lag policy
23//!
24//! Each subscriber maintains its own internal queue. If a consumer cannot
25//! keep up, the oldest events are dropped and the next call yields
26//! [`Lagged`](crate::subscription::Lagged) reporting how many events were skipped.
27//! Slow subscribers do not block the producer.
28
29use std::fmt;
30use std::pin::Pin;
31use std::task::{Context, Poll};
32
33use tokio::sync::broadcast::Receiver;
34use tokio_stream::wrappers::BroadcastStream;
35use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
36use tokio_stream::{Stream, StreamExt as _};
37
38use crate::types::{SessionEvent, SessionLifecycleEvent};
39use crate::{Custom, Repr};
40
41/// The subscription fell behind the producer.
42///
43/// Reports the number of events that were dropped from this subscriber's
44/// queue because the consumer didn't keep up. The subscription continues
45/// after this error, starting from the next live event — callers who care
46/// about lag should match on it and decide whether to resync, re-fetch, or
47/// log and continue.
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub struct Lagged(pub(crate) u64);
50
51impl Lagged {
52    /// Number of events skipped before this consumer could read them.
53    pub fn skipped(&self) -> u64 {
54        self.0
55    }
56}
57
58impl fmt::Display for Lagged {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        write!(f, "subscription lagged behind by {} events", self.0)
61    }
62}
63
64impl std::error::Error for Lagged {}
65
66/// Error kind for subscription receive operations.
67#[derive(Clone, Copy, Debug, PartialEq, Eq)]
68#[non_exhaustive]
69pub enum RecvErrorKind {
70    /// The producer is gone — the session has shut down or the client has
71    /// stopped. No further events will be delivered.
72    Closed,
73
74    /// The subscriber fell behind. See [`Lagged`].
75    Lagged(Lagged),
76}
77
78impl fmt::Display for RecvErrorKind {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        match self {
81            RecvErrorKind::Closed => write!(f, "subscription closed"),
82            RecvErrorKind::Lagged(l) => write!(f, "{l}"),
83        }
84    }
85}
86
87/// Error returned by [`crate::subscription::EventSubscription::recv`] and
88/// [`crate::subscription::LifecycleSubscription::recv`].
89#[derive(Debug)]
90pub struct RecvError {
91    repr: Repr<RecvErrorKind>,
92}
93
94impl RecvError {
95    /// The [`RecvErrorKind`] of this error.
96    pub fn kind(&self) -> &RecvErrorKind {
97        match &self.repr {
98            Repr::Simple(k) | Repr::SimpleMessage(k, ..) | Repr::Custom(Custom { kind: k, .. }) => {
99                k
100            }
101        }
102    }
103}
104
105impl fmt::Display for RecvError {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        match &self.repr {
108            Repr::Simple(k) => write!(f, "{k}"),
109            Repr::SimpleMessage(_, m) => write!(f, "{m}"),
110            Repr::Custom(Custom { error, .. }) => write!(f, "{error}"),
111        }
112    }
113}
114
115impl std::error::Error for RecvError {
116    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
117        match &self.repr {
118            Repr::Custom(Custom { error, .. }) => Some(&**error),
119            _ => None,
120        }
121    }
122}
123
124impl From<RecvErrorKind> for RecvError {
125    fn from(kind: RecvErrorKind) -> Self {
126        Self {
127            repr: Repr::Simple(kind),
128        }
129    }
130}
131
132impl From<Lagged> for RecvError {
133    fn from(lagged: Lagged) -> Self {
134        Self::from(RecvErrorKind::Lagged(lagged))
135    }
136}
137
138macro_rules! define_subscription {
139    (
140        $(#[$meta:meta])*
141        $name:ident, $item:ty $(,)?
142    ) => {
143        $(#[$meta])*
144        #[must_use = "subscriptions are inert until polled"]
145        pub struct $name {
146            inner: BroadcastStream<$item>,
147        }
148
149        impl $name {
150            pub(crate) fn new(rx: Receiver<$item>) -> Self {
151                Self {
152                    inner: BroadcastStream::new(rx),
153                }
154            }
155
156            /// Receive the next event.
157            ///
158            /// Returns:
159            ///
160            /// - `Ok(event)` for the next delivered event.
161            /// - `Err(`[`RecvError`]`)` with [`RecvError::kind()`] [`RecvErrorKind::Lagged`] if the subscriber fell behind;
162            ///   call `recv` again to continue from the next live event.
163            /// - `Err(`[`RecvError`]`)` with [`RecvError::kind()`] [`RecvErrorKind::Closed`] once the producer is gone.
164            ///
165            /// # Cancel safety
166            ///
167            /// **Cancel-safe.** Wraps a `tokio::sync::broadcast::Receiver`
168            /// via `BroadcastStream`; both are cancel-safe by design.
169            /// Dropping the future before completion is harmless — events
170            /// already buffered for this subscriber remain available on
171            /// the next `recv` call.
172            pub async fn recv(&mut self) -> Result<$item, RecvError> {
173                match self.inner.next().await {
174                    Some(Ok(event)) => Ok(event),
175                    Some(Err(BroadcastStreamRecvError::Lagged(n))) => {
176                        Err(Lagged(n).into())
177                    }
178                    None => Err(RecvErrorKind::Closed.into()),
179                }
180            }
181        }
182
183        impl Stream for $name {
184            type Item = Result<$item, Lagged>;
185
186            fn poll_next(
187                mut self: Pin<&mut Self>,
188                cx: &mut Context<'_>,
189            ) -> Poll<Option<Self::Item>> {
190                match Pin::new(&mut self.inner).poll_next(cx) {
191                    Poll::Ready(Some(Ok(event))) => Poll::Ready(Some(Ok(event))),
192                    Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) => {
193                        Poll::Ready(Some(Err(Lagged(n))))
194                    }
195                    Poll::Ready(None) => Poll::Ready(None),
196                    Poll::Pending => Poll::Pending,
197                }
198            }
199        }
200    };
201}
202
203define_subscription! {
204    /// Subscription to runtime events for a single
205    /// [`Session`](crate::session::Session).
206    ///
207    /// Created by [`Session::subscribe`](crate::session::Session::subscribe).
208    /// Implements [`Stream`] yielding `Result<SessionEvent, Lagged>`.
209    /// Drop the value to unsubscribe; there is no separate cancel handle.
210    EventSubscription, SessionEvent
211}
212
213define_subscription! {
214    /// Subscription to lifecycle events on a [`Client`](crate::Client).
215    ///
216    /// Created by
217    /// [`Client::subscribe_lifecycle`](crate::Client::subscribe_lifecycle).
218    /// Implements [`Stream`] yielding `Result<SessionLifecycleEvent, Lagged>`.
219    /// Drop the value to unsubscribe; there is no separate cancel handle.
220    LifecycleSubscription, SessionLifecycleEvent
221}
222
223#[cfg(test)]
224mod tests {
225    use tokio::sync::broadcast;
226
227    use super::*;
228
229    fn make_event(id: &str) -> SessionEvent {
230        SessionEvent {
231            id: id.into(),
232            timestamp: "2025-01-01T00:00:00Z".into(),
233            parent_id: None,
234            ephemeral: None,
235            agent_id: None,
236            debug_cli_received_at_ms: None,
237            debug_ws_forwarded_at_ms: None,
238            event_type: "noop".into(),
239            data: serde_json::json!({}),
240        }
241    }
242
243    #[tokio::test]
244    async fn recv_yields_then_closes_on_drop_sender() {
245        let (tx, rx) = broadcast::channel(8);
246        let mut sub = EventSubscription::new(rx);
247        tx.send(make_event("a")).unwrap();
248        tx.send(make_event("b")).unwrap();
249        drop(tx);
250
251        assert_eq!(sub.recv().await.unwrap().id, "a");
252        assert_eq!(sub.recv().await.unwrap().id, "b");
253        assert!(matches!(
254            sub.recv().await.unwrap_err().kind(),
255            RecvErrorKind::Closed
256        ));
257    }
258
259    #[tokio::test]
260    async fn recv_surfaces_lag() {
261        let (tx, rx) = broadcast::channel(2);
262        let mut sub = EventSubscription::new(rx);
263        for id in ["a", "b", "c", "d"] {
264            tx.send(make_event(id)).unwrap();
265        }
266        let err = sub.recv().await.expect_err("expected a Lagged error");
267        let RecvErrorKind::Lagged(l) = err.kind() else {
268            panic!("expected Lagged, got {:?}", err.kind());
269        };
270        assert_eq!(l.skipped(), 2);
271        // Subscription continues with the live tail.
272        assert_eq!(sub.recv().await.unwrap().id, "c");
273        assert_eq!(sub.recv().await.unwrap().id, "d");
274    }
275
276    #[tokio::test]
277    async fn stream_impl_matches_recv_semantics() {
278        let (tx, rx) = broadcast::channel(8);
279        let mut sub = EventSubscription::new(rx);
280        tx.send(make_event("a")).unwrap();
281        drop(tx);
282
283        // poll_next path
284        let next = sub.next().await;
285        assert_eq!(next.unwrap().unwrap().id, "a");
286        assert!(sub.next().await.is_none());
287    }
288}