Skip to main content

io_http/sse/
frame.rs

1//! I/O-free coroutine decoding a W3C [Server-Sent Events] stream.
2//! Line-oriented and infallible: the parser never terminates and the
3//! outer driver stops when the body stream closes.
4//!
5//! [Server-Sent Events]: https://html.spec.whatwg.org/multipage/server-sent-events.html
6
7use core::{convert::Infallible, mem, str};
8
9use alloc::{
10    string::{String, ToString},
11    vec::Vec,
12};
13
14use log::trace;
15
16use crate::coroutine::*;
17
18/// One dispatched Server-Sent Event. `event` is `None` when the
19/// `event:` field was absent; `data` has the trailing newline stripped.
20#[derive(Clone, Debug, Default, PartialEq, Eq)]
21pub struct SseFrame {
22    pub event: Option<String>,
23    pub data: String,
24    pub id: Option<String>,
25    pub retry: Option<u64>,
26}
27
28/// Per-step yield emitted by [`SseFrameParser`].
29#[derive(Debug)]
30pub enum SseFrameParserYield {
31    Frame(SseFrame),
32    WantsBytes,
33}
34
35/// I/O-free Server-Sent Events frame parser.
36#[derive(Debug, Default)]
37pub struct SseFrameParser {
38    buf: Vec<u8>,
39    bom_stripped: bool,
40    event: Option<String>,
41    data: String,
42    last_event_id: Option<String>,
43    retry: Option<u64>,
44}
45
46impl SseFrameParser {
47    /// Last-event-id seen so far; persists across dispatched frames so a
48    /// reconnecting caller can resume via the `Last-Event-ID` header.
49    pub fn last_event_id(&self) -> Option<&str> {
50        self.last_event_id.as_deref()
51    }
52}
53
54impl HttpCoroutine for SseFrameParser {
55    type Yield = SseFrameParserYield;
56    type Return = Infallible;
57
58    fn resume(&mut self, arg: Option<&[u8]>) -> HttpCoroutineState<Self::Yield, Self::Return> {
59        if let Some(data) = arg {
60            trace!("resume with {} bytes", data.len());
61            self.buf.extend_from_slice(data);
62        }
63
64        if !self.bom_stripped && self.buf.len() >= 3 {
65            if self.buf.starts_with(&[0xEF, 0xBB, 0xBF]) {
66                self.buf.drain(..3);
67            }
68            self.bom_stripped = true;
69        }
70
71        loop {
72            let Some((line, consumed)) = next_line(&self.buf) else {
73                return HttpCoroutineState::Yielded(SseFrameParserYield::WantsBytes);
74            };
75
76            let line_bytes = self.buf[..line].to_vec();
77            self.buf.drain(..consumed);
78
79            if line_bytes.is_empty() {
80                if self.data.is_empty() && self.event.is_none() {
81                    continue;
82                }
83
84                if self.data.ends_with('\n') {
85                    self.data.pop();
86                }
87
88                let frame = SseFrame {
89                    event: self.event.take(),
90                    data: mem::take(&mut self.data),
91                    id: self.last_event_id.clone(),
92                    retry: self.retry.take(),
93                };
94                return HttpCoroutineState::Yielded(SseFrameParserYield::Frame(frame));
95            }
96
97            if line_bytes.first() == Some(&b':') {
98                continue;
99            }
100
101            let (name, value) = split_field(&line_bytes);
102            let Ok(name) = str::from_utf8(name) else {
103                trace!("ignore field with non-utf8 name");
104                continue;
105            };
106            let Ok(value) = str::from_utf8(value) else {
107                trace!("ignore field with non-utf8 value");
108                continue;
109            };
110
111            match name {
112                "event" => self.event = Some(value.to_string()),
113                "data" => {
114                    self.data.push_str(value);
115                    self.data.push('\n');
116                }
117                "id" => {
118                    if !value.contains('\0') {
119                        self.last_event_id = Some(value.to_string());
120                    }
121                }
122                "retry" => {
123                    if let Ok(n) = value.parse::<u64>() {
124                        self.retry = Some(n);
125                    }
126                }
127                _ => trace!("ignore unknown field `{name}`"),
128            }
129        }
130    }
131}
132
133// Returns (line_end_excl_terminator, total_consumed) or None when the
134// buffer doesn't yet contain a complete line. Terminator may be \r\n,
135// \n, or bare \r; a trailing \r is treated as incomplete pending \n.
136fn next_line(buf: &[u8]) -> Option<(usize, usize)> {
137    let cr = memchr::memchr(b'\r', buf);
138    let lf = memchr::memchr(b'\n', buf);
139
140    match (cr, lf) {
141        (Some(cr), Some(lf)) if cr + 1 == lf => Some((cr, lf + 1)),
142        (Some(cr), Some(lf)) if cr < lf => {
143            if cr + 1 == buf.len() {
144                None
145            } else {
146                Some((cr, cr + 1))
147            }
148        }
149        (Some(cr), None) => {
150            if cr + 1 == buf.len() {
151                None
152            } else {
153                Some((cr, cr + 1))
154            }
155        }
156        (_, Some(lf)) => Some((lf, lf + 1)),
157        (None, None) => None,
158    }
159}
160
161// Splits a non-empty SSE line on the first `:`; a single leading SP in
162// the value is stripped per spec.
163fn split_field(line: &[u8]) -> (&[u8], &[u8]) {
164    match memchr::memchr(b':', line) {
165        None => (line, &[]),
166        Some(colon) => {
167            let name = &line[..colon];
168            let mut value = &line[colon + 1..];
169            if value.first() == Some(&b' ') {
170                value = &value[1..];
171            }
172            (name, value)
173        }
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use alloc::vec;
180
181    use crate::sse::frame::*;
182
183    fn collect(stream: &[u8]) -> Vec<SseFrame> {
184        let mut parser = SseFrameParser::default();
185        let mut arg: Option<&[u8]> = Some(stream);
186        let mut frames = Vec::new();
187
188        loop {
189            match parser.resume(arg.take()) {
190                HttpCoroutineState::Yielded(SseFrameParserYield::Frame(frame)) => {
191                    frames.push(frame)
192                }
193                HttpCoroutineState::Yielded(SseFrameParserYield::WantsBytes) => break,
194                HttpCoroutineState::Complete(never) => match never {},
195            }
196        }
197
198        frames
199    }
200
201    #[test]
202    fn single_data_event() {
203        let frames = collect(b"data: hello\n\n");
204        assert_eq!(
205            frames,
206            vec![SseFrame {
207                event: None,
208                data: "hello".into(),
209                id: None,
210                retry: None,
211            }]
212        );
213    }
214
215    #[test]
216    fn multi_line_data_joined_by_newline() {
217        let frames = collect(b"data: hello\ndata: world\n\n");
218        assert_eq!(frames[0].data, "hello\nworld");
219    }
220
221    #[test]
222    fn event_and_id_fields() {
223        let frames = collect(b"event: state\ndata: x\nid: 42\n\n");
224        assert_eq!(frames[0].event.as_deref(), Some("state"));
225        assert_eq!(frames[0].data, "x");
226        assert_eq!(frames[0].id.as_deref(), Some("42"));
227    }
228
229    #[test]
230    fn retry_parsed_when_integer() {
231        let frames = collect(b"retry: 5000\ndata: x\n\n");
232        assert_eq!(frames[0].retry, Some(5000));
233    }
234
235    #[test]
236    fn retry_ignored_when_non_integer() {
237        let frames = collect(b"retry: hello\ndata: x\n\n");
238        assert_eq!(frames[0].retry, None);
239    }
240
241    #[test]
242    fn comment_lines_ignored() {
243        let frames = collect(b": keep-alive\ndata: x\n\n");
244        assert_eq!(frames[0].data, "x");
245    }
246
247    #[test]
248    fn empty_event_no_dispatch() {
249        let frames = collect(b"\n\n\n");
250        assert!(frames.is_empty());
251    }
252
253    #[test]
254    fn id_persists_across_events() {
255        let mut parser = SseFrameParser::default();
256        let mut arg: Option<&[u8]> = Some(b"id: 1\ndata: a\n\ndata: b\n\n");
257        let mut frames = Vec::new();
258
259        loop {
260            match parser.resume(arg.take()) {
261                HttpCoroutineState::Yielded(SseFrameParserYield::Frame(frame)) => {
262                    frames.push(frame)
263                }
264                HttpCoroutineState::Yielded(SseFrameParserYield::WantsBytes) => break,
265                HttpCoroutineState::Complete(never) => match never {},
266            }
267        }
268
269        assert_eq!(frames[0].id.as_deref(), Some("1"));
270        assert_eq!(frames[1].id.as_deref(), Some("1"));
271        assert_eq!(parser.last_event_id(), Some("1"));
272    }
273
274    #[test]
275    fn id_with_null_is_ignored() {
276        let mut parser = SseFrameParser::default();
277        let stream = b"id: bad\0\ndata: x\n\n";
278        let arg: Option<&[u8]> = Some(stream);
279
280        match parser.resume(arg) {
281            HttpCoroutineState::Yielded(SseFrameParserYield::Frame(_)) => {}
282            HttpCoroutineState::Yielded(SseFrameParserYield::WantsBytes) => {
283                unreachable!("wants bytes");
284            }
285            HttpCoroutineState::Complete(never) => match never {},
286        }
287
288        assert_eq!(parser.last_event_id(), None);
289    }
290
291    #[test]
292    fn crlf_line_separator() {
293        let frames = collect(b"data: hello\r\n\r\n");
294        assert_eq!(frames[0].data, "hello");
295    }
296
297    #[test]
298    fn bare_cr_line_separator() {
299        let frames = collect(b"data: hello\r\rTAIL");
300        assert_eq!(frames[0].data, "hello");
301    }
302
303    #[test]
304    fn bom_stripped_at_stream_start() {
305        let frames = collect(b"\xEF\xBB\xBFdata: hello\n\n");
306        assert_eq!(frames[0].data, "hello");
307    }
308
309    #[test]
310    fn field_value_leading_space_stripped() {
311        let frames = collect(b"data:  hello\n\n");
312        assert_eq!(frames[0].data, " hello");
313    }
314
315    #[test]
316    fn field_no_value() {
317        let frames = collect(b"data\n\n");
318        assert_eq!(frames[0].data, "");
319    }
320
321    #[test]
322    fn incomplete_then_resumed() {
323        let mut parser = SseFrameParser::default();
324        let mut arg: Option<&[u8]> = Some(b"data: hel");
325        let mut frames = Vec::new();
326
327        loop {
328            match parser.resume(arg.take()) {
329                HttpCoroutineState::Yielded(SseFrameParserYield::Frame(frame)) => {
330                    frames.push(frame);
331                    break;
332                }
333                HttpCoroutineState::Yielded(SseFrameParserYield::WantsBytes) => {
334                    if arg.is_none() {
335                        arg = Some(b"lo\n\n");
336                    } else {
337                        break;
338                    }
339                }
340                HttpCoroutineState::Complete(never) => match never {},
341            }
342        }
343
344        assert_eq!(frames[0].data, "hello");
345    }
346
347    #[test]
348    fn unknown_field_ignored() {
349        let frames = collect(b"foobar: x\ndata: y\n\n");
350        assert_eq!(frames[0].data, "y");
351    }
352
353    #[test]
354    fn event_resets_after_dispatch() {
355        let frames = collect(b"event: a\ndata: x\n\ndata: y\n\n");
356        assert_eq!(frames[0].event.as_deref(), Some("a"));
357        assert_eq!(frames[1].event, None);
358    }
359}