Skip to main content

pulse_client/
events.rs

1//! SSE event-stream consumer — `client.events().stream()` (B-098 Phase 7).
2//!
3//! Returns a [`Stream`] of parsed events, consumed via [`StreamExt::next`].
4//! Cancellation is automatic: drop the stream and the underlying connection
5//! closes.
6//!
7//! # Example
8//!
9//! ```no_run
10//! use futures_util::StreamExt;
11//! use pulse_client::PulseClient;
12//!
13//! # async fn run() -> Result<(), pulse_client::PulseError> {
14//! let client = PulseClient::builder()
15//!     .base_url("http://localhost:9090")
16//!     .token("ey...")
17//!     .build()?;
18//!
19//! let mut stream = client.events().stream().await?;
20//! while let Some(event) = stream.next().await {
21//!     let event = event?;
22//!     println!("{}", event["type"]);
23//! }
24//! # Ok(())
25//! # }
26//! ```
27
28use std::pin::Pin;
29use std::task::{Context, Poll};
30
31use bytes::BytesMut;
32use futures_core::Stream;
33use futures_util::StreamExt;
34use reqwest::header::{ACCEPT, AUTHORIZATION, CACHE_CONTROL};
35use serde_json::Value;
36
37use crate::client::PulseClient;
38use crate::error::PulseError;
39
40const PATH: &str = "/api/pulse/events/stream";
41
42/// `client.events()` — accessor for the SSE event stream.
43pub struct EventsResource<'c> {
44    pub(crate) client: &'c PulseClient,
45}
46
47impl<'c> EventsResource<'c> {
48    /// Subscribes to `GET /api/pulse/events/stream` and returns a [`Stream`]
49    /// of parsed events.
50    ///
51    /// The future resolves once the HTTP response head is received (so auth
52    /// errors surface immediately rather than on the first poll). After
53    /// that, each call to [`StreamExt::next`] yields the next event as it
54    /// arrives on the wire.
55    pub async fn stream(self) -> Result<EventsStream, PulseError> {
56        let token = self.client.token().ok_or_else(|| PulseError::NoToken {
57            path: PATH.to_string(),
58        })?;
59        if token.is_empty() {
60            return Err(PulseError::NoToken {
61                path: PATH.to_string(),
62            });
63        }
64
65        let url = format!("{}{PATH}", self.client.inner.base_url);
66        // Note: we do NOT set a per-request `.timeout()` here — that would
67        // override the client default. The Client's configured timeout
68        // applies as the upper bound for the WHOLE stream. For long-running
69        // subscriptions, build the client with a generous timeout (or
70        // supply a custom `http_client` without one) via the builder.
71        let response = self
72            .client
73            .inner
74            .http
75            .get(url)
76            .header(AUTHORIZATION, format!("Bearer {token}"))
77            .header(ACCEPT, "text/event-stream")
78            .header(CACHE_CONTROL, "no-cache")
79            .send()
80            .await?;
81
82        let status = response.status();
83        if !status.is_success() {
84            let bytes = response.bytes().await?;
85            let body = if bytes.is_empty() {
86                None
87            } else {
88                match serde_json::from_slice::<Value>(&bytes) {
89                    Ok(v) => Some(v),
90                    Err(_) => {
91                        let raw = String::from_utf8_lossy(&bytes);
92                        Some(serde_json::json!({ "error": raw.to_string() }))
93                    }
94                }
95            };
96            return Err(match status.as_u16() {
97                401 => PulseError::Auth {
98                    path: PATH.to_string(),
99                    body,
100                },
101                other => PulseError::Api {
102                    status: other,
103                    path: PATH.to_string(),
104                    body,
105                },
106            });
107        }
108
109        Ok(EventsStream {
110            inner: Box::pin(response.bytes_stream()),
111            buffer: BytesMut::with_capacity(4096),
112            data_lines: Vec::new(),
113            done: false,
114        })
115    }
116}
117
118/// `Stream<Item = Result<Value, PulseError>>` — yields parsed SSE events.
119///
120/// Created by [`EventsResource::stream`]. Drop to cancel the subscription.
121pub struct EventsStream {
122    inner: Pin<Box<dyn Stream<Item = reqwest::Result<bytes::Bytes>> + Send>>,
123    buffer: BytesMut,
124    data_lines: Vec<String>,
125    done: bool,
126}
127
128impl Stream for EventsStream {
129    type Item = Result<Value, PulseError>;
130
131    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
132        if self.done {
133            return Poll::Ready(None);
134        }
135
136        loop {
137            // First, try to extract an event from the existing buffer.
138            if let Some(event) = self.try_parse_buffered_event() {
139                return Poll::Ready(Some(Ok(event)));
140            }
141
142            // Need more bytes from the wire.
143            match self.inner.poll_next_unpin(cx) {
144                Poll::Pending => return Poll::Pending,
145                Poll::Ready(None) => {
146                    self.done = true;
147                    return Poll::Ready(None);
148                }
149                Poll::Ready(Some(Err(e))) => {
150                    self.done = true;
151                    return Poll::Ready(Some(Err(PulseError::Transport(e))));
152                }
153                Poll::Ready(Some(Ok(chunk))) => {
154                    self.buffer.extend_from_slice(&chunk);
155                    // Loop back to try parsing the new bytes.
156                }
157            }
158        }
159    }
160}
161
162impl EventsStream {
163    /// Walks `self.buffer` looking for the next complete event (blank-line
164    /// boundary). Removes consumed bytes; returns `None` if no full event
165    /// is buffered yet.
166    fn try_parse_buffered_event(&mut self) -> Option<Value> {
167        loop {
168            let newline_pos = self.buffer.iter().position(|&b| b == b'\n')?;
169            // Take the line, including the trailing \n, out of the buffer.
170            let line_bytes = self.buffer.split_to(newline_pos + 1);
171            // Strip trailing \n and optional \r.
172            let line_len = if line_bytes.len() >= 2 && line_bytes[line_bytes.len() - 2] == b'\r' {
173                line_bytes.len() - 2
174            } else {
175                line_bytes.len() - 1
176            };
177            let line = std::str::from_utf8(&line_bytes[..line_len]).unwrap_or("");
178
179            if line.is_empty() {
180                // Event boundary
181                if !self.data_lines.is_empty() {
182                    let payload = self.data_lines.join("\n");
183                    self.data_lines.clear();
184                    return Some(match serde_json::from_str::<Value>(&payload) {
185                        Ok(v) => v,
186                        Err(_) => serde_json::json!({ "data": payload }),
187                    });
188                }
189                continue;
190            }
191            if line.starts_with(':') {
192                continue; // SSE comment / keep-alive
193            }
194            if let Some(rest) = line.strip_prefix("data:") {
195                let value = rest.strip_prefix(' ').unwrap_or(rest);
196                self.data_lines.push(value.to_string());
197            }
198            // event:/id:/retry: consumed but not surfaced.
199        }
200    }
201}
202
203impl std::fmt::Debug for EventsResource<'_> {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        f.debug_struct("EventsResource").finish()
206    }
207}
208
209impl std::fmt::Debug for EventsStream {
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        f.debug_struct("EventsStream")
212            .field("done", &self.done)
213            .field("buffered_lines", &self.data_lines.len())
214            .finish()
215    }
216}