1use core::{convert::Infallible, mem, str};
8
9use alloc::{
10 string::{String, ToString},
11 vec::Vec,
12};
13
14use log::trace;
15
16use crate::coroutine::*;
17
18#[derive(Clone, Debug, Default, PartialEq, Eq)]
21pub struct SseFrame {
22 pub event: Option<String>,
23 pub data: String,
24 pub id: Option<String>,
25 pub retry: Option<u64>,
26}
27
28#[derive(Debug)]
30pub enum SseFrameParserYield {
31 Frame(SseFrame),
32 WantsBytes,
33}
34
35#[derive(Debug, Default)]
37pub struct SseFrameParser {
38 buf: Vec<u8>,
39 bom_stripped: bool,
40 event: Option<String>,
41 data: String,
42 last_event_id: Option<String>,
43 retry: Option<u64>,
44}
45
46impl SseFrameParser {
47 pub fn last_event_id(&self) -> Option<&str> {
50 self.last_event_id.as_deref()
51 }
52}
53
54impl HttpCoroutine for SseFrameParser {
55 type Yield = SseFrameParserYield;
56 type Return = Infallible;
57
58 fn resume(&mut self, arg: Option<&[u8]>) -> HttpCoroutineState<Self::Yield, Self::Return> {
59 if let Some(data) = arg {
60 trace!("resume with {} bytes", data.len());
61 self.buf.extend_from_slice(data);
62 }
63
64 if !self.bom_stripped && self.buf.len() >= 3 {
65 if self.buf.starts_with(&[0xEF, 0xBB, 0xBF]) {
66 self.buf.drain(..3);
67 }
68 self.bom_stripped = true;
69 }
70
71 loop {
72 let Some((line, consumed)) = next_line(&self.buf) else {
73 return HttpCoroutineState::Yielded(SseFrameParserYield::WantsBytes);
74 };
75
76 let line_bytes = self.buf[..line].to_vec();
77 self.buf.drain(..consumed);
78
79 if line_bytes.is_empty() {
80 if self.data.is_empty() && self.event.is_none() {
81 continue;
82 }
83
84 if self.data.ends_with('\n') {
85 self.data.pop();
86 }
87
88 let frame = SseFrame {
89 event: self.event.take(),
90 data: mem::take(&mut self.data),
91 id: self.last_event_id.clone(),
92 retry: self.retry.take(),
93 };
94 return HttpCoroutineState::Yielded(SseFrameParserYield::Frame(frame));
95 }
96
97 if line_bytes.first() == Some(&b':') {
98 continue;
99 }
100
101 let (name, value) = split_field(&line_bytes);
102 let Ok(name) = str::from_utf8(name) else {
103 trace!("ignore field with non-utf8 name");
104 continue;
105 };
106 let Ok(value) = str::from_utf8(value) else {
107 trace!("ignore field with non-utf8 value");
108 continue;
109 };
110
111 match name {
112 "event" => self.event = Some(value.to_string()),
113 "data" => {
114 self.data.push_str(value);
115 self.data.push('\n');
116 }
117 "id" => {
118 if !value.contains('\0') {
119 self.last_event_id = Some(value.to_string());
120 }
121 }
122 "retry" => {
123 if let Ok(n) = value.parse::<u64>() {
124 self.retry = Some(n);
125 }
126 }
127 _ => trace!("ignore unknown field `{name}`"),
128 }
129 }
130 }
131}
132
133fn next_line(buf: &[u8]) -> Option<(usize, usize)> {
137 let cr = memchr::memchr(b'\r', buf);
138 let lf = memchr::memchr(b'\n', buf);
139
140 match (cr, lf) {
141 (Some(cr), Some(lf)) if cr + 1 == lf => Some((cr, lf + 1)),
142 (Some(cr), Some(lf)) if cr < lf => {
143 if cr + 1 == buf.len() {
144 None
145 } else {
146 Some((cr, cr + 1))
147 }
148 }
149 (Some(cr), None) => {
150 if cr + 1 == buf.len() {
151 None
152 } else {
153 Some((cr, cr + 1))
154 }
155 }
156 (_, Some(lf)) => Some((lf, lf + 1)),
157 (None, None) => None,
158 }
159}
160
161fn split_field(line: &[u8]) -> (&[u8], &[u8]) {
164 match memchr::memchr(b':', line) {
165 None => (line, &[]),
166 Some(colon) => {
167 let name = &line[..colon];
168 let mut value = &line[colon + 1..];
169 if value.first() == Some(&b' ') {
170 value = &value[1..];
171 }
172 (name, value)
173 }
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use alloc::vec;
180
181 use crate::sse::frame::*;
182
183 fn collect(stream: &[u8]) -> Vec<SseFrame> {
184 let mut parser = SseFrameParser::default();
185 let mut arg: Option<&[u8]> = Some(stream);
186 let mut frames = Vec::new();
187
188 loop {
189 match parser.resume(arg.take()) {
190 HttpCoroutineState::Yielded(SseFrameParserYield::Frame(frame)) => {
191 frames.push(frame)
192 }
193 HttpCoroutineState::Yielded(SseFrameParserYield::WantsBytes) => break,
194 HttpCoroutineState::Complete(never) => match never {},
195 }
196 }
197
198 frames
199 }
200
201 #[test]
202 fn single_data_event() {
203 let frames = collect(b"data: hello\n\n");
204 assert_eq!(
205 frames,
206 vec![SseFrame {
207 event: None,
208 data: "hello".into(),
209 id: None,
210 retry: None,
211 }]
212 );
213 }
214
215 #[test]
216 fn multi_line_data_joined_by_newline() {
217 let frames = collect(b"data: hello\ndata: world\n\n");
218 assert_eq!(frames[0].data, "hello\nworld");
219 }
220
221 #[test]
222 fn event_and_id_fields() {
223 let frames = collect(b"event: state\ndata: x\nid: 42\n\n");
224 assert_eq!(frames[0].event.as_deref(), Some("state"));
225 assert_eq!(frames[0].data, "x");
226 assert_eq!(frames[0].id.as_deref(), Some("42"));
227 }
228
229 #[test]
230 fn retry_parsed_when_integer() {
231 let frames = collect(b"retry: 5000\ndata: x\n\n");
232 assert_eq!(frames[0].retry, Some(5000));
233 }
234
235 #[test]
236 fn retry_ignored_when_non_integer() {
237 let frames = collect(b"retry: hello\ndata: x\n\n");
238 assert_eq!(frames[0].retry, None);
239 }
240
241 #[test]
242 fn comment_lines_ignored() {
243 let frames = collect(b": keep-alive\ndata: x\n\n");
244 assert_eq!(frames[0].data, "x");
245 }
246
247 #[test]
248 fn empty_event_no_dispatch() {
249 let frames = collect(b"\n\n\n");
250 assert!(frames.is_empty());
251 }
252
253 #[test]
254 fn id_persists_across_events() {
255 let mut parser = SseFrameParser::default();
256 let mut arg: Option<&[u8]> = Some(b"id: 1\ndata: a\n\ndata: b\n\n");
257 let mut frames = Vec::new();
258
259 loop {
260 match parser.resume(arg.take()) {
261 HttpCoroutineState::Yielded(SseFrameParserYield::Frame(frame)) => {
262 frames.push(frame)
263 }
264 HttpCoroutineState::Yielded(SseFrameParserYield::WantsBytes) => break,
265 HttpCoroutineState::Complete(never) => match never {},
266 }
267 }
268
269 assert_eq!(frames[0].id.as_deref(), Some("1"));
270 assert_eq!(frames[1].id.as_deref(), Some("1"));
271 assert_eq!(parser.last_event_id(), Some("1"));
272 }
273
274 #[test]
275 fn id_with_null_is_ignored() {
276 let mut parser = SseFrameParser::default();
277 let stream = b"id: bad\0\ndata: x\n\n";
278 let arg: Option<&[u8]> = Some(stream);
279
280 match parser.resume(arg) {
281 HttpCoroutineState::Yielded(SseFrameParserYield::Frame(_)) => {}
282 HttpCoroutineState::Yielded(SseFrameParserYield::WantsBytes) => {
283 unreachable!("wants bytes");
284 }
285 HttpCoroutineState::Complete(never) => match never {},
286 }
287
288 assert_eq!(parser.last_event_id(), None);
289 }
290
291 #[test]
292 fn crlf_line_separator() {
293 let frames = collect(b"data: hello\r\n\r\n");
294 assert_eq!(frames[0].data, "hello");
295 }
296
297 #[test]
298 fn bare_cr_line_separator() {
299 let frames = collect(b"data: hello\r\rTAIL");
300 assert_eq!(frames[0].data, "hello");
301 }
302
303 #[test]
304 fn bom_stripped_at_stream_start() {
305 let frames = collect(b"\xEF\xBB\xBFdata: hello\n\n");
306 assert_eq!(frames[0].data, "hello");
307 }
308
309 #[test]
310 fn field_value_leading_space_stripped() {
311 let frames = collect(b"data: hello\n\n");
312 assert_eq!(frames[0].data, " hello");
313 }
314
315 #[test]
316 fn field_no_value() {
317 let frames = collect(b"data\n\n");
318 assert_eq!(frames[0].data, "");
319 }
320
321 #[test]
322 fn incomplete_then_resumed() {
323 let mut parser = SseFrameParser::default();
324 let mut arg: Option<&[u8]> = Some(b"data: hel");
325 let mut frames = Vec::new();
326
327 loop {
328 match parser.resume(arg.take()) {
329 HttpCoroutineState::Yielded(SseFrameParserYield::Frame(frame)) => {
330 frames.push(frame);
331 break;
332 }
333 HttpCoroutineState::Yielded(SseFrameParserYield::WantsBytes) => {
334 if arg.is_none() {
335 arg = Some(b"lo\n\n");
336 } else {
337 break;
338 }
339 }
340 HttpCoroutineState::Complete(never) => match never {},
341 }
342 }
343
344 assert_eq!(frames[0].data, "hello");
345 }
346
347 #[test]
348 fn unknown_field_ignored() {
349 let frames = collect(b"foobar: x\ndata: y\n\n");
350 assert_eq!(frames[0].data, "y");
351 }
352
353 #[test]
354 fn event_resets_after_dispatch() {
355 let frames = collect(b"event: a\ndata: x\n\ndata: y\n\n");
356 assert_eq!(frames[0].event.as_deref(), Some("a"));
357 assert_eq!(frames[1].event, None);
358 }
359}