Skip to main content

github_copilot_sdk/
subscription.rs

1//! Subscription handles for observing session and lifecycle events.
2//!
3//! Returned by [`Session::subscribe`](crate::session::Session::subscribe) and
4//! [`Client::subscribe_lifecycle`](crate::Client::subscribe_lifecycle).
5//!
6//! Each subscription is an opt-in **observer** of events that are also
7//! delivered to the [`SessionHandler`](crate::handler::SessionHandler).
8//! Subscribers receive a clone of every event but cannot influence
9//! permission decisions, tool results, or anything else that requires
10//! returning a [`HandlerResponse`](crate::handler::HandlerResponse).
11//!
12//! # Async iteration
13//!
14//! The subscription types implement [`tokio_stream::Stream`], so consumers
15//! can use adapter combinators from [`tokio_stream::StreamExt`] or
16//! `futures::StreamExt` (filtering, mapping, batching, racing with
17//! `tokio::select!`, etc.) without learning the SDK's internal channel
18//! choice. A simple `while let Ok(event) = sub.recv().await { ... }` loop
19//! also works for callers who don't need the [`Stream`](tokio_stream::Stream)
20//! surface.
21//!
22//! # Lag policy
23//!
24//! Each subscriber maintains its own internal queue. If a consumer cannot
25//! keep up, the oldest events are dropped and the next call yields
26//! [`Lagged`] reporting how many events were skipped. Slow subscribers do
27//! not block the producer.
28
29use std::pin::Pin;
30use std::task::{Context, Poll};
31
32use tokio::sync::broadcast::Receiver;
33use tokio_stream::wrappers::BroadcastStream;
34use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
35use tokio_stream::{Stream, StreamExt as _};
36
37use crate::types::{SessionEvent, SessionLifecycleEvent};
38
39/// The subscription fell behind the producer.
40///
41/// Reports the number of events that were dropped from this subscriber's
42/// queue because the consumer didn't keep up. The subscription continues
43/// after this error, starting from the next live event — callers who care
44/// about lag should match on it and decide whether to resync, re-fetch, or
45/// log and continue.
46#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
47#[error("subscription lagged behind by {0} events")]
48pub struct Lagged(u64);
49
50impl Lagged {
51    /// Number of events skipped before this consumer could read them.
52    pub fn skipped(&self) -> u64 {
53        self.0
54    }
55}
56
57/// Error returned by [`EventSubscription::recv`] and
58/// [`LifecycleSubscription::recv`].
59#[derive(Debug, thiserror::Error)]
60#[non_exhaustive]
61pub enum RecvError {
62    /// The producer is gone — the session has shut down or the client has
63    /// stopped. No further events will be delivered.
64    #[error("subscription closed")]
65    Closed,
66
67    /// The subscriber fell behind. See [`Lagged`].
68    #[error(transparent)]
69    Lagged(#[from] Lagged),
70}
71
72macro_rules! define_subscription {
73    (
74        $(#[$meta:meta])*
75        $name:ident, $item:ty $(,)?
76    ) => {
77        $(#[$meta])*
78        #[must_use = "subscriptions are inert until polled"]
79        pub struct $name {
80            inner: BroadcastStream<$item>,
81        }
82
83        impl $name {
84            pub(crate) fn new(rx: Receiver<$item>) -> Self {
85                Self {
86                    inner: BroadcastStream::new(rx),
87                }
88            }
89
90            /// Receive the next event.
91            ///
92            /// Returns:
93            ///
94            /// - `Ok(event)` for the next delivered event.
95            /// - `Err(`[`RecvError::Lagged`]`)` if the subscriber fell behind;
96            ///   call `recv` again to continue from the next live event.
97            /// - `Err(`[`RecvError::Closed`]`)` once the producer is gone.
98            ///
99            /// # Cancel safety
100            ///
101            /// **Cancel-safe.** Wraps a `tokio::sync::broadcast::Receiver`
102            /// via `BroadcastStream`; both are cancel-safe by design.
103            /// Dropping the future before completion is harmless — events
104            /// already buffered for this subscriber remain available on
105            /// the next `recv` call.
106            pub async fn recv(&mut self) -> Result<$item, RecvError> {
107                match self.inner.next().await {
108                    Some(Ok(event)) => Ok(event),
109                    Some(Err(BroadcastStreamRecvError::Lagged(n))) => {
110                        Err(RecvError::Lagged(Lagged(n)))
111                    }
112                    None => Err(RecvError::Closed),
113                }
114            }
115        }
116
117        impl Stream for $name {
118            type Item = Result<$item, Lagged>;
119
120            fn poll_next(
121                mut self: Pin<&mut Self>,
122                cx: &mut Context<'_>,
123            ) -> Poll<Option<Self::Item>> {
124                match Pin::new(&mut self.inner).poll_next(cx) {
125                    Poll::Ready(Some(Ok(event))) => Poll::Ready(Some(Ok(event))),
126                    Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) => {
127                        Poll::Ready(Some(Err(Lagged(n))))
128                    }
129                    Poll::Ready(None) => Poll::Ready(None),
130                    Poll::Pending => Poll::Pending,
131                }
132            }
133        }
134    };
135}
136
137define_subscription! {
138    /// Subscription to runtime events for a single
139    /// [`Session`](crate::session::Session).
140    ///
141    /// Created by [`Session::subscribe`](crate::session::Session::subscribe).
142    /// Implements [`Stream`] yielding `Result<SessionEvent, Lagged>`.
143    /// Drop the value to unsubscribe; there is no separate cancel handle.
144    EventSubscription, SessionEvent
145}
146
147define_subscription! {
148    /// Subscription to lifecycle events on a [`Client`](crate::Client).
149    ///
150    /// Created by
151    /// [`Client::subscribe_lifecycle`](crate::Client::subscribe_lifecycle).
152    /// Implements [`Stream`] yielding `Result<SessionLifecycleEvent, Lagged>`.
153    /// Drop the value to unsubscribe; there is no separate cancel handle.
154    LifecycleSubscription, SessionLifecycleEvent
155}
156
157#[cfg(test)]
158mod tests {
159    use tokio::sync::broadcast;
160
161    use super::*;
162
163    fn make_event(id: &str) -> SessionEvent {
164        SessionEvent {
165            id: id.into(),
166            timestamp: "2025-01-01T00:00:00Z".into(),
167            parent_id: None,
168            ephemeral: None,
169            agent_id: None,
170            debug_cli_received_at_ms: None,
171            debug_ws_forwarded_at_ms: None,
172            event_type: "noop".into(),
173            data: serde_json::json!({}),
174        }
175    }
176
177    #[tokio::test]
178    async fn recv_yields_then_closes_on_drop_sender() {
179        let (tx, rx) = broadcast::channel(8);
180        let mut sub = EventSubscription::new(rx);
181        tx.send(make_event("a")).unwrap();
182        tx.send(make_event("b")).unwrap();
183        drop(tx);
184
185        assert_eq!(sub.recv().await.unwrap().id, "a");
186        assert_eq!(sub.recv().await.unwrap().id, "b");
187        assert!(matches!(sub.recv().await, Err(RecvError::Closed)));
188    }
189
190    #[tokio::test]
191    async fn recv_surfaces_lag() {
192        let (tx, rx) = broadcast::channel(2);
193        let mut sub = EventSubscription::new(rx);
194        for id in ["a", "b", "c", "d"] {
195            tx.send(make_event(id)).unwrap();
196        }
197        match sub.recv().await {
198            Err(RecvError::Lagged(l)) => assert_eq!(l.skipped(), 2),
199            other => panic!("expected Lagged, got {other:?}"),
200        }
201        // Subscription continues with the live tail.
202        assert_eq!(sub.recv().await.unwrap().id, "c");
203        assert_eq!(sub.recv().await.unwrap().id, "d");
204    }
205
206    #[tokio::test]
207    async fn stream_impl_matches_recv_semantics() {
208        let (tx, rx) = broadcast::channel(8);
209        let mut sub = EventSubscription::new(rx);
210        tx.send(make_event("a")).unwrap();
211        drop(tx);
212
213        // poll_next path
214        let next = sub.next().await;
215        assert_eq!(next.unwrap().unwrap().id, "a");
216        assert!(sub.next().await.is_none());
217    }
218}