Skip to main content

stream_rs/
sse.rs

1//! A spec-compliant [Server-Sent Events] stream parser.
2//!
3//! This is a *push* parser: feed it arbitrary byte chunks as they arrive off
4//! the wire with [`SseParser::feed`], and it yields fully-formed [`SseEvent`]s
5//! whenever a blank line completes one. It implements the event-stream parsing
6//! algorithm from the WHATWG HTML specification:
7//!
8//! * Line terminators may be `\r\n`, `\n`, or a lone `\r`, and a terminator
9//!   may be split across two `feed` calls.
10//! * A leading U+FEFF byte order mark on the very first chunk is stripped.
11//! * Lines beginning with `:` are comments and are ignored.
12//! * A field is `name`, optionally followed by `:` and a value; a single
13//!   leading space after the colon is removed.
14//! * `data` fields accumulate, joined by `\n`; a trailing `\n` is *not* part of
15//!   the dispatched data.
16//! * `event` sets the event type, `id` sets the last event id (unless it
17//!   contains a NUL, in which case it is ignored), and `retry` sets the
18//!   reconnection time when the value is all ASCII digits.
19//! * Per spec, the *last event id* is **persistent**: once set it is reported
20//!   on every subsequent dispatched event until a new `id` field replaces it.
21//!   The `event`, `data` and `retry` buffers, by contrast, are reset after
22//!   each dispatch.
23//! * A blank line dispatches the buffered event. An event whose data buffer is
24//!   empty is not dispatched (per spec).
25//!
26//! [Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html
27
28use alloc::string::String;
29use alloc::vec::Vec;
30
31use alloc_helpers::take_string;
32
33mod alloc_helpers {
34    use alloc::string::String;
35
36    /// Replace `s` with an empty `String` and return the old contents.
37    #[inline]
38    pub fn take_string(s: &mut String) -> String {
39        core::mem::take(s)
40    }
41}
42
43/// A fully parsed Server-Sent Event ready to be dispatched.
44#[derive(Debug, Clone, PartialEq, Eq, Default)]
45pub struct SseEvent {
46    /// The event type, from the most recent `event:` field. `None` means the
47    /// default type (`"message"`).
48    pub event: Option<String>,
49    /// The event payload: every `data:` field joined with `\n`.
50    pub data: String,
51    /// The most recent `id:` field, if any was seen for this event.
52    pub id: Option<String>,
53    /// The reconnection time in milliseconds from a `retry:` field, if present.
54    pub retry: Option<u64>,
55}
56
57/// Incremental Server-Sent Events parser.
58///
59/// # Example
60///
61/// ```
62/// use stream_rs::sse::SseParser;
63///
64/// let mut p = SseParser::new();
65/// let mut events = Vec::new();
66/// // Note the chunk boundary splitting the CRLF and a multi-line data field.
67/// p.feed(b"event: greeting\r\ndata: hello\r", &mut events);
68/// p.feed(b"\ndata: world\r\n\r\n", &mut events);
69///
70/// assert_eq!(events.len(), 1);
71/// assert_eq!(events[0].event.as_deref(), Some("greeting"));
72/// assert_eq!(events[0].data, "hello\nworld");
73/// ```
74#[derive(Debug, Default)]
75pub struct SseParser {
76    /// Bytes received but not yet terminated by a newline.
77    line_buf: Vec<u8>,
78    /// True when the previous chunk ended on a `\r`, so a leading `\n` in the
79    /// next chunk must be swallowed (CRLF split across chunks).
80    pending_cr: bool,
81    /// Accumulated `data` for the in-progress event.
82    data: String,
83    /// Most recent `event` type for the in-progress event.
84    event_type: Option<String>,
85    /// The persistent *last event id*. Per the WHATWG spec this survives across
86    /// dispatches and is reported on every event until a new `id` field is seen.
87    last_id: Option<String>,
88    /// Most recent `retry` for the in-progress event.
89    retry: Option<u64>,
90    /// The persistent reconnection time (ms) from the most recent valid `retry`
91    /// field seen on the stream. Per the WHATWG spec the reconnection time is a
92    /// connection-level setting, so unlike `retry` it survives dispatches.
93    reconnection_time: Option<u64>,
94    /// Tracks the optional leading byte-order-mark handling. The BOM may be
95    /// split across `feed` calls, so we buffer up to the first three bytes of
96    /// the stream here until we can tell whether they are `EF BB BF`.
97    bom: BomState,
98}
99
100/// State machine for stripping a single optional leading UTF-8 BOM, robust to
101/// the BOM being split across `feed` calls.
102#[derive(Debug, Default)]
103enum BomState {
104    /// No stream bytes seen yet; still matching the BOM prefix.
105    #[default]
106    Start,
107    /// Seen a prefix of the BOM (`buf[..len]`) that is still a valid BOM prefix.
108    Partial {
109        /// The matched-so-far BOM bytes, length `len`.
110        buf: [u8; 3],
111        /// Number of valid bytes in `buf` (1 or 2).
112        len: usize,
113    },
114    /// BOM handling finished (either stripped or proven absent); pass bytes through.
115    Done,
116}
117
118/// The three bytes of a UTF-8 byte order mark.
119const BOM: [u8; 3] = [0xEF, 0xBB, 0xBF];
120
121impl SseParser {
122    /// Create a fresh parser.
123    #[must_use]
124    pub fn new() -> Self {
125        Self::default()
126    }
127
128    /// The persistent *last event id*.
129    ///
130    /// Per the WHATWG spec this survives across dispatches: once an `id` field
131    /// is seen it is reported on every subsequent event until replaced. This
132    /// getter exposes the current value so a caller can send it as the
133    /// `Last-Event-ID` header when reconnecting. Note that [`finish`](Self::finish)
134    /// resets the parser, clearing this — read it *before* calling `finish` if
135    /// you need it for reconnection.
136    #[must_use]
137    pub fn last_id(&self) -> Option<&str> {
138        self.last_id.as_deref()
139    }
140
141    /// The current reconnection time in milliseconds, from the most recent valid
142    /// `retry` field.
143    ///
144    /// Unlike the per-event [`SseEvent::retry`], this is connection-level state
145    /// that persists across dispatched events (and across events that carried no
146    /// data and were therefore never surfaced), matching the WHATWG semantics.
147    #[must_use]
148    pub fn reconnection_time(&self) -> Option<u64> {
149        self.reconnection_time
150    }
151
152    /// Feed a chunk of bytes. Completed events are pushed onto `out`.
153    ///
154    /// Bytes that do not yet form a complete line are buffered internally until
155    /// the next call.
156    pub fn feed(&mut self, chunk: &[u8], out: &mut Vec<SseEvent>) {
157        let mut bytes = chunk;
158
159        // Swallow a `\n` that belongs to a CRLF split across the chunk boundary.
160        if self.pending_cr {
161            self.pending_cr = false;
162            if let [b'\n', rest @ ..] = bytes {
163                bytes = rest;
164            }
165        }
166
167        // Strip a single optional leading UTF-8 BOM. Because the BOM may be
168        // split across `feed` calls, this is a small state machine: while still
169        // matching, leading bytes are held back; once the BOM is either fully
170        // matched (and dropped) or disproven, any held-back content bytes are
171        // re-fed through the scanner ahead of the rest of the chunk.
172        if !matches!(self.bom, BomState::Done) {
173            // `recovered` holds content bytes that turned out *not* to be a BOM
174            // and must still be parsed; it lives on the stack (max 2 bytes).
175            let mut recovered: [u8; 3] = [0; 3];
176            let recovered = self.strip_bom(&mut bytes, &mut recovered);
177            self.scan(recovered, out);
178        }
179
180        self.scan(bytes, out);
181    }
182
183    /// Advance the BOM state machine over the front of `bytes`.
184    ///
185    /// On return `*bytes` has had any matched BOM prefix consumed. The returned
186    /// slice (borrowed from `scratch`) is any previously-buffered content bytes
187    /// that have now been proven not to be a BOM and must be parsed first.
188    fn strip_bom<'a>(&mut self, bytes: &mut &[u8], scratch: &'a mut [u8; 3]) -> &'a [u8] {
189        // Reconstruct the bytes matched so far.
190        let (mut matched, mut matched_len) = match self.bom {
191            BomState::Partial { buf, len } => (buf, len),
192            _ => ([0u8; 3], 0),
193        };
194
195        // Consume from the incoming chunk while it keeps matching the BOM.
196        while matched_len < 3 {
197            let Some((&next, rest)) = bytes.split_first() else {
198                break;
199            };
200            if next != BOM[matched_len] {
201                break;
202            }
203            matched[matched_len] = next;
204            matched_len += 1;
205            *bytes = rest;
206        }
207
208        if matched_len == 3 {
209            // Full BOM matched: drop it.
210            self.bom = BomState::Done;
211            return &scratch[..0];
212        }
213        if !bytes.is_empty() {
214            // Mismatch after `matched_len` BOM bytes (or empty input would have
215            // broken the loop). The matched bytes were real content, not a BOM.
216            self.bom = BomState::Done;
217            scratch[..matched_len].copy_from_slice(&matched[..matched_len]);
218            return &scratch[..matched_len];
219        }
220        // Ran out of input still matching a BOM prefix: remember and wait.
221        if matched_len == 0 {
222            self.bom = BomState::Start;
223        } else {
224            self.bom = BomState::Partial {
225                buf: matched,
226                len: matched_len,
227            };
228        }
229        &scratch[..0]
230    }
231
232    /// Scan content bytes, dispatching completed events, handling all line
233    /// terminators (`\n`, `\r`, `\r\n`) including a CRLF split across chunks.
234    fn scan(&mut self, bytes: &[u8], out: &mut Vec<SseEvent>) {
235        let mut i = 0;
236        while i < bytes.len() {
237            match bytes[i] {
238                b'\n' => {
239                    self.end_line(out);
240                    i += 1;
241                }
242                b'\r' => {
243                    self.end_line(out);
244                    // Look ahead for `\n`; if it is the last byte, defer to next chunk.
245                    if i + 1 < bytes.len() {
246                        if bytes[i + 1] == b'\n' {
247                            i += 2;
248                        } else {
249                            i += 1;
250                        }
251                    } else {
252                        self.pending_cr = true;
253                        i += 1;
254                    }
255                }
256                b => {
257                    self.line_buf.push(b);
258                    i += 1;
259                }
260            }
261        }
262    }
263
264    /// Signal end of stream.
265    ///
266    /// Per the WHATWG algorithm, once the stream ends the final *line* (bytes
267    /// received after the last terminator) is processed as a field, but a
268    /// pending event is **only** dispatched by a blank line — so an event that
269    /// was never terminated by a blank line is discarded. This call therefore
270    /// applies any trailing unterminated field, then drops the in-progress
271    /// event buffers without emitting them.
272    ///
273    /// `out` exists for API symmetry with [`feed`](Self::feed); by spec nothing
274    /// is ever pushed onto it here, but the signature lets callers funnel both
275    /// calls through the same sink.
276    ///
277    /// # Reset semantics
278    /// `finish` fully resets the parser so it can be reused for a fresh stream.
279    /// This **clears the persistent state too** — the last event id
280    /// ([`last_id`](Self::last_id)) and the reconnection time
281    /// ([`reconnection_time`](Self::reconnection_time)). If you need either for a
282    /// reconnection, read it before calling `finish`.
283    pub fn finish(&mut self, out: &mut Vec<SseEvent>) {
284        // Process a trailing line that arrived without a terminator. This can
285        // set a field (e.g. `data: x` with no final newline) but never reaches
286        // a blank line, so `process_field` is correct and `dispatch` is not.
287        if !self.line_buf.is_empty() {
288            let line = take_line(&mut self.line_buf);
289            // A trailing comment line is still ignored.
290            if line[0] != b':' {
291                let (name, value) = split_field(&line);
292                self.process_field(name, value);
293            }
294        }
295
296        // The spec dispatches a final event only on a blank line; an
297        // unterminated event is dropped. Reset everything so the parser can be
298        // reused for a fresh stream. `out` is intentionally left untouched.
299        let _ = out;
300        self.line_buf.clear();
301        self.pending_cr = false;
302        self.data.clear();
303        self.event_type = None;
304        self.retry = None;
305        self.last_id = None;
306        self.reconnection_time = None;
307        // Reset BOM handling so a reused parser strips a BOM on the next stream.
308        self.bom = BomState::Start;
309    }
310
311    /// Process one complete line (the newline has been consumed).
312    fn end_line(&mut self, out: &mut Vec<SseEvent>) {
313        let line = take_line(&mut self.line_buf);
314
315        if line.is_empty() {
316            self.dispatch(out);
317            return;
318        }
319        if line[0] == b':' {
320            // Comment line: ignore.
321            return;
322        }
323
324        let (name, value) = split_field(&line);
325        self.process_field(name, value);
326    }
327
328    /// Apply a parsed field to the in-progress event buffers.
329    fn process_field(&mut self, name: &[u8], value: &[u8]) {
330        match name {
331            b"event" => {
332                self.event_type = Some(decode_utf8_lossy(value));
333            }
334            b"data" => {
335                // Per spec: append the value then a newline; the trailing
336                // newline is stripped at dispatch time.
337                self.data.push_str(&decode_utf8_lossy(value));
338                self.data.push('\n');
339            }
340            b"id" => {
341                // Per spec, ignore the field if the value contains a NUL.
342                if !value.contains(&0) {
343                    self.last_id = Some(decode_utf8_lossy(value));
344                }
345            }
346            b"retry" => {
347                if !value.is_empty() && value.iter().all(u8::is_ascii_digit) {
348                    // Safe: all ASCII digits.
349                    if let Ok(s) = core::str::from_utf8(value) {
350                        if let Ok(ms) = s.parse::<u64>() {
351                            self.retry = Some(ms);
352                            // The reconnection time persists across dispatches.
353                            self.reconnection_time = Some(ms);
354                        }
355                    }
356                }
357            }
358            _ => { /* Unknown field: ignored per spec. */ }
359        }
360    }
361
362    /// Dispatch the buffered event on a blank line, then reset buffers.
363    fn dispatch(&mut self, out: &mut Vec<SseEvent>) {
364        // Per spec: if no `data` field was seen, the data buffer is empty;
365        // reset the per-event buffers and dispatch nothing. The last event id
366        // is *persistent*, so it is deliberately left untouched here.
367        if self.data.is_empty() {
368            self.event_type = None;
369            self.retry = None;
370            return;
371        }
372
373        // Strip the single trailing newline added by the last `data` field.
374        let mut data = take_string(&mut self.data);
375        if data.ends_with('\n') {
376            data.pop();
377        }
378
379        let event = SseEvent {
380            event: self.event_type.take(),
381            data,
382            // Clone, not take: the last event id persists across dispatches.
383            id: self.last_id.clone(),
384            retry: self.retry.take(),
385        };
386        out.push(event);
387    }
388}
389
390/// Take the current line bytes, leaving the buffer empty for reuse.
391#[inline]
392fn take_line(buf: &mut Vec<u8>) -> Vec<u8> {
393    core::mem::take(buf)
394}
395
396/// Split a (non-empty, non-comment) line into a field name and value at the
397/// first colon, removing a single leading space from the value. A line with no
398/// colon is a field name with an empty value.
399#[inline]
400fn split_field(line: &[u8]) -> (&[u8], &[u8]) {
401    match line.iter().position(|&b| b == b':') {
402        Some(colon) => {
403            let name = &line[..colon];
404            let mut value = &line[colon + 1..];
405            // Remove a single leading space from the value.
406            if let [b' ', rest @ ..] = value {
407                value = rest;
408            }
409            (name, value)
410        }
411        None => (line, &[][..]),
412    }
413}
414
415/// Decode bytes as UTF-8, replacing invalid sequences (matches browser leniency).
416#[inline]
417fn decode_utf8_lossy(bytes: &[u8]) -> String {
418    String::from_utf8_lossy(bytes).into_owned()
419}