Skip to main content

a2a_protocol_client/streaming/
sse_parser.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Tom F.
3
4//! Server-Sent Events (SSE) frame parser.
5//!
6//! Implements a state machine that processes raw bytes and emits [`SseFrame`]s.
7//! The parser handles:
8//!
9//! - Fragmented TCP delivery (bytes arrive in arbitrary-sized chunks).
10//! - `data:` lines concatenated with `\n` when a single event spans multiple
11//!   `data:` lines.
12//! - Keep-alive comment lines (`: keep-alive`), silently ignored.
13//! - `event:`, `id:`, and `retry:` fields.
14//! - Double-newline event dispatch (`\n\n` terminates each frame).
15//! - Configurable maximum event size to prevent unbounded memory growth.
16
17// ── SseFrame ──────────────────────────────────────────────────────────────────
18
19/// A fully-accumulated SSE event frame.
20///
21/// A frame is emitted each time the parser sees a blank line (`\n\n`)
22/// following at least one `data:` line.
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct SseFrame {
25    /// The event data string (multi-line `data:` values joined by `\n`).
26    pub data: String,
27
28    /// Optional event type from `event:` field.
29    pub event_type: Option<String>,
30
31    /// Optional event ID from `id:` field.
32    pub id: Option<String>,
33
34    /// Optional reconnection timeout hint (milliseconds) from `retry:` field.
35    pub retry: Option<u64>,
36}
37
38// ── SseParseError ──────────────────────────────────────────────────────────────
39
40/// Errors that can occur during SSE parsing.
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum SseParseError {
43    /// A single event exceeded the configured maximum size.
44    EventTooLarge {
45        /// The configured limit in bytes.
46        limit: usize,
47        /// The approximate size that was reached.
48        actual: usize,
49    },
50}
51
52impl std::fmt::Display for SseParseError {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            Self::EventTooLarge { limit, actual } => {
56                write!(
57                    f,
58                    "SSE event too large: {actual} bytes exceeds {limit} byte limit"
59                )
60            }
61        }
62    }
63}
64
65impl std::error::Error for SseParseError {}
66
67/// Default maximum event size: 4 MiB.
68const DEFAULT_MAX_EVENT_SIZE: usize = 4 * 1024 * 1024;
69
70// ── SseParser ─────────────────────────────────────────────────────────────────
71
72/// Stateful SSE byte-stream parser.
73///
74/// Feed bytes with [`SseParser::feed`] and poll complete frames with
75/// [`SseParser::next_frame`].
76///
77/// The parser buffers bytes internally until a complete line is available,
78/// then processes each line according to the SSE spec.
79///
80/// # Memory limits
81///
82/// The parser enforces a configurable maximum event size (default 4 MiB) to
83/// prevent unbounded memory growth from malicious or malformed streams. When
84/// the limit is exceeded, the current event is discarded and an error is
85/// queued. Use [`SseParser::with_max_event_size`] to configure the limit.
86#[derive(Debug)]
87pub struct SseParser {
88    /// Bytes accumulated since the last newline.
89    line_buf: Vec<u8>,
90    /// Data lines accumulated since the last blank line.
91    data_lines: Vec<String>,
92    /// Approximate accumulated size of the current event in bytes.
93    current_event_size: usize,
94    /// Maximum allowed event size in bytes.
95    max_event_size: usize,
96    /// Current `event:` field value.
97    event_type: Option<String>,
98    /// Current `id:` field value.
99    id: Option<String>,
100    /// Current `retry:` field value.
101    retry: Option<u64>,
102    /// Complete frames ready for consumption.
103    ready: Vec<Result<SseFrame, SseParseError>>,
104    /// Whether the UTF-8 BOM has already been checked/stripped.
105    bom_checked: bool,
106}
107
108impl Default for SseParser {
109    fn default() -> Self {
110        Self {
111            line_buf: Vec::new(),
112            data_lines: Vec::new(),
113            current_event_size: 0,
114            max_event_size: DEFAULT_MAX_EVENT_SIZE,
115            event_type: None,
116            id: None,
117            retry: None,
118            ready: Vec::new(),
119            bom_checked: false,
120        }
121    }
122}
123
124impl SseParser {
125    /// Creates a new, empty [`SseParser`] with default limits (4 MiB max event size).
126    #[must_use]
127    pub fn new() -> Self {
128        Self::default()
129    }
130
131    /// Creates a new [`SseParser`] with a custom maximum event size.
132    ///
133    /// Events exceeding this limit will be discarded and an error queued.
134    #[must_use]
135    pub fn with_max_event_size(max_event_size: usize) -> Self {
136        Self {
137            max_event_size,
138            ..Self::default()
139        }
140    }
141
142    /// Returns the number of complete frames waiting to be consumed.
143    #[must_use]
144    pub const fn pending_count(&self) -> usize {
145        self.ready.len()
146    }
147
148    /// Feeds raw bytes from the SSE stream into the parser.
149    ///
150    /// After calling `feed`, call [`SseParser::next_frame`] repeatedly until
151    /// it returns `None` to consume all complete frames.
152    pub fn feed(&mut self, bytes: &[u8]) {
153        let mut input = bytes;
154        // Strip UTF-8 BOM (\xEF\xBB\xBF) if it appears at the very start.
155        if !self.bom_checked && self.line_buf.is_empty() {
156            if input.starts_with(b"\xEF\xBB\xBF") {
157                input = &input[3..];
158            }
159            // Only check once per stream; after the first feed, BOM position
160            // has passed regardless.
161            if !input.is_empty() || bytes.len() >= 3 {
162                self.bom_checked = true;
163            }
164        }
165        for &byte in input {
166            if byte == b'\n' {
167                self.process_line();
168                self.line_buf.clear();
169            } else if byte != b'\r' {
170                // Ignore bare \r (Windows-style \r\n handled by ignoring \r).
171                self.line_buf.push(byte);
172            }
173        }
174    }
175
176    /// Returns the next complete [`SseFrame`], or `None` if none are ready.
177    ///
178    /// Returns `Err` if an event exceeded the maximum size limit.
179    pub fn next_frame(&mut self) -> Option<Result<SseFrame, SseParseError>> {
180        if self.ready.is_empty() {
181            None
182        } else {
183            Some(self.ready.remove(0))
184        }
185    }
186
187    // ── internals ─────────────────────────────────────────────────────────────
188
189    fn process_line(&mut self) {
190        // Strip BOM if present at start of first line (handles fragmented BOM).
191        if !self.bom_checked {
192            if self.line_buf.starts_with(b"\xEF\xBB\xBF") {
193                self.line_buf.drain(..3);
194            }
195            self.bom_checked = true;
196        }
197        let line = match std::str::from_utf8(&self.line_buf) {
198            Ok(s) => s.to_owned(),
199            Err(_) => return, // skip malformed UTF-8 lines
200        };
201
202        if line.is_empty() {
203            // Blank line → dispatch frame if we have data.
204            self.dispatch_frame();
205            return;
206        }
207
208        if line.starts_with(':') {
209            // Comment line (e.g. `: keep-alive`) — silently ignore.
210            return;
211        }
212
213        // Split on the first `:` to get field name and value.
214        let (field, value) = line.find(':').map_or_else(
215            || (line.as_str(), String::new()),
216            |pos| {
217                let field = &line[..pos];
218                let value = line[pos + 1..].trim_start_matches(' ');
219                (field, value.to_owned())
220            },
221        );
222
223        // Track event size for memory protection.
224        self.current_event_size += value.len();
225        if self.current_event_size > self.max_event_size {
226            // Discard the current event and queue an error.
227            let error = SseParseError::EventTooLarge {
228                limit: self.max_event_size,
229                actual: self.current_event_size,
230            };
231            self.data_lines.clear();
232            self.event_type = None;
233            self.current_event_size = 0;
234            self.ready.push(Err(error));
235            return;
236        }
237
238        match field {
239            "data" => self.data_lines.push(value),
240            "event" => self.event_type = Some(value),
241            "id" => {
242                if value.contains('\0') {
243                    // Spec: id with null byte clears the last event ID.
244                    self.id = None;
245                } else {
246                    self.id = Some(value);
247                }
248            }
249            "retry" => {
250                if let Ok(ms) = value.parse::<u64>() {
251                    self.retry = Some(ms);
252                }
253            }
254            _ => {
255                // Unknown field — ignore per spec.
256            }
257        }
258    }
259
260    fn dispatch_frame(&mut self) {
261        if self.data_lines.is_empty() {
262            // No data lines → not a real event; reset event-type only.
263            self.event_type = None;
264            self.current_event_size = 0;
265            return;
266        }
267
268        // Join data lines with `\n`; remove trailing `\n` if present.
269        let mut data = self.data_lines.join("\n");
270        if data.ends_with('\n') {
271            data.pop();
272        }
273
274        let frame = SseFrame {
275            data,
276            event_type: self.event_type.take(),
277            id: self.id.clone(), // id persists across events per spec
278            retry: self.retry,
279        };
280
281        self.data_lines.clear();
282        self.current_event_size = 0;
283        self.ready.push(Ok(frame));
284    }
285}
286
287// ── Tests ─────────────────────────────────────────────────────────────────────
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    fn parse_all(input: &str) -> Vec<SseFrame> {
294        let mut p = SseParser::new();
295        p.feed(input.as_bytes());
296        let mut frames = Vec::new();
297        while let Some(f) = p.next_frame() {
298            frames.push(f.expect("unexpected error"));
299        }
300        frames
301    }
302
303    #[test]
304    fn parse_single_data_event() {
305        let frames = parse_all("data: hello world\n\n");
306        assert_eq!(frames.len(), 1);
307        assert_eq!(frames[0].data, "hello world");
308    }
309
310    #[test]
311    fn parse_multiline_data() {
312        let frames = parse_all("data: line1\ndata: line2\n\n");
313        assert_eq!(frames.len(), 1);
314        assert_eq!(frames[0].data, "line1\nline2");
315    }
316
317    #[test]
318    fn parse_two_events() {
319        let frames = parse_all("data: first\n\ndata: second\n\n");
320        assert_eq!(frames.len(), 2);
321        assert_eq!(frames[0].data, "first");
322        assert_eq!(frames[1].data, "second");
323    }
324
325    #[test]
326    fn ignore_keepalive_comment() {
327        let frames = parse_all(": keep-alive\n\ndata: real\n\n");
328        assert_eq!(frames.len(), 1);
329        assert_eq!(frames[0].data, "real");
330    }
331
332    #[test]
333    fn parse_event_type() {
334        let frames = parse_all("event: status-update\ndata: {}\n\n");
335        assert_eq!(frames.len(), 1);
336        assert_eq!(frames[0].event_type.as_deref(), Some("status-update"));
337    }
338
339    #[test]
340    fn parse_id_field() {
341        let frames = parse_all("id: 42\ndata: hello\n\n");
342        assert_eq!(frames.len(), 1);
343        assert_eq!(frames[0].id.as_deref(), Some("42"));
344    }
345
346    #[test]
347    fn parse_retry_field() {
348        let frames = parse_all("retry: 5000\ndata: hello\n\n");
349        assert_eq!(frames.len(), 1);
350        assert_eq!(frames[0].retry, Some(5000));
351    }
352
353    #[test]
354    fn fragmented_delivery() {
355        let mut p = SseParser::new();
356        // Feed bytes one at a time to simulate fragmented TCP.
357        for byte in b"data: fragmented\n\n" {
358            p.feed(std::slice::from_ref(byte));
359        }
360        let frame = p.next_frame().expect("expected frame").expect("no error");
361        assert_eq!(frame.data, "fragmented");
362    }
363
364    #[test]
365    fn blank_line_without_data_is_ignored() {
366        let frames = parse_all("event: ping\n\ndata: real\n\n");
367        // First blank line (no data) should produce no frame.
368        assert_eq!(frames.len(), 1);
369        assert_eq!(frames[0].data, "real");
370    }
371
372    #[test]
373    fn json_data_roundtrip() {
374        let json = r#"{"jsonrpc":"2.0","id":"1","result":{"kind":"task"}}"#;
375        let input = format!("data: {json}\n\n");
376        let frames = parse_all(&input);
377        assert_eq!(frames.len(), 1);
378        assert_eq!(frames[0].data, json);
379    }
380
381    #[test]
382    fn event_too_large_returns_error() {
383        let mut p = SseParser::with_max_event_size(32);
384        // Feed data that exceeds the 32-byte limit.
385        let big_line = format!("data: {}\n\n", "x".repeat(64));
386        p.feed(big_line.as_bytes());
387        let result = p.next_frame().expect("expected result");
388        assert!(result.is_err());
389        match result.unwrap_err() {
390            SseParseError::EventTooLarge { limit, .. } => {
391                assert_eq!(limit, 32);
392            }
393        }
394    }
395
396    #[test]
397    fn events_after_oversized_event_still_parse() {
398        let mut p = SseParser::with_max_event_size(16);
399        // First event is too large.
400        let big = format!("data: {}\n\n", "x".repeat(32));
401        // Second event is small enough.
402        let small = "data: ok\n\n";
403        p.feed(big.as_bytes());
404        p.feed(small.as_bytes());
405
406        let first = p.next_frame().expect("expected result");
407        assert!(first.is_err());
408
409        let second = p.next_frame().expect("expected result");
410        assert_eq!(second.unwrap().data, "ok");
411    }
412}