earl_protocol_http/
sse.rs1#[derive(Debug, Clone)]
3pub struct SseEvent {
4 pub event_type: Option<String>,
5 pub data: String,
6 pub id: Option<String>,
7}
8
9pub struct SseParser {
14 buffer: String,
16}
17
18impl Default for SseParser {
19 fn default() -> Self {
20 Self::new()
21 }
22}
23
24impl SseParser {
25 pub fn new() -> Self {
26 Self {
27 buffer: String::new(),
28 }
29 }
30
31 pub fn feed(&mut self, input: &str) -> Vec<SseEvent> {
36 self.buffer.push_str(input);
37 let mut events = Vec::new();
38
39 loop {
42 let boundary = self
44 .buffer
45 .find("\n\n")
46 .map(|pos| (pos, 2))
47 .or_else(|| self.buffer.find("\r\n\r\n").map(|pos| (pos, 4)));
48
49 let Some((pos, sep_len)) = boundary else {
50 break;
51 };
52
53 let block = &self.buffer[..pos];
54 if let Some(event) = Self::parse_block(block) {
55 events.push(event);
56 }
57 self.buffer.drain(..pos + sep_len);
59 }
60
61 events
62 }
63
64 pub fn flush(&mut self) -> Option<SseEvent> {
69 let block = std::mem::take(&mut self.buffer);
70 let block = block.trim();
71 if block.is_empty() {
72 return None;
73 }
74 Self::parse_block(block)
75 }
76
77 fn parse_block(block: &str) -> Option<SseEvent> {
78 let mut data_lines: Vec<&str> = Vec::new();
79 let mut event_type: Option<String> = None;
80 let mut id: Option<String> = None;
81
82 for line in block.lines() {
83 if line.starts_with(':') {
84 continue;
86 }
87
88 if let Some(rest) = line.strip_prefix("data:") {
89 let value = rest.strip_prefix(' ').unwrap_or(rest);
90 data_lines.push(value);
91 } else if let Some(rest) = line.strip_prefix("event:") {
92 let value = rest.strip_prefix(' ').unwrap_or(rest);
93 event_type = Some(value.to_string());
94 } else if let Some(rest) = line.strip_prefix("id:") {
95 let value = rest.strip_prefix(' ').unwrap_or(rest);
96 id = Some(value.to_string());
97 }
98 }
100
101 if data_lines.is_empty() {
103 return None;
104 }
105
106 Some(SseEvent {
107 event_type,
108 data: data_lines.join("\n"),
109 id,
110 })
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117
118 #[test]
119 fn parses_simple_data_event() {
120 let input = "data: hello world\n\n";
121 let events = SseParser::new().feed(input);
122 assert_eq!(events.len(), 1);
123 assert_eq!(events[0].data, "hello world");
124 }
125
126 #[test]
127 fn parses_multiline_data_event() {
128 let input = "data: line1\ndata: line2\n\n";
129 let events = SseParser::new().feed(input);
130 assert_eq!(events.len(), 1);
131 assert_eq!(events[0].data, "line1\nline2");
132 }
133
134 #[test]
135 fn parses_event_with_type() {
136 let input = "event: update\ndata: {\"key\":\"value\"}\n\n";
137 let events = SseParser::new().feed(input);
138 assert_eq!(events.len(), 1);
139 assert_eq!(events[0].event_type.as_deref(), Some("update"));
140 assert_eq!(events[0].data, "{\"key\":\"value\"}");
141 }
142
143 #[test]
144 fn skips_comments() {
145 let input = ": this is a comment\ndata: actual data\n\n";
146 let events = SseParser::new().feed(input);
147 assert_eq!(events.len(), 1);
148 assert_eq!(events[0].data, "actual data");
149 }
150
151 #[test]
152 fn handles_multiple_events() {
153 let input = "data: event1\n\ndata: event2\n\n";
154 let events = SseParser::new().feed(input);
155 assert_eq!(events.len(), 2);
156 }
157
158 #[test]
159 fn parses_event_with_id() {
160 let input = "id: 42\ndata: payload\n\n";
161 let events = SseParser::new().feed(input);
162 assert_eq!(events.len(), 1);
163 assert_eq!(events[0].id.as_deref(), Some("42"));
164 assert_eq!(events[0].data, "payload");
165 }
166
167 #[test]
168 fn handles_no_space_after_colon() {
169 let input = "data:no-space\n\n";
170 let events = SseParser::new().feed(input);
171 assert_eq!(events.len(), 1);
172 assert_eq!(events[0].data, "no-space");
173 }
174
175 #[test]
176 fn ignores_block_without_data() {
177 let input = "event: ping\n\n";
178 let events = SseParser::new().feed(input);
179 assert!(events.is_empty());
180 }
181
182 #[test]
183 fn handles_empty_input() {
184 let events = SseParser::new().feed("");
185 assert!(events.is_empty());
186 }
187
188 #[test]
189 fn event_split_across_chunks() {
190 let mut parser = SseParser::new();
191
192 let events = parser.feed("data: hel");
194 assert!(events.is_empty(), "no complete event yet");
195
196 let events = parser.feed("lo world\n\n");
198 assert_eq!(events.len(), 1);
199 assert_eq!(events[0].data, "hello world");
200 }
201
202 #[test]
203 fn handles_crlf_line_endings() {
204 let input = "event: update\r\ndata: payload\r\n\r\n";
205 let events = SseParser::new().feed(input);
206 assert_eq!(events.len(), 1);
207 assert_eq!(events[0].event_type.as_deref(), Some("update"));
208 assert_eq!(events[0].data, "payload");
209 }
210
211 #[test]
212 fn flush_trailing_event() {
213 let mut parser = SseParser::new();
214
215 let events = parser.feed("data: trailing");
217 assert!(events.is_empty(), "no blank line yet, so nothing emitted");
218
219 let event = parser.flush().expect("should flush trailing event");
221 assert_eq!(event.data, "trailing");
222 }
223
224 #[test]
225 fn multiple_feed_calls() {
226 let mut parser = SseParser::new();
227
228 let events = parser.feed("data: first\n\ndata: sec");
230 assert_eq!(events.len(), 1);
231 assert_eq!(events[0].data, "first");
232
233 let events = parser.feed("ond\n\ndata: third\n\n");
235 assert_eq!(events.len(), 2);
236 assert_eq!(events[0].data, "second");
237 assert_eq!(events[1].data, "third");
238 }
239}