sse-frame 0.1.0

Streaming parser for Server-Sent Events frames as used by LLM APIs (OpenAI, Anthropic, Vertex). Push bytes, get back complete event records. Zero deps.
Documentation
//! # sse-frame
//!
//! Streaming parser for Server-Sent Events as emitted by LLM APIs.
//!
//! Push bytes; pull complete [`Event`] records. The parser holds a small
//! line buffer and a per-event field buffer; it never copies the
//! transport payload more than once.
//!
//! Supports the fields specified in the SSE standard: `event:`,
//! `data:`, `id:`, and `retry:`. Multiple `data:` lines in a single
//! event are joined with `\n` per the spec. Lines starting with `:` are
//! comments.
//!
//! ## Example
//!
//! ```
//! use sse_frame::Parser;
//! let mut p = Parser::new();
//! let frames = p.push(
//!     b"event: message\ndata: {\"x\":1}\n\ndata: line1\ndata: line2\n\n",
//! );
//! assert_eq!(frames.len(), 2);
//! assert_eq!(frames[0].event.as_deref(), Some("message"));
//! assert_eq!(frames[0].data, "{\"x\":1}");
//! assert_eq!(frames[1].data, "line1\nline2");
//! ```

#![deny(missing_docs)]

/// One SSE event.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct Event {
    /// `event:` field (None when absent).
    pub event: Option<String>,
    /// Concatenated `data:` lines, joined by `\n`.
    pub data: String,
    /// `id:` field (None when absent).
    pub id: Option<String>,
    /// `retry:` field in millis.
    pub retry_ms: Option<u64>,
}

/// Incremental SSE parser.
#[derive(Debug, Default)]
pub struct Parser {
    /// Bytes accumulated since the last `\n`.
    line_buf: Vec<u8>,
    /// Event under construction.
    pending: Event,
    /// True once we've seen any field for the pending event.
    has_pending: bool,
}

impl Parser {
    /// Build an empty parser.
    pub fn new() -> Self {
        Self::default()
    }

    /// Push the next bytes. Returns any complete events parsed by this
    /// call.
    pub fn push(&mut self, bytes: &[u8]) -> Vec<Event> {
        let mut out = Vec::new();
        for &b in bytes {
            if b == b'\n' {
                self.consume_line(&mut out);
            } else if b != b'\r' {
                self.line_buf.push(b);
            }
        }
        out
    }

    /// Flush a final event if any data was buffered without trailing
    /// blank-line terminator.
    pub fn flush(&mut self) -> Option<Event> {
        // Standard says terminate on blank line; some servers don't.
        // Flush gracefully.
        if self.has_pending {
            let ev = std::mem::take(&mut self.pending);
            self.has_pending = false;
            Some(ev)
        } else {
            None
        }
    }

    fn consume_line(&mut self, out: &mut Vec<Event>) {
        if self.line_buf.is_empty() {
            // Dispatch the pending event.
            if self.has_pending {
                out.push(std::mem::take(&mut self.pending));
                self.has_pending = false;
            }
            return;
        }
        if self.line_buf[0] == b':' {
            // Comment; ignore.
            self.line_buf.clear();
            return;
        }
        // Split on first ':'; everything before is the field name, after
        // is the value (optionally preceded by a single space).
        let line = std::mem::take(&mut self.line_buf);
        let split = line.iter().position(|&c| c == b':');
        let (field, value) = match split {
            Some(i) => {
                let f = String::from_utf8_lossy(&line[..i]).to_string();
                let mut v = &line[i + 1..];
                if v.first() == Some(&b' ') {
                    v = &v[1..];
                }
                (f, String::from_utf8_lossy(v).to_string())
            }
            None => (String::from_utf8_lossy(&line).to_string(), String::new()),
        };

        self.has_pending = true;
        match field.as_str() {
            "event" => self.pending.event = Some(value),
            "data" => {
                if !self.pending.data.is_empty() {
                    self.pending.data.push('\n');
                }
                self.pending.data.push_str(&value);
            }
            "id" => self.pending.id = Some(value),
            "retry" => self.pending.retry_ms = value.parse().ok(),
            _ => {}
        }
    }
}