Skip to main content

claude_stream/
parser.rs

1use crate::event::{Event, ParseError};
2
3/// Incremental SSE parser for Anthropic's `messages` stream.
4///
5/// Stateful: feed bytes as they arrive, drain ready events with
6/// [`next_event`](Self::next_event). Safe to call `feed` after every
7/// network read of any size, including mid-event.
8pub struct EventParser {
9    /// Bytes received but not yet split into complete events.
10    buf: String,
11    /// Events parsed but not yet handed to the caller.
12    pending: std::collections::VecDeque<Event>,
13}
14
15impl EventParser {
16    /// Construct an empty parser.
17    pub fn new() -> Self {
18        Self {
19            buf: String::new(),
20            pending: std::collections::VecDeque::new(),
21        }
22    }
23
24    /// Append `bytes` to the parser's buffer and process any complete events.
25    ///
26    /// Non-UTF-8 bytes are dropped (Anthropic's SSE is always UTF-8 in
27    /// practice). Returns the number of new events parsed and queued.
28    pub fn feed(&mut self, bytes: &[u8]) -> usize {
29        self.buf.push_str(&String::from_utf8_lossy(bytes));
30        let mut new_events = 0;
31        // Each event is terminated by a blank line ("\n\n").
32        while let Some(idx) = self.buf.find("\n\n") {
33            let raw = self.buf[..idx].to_string();
34            self.buf.drain(..idx + 2);
35            if let Some(event) = parse_event_block(&raw) {
36                self.pending.push_back(event);
37                new_events += 1;
38            }
39        }
40        new_events
41    }
42
43    /// Pull the next ready event, or `Ok(None)` if the buffer is empty.
44    pub fn next_event(&mut self) -> Result<Option<Event>, ParseError> {
45        Ok(self.pending.pop_front())
46    }
47
48    /// Drain all currently ready events.
49    pub fn drain(&mut self) -> Vec<Event> {
50        std::mem::take(&mut self.pending).into_iter().collect()
51    }
52
53    /// Convenience: parse a complete SSE response body all at once.
54    pub fn parse_all(text: &str) -> Vec<Event> {
55        let mut p = Self::new();
56        p.feed(text.as_bytes());
57        p.drain()
58    }
59}
60
61impl Default for EventParser {
62    fn default() -> Self {
63        Self::new()
64    }
65}
66
67/// Parse one event block: a sequence of `event:` / `data:` lines, terminated
68/// by a blank line. We only care about the `data:` payload (it carries the
69/// JSON whose `type` field tells us the event).
70fn parse_event_block(raw: &str) -> Option<Event> {
71    let mut data = String::new();
72    for line in raw.split('\n') {
73        let line = line.trim_end_matches('\r');
74        if let Some(rest) = line.strip_prefix("data:") {
75            // SSE allows multiple `data:` lines for one event; concatenate with newline.
76            if !data.is_empty() {
77                data.push('\n');
78            }
79            data.push_str(rest.strip_prefix(' ').unwrap_or(rest));
80        }
81        // Ignore "event:", "id:", "retry:" lines — the JSON `type` field is
82        // the source of truth for our discriminant.
83    }
84    if data.is_empty() {
85        return None;
86    }
87    serde_json::from_str(&data).ok()
88}