Skip to main content

earl_protocol_http/
sse.rs

1/// A parsed Server-Sent Event.
2#[derive(Debug, Clone)]
3pub struct SseEvent {
4    pub event_type: Option<String>,
5    pub data: String,
6    pub id: Option<String>,
7}
8
9/// Stateful Server-Sent Events parser.
10///
11/// Buffers incomplete events across calls to [`feed`] so that events split
12/// across HTTP chunk boundaries are handled correctly.
13pub struct SseParser {
14    /// Leftover text that hasn't yet formed a complete event block.
15    buffer: String,
16}
17
18impl Default for SseParser {
19    fn default() -> Self {
20        Self::new()
21    }
22}
23
24impl SseParser {
25    pub fn new() -> Self {
26        Self {
27            buffer: String::new(),
28        }
29    }
30
31    /// Feed a chunk of text into the parser, returning any complete events.
32    ///
33    /// Incomplete events are buffered internally and will be emitted on
34    /// subsequent `feed()` calls once the closing blank line arrives.
35    pub fn feed(&mut self, input: &str) -> Vec<SseEvent> {
36        self.buffer.push_str(input);
37        let mut events = Vec::new();
38
39        // The SSE spec defines event boundaries as blank lines.
40        // We support both \n\n and \r\n\r\n.
41        loop {
42            // Find the next event boundary (blank line).
43            let boundary = self
44                .buffer
45                .find("\n\n")
46                .map(|pos| (pos, 2))
47                .or_else(|| self.buffer.find("\r\n\r\n").map(|pos| (pos, 4)));
48
49            let Some((pos, sep_len)) = boundary else {
50                break;
51            };
52
53            let block = &self.buffer[..pos];
54            if let Some(event) = Self::parse_block(block) {
55                events.push(event);
56            }
57            // Drain the consumed block + separator.
58            self.buffer.drain(..pos + sep_len);
59        }
60
61        events
62    }
63
64    /// Flush any remaining buffered data as a final event.
65    ///
66    /// Call this when the stream ends to emit any trailing event that
67    /// wasn't followed by a blank line.
68    pub fn flush(&mut self) -> Option<SseEvent> {
69        let block = std::mem::take(&mut self.buffer);
70        let block = block.trim();
71        if block.is_empty() {
72            return None;
73        }
74        Self::parse_block(block)
75    }
76
77    fn parse_block(block: &str) -> Option<SseEvent> {
78        let mut data_lines: Vec<&str> = Vec::new();
79        let mut event_type: Option<String> = None;
80        let mut id: Option<String> = None;
81
82        for line in block.lines() {
83            if line.starts_with(':') {
84                // Comment — skip.
85                continue;
86            }
87
88            if let Some(rest) = line.strip_prefix("data:") {
89                let value = rest.strip_prefix(' ').unwrap_or(rest);
90                data_lines.push(value);
91            } else if let Some(rest) = line.strip_prefix("event:") {
92                let value = rest.strip_prefix(' ').unwrap_or(rest);
93                event_type = Some(value.to_string());
94            } else if let Some(rest) = line.strip_prefix("id:") {
95                let value = rest.strip_prefix(' ').unwrap_or(rest);
96                id = Some(value.to_string());
97            }
98            // Unknown fields are ignored per the SSE spec.
99        }
100
101        // Only emit an event if there was at least one data line.
102        if data_lines.is_empty() {
103            return None;
104        }
105
106        Some(SseEvent {
107            event_type,
108            data: data_lines.join("\n"),
109            id,
110        })
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117
118    #[test]
119    fn single_data_line_returned_as_event_data() {
120        let input = "data: hello world\n\n";
121        let events = SseParser::new().feed(input);
122        assert_eq!(events.len(), 1);
123        assert_eq!(events[0].data, "hello world");
124    }
125
126    #[test]
127    fn multiple_data_lines_joined_with_newline() {
128        let input = "data: line1\ndata: line2\n\n";
129        let events = SseParser::new().feed(input);
130        assert_eq!(events.len(), 1);
131        assert_eq!(events[0].data, "line1\nline2");
132    }
133
134    #[test]
135    fn event_field_sets_event_type() {
136        let input = "event: update\ndata: {\"key\":\"value\"}\n\n";
137        let events = SseParser::new().feed(input);
138        assert_eq!(events.len(), 1);
139        assert_eq!(events[0].event_type.as_deref(), Some("update"));
140    }
141
142    #[test]
143    fn comment_lines_excluded_from_event_data() {
144        let input = ": this is a comment\ndata: actual data\n\n";
145        let events = SseParser::new().feed(input);
146        assert_eq!(events.len(), 1);
147        assert_eq!(events[0].data, "actual data");
148    }
149
150    #[test]
151    fn multiple_complete_events_all_returned() {
152        let input = "data: event1\n\ndata: event2\n\n";
153        let events = SseParser::new().feed(input);
154        assert_eq!(events.len(), 2);
155    }
156
157    #[test]
158    fn id_field_sets_event_id() {
159        let input = "id: 42\ndata: payload\n\n";
160        let events = SseParser::new().feed(input);
161        assert_eq!(events.len(), 1);
162        assert_eq!(events[0].id.as_deref(), Some("42"));
163    }
164
165    #[test]
166    fn no_space_after_colon_data_is_parsed() {
167        let input = "data:no-space\n\n";
168        let events = SseParser::new().feed(input);
169        assert_eq!(events.len(), 1);
170        assert_eq!(events[0].data, "no-space");
171    }
172
173    #[test]
174    fn block_without_data_field_produces_no_event() {
175        let input = "event: ping\n\n";
176        let events = SseParser::new().feed(input);
177        assert!(events.is_empty());
178    }
179
180    #[test]
181    fn empty_input_returns_no_events() {
182        let events = SseParser::new().feed("");
183        assert!(events.is_empty());
184    }
185
186    #[test]
187    fn event_split_across_chunks_buffered_until_complete() {
188        let mut parser = SseParser::new();
189
190        // First chunk contains the beginning of the event but no blank-line terminator.
191        let events = parser.feed("data: hel");
192        assert!(events.is_empty(), "no complete event yet");
193
194        // Second chunk completes the event with the rest of the data + blank line.
195        let events = parser.feed("lo world\n\n");
196        assert_eq!(events.len(), 1);
197        assert_eq!(events[0].data, "hello world");
198    }
199
200    #[test]
201    fn crlf_line_endings_parse_event_type() {
202        let input = "event: update\r\ndata: payload\r\n\r\n";
203        let events = SseParser::new().feed(input);
204        assert_eq!(events.len(), 1);
205        assert_eq!(events[0].event_type.as_deref(), Some("update"));
206    }
207
208    #[test]
209    fn crlf_line_endings_parse_data() {
210        let input = "event: update\r\ndata: payload\r\n\r\n";
211        let events = SseParser::new().feed(input);
212        assert_eq!(events.len(), 1);
213        assert_eq!(events[0].data, "payload");
214    }
215
216    #[test]
217    fn trailing_data_without_terminator_emitted_on_flush() {
218        let mut parser = SseParser::new();
219
220        // Feed an event that is NOT terminated by a blank line.
221        let events = parser.feed("data: trailing");
222        assert!(events.is_empty(), "no blank line yet, so nothing emitted");
223
224        // Flush should emit the trailing event.
225        let event = parser.flush().expect("should flush trailing event");
226        assert_eq!(event.data, "trailing");
227    }
228
229    #[test]
230    fn complete_event_before_partial_in_same_feed_emitted_immediately() {
231        let mut parser = SseParser::new();
232        let events = parser.feed("data: first\n\ndata: sec");
233        assert_eq!(events.len(), 1);
234        assert_eq!(events[0].data, "first");
235    }
236
237    #[test]
238    fn subsequent_feed_completes_partial_and_returns_additional_events() {
239        let mut parser = SseParser::new();
240        // Setup: buffer a partial event.
241        parser.feed("data: first\n\ndata: sec");
242        // Second feed completes the partial and delivers a further event.
243        let events = parser.feed("ond\n\ndata: third\n\n");
244        assert_eq!(events.len(), 2);
245        assert_eq!(events[0].data, "second");
246        assert_eq!(events[1].data, "third");
247    }
248}