Skip to main content

codineer_api/
sse.rs

1use crate::error::ApiError;
2use crate::types::StreamEvent;
3
4#[derive(Debug, Default)]
5pub struct SseParser {
6    buffer: Vec<u8>,
7}
8
9impl SseParser {
10    #[must_use]
11    pub fn new() -> Self {
12        Self::default()
13    }
14
15    const MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
16
17    pub fn push(&mut self, chunk: &[u8]) -> Result<Vec<StreamEvent>, ApiError> {
18        if self.buffer.len() + chunk.len() > Self::MAX_BUFFER_SIZE {
19            self.buffer.clear();
20            return Err(ApiError::ResponsePayloadTooLarge {
21                limit: Self::MAX_BUFFER_SIZE,
22            });
23        }
24        self.buffer.extend_from_slice(chunk);
25        let mut events = Vec::new();
26
27        while let Some(frame) = self.next_frame() {
28            if let Some(event) = parse_frame(&frame)? {
29                events.push(event);
30            }
31        }
32
33        Ok(events)
34    }
35
36    pub fn finish(&mut self) -> Result<Vec<StreamEvent>, ApiError> {
37        if self.buffer.is_empty() {
38            return Ok(Vec::new());
39        }
40
41        let trailing = std::mem::take(&mut self.buffer);
42        match parse_frame(&String::from_utf8_lossy(&trailing))? {
43            Some(event) => Ok(vec![event]),
44            None => Ok(Vec::new()),
45        }
46    }
47
48    fn next_frame(&mut self) -> Option<String> {
49        let separator = self
50            .buffer
51            .windows(2)
52            .position(|window| window == b"\n\n")
53            .map(|position| (position, 2))
54            .or_else(|| {
55                self.buffer
56                    .windows(4)
57                    .position(|window| window == b"\r\n\r\n")
58                    .map(|position| (position, 4))
59            })?;
60
61        let (position, separator_len) = separator;
62        let frame = self
63            .buffer
64            .drain(..position + separator_len)
65            .collect::<Vec<_>>();
66        let frame_len = frame.len().saturating_sub(separator_len);
67        Some(String::from_utf8_lossy(&frame[..frame_len]).into_owned())
68    }
69}
70
71pub fn parse_frame(frame: &str) -> Result<Option<StreamEvent>, ApiError> {
72    let trimmed = frame.trim();
73    if trimmed.is_empty() {
74        return Ok(None);
75    }
76
77    let mut data_lines = Vec::new();
78    let mut event_name: Option<&str> = None;
79
80    for line in trimmed.lines() {
81        if line.starts_with(':') {
82            continue;
83        }
84        if let Some(name) = line.strip_prefix("event:") {
85            event_name = Some(name.trim());
86            continue;
87        }
88        if let Some(data) = line.strip_prefix("data:") {
89            data_lines.push(data.trim_start());
90        }
91    }
92
93    if matches!(event_name, Some("ping")) {
94        return Ok(None);
95    }
96
97    if data_lines.is_empty() {
98        return Ok(None);
99    }
100
101    let payload = data_lines.join("\n");
102    if payload == "[DONE]" {
103        return Ok(None);
104    }
105
106    serde_json::from_str::<StreamEvent>(&payload)
107        .map(Some)
108        .map_err(ApiError::from)
109}
110
111#[cfg(test)]
112mod tests {
113    use super::{parse_frame, SseParser};
114    use crate::types::{ContentBlockDelta, MessageDelta, OutputContentBlock, StreamEvent, Usage};
115
116    #[test]
117    fn parses_single_frame() {
118        let frame = concat!(
119            "event: content_block_start\n",
120            "data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"Hi\"}}\n\n"
121        );
122
123        let event = parse_frame(frame).expect("frame should parse");
124        assert_eq!(
125            event,
126            Some(StreamEvent::ContentBlockStart(
127                crate::types::ContentBlockStartEvent {
128                    index: 0,
129                    content_block: OutputContentBlock::Text {
130                        text: "Hi".to_string(),
131                    },
132                },
133            ))
134        );
135    }
136
137    #[test]
138    fn parses_chunked_stream() {
139        let mut parser = SseParser::new();
140        let first = b"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Hel";
141        let second = b"lo\"}}\n\n";
142
143        assert!(parser
144            .push(first)
145            .expect("first chunk should buffer")
146            .is_empty());
147        let events = parser.push(second).expect("second chunk should parse");
148
149        assert_eq!(
150            events,
151            vec![StreamEvent::ContentBlockDelta(
152                crate::types::ContentBlockDeltaEvent {
153                    index: 0,
154                    delta: ContentBlockDelta::TextDelta {
155                        text: "Hello".to_string(),
156                    },
157                }
158            )]
159        );
160    }
161
162    #[test]
163    fn ignores_ping_and_done() {
164        let mut parser = SseParser::new();
165        let payload = concat!(
166            ": keepalive\n",
167            "event: ping\n",
168            "data: {\"type\":\"ping\"}\n\n",
169            "event: message_delta\n",
170            "data: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"tool_use\",\"stop_sequence\":null},\"usage\":{\"input_tokens\":1,\"output_tokens\":2}}\n\n",
171            "event: message_stop\n",
172            "data: {\"type\":\"message_stop\"}\n\n",
173            "data: [DONE]\n\n"
174        );
175
176        let events = parser
177            .push(payload.as_bytes())
178            .expect("parser should succeed");
179        assert_eq!(
180            events,
181            vec![
182                StreamEvent::MessageDelta(crate::types::MessageDeltaEvent {
183                    delta: MessageDelta {
184                        stop_reason: Some("tool_use".to_string()),
185                        stop_sequence: None,
186                    },
187                    usage: Usage {
188                        input_tokens: 1,
189                        cache_creation_input_tokens: 0,
190                        cache_read_input_tokens: 0,
191                        output_tokens: 2,
192                    },
193                }),
194                StreamEvent::MessageStop(crate::types::MessageStopEvent {}),
195            ]
196        );
197    }
198
199    #[test]
200    fn ignores_data_less_event_frames() {
201        let frame = "event: ping\n\n";
202        let event = parse_frame(frame).expect("frame without data should be ignored");
203        assert_eq!(event, None);
204    }
205
206    #[test]
207    fn parses_split_json_across_data_lines() {
208        let frame = concat!(
209            "event: content_block_delta\n",
210            "data: {\"type\":\"content_block_delta\",\"index\":0,\n",
211            "data: \"delta\":{\"type\":\"text_delta\",\"text\":\"Hello\"}}\n\n"
212        );
213
214        let event = parse_frame(frame).expect("frame should parse");
215        assert_eq!(
216            event,
217            Some(StreamEvent::ContentBlockDelta(
218                crate::types::ContentBlockDeltaEvent {
219                    index: 0,
220                    delta: ContentBlockDelta::TextDelta {
221                        text: "Hello".to_string(),
222                    },
223                }
224            ))
225        );
226    }
227
228    #[test]
229    fn parses_thinking_content_block_start() {
230        let frame = concat!(
231            "event: content_block_start\n",
232            "data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"thinking\",\"thinking\":\"\",\"signature\":null}}\n\n"
233        );
234
235        let event = parse_frame(frame).expect("frame should parse");
236        assert_eq!(
237            event,
238            Some(StreamEvent::ContentBlockStart(
239                crate::types::ContentBlockStartEvent {
240                    index: 0,
241                    content_block: OutputContentBlock::Thinking {
242                        thinking: String::new(),
243                        signature: None,
244                    },
245                },
246            ))
247        );
248    }
249
250    #[test]
251    fn parses_thinking_related_deltas() {
252        let thinking = concat!(
253            "event: content_block_delta\n",
254            "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"thinking_delta\",\"thinking\":\"step 1\"}}\n\n"
255        );
256        let signature = concat!(
257            "event: content_block_delta\n",
258            "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"signature_delta\",\"signature\":\"sig_123\"}}\n\n"
259        );
260
261        let thinking_event = parse_frame(thinking).expect("thinking delta should parse");
262        let signature_event = parse_frame(signature).expect("signature delta should parse");
263
264        assert_eq!(
265            thinking_event,
266            Some(StreamEvent::ContentBlockDelta(
267                crate::types::ContentBlockDeltaEvent {
268                    index: 0,
269                    delta: ContentBlockDelta::ThinkingDelta {
270                        thinking: "step 1".to_string(),
271                    },
272                }
273            ))
274        );
275        assert_eq!(
276            signature_event,
277            Some(StreamEvent::ContentBlockDelta(
278                crate::types::ContentBlockDeltaEvent {
279                    index: 0,
280                    delta: ContentBlockDelta::SignatureDelta {
281                        signature: "sig_123".to_string(),
282                    },
283                }
284            ))
285        );
286    }
287
288    #[test]
289    fn rejects_oversized_buffer() {
290        let mut parser = SseParser::new();
291        let big_chunk = vec![b'x'; SseParser::MAX_BUFFER_SIZE + 1];
292        let err = parser.push(&big_chunk).unwrap_err();
293        assert!(err.to_string().contains("limit"));
294    }
295}