pulse_client/events.rs
1//! SSE event-stream consumer — `client.events().stream()` (B-098 Phase 7).
2//!
3//! Returns a [`Stream`] of parsed events, consumed via [`StreamExt::next`].
4//! Cancellation is automatic: drop the stream and the underlying connection
5//! closes.
6//!
7//! # Example
8//!
9//! ```no_run
10//! use futures_util::StreamExt;
11//! use pulse_client::PulseClient;
12//!
13//! # async fn run() -> Result<(), pulse_client::PulseError> {
14//! let client = PulseClient::builder()
15//! .base_url("http://localhost:9090")
16//! .token("ey...")
17//! .build()?;
18//!
19//! let mut stream = client.events().stream().await?;
20//! while let Some(event) = stream.next().await {
21//! let event = event?;
22//! println!("{}", event["type"]);
23//! }
24//! # Ok(())
25//! # }
26//! ```
27
28use std::pin::Pin;
29use std::task::{Context, Poll};
30
31use bytes::BytesMut;
32use futures_core::Stream;
33use futures_util::StreamExt;
34use reqwest::header::{ACCEPT, AUTHORIZATION, CACHE_CONTROL};
35use reqwest::Method;
36use serde_json::Value;
37
38use crate::client::PulseClient;
39use crate::error::PulseError;
40
41const PATH: &str = "/api/pulse/events/stream";
42
43/// `client.events()` — accessor for the SSE event stream.
44pub struct EventsResource<'c> {
45 pub(crate) client: &'c PulseClient,
46}
47
48impl<'c> EventsResource<'c> {
49 /// Subscribes to `GET /api/pulse/events/stream` and returns a [`Stream`]
50 /// of parsed events.
51 ///
52 /// The future resolves once the HTTP response head is received (so auth
53 /// errors surface immediately rather than on the first poll). After
54 /// that, each call to [`StreamExt::next`] yields the next event as it
55 /// arrives on the wire.
56 pub async fn stream(self) -> Result<EventsStream, PulseError> {
57 let token = self.client.token().ok_or_else(|| PulseError::NoToken {
58 path: PATH.to_string(),
59 })?;
60 if token.is_empty() {
61 return Err(PulseError::NoToken {
62 path: PATH.to_string(),
63 });
64 }
65
66 let url = format!("{}{PATH}", self.client.inner.base_url);
67 // Note: we do NOT set a per-request `.timeout()` here — that would
68 // override the client default. The Client's configured timeout
69 // applies as the upper bound for the WHOLE stream. For long-running
70 // subscriptions, build the client with a generous timeout (or
71 // supply a custom `http_client` without one) via the builder.
72 let response = self
73 .client
74 .inner
75 .http
76 .get(url)
77 .header(AUTHORIZATION, format!("Bearer {token}"))
78 .header(ACCEPT, "text/event-stream")
79 .header(CACHE_CONTROL, "no-cache")
80 .send()
81 .await?;
82
83 let status = response.status();
84 if !status.is_success() {
85 let bytes = response.bytes().await?;
86 let body = if bytes.is_empty() {
87 None
88 } else {
89 match serde_json::from_slice::<Value>(&bytes) {
90 Ok(v) => Some(v),
91 Err(_) => {
92 let raw = String::from_utf8_lossy(&bytes);
93 Some(serde_json::json!({ "error": raw.to_string() }))
94 }
95 }
96 };
97 return Err(match status.as_u16() {
98 401 => PulseError::Auth {
99 path: PATH.to_string(),
100 body,
101 },
102 other => PulseError::Api {
103 status: other,
104 path: PATH.to_string(),
105 body,
106 },
107 });
108 }
109
110 Ok(EventsStream {
111 inner: Box::pin(response.bytes_stream()),
112 buffer: BytesMut::with_capacity(4096),
113 data_lines: Vec::new(),
114 done: false,
115 })
116 }
117
118 /// `GET /api/pulse/iq/agents/{affecting_state}/state/replay/{key}?from=&to=&limit=`
119 /// — B-113 state-change replay.
120 ///
121 /// Returns the ordered list of changes that touched a state key between
122 /// two instants. `affecting_state` is the agent whose state store to
123 /// inspect; `key` is the state key. `from` / `to` accept the same specs
124 /// as `iq().get_as_of(...)` (`now`, `-1h`, ISO-8601, epoch millis);
125 /// `limit` caps the number of changes (server default 100). Each change
126 /// carries `timestamp`, `changeType` (`PUT` / `DELETE`), the resulting
127 /// `value`, and `eventId` when known. The server's enclosing
128 /// `{..., changes:[...]}` envelope is unwrapped — only the `changes`
129 /// array is returned (empty when the response omits it).
130 ///
131 /// ```no_run
132 /// # use pulse_client::PulseClient;
133 /// # async fn run(client: &PulseClient) -> Result<(), pulse_client::PulseError> {
134 /// let changes = client
135 /// .events()
136 /// .replay("user-sessions", "u42", "-1h", "now", 100)
137 /// .await?;
138 /// # Ok(())
139 /// # }
140 /// ```
141 pub async fn replay(
142 self,
143 affecting_state: &str,
144 key: &str,
145 from: &str,
146 to: &str,
147 limit: u32,
148 ) -> Result<Vec<Value>, PulseError> {
149 let path = format!(
150 "/api/pulse/iq/agents/{}/state/replay/{}?from={}&to={}&limit={}",
151 encode_segment(affecting_state),
152 encode_segment(key),
153 encode_segment(from),
154 encode_segment(to),
155 limit,
156 );
157 let result = self
158 .client
159 .request(Method::GET, &path, None::<&()>, true)
160 .await?;
161 Ok(result
162 .get("changes")
163 .and_then(Value::as_array)
164 .cloned()
165 .unwrap_or_default())
166 }
167}
168
169/// Percent-encodes a path/query segment — same aggressive semantics as the
170/// IQ resource so a key like `"user:123/orders"` produces identical URL
171/// bytes across the Pulse SDKs.
172fn encode_segment(s: &str) -> String {
173 let mut out = String::with_capacity(s.len());
174 for &b in s.as_bytes() {
175 match b {
176 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
177 out.push(b as char);
178 }
179 _ => {
180 out.push('%');
181 out.push(HEX[(b >> 4) as usize] as char);
182 out.push(HEX[(b & 0xF) as usize] as char);
183 }
184 }
185 }
186 out
187}
188
189const HEX: &[u8; 16] = b"0123456789ABCDEF";
190
191/// `Stream<Item = Result<Value, PulseError>>` — yields parsed SSE events.
192///
193/// Created by [`EventsResource::stream`]. Drop to cancel the subscription.
194pub struct EventsStream {
195 inner: Pin<Box<dyn Stream<Item = reqwest::Result<bytes::Bytes>> + Send>>,
196 buffer: BytesMut,
197 data_lines: Vec<String>,
198 done: bool,
199}
200
201impl Stream for EventsStream {
202 type Item = Result<Value, PulseError>;
203
204 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
205 if self.done {
206 return Poll::Ready(None);
207 }
208
209 loop {
210 // First, try to extract an event from the existing buffer.
211 if let Some(event) = self.try_parse_buffered_event() {
212 return Poll::Ready(Some(Ok(event)));
213 }
214
215 // Need more bytes from the wire.
216 match self.inner.poll_next_unpin(cx) {
217 Poll::Pending => return Poll::Pending,
218 Poll::Ready(None) => {
219 self.done = true;
220 return Poll::Ready(None);
221 }
222 Poll::Ready(Some(Err(e))) => {
223 self.done = true;
224 return Poll::Ready(Some(Err(PulseError::Transport(e))));
225 }
226 Poll::Ready(Some(Ok(chunk))) => {
227 self.buffer.extend_from_slice(&chunk);
228 // Loop back to try parsing the new bytes.
229 }
230 }
231 }
232 }
233}
234
235impl EventsStream {
236 /// Walks `self.buffer` looking for the next complete event (blank-line
237 /// boundary). Removes consumed bytes; returns `None` if no full event
238 /// is buffered yet.
239 fn try_parse_buffered_event(&mut self) -> Option<Value> {
240 loop {
241 let newline_pos = self.buffer.iter().position(|&b| b == b'\n')?;
242 // Take the line, including the trailing \n, out of the buffer.
243 let line_bytes = self.buffer.split_to(newline_pos + 1);
244 // Strip trailing \n and optional \r.
245 let line_len = if line_bytes.len() >= 2 && line_bytes[line_bytes.len() - 2] == b'\r' {
246 line_bytes.len() - 2
247 } else {
248 line_bytes.len() - 1
249 };
250 let line = std::str::from_utf8(&line_bytes[..line_len]).unwrap_or("");
251
252 if line.is_empty() {
253 // Event boundary
254 if !self.data_lines.is_empty() {
255 let payload = self.data_lines.join("\n");
256 self.data_lines.clear();
257 return Some(match serde_json::from_str::<Value>(&payload) {
258 Ok(v) => v,
259 Err(_) => serde_json::json!({ "data": payload }),
260 });
261 }
262 continue;
263 }
264 if line.starts_with(':') {
265 continue; // SSE comment / keep-alive
266 }
267 if let Some(rest) = line.strip_prefix("data:") {
268 let value = rest.strip_prefix(' ').unwrap_or(rest);
269 self.data_lines.push(value.to_string());
270 }
271 // event:/id:/retry: consumed but not surfaced.
272 }
273 }
274}
275
276impl std::fmt::Debug for EventsResource<'_> {
277 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 f.debug_struct("EventsResource").finish()
279 }
280}
281
282impl std::fmt::Debug for EventsStream {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 f.debug_struct("EventsStream")
285 .field("done", &self.done)
286 .field("buffered_lines", &self.data_lines.len())
287 .finish()
288 }
289}