stream-rs 0.1.0

Zero-dependency, spec-compliant streaming toolkit for LLM responses (SSE, incremental JSON, OpenAI/Anthropic delta accumulators).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
//! A spec-compliant [Server-Sent Events] stream parser.
//!
//! This is a *push* parser: feed it arbitrary byte chunks as they arrive off
//! the wire with [`SseParser::feed`], and it yields fully-formed [`SseEvent`]s
//! whenever a blank line completes one. It implements the event-stream parsing
//! algorithm from the WHATWG HTML specification:
//!
//! * Line terminators may be `\r\n`, `\n`, or a lone `\r`, and a terminator
//!   may be split across two `feed` calls.
//! * A leading U+FEFF byte order mark on the very first chunk is stripped.
//! * Lines beginning with `:` are comments and are ignored.
//! * A field is `name`, optionally followed by `:` and a value; a single
//!   leading space after the colon is removed.
//! * `data` fields accumulate, joined by `\n`; a trailing `\n` is *not* part of
//!   the dispatched data.
//! * `event` sets the event type, `id` sets the last event id (unless it
//!   contains a NUL, in which case it is ignored), and `retry` sets the
//!   reconnection time when the value is all ASCII digits.
//! * Per spec, the *last event id* is **persistent**: once set it is reported
//!   on every subsequent dispatched event until a new `id` field replaces it.
//!   The `event`, `data` and `retry` buffers, by contrast, are reset after
//!   each dispatch.
//! * A blank line dispatches the buffered event. An event whose data buffer is
//!   empty is not dispatched (per spec).
//!
//! [Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html

use alloc::string::String;
use alloc::vec::Vec;

use alloc_helpers::take_string;

mod alloc_helpers {
    use alloc::string::String;

    /// Replace `s` with an empty `String` and return the old contents.
    #[inline]
    pub fn take_string(s: &mut String) -> String {
        core::mem::take(s)
    }
}

/// A fully parsed Server-Sent Event ready to be dispatched.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct SseEvent {
    /// The event type, from the most recent `event:` field. `None` means the
    /// default type (`"message"`).
    pub event: Option<String>,
    /// The event payload: every `data:` field joined with `\n`.
    pub data: String,
    /// The most recent `id:` field, if any was seen for this event.
    pub id: Option<String>,
    /// The reconnection time in milliseconds from a `retry:` field, if present.
    pub retry: Option<u64>,
}

/// Incremental Server-Sent Events parser.
///
/// # Example
///
/// ```
/// use stream_rs::sse::SseParser;
///
/// let mut p = SseParser::new();
/// let mut events = Vec::new();
/// // Note the chunk boundary splitting the CRLF and a multi-line data field.
/// p.feed(b"event: greeting\r\ndata: hello\r", &mut events);
/// p.feed(b"\ndata: world\r\n\r\n", &mut events);
///
/// assert_eq!(events.len(), 1);
/// assert_eq!(events[0].event.as_deref(), Some("greeting"));
/// assert_eq!(events[0].data, "hello\nworld");
/// ```
#[derive(Debug, Default)]
pub struct SseParser {
    /// Bytes received but not yet terminated by a newline.
    line_buf: Vec<u8>,
    /// True when the previous chunk ended on a `\r`, so a leading `\n` in the
    /// next chunk must be swallowed (CRLF split across chunks).
    pending_cr: bool,
    /// Accumulated `data` for the in-progress event.
    data: String,
    /// Most recent `event` type for the in-progress event.
    event_type: Option<String>,
    /// The persistent *last event id*. Per the WHATWG spec this survives across
    /// dispatches and is reported on every event until a new `id` field is seen.
    last_id: Option<String>,
    /// Most recent `retry` for the in-progress event.
    retry: Option<u64>,
    /// The persistent reconnection time (ms) from the most recent valid `retry`
    /// field seen on the stream. Per the WHATWG spec the reconnection time is a
    /// connection-level setting, so unlike `retry` it survives dispatches.
    reconnection_time: Option<u64>,
    /// Tracks the optional leading byte-order-mark handling. The BOM may be
    /// split across `feed` calls, so we buffer up to the first three bytes of
    /// the stream here until we can tell whether they are `EF BB BF`.
    bom: BomState,
}

/// State machine for stripping a single optional leading UTF-8 BOM, robust to
/// the BOM being split across `feed` calls.
#[derive(Debug, Default)]
enum BomState {
    /// No stream bytes seen yet; still matching the BOM prefix.
    #[default]
    Start,
    /// Seen a prefix of the BOM (`buf[..len]`) that is still a valid BOM prefix.
    Partial {
        /// The matched-so-far BOM bytes, length `len`.
        buf: [u8; 3],
        /// Number of valid bytes in `buf` (1 or 2).
        len: usize,
    },
    /// BOM handling finished (either stripped or proven absent); pass bytes through.
    Done,
}

/// The three bytes of a UTF-8 byte order mark.
const BOM: [u8; 3] = [0xEF, 0xBB, 0xBF];

impl SseParser {
    /// Create a fresh parser.
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    /// The persistent *last event id*.
    ///
    /// Per the WHATWG spec this survives across dispatches: once an `id` field
    /// is seen it is reported on every subsequent event until replaced. This
    /// getter exposes the current value so a caller can send it as the
    /// `Last-Event-ID` header when reconnecting. Note that [`finish`](Self::finish)
    /// resets the parser, clearing this — read it *before* calling `finish` if
    /// you need it for reconnection.
    #[must_use]
    pub fn last_id(&self) -> Option<&str> {
        self.last_id.as_deref()
    }

    /// The current reconnection time in milliseconds, from the most recent valid
    /// `retry` field.
    ///
    /// Unlike the per-event [`SseEvent::retry`], this is connection-level state
    /// that persists across dispatched events (and across events that carried no
    /// data and were therefore never surfaced), matching the WHATWG semantics.
    #[must_use]
    pub fn reconnection_time(&self) -> Option<u64> {
        self.reconnection_time
    }

    /// Feed a chunk of bytes. Completed events are pushed onto `out`.
    ///
    /// Bytes that do not yet form a complete line are buffered internally until
    /// the next call.
    pub fn feed(&mut self, chunk: &[u8], out: &mut Vec<SseEvent>) {
        let mut bytes = chunk;

        // Swallow a `\n` that belongs to a CRLF split across the chunk boundary.
        if self.pending_cr {
            self.pending_cr = false;
            if let [b'\n', rest @ ..] = bytes {
                bytes = rest;
            }
        }

        // Strip a single optional leading UTF-8 BOM. Because the BOM may be
        // split across `feed` calls, this is a small state machine: while still
        // matching, leading bytes are held back; once the BOM is either fully
        // matched (and dropped) or disproven, any held-back content bytes are
        // re-fed through the scanner ahead of the rest of the chunk.
        if !matches!(self.bom, BomState::Done) {
            // `recovered` holds content bytes that turned out *not* to be a BOM
            // and must still be parsed; it lives on the stack (max 2 bytes).
            let mut recovered: [u8; 3] = [0; 3];
            let recovered = self.strip_bom(&mut bytes, &mut recovered);
            self.scan(recovered, out);
        }

        self.scan(bytes, out);
    }

    /// Advance the BOM state machine over the front of `bytes`.
    ///
    /// On return `*bytes` has had any matched BOM prefix consumed. The returned
    /// slice (borrowed from `scratch`) is any previously-buffered content bytes
    /// that have now been proven not to be a BOM and must be parsed first.
    fn strip_bom<'a>(&mut self, bytes: &mut &[u8], scratch: &'a mut [u8; 3]) -> &'a [u8] {
        // Reconstruct the bytes matched so far.
        let (mut matched, mut matched_len) = match self.bom {
            BomState::Partial { buf, len } => (buf, len),
            _ => ([0u8; 3], 0),
        };

        // Consume from the incoming chunk while it keeps matching the BOM.
        while matched_len < 3 {
            let Some((&next, rest)) = bytes.split_first() else {
                break;
            };
            if next != BOM[matched_len] {
                break;
            }
            matched[matched_len] = next;
            matched_len += 1;
            *bytes = rest;
        }

        if matched_len == 3 {
            // Full BOM matched: drop it.
            self.bom = BomState::Done;
            return &scratch[..0];
        }
        if !bytes.is_empty() {
            // Mismatch after `matched_len` BOM bytes (or empty input would have
            // broken the loop). The matched bytes were real content, not a BOM.
            self.bom = BomState::Done;
            scratch[..matched_len].copy_from_slice(&matched[..matched_len]);
            return &scratch[..matched_len];
        }
        // Ran out of input still matching a BOM prefix: remember and wait.
        if matched_len == 0 {
            self.bom = BomState::Start;
        } else {
            self.bom = BomState::Partial {
                buf: matched,
                len: matched_len,
            };
        }
        &scratch[..0]
    }

    /// Scan content bytes, dispatching completed events, handling all line
    /// terminators (`\n`, `\r`, `\r\n`) including a CRLF split across chunks.
    fn scan(&mut self, bytes: &[u8], out: &mut Vec<SseEvent>) {
        let mut i = 0;
        while i < bytes.len() {
            match bytes[i] {
                b'\n' => {
                    self.end_line(out);
                    i += 1;
                }
                b'\r' => {
                    self.end_line(out);
                    // Look ahead for `\n`; if it is the last byte, defer to next chunk.
                    if i + 1 < bytes.len() {
                        if bytes[i + 1] == b'\n' {
                            i += 2;
                        } else {
                            i += 1;
                        }
                    } else {
                        self.pending_cr = true;
                        i += 1;
                    }
                }
                b => {
                    self.line_buf.push(b);
                    i += 1;
                }
            }
        }
    }

    /// Signal end of stream.
    ///
    /// Per the WHATWG algorithm, once the stream ends the final *line* (bytes
    /// received after the last terminator) is processed as a field, but a
    /// pending event is **only** dispatched by a blank line — so an event that
    /// was never terminated by a blank line is discarded. This call therefore
    /// applies any trailing unterminated field, then drops the in-progress
    /// event buffers without emitting them.
    ///
    /// `out` exists for API symmetry with [`feed`](Self::feed); by spec nothing
    /// is ever pushed onto it here, but the signature lets callers funnel both
    /// calls through the same sink.
    ///
    /// # Reset semantics
    /// `finish` fully resets the parser so it can be reused for a fresh stream.
    /// This **clears the persistent state too** — the last event id
    /// ([`last_id`](Self::last_id)) and the reconnection time
    /// ([`reconnection_time`](Self::reconnection_time)). If you need either for a
    /// reconnection, read it before calling `finish`.
    pub fn finish(&mut self, out: &mut Vec<SseEvent>) {
        // Process a trailing line that arrived without a terminator. This can
        // set a field (e.g. `data: x` with no final newline) but never reaches
        // a blank line, so `process_field` is correct and `dispatch` is not.
        if !self.line_buf.is_empty() {
            let line = take_line(&mut self.line_buf);
            // A trailing comment line is still ignored.
            if line[0] != b':' {
                let (name, value) = split_field(&line);
                self.process_field(name, value);
            }
        }

        // The spec dispatches a final event only on a blank line; an
        // unterminated event is dropped. Reset everything so the parser can be
        // reused for a fresh stream. `out` is intentionally left untouched.
        let _ = out;
        self.line_buf.clear();
        self.pending_cr = false;
        self.data.clear();
        self.event_type = None;
        self.retry = None;
        self.last_id = None;
        self.reconnection_time = None;
        // Reset BOM handling so a reused parser strips a BOM on the next stream.
        self.bom = BomState::Start;
    }

    /// Process one complete line (the newline has been consumed).
    fn end_line(&mut self, out: &mut Vec<SseEvent>) {
        let line = take_line(&mut self.line_buf);

        if line.is_empty() {
            self.dispatch(out);
            return;
        }
        if line[0] == b':' {
            // Comment line: ignore.
            return;
        }

        let (name, value) = split_field(&line);
        self.process_field(name, value);
    }

    /// Apply a parsed field to the in-progress event buffers.
    fn process_field(&mut self, name: &[u8], value: &[u8]) {
        match name {
            b"event" => {
                self.event_type = Some(decode_utf8_lossy(value));
            }
            b"data" => {
                // Per spec: append the value then a newline; the trailing
                // newline is stripped at dispatch time.
                self.data.push_str(&decode_utf8_lossy(value));
                self.data.push('\n');
            }
            b"id" => {
                // Per spec, ignore the field if the value contains a NUL.
                if !value.contains(&0) {
                    self.last_id = Some(decode_utf8_lossy(value));
                }
            }
            b"retry" => {
                if !value.is_empty() && value.iter().all(u8::is_ascii_digit) {
                    // Safe: all ASCII digits.
                    if let Ok(s) = core::str::from_utf8(value) {
                        if let Ok(ms) = s.parse::<u64>() {
                            self.retry = Some(ms);
                            // The reconnection time persists across dispatches.
                            self.reconnection_time = Some(ms);
                        }
                    }
                }
            }
            _ => { /* Unknown field: ignored per spec. */ }
        }
    }

    /// Dispatch the buffered event on a blank line, then reset buffers.
    fn dispatch(&mut self, out: &mut Vec<SseEvent>) {
        // Per spec: if no `data` field was seen, the data buffer is empty;
        // reset the per-event buffers and dispatch nothing. The last event id
        // is *persistent*, so it is deliberately left untouched here.
        if self.data.is_empty() {
            self.event_type = None;
            self.retry = None;
            return;
        }

        // Strip the single trailing newline added by the last `data` field.
        let mut data = take_string(&mut self.data);
        if data.ends_with('\n') {
            data.pop();
        }

        let event = SseEvent {
            event: self.event_type.take(),
            data,
            // Clone, not take: the last event id persists across dispatches.
            id: self.last_id.clone(),
            retry: self.retry.take(),
        };
        out.push(event);
    }
}

/// Take the current line bytes, leaving the buffer empty for reuse.
#[inline]
fn take_line(buf: &mut Vec<u8>) -> Vec<u8> {
    core::mem::take(buf)
}

/// Split a (non-empty, non-comment) line into a field name and value at the
/// first colon, removing a single leading space from the value. A line with no
/// colon is a field name with an empty value.
#[inline]
fn split_field(line: &[u8]) -> (&[u8], &[u8]) {
    match line.iter().position(|&b| b == b':') {
        Some(colon) => {
            let name = &line[..colon];
            let mut value = &line[colon + 1..];
            // Remove a single leading space from the value.
            if let [b' ', rest @ ..] = value {
                value = rest;
            }
            (name, value)
        }
        None => (line, &[][..]),
    }
}

/// Decode bytes as UTF-8, replacing invalid sequences (matches browser leniency).
#[inline]
fn decode_utf8_lossy(bytes: &[u8]) -> String {
    String::from_utf8_lossy(bytes).into_owned()
}