Skip to main content

earl_protocol_http/
sse.rs

1/// A parsed Server-Sent Event.
2#[derive(Debug, Clone)]
3pub struct SseEvent {
4    pub event_type: Option<String>,
5    pub data: String,
6    pub id: Option<String>,
7}
8
9/// Stateful Server-Sent Events parser.
10///
11/// Buffers incomplete events across calls to [`feed`] so that events split
12/// across HTTP chunk boundaries are handled correctly.
13pub struct SseParser {
14    /// Leftover text that hasn't yet formed a complete event block.
15    buffer: String,
16}
17
18impl Default for SseParser {
19    fn default() -> Self {
20        Self::new()
21    }
22}
23
24impl SseParser {
25    pub fn new() -> Self {
26        Self {
27            buffer: String::new(),
28        }
29    }
30
31    /// Feed a chunk of text into the parser, returning any complete events.
32    ///
33    /// Incomplete events are buffered internally and will be emitted on
34    /// subsequent `feed()` calls once the closing blank line arrives.
35    pub fn feed(&mut self, input: &str) -> Vec<SseEvent> {
36        self.buffer.push_str(input);
37        let mut events = Vec::new();
38
39        // The SSE spec defines event boundaries as blank lines.
40        // We support both \n\n and \r\n\r\n.
41        loop {
42            // Find the next event boundary (blank line).
43            let boundary = self
44                .buffer
45                .find("\n\n")
46                .map(|pos| (pos, 2))
47                .or_else(|| self.buffer.find("\r\n\r\n").map(|pos| (pos, 4)));
48
49            let Some((pos, sep_len)) = boundary else {
50                break;
51            };
52
53            let block = &self.buffer[..pos];
54            if let Some(event) = Self::parse_block(block) {
55                events.push(event);
56            }
57            // Drain the consumed block + separator.
58            self.buffer.drain(..pos + sep_len);
59        }
60
61        events
62    }
63
64    /// Flush any remaining buffered data as a final event.
65    ///
66    /// Call this when the stream ends to emit any trailing event that
67    /// wasn't followed by a blank line.
68    pub fn flush(&mut self) -> Option<SseEvent> {
69        let block = std::mem::take(&mut self.buffer);
70        let block = block.trim();
71        if block.is_empty() {
72            return None;
73        }
74        Self::parse_block(block)
75    }
76
77    fn parse_block(block: &str) -> Option<SseEvent> {
78        let mut data_lines: Vec<&str> = Vec::new();
79        let mut event_type: Option<String> = None;
80        let mut id: Option<String> = None;
81
82        for line in block.lines() {
83            if line.starts_with(':') {
84                // Comment — skip.
85                continue;
86            }
87
88            if let Some(rest) = line.strip_prefix("data:") {
89                let value = rest.strip_prefix(' ').unwrap_or(rest);
90                data_lines.push(value);
91            } else if let Some(rest) = line.strip_prefix("event:") {
92                let value = rest.strip_prefix(' ').unwrap_or(rest);
93                event_type = Some(value.to_string());
94            } else if let Some(rest) = line.strip_prefix("id:") {
95                let value = rest.strip_prefix(' ').unwrap_or(rest);
96                id = Some(value.to_string());
97            }
98            // Unknown fields are ignored per the SSE spec.
99        }
100
101        // Only emit an event if there was at least one data line.
102        if data_lines.is_empty() {
103            return None;
104        }
105
106        Some(SseEvent {
107            event_type,
108            data: data_lines.join("\n"),
109            id,
110        })
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117
118    #[test]
119    fn parses_simple_data_event() {
120        let input = "data: hello world\n\n";
121        let events = SseParser::new().feed(input);
122        assert_eq!(events.len(), 1);
123        assert_eq!(events[0].data, "hello world");
124    }
125
126    #[test]
127    fn parses_multiline_data_event() {
128        let input = "data: line1\ndata: line2\n\n";
129        let events = SseParser::new().feed(input);
130        assert_eq!(events.len(), 1);
131        assert_eq!(events[0].data, "line1\nline2");
132    }
133
134    #[test]
135    fn parses_event_with_type() {
136        let input = "event: update\ndata: {\"key\":\"value\"}\n\n";
137        let events = SseParser::new().feed(input);
138        assert_eq!(events.len(), 1);
139        assert_eq!(events[0].event_type.as_deref(), Some("update"));
140        assert_eq!(events[0].data, "{\"key\":\"value\"}");
141    }
142
143    #[test]
144    fn skips_comments() {
145        let input = ": this is a comment\ndata: actual data\n\n";
146        let events = SseParser::new().feed(input);
147        assert_eq!(events.len(), 1);
148        assert_eq!(events[0].data, "actual data");
149    }
150
151    #[test]
152    fn handles_multiple_events() {
153        let input = "data: event1\n\ndata: event2\n\n";
154        let events = SseParser::new().feed(input);
155        assert_eq!(events.len(), 2);
156    }
157
158    #[test]
159    fn parses_event_with_id() {
160        let input = "id: 42\ndata: payload\n\n";
161        let events = SseParser::new().feed(input);
162        assert_eq!(events.len(), 1);
163        assert_eq!(events[0].id.as_deref(), Some("42"));
164        assert_eq!(events[0].data, "payload");
165    }
166
167    #[test]
168    fn handles_no_space_after_colon() {
169        let input = "data:no-space\n\n";
170        let events = SseParser::new().feed(input);
171        assert_eq!(events.len(), 1);
172        assert_eq!(events[0].data, "no-space");
173    }
174
175    #[test]
176    fn ignores_block_without_data() {
177        let input = "event: ping\n\n";
178        let events = SseParser::new().feed(input);
179        assert!(events.is_empty());
180    }
181
182    #[test]
183    fn handles_empty_input() {
184        let events = SseParser::new().feed("");
185        assert!(events.is_empty());
186    }
187
188    #[test]
189    fn event_split_across_chunks() {
190        let mut parser = SseParser::new();
191
192        // First chunk contains the beginning of the event but no blank-line terminator.
193        let events = parser.feed("data: hel");
194        assert!(events.is_empty(), "no complete event yet");
195
196        // Second chunk completes the event with the rest of the data + blank line.
197        let events = parser.feed("lo world\n\n");
198        assert_eq!(events.len(), 1);
199        assert_eq!(events[0].data, "hello world");
200    }
201
202    #[test]
203    fn handles_crlf_line_endings() {
204        let input = "event: update\r\ndata: payload\r\n\r\n";
205        let events = SseParser::new().feed(input);
206        assert_eq!(events.len(), 1);
207        assert_eq!(events[0].event_type.as_deref(), Some("update"));
208        assert_eq!(events[0].data, "payload");
209    }
210
211    #[test]
212    fn flush_trailing_event() {
213        let mut parser = SseParser::new();
214
215        // Feed an event that is NOT terminated by a blank line.
216        let events = parser.feed("data: trailing");
217        assert!(events.is_empty(), "no blank line yet, so nothing emitted");
218
219        // Flush should emit the trailing event.
220        let event = parser.flush().expect("should flush trailing event");
221        assert_eq!(event.data, "trailing");
222    }
223
224    #[test]
225    fn multiple_feed_calls() {
226        let mut parser = SseParser::new();
227
228        // First feed: one complete event and start of another.
229        let events = parser.feed("data: first\n\ndata: sec");
230        assert_eq!(events.len(), 1);
231        assert_eq!(events[0].data, "first");
232
233        // Second feed: finish the second event and deliver a third.
234        let events = parser.feed("ond\n\ndata: third\n\n");
235        assert_eq!(events.len(), 2);
236        assert_eq!(events[0].data, "second");
237        assert_eq!(events[1].data, "third");
238    }
239}