Skip to main content

sse_frame/
lib.rs

1//! # sse-frame
2//!
3//! Streaming parser for Server-Sent Events as emitted by LLM APIs.
4//!
5//! Push bytes; pull complete [`Event`] records. The parser holds a small
6//! line buffer and a per-event field buffer; it never copies the
7//! transport payload more than once.
8//!
9//! Supports the fields specified in the SSE standard: `event:`,
10//! `data:`, `id:`, and `retry:`. Multiple `data:` lines in a single
11//! event are joined with `\n` per the spec. Lines starting with `:` are
12//! comments.
13//!
14//! ## Example
15//!
16//! ```
17//! use sse_frame::Parser;
18//! let mut p = Parser::new();
19//! let frames = p.push(
20//!     b"event: message\ndata: {\"x\":1}\n\ndata: line1\ndata: line2\n\n",
21//! );
22//! assert_eq!(frames.len(), 2);
23//! assert_eq!(frames[0].event.as_deref(), Some("message"));
24//! assert_eq!(frames[0].data, "{\"x\":1}");
25//! assert_eq!(frames[1].data, "line1\nline2");
26//! ```
27
28#![deny(missing_docs)]
29
30/// One SSE event.
31#[derive(Debug, Clone, PartialEq, Eq, Default)]
32pub struct Event {
33    /// `event:` field (None when absent).
34    pub event: Option<String>,
35    /// Concatenated `data:` lines, joined by `\n`.
36    pub data: String,
37    /// `id:` field (None when absent).
38    pub id: Option<String>,
39    /// `retry:` field in millis.
40    pub retry_ms: Option<u64>,
41}
42
43/// Incremental SSE parser.
44#[derive(Debug, Default)]
45pub struct Parser {
46    /// Bytes accumulated since the last `\n`.
47    line_buf: Vec<u8>,
48    /// Event under construction.
49    pending: Event,
50    /// True once we've seen any field for the pending event.
51    has_pending: bool,
52}
53
54impl Parser {
55    /// Build an empty parser.
56    pub fn new() -> Self {
57        Self::default()
58    }
59
60    /// Push the next bytes. Returns any complete events parsed by this
61    /// call.
62    pub fn push(&mut self, bytes: &[u8]) -> Vec<Event> {
63        let mut out = Vec::new();
64        for &b in bytes {
65            if b == b'\n' {
66                self.consume_line(&mut out);
67            } else if b != b'\r' {
68                self.line_buf.push(b);
69            }
70        }
71        out
72    }
73
74    /// Flush a final event if any data was buffered without trailing
75    /// blank-line terminator.
76    pub fn flush(&mut self) -> Option<Event> {
77        // Standard says terminate on blank line; some servers don't.
78        // Flush gracefully.
79        if self.has_pending {
80            let ev = std::mem::take(&mut self.pending);
81            self.has_pending = false;
82            Some(ev)
83        } else {
84            None
85        }
86    }
87
88    fn consume_line(&mut self, out: &mut Vec<Event>) {
89        if self.line_buf.is_empty() {
90            // Dispatch the pending event.
91            if self.has_pending {
92                out.push(std::mem::take(&mut self.pending));
93                self.has_pending = false;
94            }
95            return;
96        }
97        if self.line_buf[0] == b':' {
98            // Comment; ignore.
99            self.line_buf.clear();
100            return;
101        }
102        // Split on first ':'; everything before is the field name, after
103        // is the value (optionally preceded by a single space).
104        let line = std::mem::take(&mut self.line_buf);
105        let split = line.iter().position(|&c| c == b':');
106        let (field, value) = match split {
107            Some(i) => {
108                let f = String::from_utf8_lossy(&line[..i]).to_string();
109                let mut v = &line[i + 1..];
110                if v.first() == Some(&b' ') {
111                    v = &v[1..];
112                }
113                (f, String::from_utf8_lossy(v).to_string())
114            }
115            None => (String::from_utf8_lossy(&line).to_string(), String::new()),
116        };
117
118        self.has_pending = true;
119        match field.as_str() {
120            "event" => self.pending.event = Some(value),
121            "data" => {
122                if !self.pending.data.is_empty() {
123                    self.pending.data.push('\n');
124                }
125                self.pending.data.push_str(&value);
126            }
127            "id" => self.pending.id = Some(value),
128            "retry" => self.pending.retry_ms = value.parse().ok(),
129            _ => {}
130        }
131    }
132}