claude_stream/parser.rs
1use crate::event::{Event, ParseError};
2
3/// Incremental SSE parser for Anthropic's `messages` stream.
4///
5/// Stateful: feed bytes as they arrive, drain ready events with
6/// [`next_event`](Self::next_event). Safe to call `feed` after every
7/// network read of any size, including mid-event.
8pub struct EventParser {
9 /// Bytes received but not yet split into complete events.
10 buf: String,
11 /// Events parsed but not yet handed to the caller.
12 pending: std::collections::VecDeque<Event>,
13}
14
15impl EventParser {
16 /// Construct an empty parser.
17 pub fn new() -> Self {
18 Self {
19 buf: String::new(),
20 pending: std::collections::VecDeque::new(),
21 }
22 }
23
24 /// Append `bytes` to the parser's buffer and process any complete events.
25 ///
26 /// Non-UTF-8 bytes are dropped (Anthropic's SSE is always UTF-8 in
27 /// practice). Returns the number of new events parsed and queued.
28 pub fn feed(&mut self, bytes: &[u8]) -> usize {
29 self.buf.push_str(&String::from_utf8_lossy(bytes));
30 let mut new_events = 0;
31 // Each event is terminated by a blank line ("\n\n").
32 while let Some(idx) = self.buf.find("\n\n") {
33 let raw = self.buf[..idx].to_string();
34 self.buf.drain(..idx + 2);
35 if let Some(event) = parse_event_block(&raw) {
36 self.pending.push_back(event);
37 new_events += 1;
38 }
39 }
40 new_events
41 }
42
43 /// Pull the next ready event, or `Ok(None)` if the buffer is empty.
44 pub fn next_event(&mut self) -> Result<Option<Event>, ParseError> {
45 Ok(self.pending.pop_front())
46 }
47
48 /// Drain all currently ready events.
49 pub fn drain(&mut self) -> Vec<Event> {
50 std::mem::take(&mut self.pending).into_iter().collect()
51 }
52
53 /// Convenience: parse a complete SSE response body all at once.
54 pub fn parse_all(text: &str) -> Vec<Event> {
55 let mut p = Self::new();
56 p.feed(text.as_bytes());
57 p.drain()
58 }
59}
60
61impl Default for EventParser {
62 fn default() -> Self {
63 Self::new()
64 }
65}
66
67/// Parse one event block: a sequence of `event:` / `data:` lines, terminated
68/// by a blank line. We only care about the `data:` payload (it carries the
69/// JSON whose `type` field tells us the event).
70fn parse_event_block(raw: &str) -> Option<Event> {
71 let mut data = String::new();
72 for line in raw.split('\n') {
73 let line = line.trim_end_matches('\r');
74 if let Some(rest) = line.strip_prefix("data:") {
75 // SSE allows multiple `data:` lines for one event; concatenate with newline.
76 if !data.is_empty() {
77 data.push('\n');
78 }
79 data.push_str(rest.strip_prefix(' ').unwrap_or(rest));
80 }
81 // Ignore "event:", "id:", "retry:" lines — the JSON `type` field is
82 // the source of truth for our discriminant.
83 }
84 if data.is_empty() {
85 return None;
86 }
87 serde_json::from_str(&data).ok()
88}