stream_rs/sse.rs
1//! A spec-compliant [Server-Sent Events] stream parser.
2//!
3//! This is a *push* parser: feed it arbitrary byte chunks as they arrive off
4//! the wire with [`SseParser::feed`], and it yields fully-formed [`SseEvent`]s
5//! whenever a blank line completes one. It implements the event-stream parsing
6//! algorithm from the WHATWG HTML specification:
7//!
8//! * Line terminators may be `\r\n`, `\n`, or a lone `\r`, and a terminator
9//! may be split across two `feed` calls.
10//! * A leading U+FEFF byte order mark on the very first chunk is stripped.
11//! * Lines beginning with `:` are comments and are ignored.
12//! * A field is `name`, optionally followed by `:` and a value; a single
13//! leading space after the colon is removed.
14//! * `data` fields accumulate, joined by `\n`; a trailing `\n` is *not* part of
15//! the dispatched data.
16//! * `event` sets the event type, `id` sets the last event id (unless it
17//! contains a NUL, in which case it is ignored), and `retry` sets the
18//! reconnection time when the value is all ASCII digits.
19//! * Per spec, the *last event id* is **persistent**: once set it is reported
20//! on every subsequent dispatched event until a new `id` field replaces it.
21//! The `event`, `data` and `retry` buffers, by contrast, are reset after
22//! each dispatch.
23//! * A blank line dispatches the buffered event. An event whose data buffer is
24//! empty is not dispatched (per spec).
25//!
26//! [Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html
27
28use alloc::string::String;
29use alloc::vec::Vec;
30
31use alloc_helpers::take_string;
32
33mod alloc_helpers {
34 use alloc::string::String;
35
36 /// Replace `s` with an empty `String` and return the old contents.
37 #[inline]
38 pub fn take_string(s: &mut String) -> String {
39 core::mem::take(s)
40 }
41}
42
43/// A fully parsed Server-Sent Event ready to be dispatched.
44#[derive(Debug, Clone, PartialEq, Eq, Default)]
45pub struct SseEvent {
46 /// The event type, from the most recent `event:` field. `None` means the
47 /// default type (`"message"`).
48 pub event: Option<String>,
49 /// The event payload: every `data:` field joined with `\n`.
50 pub data: String,
51 /// The most recent `id:` field, if any was seen for this event.
52 pub id: Option<String>,
53 /// The reconnection time in milliseconds from a `retry:` field, if present.
54 pub retry: Option<u64>,
55}
56
57/// Incremental Server-Sent Events parser.
58///
59/// # Example
60///
61/// ```
62/// use stream_rs::sse::SseParser;
63///
64/// let mut p = SseParser::new();
65/// let mut events = Vec::new();
66/// // Note the chunk boundary splitting the CRLF and a multi-line data field.
67/// p.feed(b"event: greeting\r\ndata: hello\r", &mut events);
68/// p.feed(b"\ndata: world\r\n\r\n", &mut events);
69///
70/// assert_eq!(events.len(), 1);
71/// assert_eq!(events[0].event.as_deref(), Some("greeting"));
72/// assert_eq!(events[0].data, "hello\nworld");
73/// ```
74#[derive(Debug, Default)]
75pub struct SseParser {
76 /// Bytes received but not yet terminated by a newline.
77 line_buf: Vec<u8>,
78 /// True when the previous chunk ended on a `\r`, so a leading `\n` in the
79 /// next chunk must be swallowed (CRLF split across chunks).
80 pending_cr: bool,
81 /// Accumulated `data` for the in-progress event.
82 data: String,
83 /// Most recent `event` type for the in-progress event.
84 event_type: Option<String>,
85 /// The persistent *last event id*. Per the WHATWG spec this survives across
86 /// dispatches and is reported on every event until a new `id` field is seen.
87 last_id: Option<String>,
88 /// Most recent `retry` for the in-progress event.
89 retry: Option<u64>,
90 /// The persistent reconnection time (ms) from the most recent valid `retry`
91 /// field seen on the stream. Per the WHATWG spec the reconnection time is a
92 /// connection-level setting, so unlike `retry` it survives dispatches.
93 reconnection_time: Option<u64>,
94 /// Tracks the optional leading byte-order-mark handling. The BOM may be
95 /// split across `feed` calls, so we buffer up to the first three bytes of
96 /// the stream here until we can tell whether they are `EF BB BF`.
97 bom: BomState,
98}
99
100/// State machine for stripping a single optional leading UTF-8 BOM, robust to
101/// the BOM being split across `feed` calls.
102#[derive(Debug, Default)]
103enum BomState {
104 /// No stream bytes seen yet; still matching the BOM prefix.
105 #[default]
106 Start,
107 /// Seen a prefix of the BOM (`buf[..len]`) that is still a valid BOM prefix.
108 Partial {
109 /// The matched-so-far BOM bytes, length `len`.
110 buf: [u8; 3],
111 /// Number of valid bytes in `buf` (1 or 2).
112 len: usize,
113 },
114 /// BOM handling finished (either stripped or proven absent); pass bytes through.
115 Done,
116}
117
118/// The three bytes of a UTF-8 byte order mark.
119const BOM: [u8; 3] = [0xEF, 0xBB, 0xBF];
120
121impl SseParser {
122 /// Create a fresh parser.
123 #[must_use]
124 pub fn new() -> Self {
125 Self::default()
126 }
127
128 /// The persistent *last event id*.
129 ///
130 /// Per the WHATWG spec this survives across dispatches: once an `id` field
131 /// is seen it is reported on every subsequent event until replaced. This
132 /// getter exposes the current value so a caller can send it as the
133 /// `Last-Event-ID` header when reconnecting. Note that [`finish`](Self::finish)
134 /// resets the parser, clearing this — read it *before* calling `finish` if
135 /// you need it for reconnection.
136 #[must_use]
137 pub fn last_id(&self) -> Option<&str> {
138 self.last_id.as_deref()
139 }
140
141 /// The current reconnection time in milliseconds, from the most recent valid
142 /// `retry` field.
143 ///
144 /// Unlike the per-event [`SseEvent::retry`], this is connection-level state
145 /// that persists across dispatched events (and across events that carried no
146 /// data and were therefore never surfaced), matching the WHATWG semantics.
147 #[must_use]
148 pub fn reconnection_time(&self) -> Option<u64> {
149 self.reconnection_time
150 }
151
152 /// Feed a chunk of bytes. Completed events are pushed onto `out`.
153 ///
154 /// Bytes that do not yet form a complete line are buffered internally until
155 /// the next call.
156 pub fn feed(&mut self, chunk: &[u8], out: &mut Vec<SseEvent>) {
157 let mut bytes = chunk;
158
159 // Swallow a `\n` that belongs to a CRLF split across the chunk boundary.
160 if self.pending_cr {
161 self.pending_cr = false;
162 if let [b'\n', rest @ ..] = bytes {
163 bytes = rest;
164 }
165 }
166
167 // Strip a single optional leading UTF-8 BOM. Because the BOM may be
168 // split across `feed` calls, this is a small state machine: while still
169 // matching, leading bytes are held back; once the BOM is either fully
170 // matched (and dropped) or disproven, any held-back content bytes are
171 // re-fed through the scanner ahead of the rest of the chunk.
172 if !matches!(self.bom, BomState::Done) {
173 // `recovered` holds content bytes that turned out *not* to be a BOM
174 // and must still be parsed; it lives on the stack (max 2 bytes).
175 let mut recovered: [u8; 3] = [0; 3];
176 let recovered = self.strip_bom(&mut bytes, &mut recovered);
177 self.scan(recovered, out);
178 }
179
180 self.scan(bytes, out);
181 }
182
183 /// Advance the BOM state machine over the front of `bytes`.
184 ///
185 /// On return `*bytes` has had any matched BOM prefix consumed. The returned
186 /// slice (borrowed from `scratch`) is any previously-buffered content bytes
187 /// that have now been proven not to be a BOM and must be parsed first.
188 fn strip_bom<'a>(&mut self, bytes: &mut &[u8], scratch: &'a mut [u8; 3]) -> &'a [u8] {
189 // Reconstruct the bytes matched so far.
190 let (mut matched, mut matched_len) = match self.bom {
191 BomState::Partial { buf, len } => (buf, len),
192 _ => ([0u8; 3], 0),
193 };
194
195 // Consume from the incoming chunk while it keeps matching the BOM.
196 while matched_len < 3 {
197 let Some((&next, rest)) = bytes.split_first() else {
198 break;
199 };
200 if next != BOM[matched_len] {
201 break;
202 }
203 matched[matched_len] = next;
204 matched_len += 1;
205 *bytes = rest;
206 }
207
208 if matched_len == 3 {
209 // Full BOM matched: drop it.
210 self.bom = BomState::Done;
211 return &scratch[..0];
212 }
213 if !bytes.is_empty() {
214 // Mismatch after `matched_len` BOM bytes (or empty input would have
215 // broken the loop). The matched bytes were real content, not a BOM.
216 self.bom = BomState::Done;
217 scratch[..matched_len].copy_from_slice(&matched[..matched_len]);
218 return &scratch[..matched_len];
219 }
220 // Ran out of input still matching a BOM prefix: remember and wait.
221 if matched_len == 0 {
222 self.bom = BomState::Start;
223 } else {
224 self.bom = BomState::Partial {
225 buf: matched,
226 len: matched_len,
227 };
228 }
229 &scratch[..0]
230 }
231
232 /// Scan content bytes, dispatching completed events, handling all line
233 /// terminators (`\n`, `\r`, `\r\n`) including a CRLF split across chunks.
234 fn scan(&mut self, bytes: &[u8], out: &mut Vec<SseEvent>) {
235 let mut i = 0;
236 while i < bytes.len() {
237 match bytes[i] {
238 b'\n' => {
239 self.end_line(out);
240 i += 1;
241 }
242 b'\r' => {
243 self.end_line(out);
244 // Look ahead for `\n`; if it is the last byte, defer to next chunk.
245 if i + 1 < bytes.len() {
246 if bytes[i + 1] == b'\n' {
247 i += 2;
248 } else {
249 i += 1;
250 }
251 } else {
252 self.pending_cr = true;
253 i += 1;
254 }
255 }
256 b => {
257 self.line_buf.push(b);
258 i += 1;
259 }
260 }
261 }
262 }
263
264 /// Signal end of stream.
265 ///
266 /// Per the WHATWG algorithm, once the stream ends the final *line* (bytes
267 /// received after the last terminator) is processed as a field, but a
268 /// pending event is **only** dispatched by a blank line — so an event that
269 /// was never terminated by a blank line is discarded. This call therefore
270 /// applies any trailing unterminated field, then drops the in-progress
271 /// event buffers without emitting them.
272 ///
273 /// `out` exists for API symmetry with [`feed`](Self::feed); by spec nothing
274 /// is ever pushed onto it here, but the signature lets callers funnel both
275 /// calls through the same sink.
276 ///
277 /// # Reset semantics
278 /// `finish` fully resets the parser so it can be reused for a fresh stream.
279 /// This **clears the persistent state too** — the last event id
280 /// ([`last_id`](Self::last_id)) and the reconnection time
281 /// ([`reconnection_time`](Self::reconnection_time)). If you need either for a
282 /// reconnection, read it before calling `finish`.
283 pub fn finish(&mut self, out: &mut Vec<SseEvent>) {
284 // Process a trailing line that arrived without a terminator. This can
285 // set a field (e.g. `data: x` with no final newline) but never reaches
286 // a blank line, so `process_field` is correct and `dispatch` is not.
287 if !self.line_buf.is_empty() {
288 let line = take_line(&mut self.line_buf);
289 // A trailing comment line is still ignored.
290 if line[0] != b':' {
291 let (name, value) = split_field(&line);
292 self.process_field(name, value);
293 }
294 }
295
296 // The spec dispatches a final event only on a blank line; an
297 // unterminated event is dropped. Reset everything so the parser can be
298 // reused for a fresh stream. `out` is intentionally left untouched.
299 let _ = out;
300 self.line_buf.clear();
301 self.pending_cr = false;
302 self.data.clear();
303 self.event_type = None;
304 self.retry = None;
305 self.last_id = None;
306 self.reconnection_time = None;
307 // Reset BOM handling so a reused parser strips a BOM on the next stream.
308 self.bom = BomState::Start;
309 }
310
311 /// Process one complete line (the newline has been consumed).
312 fn end_line(&mut self, out: &mut Vec<SseEvent>) {
313 let line = take_line(&mut self.line_buf);
314
315 if line.is_empty() {
316 self.dispatch(out);
317 return;
318 }
319 if line[0] == b':' {
320 // Comment line: ignore.
321 return;
322 }
323
324 let (name, value) = split_field(&line);
325 self.process_field(name, value);
326 }
327
328 /// Apply a parsed field to the in-progress event buffers.
329 fn process_field(&mut self, name: &[u8], value: &[u8]) {
330 match name {
331 b"event" => {
332 self.event_type = Some(decode_utf8_lossy(value));
333 }
334 b"data" => {
335 // Per spec: append the value then a newline; the trailing
336 // newline is stripped at dispatch time.
337 self.data.push_str(&decode_utf8_lossy(value));
338 self.data.push('\n');
339 }
340 b"id" => {
341 // Per spec, ignore the field if the value contains a NUL.
342 if !value.contains(&0) {
343 self.last_id = Some(decode_utf8_lossy(value));
344 }
345 }
346 b"retry" => {
347 if !value.is_empty() && value.iter().all(u8::is_ascii_digit) {
348 // Safe: all ASCII digits.
349 if let Ok(s) = core::str::from_utf8(value) {
350 if let Ok(ms) = s.parse::<u64>() {
351 self.retry = Some(ms);
352 // The reconnection time persists across dispatches.
353 self.reconnection_time = Some(ms);
354 }
355 }
356 }
357 }
358 _ => { /* Unknown field: ignored per spec. */ }
359 }
360 }
361
362 /// Dispatch the buffered event on a blank line, then reset buffers.
363 fn dispatch(&mut self, out: &mut Vec<SseEvent>) {
364 // Per spec: if no `data` field was seen, the data buffer is empty;
365 // reset the per-event buffers and dispatch nothing. The last event id
366 // is *persistent*, so it is deliberately left untouched here.
367 if self.data.is_empty() {
368 self.event_type = None;
369 self.retry = None;
370 return;
371 }
372
373 // Strip the single trailing newline added by the last `data` field.
374 let mut data = take_string(&mut self.data);
375 if data.ends_with('\n') {
376 data.pop();
377 }
378
379 let event = SseEvent {
380 event: self.event_type.take(),
381 data,
382 // Clone, not take: the last event id persists across dispatches.
383 id: self.last_id.clone(),
384 retry: self.retry.take(),
385 };
386 out.push(event);
387 }
388}
389
390/// Take the current line bytes, leaving the buffer empty for reuse.
391#[inline]
392fn take_line(buf: &mut Vec<u8>) -> Vec<u8> {
393 core::mem::take(buf)
394}
395
396/// Split a (non-empty, non-comment) line into a field name and value at the
397/// first colon, removing a single leading space from the value. A line with no
398/// colon is a field name with an empty value.
399#[inline]
400fn split_field(line: &[u8]) -> (&[u8], &[u8]) {
401 match line.iter().position(|&b| b == b':') {
402 Some(colon) => {
403 let name = &line[..colon];
404 let mut value = &line[colon + 1..];
405 // Remove a single leading space from the value.
406 if let [b' ', rest @ ..] = value {
407 value = rest;
408 }
409 (name, value)
410 }
411 None => (line, &[][..]),
412 }
413}
414
415/// Decode bytes as UTF-8, replacing invalid sequences (matches browser leniency).
416#[inline]
417fn decode_utf8_lossy(bytes: &[u8]) -> String {
418 String::from_utf8_lossy(bytes).into_owned()
419}