earl_protocol_http/
sse.rs1#[derive(Debug, Clone)]
3pub struct SseEvent {
4 pub event_type: Option<String>,
5 pub data: String,
6 pub id: Option<String>,
7}
8
9pub struct SseParser {
14 buffer: String,
16}
17
18impl Default for SseParser {
19 fn default() -> Self {
20 Self::new()
21 }
22}
23
24impl SseParser {
25 pub fn new() -> Self {
26 Self {
27 buffer: String::new(),
28 }
29 }
30
31 pub fn feed(&mut self, input: &str) -> Vec<SseEvent> {
36 self.buffer.push_str(input);
37 let mut events = Vec::new();
38
39 loop {
42 let boundary = self
44 .buffer
45 .find("\n\n")
46 .map(|pos| (pos, 2))
47 .or_else(|| self.buffer.find("\r\n\r\n").map(|pos| (pos, 4)));
48
49 let Some((pos, sep_len)) = boundary else {
50 break;
51 };
52
53 let block = &self.buffer[..pos];
54 if let Some(event) = Self::parse_block(block) {
55 events.push(event);
56 }
57 self.buffer.drain(..pos + sep_len);
59 }
60
61 events
62 }
63
64 pub fn flush(&mut self) -> Option<SseEvent> {
69 let block = std::mem::take(&mut self.buffer);
70 let block = block.trim();
71 if block.is_empty() {
72 return None;
73 }
74 Self::parse_block(block)
75 }
76
77 fn parse_block(block: &str) -> Option<SseEvent> {
78 let mut data_lines: Vec<&str> = Vec::new();
79 let mut event_type: Option<String> = None;
80 let mut id: Option<String> = None;
81
82 for line in block.lines() {
83 if line.starts_with(':') {
84 continue;
86 }
87
88 if let Some(rest) = line.strip_prefix("data:") {
89 let value = rest.strip_prefix(' ').unwrap_or(rest);
90 data_lines.push(value);
91 } else if let Some(rest) = line.strip_prefix("event:") {
92 let value = rest.strip_prefix(' ').unwrap_or(rest);
93 event_type = Some(value.to_string());
94 } else if let Some(rest) = line.strip_prefix("id:") {
95 let value = rest.strip_prefix(' ').unwrap_or(rest);
96 id = Some(value.to_string());
97 }
98 }
100
101 if data_lines.is_empty() {
103 return None;
104 }
105
106 Some(SseEvent {
107 event_type,
108 data: data_lines.join("\n"),
109 id,
110 })
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117
118 #[test]
119 fn single_data_line_returned_as_event_data() {
120 let input = "data: hello world\n\n";
121 let events = SseParser::new().feed(input);
122 assert_eq!(events.len(), 1);
123 assert_eq!(events[0].data, "hello world");
124 }
125
126 #[test]
127 fn multiple_data_lines_joined_with_newline() {
128 let input = "data: line1\ndata: line2\n\n";
129 let events = SseParser::new().feed(input);
130 assert_eq!(events.len(), 1);
131 assert_eq!(events[0].data, "line1\nline2");
132 }
133
134 #[test]
135 fn event_field_sets_event_type() {
136 let input = "event: update\ndata: {\"key\":\"value\"}\n\n";
137 let events = SseParser::new().feed(input);
138 assert_eq!(events.len(), 1);
139 assert_eq!(events[0].event_type.as_deref(), Some("update"));
140 }
141
142 #[test]
143 fn comment_lines_excluded_from_event_data() {
144 let input = ": this is a comment\ndata: actual data\n\n";
145 let events = SseParser::new().feed(input);
146 assert_eq!(events.len(), 1);
147 assert_eq!(events[0].data, "actual data");
148 }
149
150 #[test]
151 fn multiple_complete_events_all_returned() {
152 let input = "data: event1\n\ndata: event2\n\n";
153 let events = SseParser::new().feed(input);
154 assert_eq!(events.len(), 2);
155 }
156
157 #[test]
158 fn id_field_sets_event_id() {
159 let input = "id: 42\ndata: payload\n\n";
160 let events = SseParser::new().feed(input);
161 assert_eq!(events.len(), 1);
162 assert_eq!(events[0].id.as_deref(), Some("42"));
163 }
164
165 #[test]
166 fn no_space_after_colon_data_is_parsed() {
167 let input = "data:no-space\n\n";
168 let events = SseParser::new().feed(input);
169 assert_eq!(events.len(), 1);
170 assert_eq!(events[0].data, "no-space");
171 }
172
173 #[test]
174 fn block_without_data_field_produces_no_event() {
175 let input = "event: ping\n\n";
176 let events = SseParser::new().feed(input);
177 assert!(events.is_empty());
178 }
179
180 #[test]
181 fn empty_input_returns_no_events() {
182 let events = SseParser::new().feed("");
183 assert!(events.is_empty());
184 }
185
186 #[test]
187 fn event_split_across_chunks_buffered_until_complete() {
188 let mut parser = SseParser::new();
189
190 let events = parser.feed("data: hel");
192 assert!(events.is_empty(), "no complete event yet");
193
194 let events = parser.feed("lo world\n\n");
196 assert_eq!(events.len(), 1);
197 assert_eq!(events[0].data, "hello world");
198 }
199
200 #[test]
201 fn crlf_line_endings_parse_event_type() {
202 let input = "event: update\r\ndata: payload\r\n\r\n";
203 let events = SseParser::new().feed(input);
204 assert_eq!(events.len(), 1);
205 assert_eq!(events[0].event_type.as_deref(), Some("update"));
206 }
207
208 #[test]
209 fn crlf_line_endings_parse_data() {
210 let input = "event: update\r\ndata: payload\r\n\r\n";
211 let events = SseParser::new().feed(input);
212 assert_eq!(events.len(), 1);
213 assert_eq!(events[0].data, "payload");
214 }
215
216 #[test]
217 fn trailing_data_without_terminator_emitted_on_flush() {
218 let mut parser = SseParser::new();
219
220 let events = parser.feed("data: trailing");
222 assert!(events.is_empty(), "no blank line yet, so nothing emitted");
223
224 let event = parser.flush().expect("should flush trailing event");
226 assert_eq!(event.data, "trailing");
227 }
228
229 #[test]
230 fn complete_event_before_partial_in_same_feed_emitted_immediately() {
231 let mut parser = SseParser::new();
232 let events = parser.feed("data: first\n\ndata: sec");
233 assert_eq!(events.len(), 1);
234 assert_eq!(events[0].data, "first");
235 }
236
237 #[test]
238 fn subsequent_feed_completes_partial_and_returns_additional_events() {
239 let mut parser = SseParser::new();
240 parser.feed("data: first\n\ndata: sec");
242 let events = parser.feed("ond\n\ndata: third\n\n");
244 assert_eq!(events.len(), 2);
245 assert_eq!(events[0].data, "second");
246 assert_eq!(events[1].data, "third");
247 }
248}