github_copilot_sdk/subscription.rs
1//! Subscription handles for observing session and lifecycle events.
2//!
3//! Returned by [`Session::subscribe`](crate::session::Session::subscribe) and
4//! [`Client::subscribe_lifecycle`](crate::Client::subscribe_lifecycle).
5//!
6//! Each subscription is an opt-in **observer** of events that are also
7//! delivered to the [`SessionHandler`](crate::handler::SessionHandler).
8//! Subscribers receive a clone of every event but cannot influence
9//! permission decisions, tool results, or anything else that requires
10//! returning a [`HandlerResponse`](crate::handler::HandlerResponse).
11//!
12//! # Async iteration
13//!
14//! The subscription types implement [`tokio_stream::Stream`], so consumers
15//! can use adapter combinators from [`tokio_stream::StreamExt`] or
16//! `futures::StreamExt` (filtering, mapping, batching, racing with
17//! `tokio::select!`, etc.) without learning the SDK's internal channel
18//! choice. A simple `while let Ok(event) = sub.recv().await { ... }` loop
19//! also works for callers who don't need the [`Stream`](tokio_stream::Stream)
20//! surface.
21//!
22//! # Lag policy
23//!
24//! Each subscriber maintains its own internal queue. If a consumer cannot
25//! keep up, the oldest events are dropped and the next call yields
26//! [`Lagged`] reporting how many events were skipped. Slow subscribers do
27//! not block the producer.
28
29use std::pin::Pin;
30use std::task::{Context, Poll};
31
32use tokio::sync::broadcast::Receiver;
33use tokio_stream::wrappers::BroadcastStream;
34use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
35use tokio_stream::{Stream, StreamExt as _};
36
37use crate::types::{SessionEvent, SessionLifecycleEvent};
38
39/// The subscription fell behind the producer.
40///
41/// Reports the number of events that were dropped from this subscriber's
42/// queue because the consumer didn't keep up. The subscription continues
43/// after this error, starting from the next live event — callers who care
44/// about lag should match on it and decide whether to resync, re-fetch, or
45/// log and continue.
46#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
47#[error("subscription lagged behind by {0} events")]
48pub struct Lagged(u64);
49
50impl Lagged {
51 /// Number of events skipped before this consumer could read them.
52 pub fn skipped(&self) -> u64 {
53 self.0
54 }
55}
56
57/// Error returned by [`EventSubscription::recv`] and
58/// [`LifecycleSubscription::recv`].
59#[derive(Debug, thiserror::Error)]
60#[non_exhaustive]
61pub enum RecvError {
62 /// The producer is gone — the session has shut down or the client has
63 /// stopped. No further events will be delivered.
64 #[error("subscription closed")]
65 Closed,
66
67 /// The subscriber fell behind. See [`Lagged`].
68 #[error(transparent)]
69 Lagged(#[from] Lagged),
70}
71
72macro_rules! define_subscription {
73 (
74 $(#[$meta:meta])*
75 $name:ident, $item:ty $(,)?
76 ) => {
77 $(#[$meta])*
78 #[must_use = "subscriptions are inert until polled"]
79 pub struct $name {
80 inner: BroadcastStream<$item>,
81 }
82
83 impl $name {
84 pub(crate) fn new(rx: Receiver<$item>) -> Self {
85 Self {
86 inner: BroadcastStream::new(rx),
87 }
88 }
89
90 /// Receive the next event.
91 ///
92 /// Returns:
93 ///
94 /// - `Ok(event)` for the next delivered event.
95 /// - `Err(`[`RecvError::Lagged`]`)` if the subscriber fell behind;
96 /// call `recv` again to continue from the next live event.
97 /// - `Err(`[`RecvError::Closed`]`)` once the producer is gone.
98 ///
99 /// # Cancel safety
100 ///
101 /// **Cancel-safe.** Wraps a `tokio::sync::broadcast::Receiver`
102 /// via `BroadcastStream`; both are cancel-safe by design.
103 /// Dropping the future before completion is harmless — events
104 /// already buffered for this subscriber remain available on
105 /// the next `recv` call.
106 pub async fn recv(&mut self) -> Result<$item, RecvError> {
107 match self.inner.next().await {
108 Some(Ok(event)) => Ok(event),
109 Some(Err(BroadcastStreamRecvError::Lagged(n))) => {
110 Err(RecvError::Lagged(Lagged(n)))
111 }
112 None => Err(RecvError::Closed),
113 }
114 }
115 }
116
117 impl Stream for $name {
118 type Item = Result<$item, Lagged>;
119
120 fn poll_next(
121 mut self: Pin<&mut Self>,
122 cx: &mut Context<'_>,
123 ) -> Poll<Option<Self::Item>> {
124 match Pin::new(&mut self.inner).poll_next(cx) {
125 Poll::Ready(Some(Ok(event))) => Poll::Ready(Some(Ok(event))),
126 Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) => {
127 Poll::Ready(Some(Err(Lagged(n))))
128 }
129 Poll::Ready(None) => Poll::Ready(None),
130 Poll::Pending => Poll::Pending,
131 }
132 }
133 }
134 };
135}
136
137define_subscription! {
138 /// Subscription to runtime events for a single
139 /// [`Session`](crate::session::Session).
140 ///
141 /// Created by [`Session::subscribe`](crate::session::Session::subscribe).
142 /// Implements [`Stream`] yielding `Result<SessionEvent, Lagged>`.
143 /// Drop the value to unsubscribe; there is no separate cancel handle.
144 EventSubscription, SessionEvent
145}
146
147define_subscription! {
148 /// Subscription to lifecycle events on a [`Client`](crate::Client).
149 ///
150 /// Created by
151 /// [`Client::subscribe_lifecycle`](crate::Client::subscribe_lifecycle).
152 /// Implements [`Stream`] yielding `Result<SessionLifecycleEvent, Lagged>`.
153 /// Drop the value to unsubscribe; there is no separate cancel handle.
154 LifecycleSubscription, SessionLifecycleEvent
155}
156
157#[cfg(test)]
158mod tests {
159 use tokio::sync::broadcast;
160
161 use super::*;
162
163 fn make_event(id: &str) -> SessionEvent {
164 SessionEvent {
165 id: id.into(),
166 timestamp: "2025-01-01T00:00:00Z".into(),
167 parent_id: None,
168 ephemeral: None,
169 agent_id: None,
170 debug_cli_received_at_ms: None,
171 debug_ws_forwarded_at_ms: None,
172 event_type: "noop".into(),
173 data: serde_json::json!({}),
174 }
175 }
176
177 #[tokio::test]
178 async fn recv_yields_then_closes_on_drop_sender() {
179 let (tx, rx) = broadcast::channel(8);
180 let mut sub = EventSubscription::new(rx);
181 tx.send(make_event("a")).unwrap();
182 tx.send(make_event("b")).unwrap();
183 drop(tx);
184
185 assert_eq!(sub.recv().await.unwrap().id, "a");
186 assert_eq!(sub.recv().await.unwrap().id, "b");
187 assert!(matches!(sub.recv().await, Err(RecvError::Closed)));
188 }
189
190 #[tokio::test]
191 async fn recv_surfaces_lag() {
192 let (tx, rx) = broadcast::channel(2);
193 let mut sub = EventSubscription::new(rx);
194 for id in ["a", "b", "c", "d"] {
195 tx.send(make_event(id)).unwrap();
196 }
197 match sub.recv().await {
198 Err(RecvError::Lagged(l)) => assert_eq!(l.skipped(), 2),
199 other => panic!("expected Lagged, got {other:?}"),
200 }
201 // Subscription continues with the live tail.
202 assert_eq!(sub.recv().await.unwrap().id, "c");
203 assert_eq!(sub.recv().await.unwrap().id, "d");
204 }
205
206 #[tokio::test]
207 async fn stream_impl_matches_recv_semantics() {
208 let (tx, rx) = broadcast::channel(8);
209 let mut sub = EventSubscription::new(rx);
210 tx.send(make_event("a")).unwrap();
211 drop(tx);
212
213 // poll_next path
214 let next = sub.next().await;
215 assert_eq!(next.unwrap().unwrap().id, "a");
216 assert!(sub.next().await.is_none());
217 }
218}