sse_agent/
parser.rs

1use bytes::{Buf, BufMut, Bytes, BytesMut};
2use memchr::{memchr, memchr2};
3use std::{error::Error as StdError, fmt, str};
4
5const CR: u8 = b'\r';
6const LF: u8 = b'\n';
7const COLON: u8 = b':';
8const NULL: char = '\u{0000}';
9
10/// Inner Error kind that contains possible errors occuring during parsing.
11#[derive(Clone, Copy, Debug)]
12pub enum Error {
13    Utf8(std::str::Utf8Error),
14}
15
16impl fmt::Display for Error {
17    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
18        match self {
19            Self::Utf8(err) => write!(f, "Invalid UTF8: {}", err),
20        }
21    }
22}
23
24impl StdError for Error {
25    fn source(&self) -> Option<&(dyn StdError + 'static)> {
26        match self {
27            Self::Utf8(ref err) => Some(err),
28        }
29    }
30}
31
32#[derive(Default)]
33struct EventBuilder {
34    event_type: Option<String>,
35    data: Option<String>,
36    last_event_id: Option<String>,
37}
38
39impl EventBuilder {
40    pub fn add_field(&mut self, name: &[u8], value_bs: &[u8]) -> Result<(), Error> {
41        let value = str::from_utf8(value_bs).map_err(Error::Utf8)?;
42
43        if name == &b"event"[..] {
44            // Set event type buffer to value. After parsing as utf8.
45            self.event_type.replace(String::from(value));
46        } else if name == &b"data"[..] {
47            // According to the spec
48            // (https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation)
49            // Whenever data is pushed, a single LF should be appended
50            // and then removed whenever an entire event is created.
51            // However this is stupid, better just add a LF before
52            // appending data to an already existing data buffer.
53            // So we push a LF before we add MORE data.
54            match &mut self.data {
55                Some(ref mut data) => {
56                    data.reserve(value.len() + 1);
57                    data.push('\n');
58                    data.push_str(value);
59                }
60
61                None => {
62                    self.data = Some(String::from(value));
63                }
64            }
65        } else if name == &b"id"[..] && !value.contains(NULL) {
66            // Set the latest_event_id buffer field.
67            // value must not contain any nulls.
68            self.last_event_id = Some(String::from(value));
69        } else if name == &b"retry"[..] && value.chars().all(|c| c.is_digit(10)) {
70            // If the field name is retry and the value is all base 10 digits.
71            // use the value as the amount of time to wait before reconnects.
72            // @TODO: Implement reconnection time.
73        }
74
75        Ok(())
76    }
77
78    fn ready(&self) -> bool {
79        self.event_type.is_some() || self.data.is_some() || self.last_event_id.is_some()
80    }
81
82    fn build_and_clear(&mut self) -> Result<crate::Event, Error> {
83        Ok(crate::Event {
84            event: self.event_type.take().unwrap_or_else(String::new),
85            data: self.data.take().unwrap_or_else(String::new),
86            last_event_id: self.last_event_id.take(),
87        })
88    }
89}
90
91#[derive(Default)]
92pub struct Parser {
93    buf: BytesMut,
94    builder: EventBuilder,
95}
96
97impl Parser {
98    pub fn put(&mut self, bs: impl Buf) {
99        self.buf.put(bs)
100    }
101
102    /// Parses a line and attemps to add it to the current Builder.
103    ///
104    pub fn next(&mut self) -> Option<Result<crate::Event, Error>> {
105        // Parse while there are lines.
106
107        while let Some(line) = self.parse_line() {
108            if line.is_empty() && self.builder.ready() {
109                return Some(self.builder.build_and_clear());
110            }
111
112            // Check if there's a colon in the line
113            match memchr(COLON, &line) {
114                // Lines beginning with colon are just skipped
115                Some(0) => {
116                    continue;
117                }
118
119                Some(i) => {
120                    // name is all the characters to the left of the colon.
121                    let name = &line[0..i];
122
123                    // Let value be all the chars AFTER the colon.
124                    // Drop any SPACE immeadately after the colon.
125                    let value = if i + 1 < line.len() && line[i + 1] == b' ' {
126                        &line[i + 2..]
127                    } else {
128                        &line[i + 1..]
129                    };
130
131                    // TODO:
132                    // 1. Remove potential white space after colon
133                    // 2. Verify that lines ending in colon works.
134                    if let Err(err) = self.builder.add_field(name, value) {
135                        return Some(Err(err));
136                    }
137                }
138
139                None => {
140                    if let Err(err) = self.builder.add_field(&line[..], &[][..]) {
141                        return Some(Err(err));
142                    }
143                }
144            }
145        }
146
147        None
148    }
149
150    fn parse_line(&mut self) -> Option<Bytes> {
151        // Ways a line can end:
152        //
153        // a U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair,
154        // CRLF
155        //
156        // a single U+000A LINE FEED (LF) character not preceded by a
157        // U+000D CARRIAGE RETURN (CR) character,
158        // LF
159        //
160        // a single U+000D CARRIAGE RETURN (CR) character
161        // not followed by a U+000A LINE FEED (LF) character
162        // CR
163        //
164        // being the ways in which a line can end.
165
166        match memchr2(CR, LF, &self.buf) {
167            Some(i) => {
168                let line = self.buf.split_to(i);
169
170                if !self.buf.is_empty() {
171                    if 2 < self.buf.len() && self.buf[0..2] == [CR, LF] {
172                        self.buf.advance(2);
173                    } else {
174                        self.buf.advance(1);
175                    }
176                }
177
178                Some(line.freeze())
179            }
180
181            None => None,
182        }
183    }
184
185    #[cfg(test)]
186    /// Helper fn for tests.
187    fn bytes(&self) -> &[u8] {
188        &self.buf
189    }
190}
191
192impl From<&[u8]> for Parser {
193    fn from(b: &[u8]) -> Self {
194        Self {
195            buf: BytesMut::from(b),
196            builder: EventBuilder::default(),
197        }
198    }
199}
200
201impl From<&str> for Parser {
202    fn from(s: &str) -> Self {
203        Self {
204            buf: BytesMut::from(s),
205            builder: EventBuilder::default(),
206        }
207    }
208}
209
210#[cfg(test)]
211mod tests {
212
213    use super::*;
214
215    #[test]
216    fn buf_cleared_line_ending_with_crlf() {
217        let mut p = Parser::from("\r\n");
218        p.next();
219        assert_eq!(p.bytes(), &[]);
220    }
221
222    #[test]
223    fn single_lf_should_be_empty_line() {
224        let mut p = Parser::from("\n");
225        assert_eq!(p.parse_line().expect("parsing line"), &b""[..]);
226    }
227
228    #[test]
229    fn buf_cleared_line_ending_with_cr() {
230        let mut p = Parser::from("\r");
231        p.next();
232        assert_eq!(p.bytes(), &[]);
233    }
234
235    #[test]
236    fn buf_cleared_line_ending_with_lf() {
237        let mut p = Parser::from("\n");
238        p.next();
239        assert_eq!(p.bytes(), &[]);
240    }
241
242    #[test]
243    fn lines_starting_with_colon_are_ignored() {
244        let mut p = Parser::from(":ok");
245        assert!(p.next().is_none());
246    }
247
248    #[test]
249    fn test_memchr_order() {
250        let bs = &b"abcd\r\n"[..];
251        assert_eq!(memchr2(CR, LF, bs), Some(4));
252        assert_eq!(memchr2(LF, CR, bs), Some(4));
253    }
254
255    #[test]
256    fn colon_as_last_char_in_row() {
257        let mut p = Parser::from("data:\n\n");
258        let ev = p.next().expect("Expected an event").expect("Should parse");
259        assert_eq!(ev.event, "");
260        assert_eq!(ev.data, "");
261    }
262
263    #[test]
264    fn parse_example_2_events() {
265        // The following stream fires two events:
266
267        let mut p = Parser::from(
268            r#"
269data
270
271data
272data
273
274data:"#,
275        );
276
277        // The first block fires events with the data set to the empty string,
278        // as would the last block if it was followed by a blank line.
279        //
280        // The middle block fires an event with the data set to a single newline character.
281        //
282        // The last block is discarded because it is not followed by a blank line.
283
284        let ev = p.next().expect("Event").expect("Parsed");
285        assert_eq!(ev.event, "");
286        assert_eq!(ev.data, "");
287
288        let ev = p.next().expect("Event").expect("Parsed");
289        assert_eq!(ev.event, "");
290        assert_eq!(ev.data, "\n");
291
292        assert!(p.next().is_none());
293    }
294
295    #[test]
296    fn parse_two_identical_events() {
297        // The following stream fires two identical events:
298        // This is because the space after the colon is ignored if present.
299        let mut p = Parser::from(
300            r#"
301data:test
302
303data: test
304
305"#,
306        );
307
308        let ev = p
309            .next()
310            .expect("Expected first event")
311            .expect("Should parse");
312
313        assert_eq!(ev.event, "");
314        assert_eq!(ev.data, "test");
315
316        let ev = p
317            .next()
318            .expect("Expected first event")
319            .expect("Should parse");
320
321        assert_eq!(ev.event, "");
322        assert_eq!(ev.data, "test");
323    }
324
325    #[test]
326    fn parse_biggest_example_from_spec_page() {
327        // The following stream contains four blocks.
328        // The first block has just a comment, and will fire nothing.
329        //
330        // The second block has two fields with names "data" and "id" respectively;
331        // an event will be fired for this block,
332        // with the data "first event",
333        // and will then set the last event ID to "1"
334        // so that if the connection died between this block and the next,
335        // the server would be sent a `Last-Event-ID` header with the value "1".
336        //
337        // The third block fires an event with data "second event", and also has an "id" field,
338        // this time with no value, which resets the last event ID to the
339        // empty string (meaning no `Last-Event-ID` header will now be sent in
340        // the event of a reconnection being attempted).
341        //
342        // Finally, the last block just fires an event with the data " third event"
343        // (with a single leading space character).
344        // Note that the last still has to end with a blank line,
345        // the end of the stream is not enough to trigger the dispatch of the last event.
346
347        let mut p = Parser::from(
348            r#"
349: test stream
350
351data: first event
352id: 1
353
354data:second event
355id
356
357data:  third event
358
359"#,
360        );
361
362        let ev = p.next().expect("Event").expect("Parses");
363        assert_eq!(ev.data, "first event");
364        assert_eq!(ev.last_event_id.as_deref(), Some("1"));
365
366        let ev = p.next().expect("Event").expect("Parses");
367        assert_eq!(ev.data, "second event");
368        assert_eq!(ev.last_event_id.as_deref(), Some(""));
369
370        let ev = p.next().expect("Event").expect("Parses");
371        assert_eq!(ev.data, " third event");
372        assert_eq!(ev.last_event_id, None);
373    }
374
375    #[test]
376    fn buf_fiddle() {
377        let mut buf = BytesMut::from("1234");
378
379        let left = buf.split_to(1);
380        assert_eq!(left, &b"1"[..]);
381        assert_eq!(buf, &b"234"[..]);
382    }
383}