Skip to main content

pulse_client/
events.rs

1//! SSE event-stream consumer — `client.events().stream()` (B-098 Phase 7).
2//!
3//! Returns a [`Stream`] of parsed events, consumed via [`StreamExt::next`].
4//! Cancellation is automatic: drop the stream and the underlying connection
5//! closes.
6//!
7//! # Example
8//!
9//! ```no_run
10//! use futures_util::StreamExt;
11//! use pulse_client::PulseClient;
12//!
13//! # async fn run() -> Result<(), pulse_client::PulseError> {
14//! let client = PulseClient::builder()
15//!     .base_url("http://localhost:9090")
16//!     .token("ey...")
17//!     .build()?;
18//!
19//! let mut stream = client.events().stream().await?;
20//! while let Some(event) = stream.next().await {
21//!     let event = event?;
22//!     println!("{}", event["type"]);
23//! }
24//! # Ok(())
25//! # }
26//! ```
27
28use std::pin::Pin;
29use std::task::{Context, Poll};
30
31use bytes::BytesMut;
32use futures_core::Stream;
33use futures_util::StreamExt;
34use reqwest::header::{ACCEPT, AUTHORIZATION, CACHE_CONTROL};
35use reqwest::Method;
36use serde_json::Value;
37
38use crate::client::PulseClient;
39use crate::error::PulseError;
40
41const PATH: &str = "/api/pulse/events/stream";
42
43/// `client.events()` — accessor for the SSE event stream.
44pub struct EventsResource<'c> {
45    pub(crate) client: &'c PulseClient,
46}
47
48impl<'c> EventsResource<'c> {
49    /// Subscribes to `GET /api/pulse/events/stream` and returns a [`Stream`]
50    /// of parsed events.
51    ///
52    /// The future resolves once the HTTP response head is received (so auth
53    /// errors surface immediately rather than on the first poll). After
54    /// that, each call to [`StreamExt::next`] yields the next event as it
55    /// arrives on the wire.
56    pub async fn stream(self) -> Result<EventsStream, PulseError> {
57        let token = self.client.token().ok_or_else(|| PulseError::NoToken {
58            path: PATH.to_string(),
59        })?;
60        if token.is_empty() {
61            return Err(PulseError::NoToken {
62                path: PATH.to_string(),
63            });
64        }
65
66        let url = format!("{}{PATH}", self.client.inner.base_url);
67        // Note: we do NOT set a per-request `.timeout()` here — that would
68        // override the client default. The Client's configured timeout
69        // applies as the upper bound for the WHOLE stream. For long-running
70        // subscriptions, build the client with a generous timeout (or
71        // supply a custom `http_client` without one) via the builder.
72        let response = self
73            .client
74            .inner
75            .http
76            .get(url)
77            .header(AUTHORIZATION, format!("Bearer {token}"))
78            .header(ACCEPT, "text/event-stream")
79            .header(CACHE_CONTROL, "no-cache")
80            .send()
81            .await?;
82
83        let status = response.status();
84        if !status.is_success() {
85            let bytes = response.bytes().await?;
86            let body = if bytes.is_empty() {
87                None
88            } else {
89                match serde_json::from_slice::<Value>(&bytes) {
90                    Ok(v) => Some(v),
91                    Err(_) => {
92                        let raw = String::from_utf8_lossy(&bytes);
93                        Some(serde_json::json!({ "error": raw.to_string() }))
94                    }
95                }
96            };
97            return Err(match status.as_u16() {
98                401 => PulseError::Auth {
99                    path: PATH.to_string(),
100                    body,
101                },
102                other => PulseError::Api {
103                    status: other,
104                    path: PATH.to_string(),
105                    body,
106                },
107            });
108        }
109
110        Ok(EventsStream {
111            inner: Box::pin(response.bytes_stream()),
112            buffer: BytesMut::with_capacity(4096),
113            data_lines: Vec::new(),
114            done: false,
115        })
116    }
117
118    /// `GET /api/pulse/iq/agents/{affecting_state}/state/replay/{key}?from=&to=&limit=`
119    /// — B-113 state-change replay.
120    ///
121    /// Returns the ordered list of changes that touched a state key between
122    /// two instants. `affecting_state` is the agent whose state store to
123    /// inspect; `key` is the state key. `from` / `to` accept the same specs
124    /// as `iq().get_as_of(...)` (`now`, `-1h`, ISO-8601, epoch millis);
125    /// `limit` caps the number of changes (server default 100). Each change
126    /// carries `timestamp`, `changeType` (`PUT` / `DELETE`), the resulting
127    /// `value`, and `eventId` when known. The server's enclosing
128    /// `{..., changes:[...]}` envelope is unwrapped — only the `changes`
129    /// array is returned (empty when the response omits it).
130    ///
131    /// ```no_run
132    /// # use pulse_client::PulseClient;
133    /// # async fn run(client: &PulseClient) -> Result<(), pulse_client::PulseError> {
134    /// let changes = client
135    ///     .events()
136    ///     .replay("user-sessions", "u42", "-1h", "now", 100)
137    ///     .await?;
138    /// # Ok(())
139    /// # }
140    /// ```
141    pub async fn replay(
142        self,
143        affecting_state: &str,
144        key: &str,
145        from: &str,
146        to: &str,
147        limit: u32,
148    ) -> Result<Vec<Value>, PulseError> {
149        let path = format!(
150            "/api/pulse/iq/agents/{}/state/replay/{}?from={}&to={}&limit={}",
151            encode_segment(affecting_state),
152            encode_segment(key),
153            encode_segment(from),
154            encode_segment(to),
155            limit,
156        );
157        let result = self
158            .client
159            .request(Method::GET, &path, None::<&()>, true)
160            .await?;
161        Ok(result
162            .get("changes")
163            .and_then(Value::as_array)
164            .cloned()
165            .unwrap_or_default())
166    }
167}
168
169/// Percent-encodes a path/query segment — same aggressive semantics as the
170/// IQ resource so a key like `"user:123/orders"` produces identical URL
171/// bytes across the Pulse SDKs.
172fn encode_segment(s: &str) -> String {
173    let mut out = String::with_capacity(s.len());
174    for &b in s.as_bytes() {
175        match b {
176            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
177                out.push(b as char);
178            }
179            _ => {
180                out.push('%');
181                out.push(HEX[(b >> 4) as usize] as char);
182                out.push(HEX[(b & 0xF) as usize] as char);
183            }
184        }
185    }
186    out
187}
188
189const HEX: &[u8; 16] = b"0123456789ABCDEF";
190
191/// `Stream<Item = Result<Value, PulseError>>` — yields parsed SSE events.
192///
193/// Created by [`EventsResource::stream`]. Drop to cancel the subscription.
194pub struct EventsStream {
195    inner: Pin<Box<dyn Stream<Item = reqwest::Result<bytes::Bytes>> + Send>>,
196    buffer: BytesMut,
197    data_lines: Vec<String>,
198    done: bool,
199}
200
201impl Stream for EventsStream {
202    type Item = Result<Value, PulseError>;
203
204    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
205        if self.done {
206            return Poll::Ready(None);
207        }
208
209        loop {
210            // First, try to extract an event from the existing buffer.
211            if let Some(event) = self.try_parse_buffered_event() {
212                return Poll::Ready(Some(Ok(event)));
213            }
214
215            // Need more bytes from the wire.
216            match self.inner.poll_next_unpin(cx) {
217                Poll::Pending => return Poll::Pending,
218                Poll::Ready(None) => {
219                    self.done = true;
220                    return Poll::Ready(None);
221                }
222                Poll::Ready(Some(Err(e))) => {
223                    self.done = true;
224                    return Poll::Ready(Some(Err(PulseError::Transport(e))));
225                }
226                Poll::Ready(Some(Ok(chunk))) => {
227                    self.buffer.extend_from_slice(&chunk);
228                    // Loop back to try parsing the new bytes.
229                }
230            }
231        }
232    }
233}
234
235impl EventsStream {
236    /// Walks `self.buffer` looking for the next complete event (blank-line
237    /// boundary). Removes consumed bytes; returns `None` if no full event
238    /// is buffered yet.
239    fn try_parse_buffered_event(&mut self) -> Option<Value> {
240        loop {
241            let newline_pos = self.buffer.iter().position(|&b| b == b'\n')?;
242            // Take the line, including the trailing \n, out of the buffer.
243            let line_bytes = self.buffer.split_to(newline_pos + 1);
244            // Strip trailing \n and optional \r.
245            let line_len = if line_bytes.len() >= 2 && line_bytes[line_bytes.len() - 2] == b'\r' {
246                line_bytes.len() - 2
247            } else {
248                line_bytes.len() - 1
249            };
250            let line = std::str::from_utf8(&line_bytes[..line_len]).unwrap_or("");
251
252            if line.is_empty() {
253                // Event boundary
254                if !self.data_lines.is_empty() {
255                    let payload = self.data_lines.join("\n");
256                    self.data_lines.clear();
257                    return Some(match serde_json::from_str::<Value>(&payload) {
258                        Ok(v) => v,
259                        Err(_) => serde_json::json!({ "data": payload }),
260                    });
261                }
262                continue;
263            }
264            if line.starts_with(':') {
265                continue; // SSE comment / keep-alive
266            }
267            if let Some(rest) = line.strip_prefix("data:") {
268                let value = rest.strip_prefix(' ').unwrap_or(rest);
269                self.data_lines.push(value.to_string());
270            }
271            // event:/id:/retry: consumed but not surfaced.
272        }
273    }
274}
275
276impl std::fmt::Debug for EventsResource<'_> {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        f.debug_struct("EventsResource").finish()
279    }
280}
281
282impl std::fmt::Debug for EventsStream {
283    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284        f.debug_struct("EventsStream")
285            .field("done", &self.done)
286            .field("buffered_lines", &self.data_lines.len())
287            .finish()
288    }
289}