Skip to main content

eventsource_client/
event_parser.rs

1use std::{collections::VecDeque, convert::TryFrom, str::from_utf8};
2
3use hyper::body::Bytes;
4use log::{debug, log_enabled, trace};
5use pin_project::pin_project;
6
7use crate::response::Response;
8
9use super::error::{Error, Result};
10
11#[derive(Default, PartialEq)]
12struct EventData {
13    pub event_type: String,
14    pub data: String,
15    pub id: Option<String>,
16    pub retry: Option<u64>,
17}
18
19impl EventData {
20    fn new() -> Self {
21        Self::default()
22    }
23
24    pub fn append_data(&mut self, value: &str) {
25        self.data.push_str(value);
26        self.data.push('\n');
27    }
28
29    pub fn with_id(mut self, value: Option<String>) -> Self {
30        self.id = value;
31        self
32    }
33}
34
35#[derive(Debug, Eq, PartialEq)]
36pub enum SSE {
37    Connected(ConnectionDetails),
38    Event(Event),
39    Comment(String),
40}
41
42impl TryFrom<EventData> for Option<SSE> {
43    type Error = Error;
44
45    fn try_from(event_data: EventData) -> std::result::Result<Self, Self::Error> {
46        if event_data == EventData::default() {
47            return Err(Error::InvalidEvent);
48        }
49
50        if event_data.data.is_empty() {
51            return Ok(None);
52        }
53
54        let event_type = if event_data.event_type.is_empty() {
55            String::from("message")
56        } else {
57            event_data.event_type
58        };
59
60        let mut data = event_data.data.clone();
61        data.truncate(data.len() - 1);
62
63        let id = event_data.id.clone();
64
65        let retry = event_data.retry;
66
67        Ok(Some(SSE::Event(Event {
68            event_type,
69            data,
70            id,
71            retry,
72        })))
73    }
74}
75
76#[derive(Clone, Debug, Eq, PartialEq)]
77pub struct ConnectionDetails {
78    response: Response,
79}
80
81impl ConnectionDetails {
82    pub(crate) fn new(response: Response) -> Self {
83        Self { response }
84    }
85
86    /// Returns information describing the response at the time of connection.
87    pub fn response(&self) -> &Response {
88        &self.response
89    }
90}
91
92#[derive(Clone, Debug, Eq, PartialEq)]
93pub struct Event {
94    pub event_type: String,
95    pub data: String,
96    pub id: Option<String>,
97    pub retry: Option<u64>,
98}
99
100const LOGIFY_MAX_CHARS: usize = 100;
101fn logify(bytes: &[u8]) -> String {
102    let stringified = from_utf8(bytes).unwrap_or("<bad utf8>");
103    stringified.chars().take(LOGIFY_MAX_CHARS).collect()
104}
105
106fn parse_field(line: &[u8]) -> Result<Option<(&str, &str)>> {
107    if line.is_empty() {
108        return Err(Error::InvalidLine(
109            "should never try to parse an empty line (probably a bug)".into(),
110        ));
111    }
112
113    match line.iter().position(|&b| b':' == b) {
114        Some(0) => {
115            let value = &line[1..];
116            debug!("comment: {}", logify(value));
117            Ok(Some(("comment", parse_value(value)?)))
118        }
119        Some(colon_pos) => {
120            let key = &line[0..colon_pos];
121            let key = parse_key(key)?;
122
123            let mut value = &line[colon_pos + 1..];
124            // remove the first initial space character if any (but remove no other whitespace)
125            if value.starts_with(b" ") {
126                value = &value[1..];
127            }
128
129            debug!("key: {}, value: {}", key, logify(value));
130
131            Ok(Some((key, parse_value(value)?)))
132        }
133        None => Ok(Some((parse_key(line)?, ""))),
134    }
135}
136
137fn parse_key(key: &[u8]) -> Result<&str> {
138    from_utf8(key).map_err(|e| Error::InvalidLine(format!("malformed key: {e:?}")))
139}
140
141fn parse_value(value: &[u8]) -> Result<&str> {
142    from_utf8(value).map_err(|e| Error::InvalidLine(format!("malformed value: {e:?}")))
143}
144
145// A state machine for handling the BOM header.
146#[derive(Debug)]
147enum BomHeaderState {
148    Parsing(Vec<u8>),
149    Consumed,
150}
151
152const BOM_HEADER: &[u8] = b"\xEF\xBB\xBF";
153
154// Try to consume the BOM header from the given bytes.
155// If the BOM header is found, return the remaining bytes, otherwise return the origin buffer.
156// Return `None` if we cannot determine whether the BOM header is present.
157fn try_consume_bom_header(buf: &[u8]) -> Option<&[u8]> {
158    if buf.len() < BOM_HEADER.len() {
159        if BOM_HEADER.starts_with(buf) {
160            None
161        } else {
162            Some(buf)
163        }
164    } else if buf.starts_with(BOM_HEADER) {
165        Some(&buf[BOM_HEADER.len()..])
166    } else {
167        Some(buf)
168    }
169}
170
171#[pin_project]
172#[must_use = "streams do nothing unless polled"]
173pub struct EventParser {
174    /// buffer for lines we know are complete (terminated) but not yet parsed into event fields, in
175    /// the order received
176    complete_lines: VecDeque<Vec<u8>>,
177    /// buffer for the most-recently received line, pending completion (by a newline terminator) or
178    /// extension (by more non-newline bytes)
179    incomplete_line: Option<Vec<u8>>,
180    /// flagged if the last character processed as a carriage return; used to help process CRLF
181    /// pairs
182    last_char_was_cr: bool,
183    /// the event data currently being decoded
184    event_data: Option<EventData>,
185    /// the last-seen event ID; events without an ID will take on this value until it is updated.
186    last_event_id: Option<String>,
187    sse: VecDeque<SSE>,
188    /// state machine for handling the BOM header
189    bom_header_state: BomHeaderState,
190}
191
192impl EventParser {
193    pub fn new() -> Self {
194        Self {
195            complete_lines: VecDeque::with_capacity(10),
196            incomplete_line: None,
197            last_char_was_cr: false,
198            event_data: None,
199            last_event_id: None,
200            sse: VecDeque::with_capacity(3),
201            bom_header_state: BomHeaderState::Parsing(Vec::new()),
202        }
203    }
204
205    pub fn was_processing(&self) -> bool {
206        if self.incomplete_line.is_some() || !self.complete_lines.is_empty() {
207            true
208        } else {
209            !self.sse.is_empty()
210        }
211    }
212
213    pub fn get_event(&mut self) -> Option<SSE> {
214        self.sse.pop_front()
215    }
216
217    pub fn process_bytes(&mut self, bytes: Bytes) -> Result<()> {
218        trace!("Parsing bytes {bytes:?}");
219
220        // According to the SSE spec, a BOM header may be present at the beginning of the stream,
221        // which must be stripped before the message processing.
222        let bytes_to_process =
223            if let BomHeaderState::Parsing(header_buf) = &mut self.bom_header_state {
224                header_buf.extend_from_slice(&bytes);
225                if let Some(rest) = try_consume_bom_header(header_buf) {
226                    let owned_rest = rest.to_vec();
227                    self.bom_header_state = BomHeaderState::Consumed;
228                    // Once the BOM header is consumed, we can process the rest of the bytes.
229                    Bytes::from_owner(owned_rest)
230                } else {
231                    return Ok(());
232                }
233            } else {
234                bytes
235            };
236
237        // We get bytes from the underlying stream in chunks.  Decoding a chunk has two phases:
238        // decode the chunk into lines, and decode the lines into events.
239        //
240        // We counterintuitively do these two phases in reverse order. Because both lines and
241        // events may be split across chunks, we need to ensure we have a complete
242        // (newline-terminated) line before parsing it, and a complete event
243        // (empty-line-terminated) before returning it. So we buffer lines between poll()
244        // invocations, and begin by processing any incomplete events from previous invocations,
245        // before requesting new input from the underlying stream and processing that.
246        self.decode_and_buffer_lines(bytes_to_process);
247        self.parse_complete_lines_into_event()?;
248
249        Ok(())
250    }
251
252    // Populate the event fields from the complete lines already seen, until we either encounter an
253    // empty line - indicating we've decoded a complete event - or we run out of complete lines to
254    // process.
255    //
256    // Returns the event for dispatch if it is complete.
257    fn parse_complete_lines_into_event(&mut self) -> Result<()> {
258        loop {
259            let mut seen_empty_line = false;
260
261            while let Some(line) = self.complete_lines.pop_front() {
262                if line.is_empty() && self.event_data.is_some() {
263                    seen_empty_line = true;
264                    break;
265                } else if line.is_empty() {
266                    continue;
267                }
268
269                if let Some((key, value)) = parse_field(&line)? {
270                    if key == "comment" {
271                        self.sse.push_back(SSE::Comment(value.to_string()));
272                        continue;
273                    }
274
275                    let id = &self.last_event_id;
276                    let event_data = self
277                        .event_data
278                        .get_or_insert_with(|| EventData::new().with_id(id.clone()));
279
280                    if key == "event" {
281                        event_data.event_type = value.to_string()
282                    } else if key == "data" {
283                        event_data.append_data(value);
284                    } else if key == "id" {
285                        // If id contains a null byte, it is a non-fatal error and the rest of
286                        // the event should be parsed if possible.
287                        if value.chars().any(|c| c == '\0') {
288                            debug!("Ignoring event ID containing null byte");
289                            continue;
290                        }
291
292                        if value.is_empty() {
293                            self.last_event_id = Some("".to_string());
294                        } else {
295                            self.last_event_id = Some(value.to_string());
296                        }
297
298                        event_data.id.clone_from(&self.last_event_id)
299                    } else if key == "retry" {
300                        match value.parse::<u64>() {
301                            Ok(retry) => {
302                                event_data.retry = Some(retry);
303                            }
304                            _ => debug!("Failed to parse {value:?} into retry value"),
305                        };
306                    }
307                }
308            }
309
310            if seen_empty_line {
311                let event_data = self.event_data.take();
312
313                trace!(
314                    "seen empty line, event_data is {:?})",
315                    event_data.as_ref().map(|event_data| &event_data.event_type)
316                );
317
318                if let Some(event_data) = event_data {
319                    match Option::<SSE>::try_from(event_data) {
320                        Err(e) => return Err(e),
321                        Ok(None) => (),
322                        Ok(Some(event)) => self.sse.push_back(event),
323                    };
324                }
325
326                continue;
327            } else {
328                trace!("processed all complete lines but event_data not yet complete");
329            }
330
331            break;
332        }
333
334        Ok(())
335    }
336
337    // Decode a chunk into lines and buffer them for subsequent parsing, taking account of
338    // incomplete lines from previous chunks.
339    fn decode_and_buffer_lines(&mut self, chunk: Bytes) {
340        let mut lines = chunk.split_inclusive(|&b| b == b'\n' || b == b'\r');
341        // The first and last elements in this split are special. The spec requires lines to be
342        // terminated. But lines may span chunks, so:
343        //  * the last line, if non-empty (i.e. if chunk didn't end with a line terminator),
344        //    should be buffered as an incomplete line
345        //  * the first line should be appended to the incomplete line, if any
346
347        if let Some(incomplete_line) = self.incomplete_line.as_mut() {
348            if let Some(line) = lines.next() {
349                trace!(
350                    "extending line from previous chunk: {:?}+{:?}",
351                    logify(incomplete_line),
352                    logify(line)
353                );
354
355                self.last_char_was_cr = false;
356                if !line.is_empty() {
357                    // Checking the last character handles lines where the last character is a
358                    // terminator, but also where the entire line is a terminator.
359                    match line.last().unwrap() {
360                        b'\r' => {
361                            incomplete_line.extend_from_slice(&line[..line.len() - 1]);
362                            let il = self.incomplete_line.take();
363                            self.complete_lines.push_back(il.unwrap());
364                            self.last_char_was_cr = true;
365                        }
366                        b'\n' => {
367                            incomplete_line.extend_from_slice(&line[..line.len() - 1]);
368                            let il = self.incomplete_line.take();
369                            self.complete_lines.push_back(il.unwrap());
370                        }
371                        _ => incomplete_line.extend_from_slice(line),
372                    };
373                }
374            }
375        }
376
377        let mut lines = lines.peekable();
378        while let Some(line) = lines.next() {
379            if let Some(actually_complete_line) = self.incomplete_line.take() {
380                // we saw the next line, so the previous one must have been complete after all
381                trace!(
382                    "previous line was complete: {:?}",
383                    logify(&actually_complete_line)
384                );
385                self.complete_lines.push_back(actually_complete_line);
386            }
387
388            if self.last_char_was_cr && line == [b'\n'] {
389                // This is a continuation of a \r\n pair, so we can ignore this line. We do need to
390                // reset our flag though.
391                self.last_char_was_cr = false;
392                continue;
393            }
394
395            self.last_char_was_cr = false;
396            if line.ends_with(b"\r") {
397                self.complete_lines
398                    .push_back(line[..line.len() - 1].to_vec());
399                self.last_char_was_cr = true;
400            } else if line.ends_with(b"\n") {
401                // self isn't a continuation, but rather a line ending with a LF terminator.
402                self.complete_lines
403                    .push_back(line[..line.len() - 1].to_vec());
404            } else if line.is_empty() {
405                // this is the last line and it's empty, no need to buffer it
406                trace!("chunk ended with a line terminator");
407            } else if lines.peek().is_some() {
408                // this line isn't the last and we know from previous checks it doesn't end in a
409                // terminator, so we can consider it complete
410                self.complete_lines.push_back(line.to_vec());
411            } else {
412                // last line needs to be buffered as it may be incomplete
413                trace!("buffering incomplete line: {:?}", logify(line));
414                self.incomplete_line = Some(line.to_vec());
415            }
416        }
417
418        if log_enabled!(log::Level::Trace) {
419            for line in &self.complete_lines {
420                trace!("complete line: {:?}", logify(line));
421            }
422            if let Some(line) = &self.incomplete_line {
423                trace!("incomplete line: {:?}", logify(line));
424            }
425        }
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use std::str::FromStr;
432
433    use super::{Error::*, *};
434    use proptest::proptest;
435    use test_case::test_case;
436
437    fn field<'a>(key: &'a str, value: &'a str) -> Result<Option<(&'a str, &'a str)>> {
438        Ok(Some((key, value)))
439    }
440
441    /// Requires an event to be popped from the given parser.
442    /// Event properties can be asserted using a closure.
443    fn require_pop_event<F>(parser: &mut EventParser, f: F)
444    where
445        F: FnOnce(Event),
446    {
447        if let Some(SSE::Event(event)) = parser.get_event() {
448            f(event)
449        } else {
450            panic!("Event should have been received")
451        }
452    }
453
454    #[test]
455    fn test_logify_handles_code_point_boundaries() {
456        let phase = String::from_str(
457            "这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。",
458        )
459        .expect("Invalid sample string");
460
461        let input: &[u8] = phase.as_bytes();
462        let result = logify(input);
463
464        assert!(result == "这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如");
465    }
466
467    #[test]
468    fn test_parse_field_invalid() {
469        assert!(parse_field(b"").is_err());
470
471        match parse_field(b"\x80: invalid UTF-8") {
472            Err(InvalidLine(msg)) => assert!(msg.contains("Utf8Error")),
473            res => panic!("expected InvalidLine error, got {res:?}"),
474        }
475    }
476
477    #[test]
478    fn test_event_id_error_if_invalid_utf8() {
479        let mut bytes = Vec::from("id: ");
480        let mut invalid = vec![b'\xf0', b'\x28', b'\x8c', b'\xbc'];
481        bytes.append(&mut invalid);
482        bytes.push(b'\n');
483        let mut parser = EventParser::new();
484        assert!(parser.process_bytes(Bytes::from(bytes)).is_err());
485    }
486
487    #[test]
488    fn test_parse_field_comments() {
489        assert_eq!(parse_field(b":"), field("comment", ""));
490        assert_eq!(
491            parse_field(b":hello \0 world"),
492            field("comment", "hello \0 world")
493        );
494        assert_eq!(parse_field(b":event: foo"), field("comment", "event: foo"));
495    }
496
497    #[test]
498    fn test_parse_field_valid() {
499        assert_eq!(parse_field(b"event:foo"), field("event", "foo"));
500        assert_eq!(parse_field(b"event: foo"), field("event", "foo"));
501        assert_eq!(parse_field(b"event:  foo"), field("event", " foo"));
502        assert_eq!(parse_field(b"event:\tfoo"), field("event", "\tfoo"));
503        assert_eq!(parse_field(b"event: foo "), field("event", "foo "));
504
505        assert_eq!(parse_field(b"disconnect:"), field("disconnect", ""));
506        assert_eq!(parse_field(b"disconnect: "), field("disconnect", ""));
507        assert_eq!(parse_field(b"disconnect:  "), field("disconnect", " "));
508        assert_eq!(parse_field(b"disconnect:\t"), field("disconnect", "\t"));
509
510        assert_eq!(parse_field(b"disconnect"), field("disconnect", ""));
511
512        assert_eq!(parse_field(b" : foo"), field(" ", "foo"));
513        assert_eq!(parse_field(b"\xe2\x98\x83: foo"), field("☃", "foo"));
514    }
515
516    fn event(typ: &str, data: &str) -> SSE {
517        SSE::Event(Event {
518            data: data.to_string(),
519            id: None,
520            event_type: typ.to_string(),
521            retry: None,
522        })
523    }
524
525    fn event_with_id(typ: &str, data: &str, id: &str) -> SSE {
526        SSE::Event(Event {
527            data: data.to_string(),
528            id: Some(id.to_string()),
529            event_type: typ.to_string(),
530            retry: None,
531        })
532    }
533
534    #[test]
535    fn test_event_without_data_yields_no_event() {
536        let mut parser = EventParser::new();
537        assert!(parser.process_bytes(Bytes::from("id: abc\n\n")).is_ok());
538        assert!(parser.get_event().is_none());
539    }
540
541    #[test]
542    fn test_ignore_id_containing_null() {
543        let mut parser = EventParser::new();
544        assert!(parser
545            .process_bytes(Bytes::from("id: a\x00bc\nevent: add\ndata: abc\n\n"))
546            .is_ok());
547
548        if let Some(SSE::Event(event)) = parser.get_event() {
549            assert!(event.id.is_none());
550        } else {
551            panic!("Event should have been received");
552        }
553    }
554
555    #[test_case("event: add\ndata: hello\n\n", "add".into())]
556    #[test_case("data: hello\n\n", "message".into())]
557    fn test_event_can_parse_type_correctly(chunk: &'static str, event_type: String) {
558        let mut parser = EventParser::new();
559
560        assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
561
562        require_pop_event(&mut parser, |e| assert_eq!(event_type, e.event_type));
563    }
564
565    #[test_case("data: hello\n\n", event("message", "hello"); "parses event body with LF")]
566    #[test_case("data: hello\n\r", event("message", "hello"); "parses event body with LF and trailing CR")]
567    #[test_case("data: hello\r\n\n", event("message", "hello"); "parses event body with CRLF")]
568    #[test_case("data: hello\r\n\r", event("message", "hello"); "parses event body with CRLF and trailing CR")]
569    #[test_case("data: hello\r\r", event("message", "hello"); "parses event body with CR")]
570    #[test_case("data: hello\r\r\n", event("message", "hello"); "parses event body with CR and trailing CRLF")]
571    #[test_case("id: 1\ndata: hello\n\n", event_with_id("message", "hello", "1"))]
572    #[test_case("id: 😀\ndata: hello\n\n", event_with_id("message", "hello", "😀"))]
573    fn test_decode_chunks_simple(chunk: &'static str, event: SSE) {
574        let mut parser = EventParser::new();
575        assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
576        assert_eq!(parser.get_event().unwrap(), event);
577        assert!(parser.get_event().is_none());
578    }
579
580    #[test_case("persistent-event-id.sse"; "persistent-event-id.sse")]
581    fn test_last_id_persists_if_not_overridden(file: &str) {
582        let contents = read_contents_from_file(file);
583        let mut parser = EventParser::new();
584        assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
585
586        require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("1".into())));
587        require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("1".into())));
588        require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("3".into())));
589        require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("3".into())));
590    }
591
592    #[test_case(b":hello\n"; "with LF")]
593    #[test_case(b":hello\r"; "with CR")]
594    #[test_case(b":hello\r\n"; "with CRLF")]
595    fn test_decode_chunks_comments_are_generated(chunk: &'static [u8]) {
596        let mut parser = EventParser::new();
597        assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
598        assert!(parser.get_event().is_some());
599    }
600
601    #[test]
602    fn test_comment_is_separate_from_event() {
603        let mut parser = EventParser::new();
604        let result = parser.process_bytes(Bytes::from(":comment\ndata:hello\n\n"));
605        assert!(result.is_ok());
606
607        let comment = parser.get_event();
608        assert!(matches!(comment, Some(SSE::Comment(_))));
609
610        let event = parser.get_event();
611        assert!(matches!(event, Some(SSE::Event(_))));
612
613        assert!(parser.get_event().is_none());
614    }
615
616    #[test]
617    fn test_comment_with_trailing_blank_line() {
618        let mut parser = EventParser::new();
619        let result = parser.process_bytes(Bytes::from(":comment\n\r\n\r"));
620        assert!(result.is_ok());
621
622        let comment = parser.get_event();
623        assert!(matches!(comment, Some(SSE::Comment(_))));
624
625        assert!(parser.get_event().is_none());
626    }
627
628    #[test_case(&["data:", "hello\n\n"], event("message", "hello"); "data split")]
629    #[test_case(&["data:hell", "o\n\n"], event("message", "hello"); "data truncated")]
630    fn test_decode_message_split_across_chunks(chunks: &[&'static str], event: SSE) {
631        let mut parser = EventParser::new();
632
633        if let Some((last, chunks)) = chunks.split_last() {
634            for chunk in chunks {
635                assert!(parser.process_bytes(Bytes::from(*chunk)).is_ok());
636                assert!(parser.get_event().is_none());
637            }
638
639            assert!(parser.process_bytes(Bytes::from(*last)).is_ok());
640            assert_eq!(parser.get_event(), Some(event));
641            assert!(parser.get_event().is_none());
642        } else {
643            panic!("Failed to split last");
644        }
645    }
646
647    #[test_case(&["data:hell", "o\n\ndata:", "world\n\n"], &[event("message", "hello"), event("message", "world")]; "with lf")]
648    #[test_case(&["data:hell", "o\r\rdata:", "world\r\r"], &[event("message", "hello"), event("message", "world")]; "with cr")]
649    #[test_case(&["data:hell", "o\r\n\ndata:", "world\r\n\n"], &[event("message", "hello"), event("message", "world")]; "with crlf")]
650    fn test_decode_multiple_messages_split_across_chunks(chunks: &[&'static str], events: &[SSE]) {
651        let mut parser = EventParser::new();
652
653        for chunk in chunks {
654            assert!(parser.process_bytes(Bytes::from(*chunk)).is_ok());
655        }
656
657        for event in events {
658            assert_eq!(parser.get_event().unwrap(), *event);
659        }
660
661        assert!(parser.get_event().is_none());
662    }
663
664    #[test]
665    fn test_decode_line_split_across_chunks() {
666        let mut parser = EventParser::new();
667        assert!(parser.process_bytes(Bytes::from("data:foo")).is_ok());
668        assert!(parser.process_bytes(Bytes::from("")).is_ok());
669        assert!(parser.process_bytes(Bytes::from("baz\n\n")).is_ok());
670        assert_eq!(parser.get_event(), Some(event("message", "foobaz")));
671        assert!(parser.get_event().is_none());
672
673        assert!(parser.process_bytes(Bytes::from("data:foo")).is_ok());
674        assert!(parser.process_bytes(Bytes::from("bar")).is_ok());
675        assert!(parser.process_bytes(Bytes::from("baz\n\n")).is_ok());
676        assert_eq!(parser.get_event(), Some(event("message", "foobarbaz")));
677        assert!(parser.get_event().is_none());
678    }
679
680    #[test]
681    fn test_decode_concatenates_multiple_values_for_same_field() {
682        let mut parser = EventParser::new();
683        assert!(parser.process_bytes(Bytes::from("data:hello\n")).is_ok());
684        assert!(parser.process_bytes(Bytes::from("data:world\n\n")).is_ok());
685        assert_eq!(parser.get_event(), Some(event("message", "hello\nworld")));
686        assert!(parser.get_event().is_none());
687    }
688
689    #[test_case("\n\n\n\n" ; "all LFs")]
690    #[test_case("\r\r\r\r" ; "all CRs")]
691    #[test_case("\r\n\r\n\r\n\r\n" ; "all CRLFs")]
692    fn test_decode_repeated_terminators(chunk: &'static str) {
693        let mut parser = EventParser::new();
694        assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
695
696        // spec seems unclear on whether this should actually dispatch empty events, but that seems
697        // unhelpful for all practical purposes
698        assert!(parser.get_event().is_none());
699    }
700
701    #[test]
702    fn test_decode_extra_terminators_between_events() {
703        let mut parser = EventParser::new();
704        assert!(parser
705            .process_bytes(Bytes::from("data: abc\n\n\ndata: def\n\n"))
706            .is_ok());
707
708        assert_eq!(parser.get_event(), Some(event("message", "abc")));
709        assert_eq!(parser.get_event(), Some(event("message", "def")));
710        assert!(parser.get_event().is_none());
711    }
712
713    #[test_case("one-event.sse"; "one-event.sse")]
714    #[test_case("one-event-crlf.sse"; "one-event-crlf.sse")]
715    fn test_decode_one_event(file: &str) {
716        let contents = read_contents_from_file(file);
717        let mut parser = EventParser::new();
718        assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
719
720        require_pop_event(&mut parser, |e| {
721            assert_eq!(e.event_type, "patch");
722            assert!(e
723                .data
724                .contains(r#"path":"/flags/goals.02.featureWithGoals"#));
725        });
726    }
727
728    #[test_case("two-events.sse"; "two-events.sse")]
729    #[test_case("two-events-crlf.sse"; "two-events-crlf.sse")]
730    fn test_decode_two_events(file: &str) {
731        let contents = read_contents_from_file(file);
732        let mut parser = EventParser::new();
733        assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
734
735        require_pop_event(&mut parser, |e| {
736            assert_eq!(e.event_type, "one");
737            assert_eq!(e.data, "One");
738        });
739
740        require_pop_event(&mut parser, |e| {
741            assert_eq!(e.event_type, "two");
742            assert_eq!(e.data, "Two");
743        });
744    }
745
746    #[test_case("big-event-followed-by-another.sse"; "big-event-followed-by-another.sse")]
747    #[test_case("big-event-followed-by-another-crlf.sse"; "big-event-followed-by-another-crlf.sse")]
748    fn test_decode_big_event_followed_by_another(file: &str) {
749        let contents = read_contents_from_file(file);
750        let mut parser = EventParser::new();
751        assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
752
753        require_pop_event(&mut parser, |e| {
754            assert_eq!(e.event_type, "patch");
755            assert!(e.data.len() > 10_000);
756            assert!(e.data.contains(r#"path":"/flags/big.00.bigFeatureKey"#));
757        });
758
759        require_pop_event(&mut parser, |e| {
760            assert_eq!(e.event_type, "patch");
761            assert!(e
762                .data
763                .contains(r#"path":"/flags/goals.02.featureWithGoals"#));
764        });
765    }
766
767    fn read_contents_from_file(name: &str) -> Vec<u8> {
768        std::fs::read(format!("test-data/{name}"))
769            .unwrap_or_else(|_| panic!("couldn't read {name}"))
770    }
771
772    #[test]
773    fn test_event_parser_with_bom_header_split_across_chunks() {
774        let mut parser = EventParser::new();
775        // First chunk: partial BOM
776        assert!(parser
777            .process_bytes(Bytes::from(b"\xEF\xBB".as_slice()))
778            .is_ok());
779        assert!(parser.get_event().is_none());
780        // Second chunk: rest of BOM + data
781        assert!(parser
782            .process_bytes(Bytes::from(b"\xBFdata: hello\n\n".as_slice()))
783            .is_ok());
784        assert_eq!(parser.get_event(), Some(event("message", "hello")));
785        assert!(parser.get_event().is_none());
786    }
787
788    #[test]
789    fn test_event_parser_second_bom_should_fail() {
790        let mut parser = EventParser::new();
791        // First event with BOM - should succeed
792        assert!(parser
793            .process_bytes(Bytes::from(b"\xEF\xBB\xBFdata: first\n\n".as_slice()))
794            .is_ok());
795        assert_eq!(parser.get_event(), Some(event("message", "first")));
796
797        // Second event with BOM - should fail (only first message can have BOM)
798        let result = parser.process_bytes(Bytes::from(b"\xEF\xBB\xBFdata: second\n\n".as_slice()));
799        assert!(result.is_err());
800    }
801
802    proptest! {
803        #[test]
804        fn test_decode_and_buffer_lines_does_not_crash(next in "(\r\n|\r|\n)*event: [^\n\r:]*(\r\n|\r|\n)", previous in "(\r\n|\r|\n)*event: [^\n\r:]*(\r\n|\r|\n)") {
805            let mut parser = EventParser::new();
806            parser.incomplete_line = Some(previous.as_bytes().to_vec());
807            parser.decode_and_buffer_lines(Bytes::from(next));
808        }
809    }
810}