sse_frame/lib.rs
1//! # sse-frame
2//!
3//! Streaming parser for Server-Sent Events as emitted by LLM APIs.
4//!
5//! Push bytes; pull complete [`Event`] records. The parser holds a small
6//! line buffer and a per-event field buffer; it never copies the
7//! transport payload more than once.
8//!
9//! Supports the fields specified in the SSE standard: `event:`,
10//! `data:`, `id:`, and `retry:`. Multiple `data:` lines in a single
11//! event are joined with `\n` per the spec. Lines starting with `:` are
12//! comments.
13//!
14//! ## Example
15//!
16//! ```
17//! use sse_frame::Parser;
18//! let mut p = Parser::new();
19//! let frames = p.push(
20//! b"event: message\ndata: {\"x\":1}\n\ndata: line1\ndata: line2\n\n",
21//! );
22//! assert_eq!(frames.len(), 2);
23//! assert_eq!(frames[0].event.as_deref(), Some("message"));
24//! assert_eq!(frames[0].data, "{\"x\":1}");
25//! assert_eq!(frames[1].data, "line1\nline2");
26//! ```
27
28#![deny(missing_docs)]
29
30/// One SSE event.
31#[derive(Debug, Clone, PartialEq, Eq, Default)]
32pub struct Event {
33 /// `event:` field (None when absent).
34 pub event: Option<String>,
35 /// Concatenated `data:` lines, joined by `\n`.
36 pub data: String,
37 /// `id:` field (None when absent).
38 pub id: Option<String>,
39 /// `retry:` field in millis.
40 pub retry_ms: Option<u64>,
41}
42
43/// Incremental SSE parser.
44#[derive(Debug, Default)]
45pub struct Parser {
46 /// Bytes accumulated since the last `\n`.
47 line_buf: Vec<u8>,
48 /// Event under construction.
49 pending: Event,
50 /// True once we've seen any field for the pending event.
51 has_pending: bool,
52}
53
54impl Parser {
55 /// Build an empty parser.
56 pub fn new() -> Self {
57 Self::default()
58 }
59
60 /// Push the next bytes. Returns any complete events parsed by this
61 /// call.
62 pub fn push(&mut self, bytes: &[u8]) -> Vec<Event> {
63 let mut out = Vec::new();
64 for &b in bytes {
65 if b == b'\n' {
66 self.consume_line(&mut out);
67 } else if b != b'\r' {
68 self.line_buf.push(b);
69 }
70 }
71 out
72 }
73
74 /// Flush a final event if any data was buffered without trailing
75 /// blank-line terminator.
76 pub fn flush(&mut self) -> Option<Event> {
77 // Standard says terminate on blank line; some servers don't.
78 // Flush gracefully.
79 if self.has_pending {
80 let ev = std::mem::take(&mut self.pending);
81 self.has_pending = false;
82 Some(ev)
83 } else {
84 None
85 }
86 }
87
88 fn consume_line(&mut self, out: &mut Vec<Event>) {
89 if self.line_buf.is_empty() {
90 // Dispatch the pending event.
91 if self.has_pending {
92 out.push(std::mem::take(&mut self.pending));
93 self.has_pending = false;
94 }
95 return;
96 }
97 if self.line_buf[0] == b':' {
98 // Comment; ignore.
99 self.line_buf.clear();
100 return;
101 }
102 // Split on first ':'; everything before is the field name, after
103 // is the value (optionally preceded by a single space).
104 let line = std::mem::take(&mut self.line_buf);
105 let split = line.iter().position(|&c| c == b':');
106 let (field, value) = match split {
107 Some(i) => {
108 let f = String::from_utf8_lossy(&line[..i]).to_string();
109 let mut v = &line[i + 1..];
110 if v.first() == Some(&b' ') {
111 v = &v[1..];
112 }
113 (f, String::from_utf8_lossy(v).to_string())
114 }
115 None => (String::from_utf8_lossy(&line).to_string(), String::new()),
116 };
117
118 self.has_pending = true;
119 match field.as_str() {
120 "event" => self.pending.event = Some(value),
121 "data" => {
122 if !self.pending.data.is_empty() {
123 self.pending.data.push('\n');
124 }
125 self.pending.data.push_str(&value);
126 }
127 "id" => self.pending.id = Some(value),
128 "retry" => self.pending.retry_ms = value.parse().ok(),
129 _ => {}
130 }
131 }
132}