1use crate::error::ApiError;
2use crate::types::StreamEvent;
3
4#[derive(Debug, Default)]
5pub struct SseParser {
6 buffer: Vec<u8>,
7}
8
9impl SseParser {
10 #[must_use]
11 pub fn new() -> Self {
12 Self::default()
13 }
14
15 const MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
16
17 pub fn push(&mut self, chunk: &[u8]) -> Result<Vec<StreamEvent>, ApiError> {
18 if self.buffer.len() + chunk.len() > Self::MAX_BUFFER_SIZE {
19 self.buffer.clear();
20 return Err(ApiError::ResponsePayloadTooLarge {
21 limit: Self::MAX_BUFFER_SIZE,
22 });
23 }
24 self.buffer.extend_from_slice(chunk);
25 let mut events = Vec::new();
26
27 while let Some(frame) = self.next_frame() {
28 if let Some(event) = parse_frame(&frame)? {
29 events.push(event);
30 }
31 }
32
33 Ok(events)
34 }
35
36 pub fn finish(&mut self) -> Result<Vec<StreamEvent>, ApiError> {
37 if self.buffer.is_empty() {
38 return Ok(Vec::new());
39 }
40
41 let trailing = std::mem::take(&mut self.buffer);
42 match parse_frame(&String::from_utf8_lossy(&trailing))? {
43 Some(event) => Ok(vec![event]),
44 None => Ok(Vec::new()),
45 }
46 }
47
48 fn next_frame(&mut self) -> Option<String> {
49 let separator = self
50 .buffer
51 .windows(2)
52 .position(|window| window == b"\n\n")
53 .map(|position| (position, 2))
54 .or_else(|| {
55 self.buffer
56 .windows(4)
57 .position(|window| window == b"\r\n\r\n")
58 .map(|position| (position, 4))
59 })?;
60
61 let (position, separator_len) = separator;
62 let frame = self
63 .buffer
64 .drain(..position + separator_len)
65 .collect::<Vec<_>>();
66 let frame_len = frame.len().saturating_sub(separator_len);
67 Some(String::from_utf8_lossy(&frame[..frame_len]).into_owned())
68 }
69}
70
71pub fn parse_frame(frame: &str) -> Result<Option<StreamEvent>, ApiError> {
72 let trimmed = frame.trim();
73 if trimmed.is_empty() {
74 return Ok(None);
75 }
76
77 let mut data_lines = Vec::new();
78 let mut event_name: Option<&str> = None;
79
80 for line in trimmed.lines() {
81 if line.starts_with(':') {
82 continue;
83 }
84 if let Some(name) = line.strip_prefix("event:") {
85 event_name = Some(name.trim());
86 continue;
87 }
88 if let Some(data) = line.strip_prefix("data:") {
89 data_lines.push(data.trim_start());
90 }
91 }
92
93 if matches!(event_name, Some("ping")) {
94 return Ok(None);
95 }
96
97 if data_lines.is_empty() {
98 return Ok(None);
99 }
100
101 let payload = data_lines.join("\n");
102 if payload == "[DONE]" {
103 return Ok(None);
104 }
105
106 serde_json::from_str::<StreamEvent>(&payload)
107 .map(Some)
108 .map_err(ApiError::from)
109}
110
111#[cfg(test)]
112mod tests {
113 use super::{parse_frame, SseParser};
114 use crate::types::{ContentBlockDelta, MessageDelta, OutputContentBlock, StreamEvent, Usage};
115
116 #[test]
117 fn parses_single_frame() {
118 let frame = concat!(
119 "event: content_block_start\n",
120 "data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"Hi\"}}\n\n"
121 );
122
123 let event = parse_frame(frame).expect("frame should parse");
124 assert_eq!(
125 event,
126 Some(StreamEvent::ContentBlockStart(
127 crate::types::ContentBlockStartEvent {
128 index: 0,
129 content_block: OutputContentBlock::Text {
130 text: "Hi".to_string(),
131 },
132 },
133 ))
134 );
135 }
136
137 #[test]
138 fn parses_chunked_stream() {
139 let mut parser = SseParser::new();
140 let first = b"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Hel";
141 let second = b"lo\"}}\n\n";
142
143 assert!(parser
144 .push(first)
145 .expect("first chunk should buffer")
146 .is_empty());
147 let events = parser.push(second).expect("second chunk should parse");
148
149 assert_eq!(
150 events,
151 vec![StreamEvent::ContentBlockDelta(
152 crate::types::ContentBlockDeltaEvent {
153 index: 0,
154 delta: ContentBlockDelta::TextDelta {
155 text: "Hello".to_string(),
156 },
157 }
158 )]
159 );
160 }
161
162 #[test]
163 fn ignores_ping_and_done() {
164 let mut parser = SseParser::new();
165 let payload = concat!(
166 ": keepalive\n",
167 "event: ping\n",
168 "data: {\"type\":\"ping\"}\n\n",
169 "event: message_delta\n",
170 "data: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"tool_use\",\"stop_sequence\":null},\"usage\":{\"input_tokens\":1,\"output_tokens\":2}}\n\n",
171 "event: message_stop\n",
172 "data: {\"type\":\"message_stop\"}\n\n",
173 "data: [DONE]\n\n"
174 );
175
176 let events = parser
177 .push(payload.as_bytes())
178 .expect("parser should succeed");
179 assert_eq!(
180 events,
181 vec![
182 StreamEvent::MessageDelta(crate::types::MessageDeltaEvent {
183 delta: MessageDelta {
184 stop_reason: Some("tool_use".to_string()),
185 stop_sequence: None,
186 },
187 usage: Usage {
188 input_tokens: 1,
189 cache_creation_input_tokens: 0,
190 cache_read_input_tokens: 0,
191 output_tokens: 2,
192 },
193 }),
194 StreamEvent::MessageStop(crate::types::MessageStopEvent {}),
195 ]
196 );
197 }
198
199 #[test]
200 fn ignores_data_less_event_frames() {
201 let frame = "event: ping\n\n";
202 let event = parse_frame(frame).expect("frame without data should be ignored");
203 assert_eq!(event, None);
204 }
205
206 #[test]
207 fn parses_split_json_across_data_lines() {
208 let frame = concat!(
209 "event: content_block_delta\n",
210 "data: {\"type\":\"content_block_delta\",\"index\":0,\n",
211 "data: \"delta\":{\"type\":\"text_delta\",\"text\":\"Hello\"}}\n\n"
212 );
213
214 let event = parse_frame(frame).expect("frame should parse");
215 assert_eq!(
216 event,
217 Some(StreamEvent::ContentBlockDelta(
218 crate::types::ContentBlockDeltaEvent {
219 index: 0,
220 delta: ContentBlockDelta::TextDelta {
221 text: "Hello".to_string(),
222 },
223 }
224 ))
225 );
226 }
227
228 #[test]
229 fn parses_thinking_content_block_start() {
230 let frame = concat!(
231 "event: content_block_start\n",
232 "data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"thinking\",\"thinking\":\"\",\"signature\":null}}\n\n"
233 );
234
235 let event = parse_frame(frame).expect("frame should parse");
236 assert_eq!(
237 event,
238 Some(StreamEvent::ContentBlockStart(
239 crate::types::ContentBlockStartEvent {
240 index: 0,
241 content_block: OutputContentBlock::Thinking {
242 thinking: String::new(),
243 signature: None,
244 },
245 },
246 ))
247 );
248 }
249
250 #[test]
251 fn parses_thinking_related_deltas() {
252 let thinking = concat!(
253 "event: content_block_delta\n",
254 "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"thinking_delta\",\"thinking\":\"step 1\"}}\n\n"
255 );
256 let signature = concat!(
257 "event: content_block_delta\n",
258 "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"signature_delta\",\"signature\":\"sig_123\"}}\n\n"
259 );
260
261 let thinking_event = parse_frame(thinking).expect("thinking delta should parse");
262 let signature_event = parse_frame(signature).expect("signature delta should parse");
263
264 assert_eq!(
265 thinking_event,
266 Some(StreamEvent::ContentBlockDelta(
267 crate::types::ContentBlockDeltaEvent {
268 index: 0,
269 delta: ContentBlockDelta::ThinkingDelta {
270 thinking: "step 1".to_string(),
271 },
272 }
273 ))
274 );
275 assert_eq!(
276 signature_event,
277 Some(StreamEvent::ContentBlockDelta(
278 crate::types::ContentBlockDeltaEvent {
279 index: 0,
280 delta: ContentBlockDelta::SignatureDelta {
281 signature: "sig_123".to_string(),
282 },
283 }
284 ))
285 );
286 }
287
288 #[test]
289 fn rejects_oversized_buffer() {
290 let mut parser = SseParser::new();
291 let big_chunk = vec![b'x'; SseParser::MAX_BUFFER_SIZE + 1];
292 let err = parser.push(&big_chunk).unwrap_err();
293 assert!(err.to_string().contains("limit"));
294 }
295}