1use std::{collections::VecDeque, convert::TryFrom, str::from_utf8};
2
3use hyper::body::Bytes;
4use log::{debug, log_enabled, trace};
5use pin_project::pin_project;
6
7use crate::response::Response;
8
9use super::error::{Error, Result};
10
11#[derive(Default, PartialEq)]
12struct EventData {
13 pub event_type: String,
14 pub data: String,
15 pub id: Option<String>,
16 pub retry: Option<u64>,
17}
18
19impl EventData {
20 fn new() -> Self {
21 Self::default()
22 }
23
24 pub fn append_data(&mut self, value: &str) {
25 self.data.push_str(value);
26 self.data.push('\n');
27 }
28
29 pub fn with_id(mut self, value: Option<String>) -> Self {
30 self.id = value;
31 self
32 }
33}
34
35#[derive(Debug, Eq, PartialEq)]
36pub enum SSE {
37 Connected(ConnectionDetails),
38 Event(Event),
39 Comment(String),
40}
41
42impl TryFrom<EventData> for Option<SSE> {
43 type Error = Error;
44
45 fn try_from(event_data: EventData) -> std::result::Result<Self, Self::Error> {
46 if event_data == EventData::default() {
47 return Err(Error::InvalidEvent);
48 }
49
50 if event_data.data.is_empty() {
51 return Ok(None);
52 }
53
54 let event_type = if event_data.event_type.is_empty() {
55 String::from("message")
56 } else {
57 event_data.event_type
58 };
59
60 let mut data = event_data.data.clone();
61 data.truncate(data.len() - 1);
62
63 let id = event_data.id.clone();
64
65 let retry = event_data.retry;
66
67 Ok(Some(SSE::Event(Event {
68 event_type,
69 data,
70 id,
71 retry,
72 })))
73 }
74}
75
76#[derive(Clone, Debug, Eq, PartialEq)]
77pub struct ConnectionDetails {
78 response: Response,
79}
80
81impl ConnectionDetails {
82 pub(crate) fn new(response: Response) -> Self {
83 Self { response }
84 }
85
86 pub fn response(&self) -> &Response {
88 &self.response
89 }
90}
91
92#[derive(Clone, Debug, Eq, PartialEq)]
93pub struct Event {
94 pub event_type: String,
95 pub data: String,
96 pub id: Option<String>,
97 pub retry: Option<u64>,
98}
99
100const LOGIFY_MAX_CHARS: usize = 100;
101fn logify(bytes: &[u8]) -> String {
102 let stringified = from_utf8(bytes).unwrap_or("<bad utf8>");
103 stringified.chars().take(LOGIFY_MAX_CHARS).collect()
104}
105
106fn parse_field(line: &[u8]) -> Result<Option<(&str, &str)>> {
107 if line.is_empty() {
108 return Err(Error::InvalidLine(
109 "should never try to parse an empty line (probably a bug)".into(),
110 ));
111 }
112
113 match line.iter().position(|&b| b':' == b) {
114 Some(0) => {
115 let value = &line[1..];
116 debug!("comment: {}", logify(value));
117 Ok(Some(("comment", parse_value(value)?)))
118 }
119 Some(colon_pos) => {
120 let key = &line[0..colon_pos];
121 let key = parse_key(key)?;
122
123 let mut value = &line[colon_pos + 1..];
124 if value.starts_with(b" ") {
126 value = &value[1..];
127 }
128
129 debug!("key: {}, value: {}", key, logify(value));
130
131 Ok(Some((key, parse_value(value)?)))
132 }
133 None => Ok(Some((parse_key(line)?, ""))),
134 }
135}
136
137fn parse_key(key: &[u8]) -> Result<&str> {
138 from_utf8(key).map_err(|e| Error::InvalidLine(format!("malformed key: {e:?}")))
139}
140
141fn parse_value(value: &[u8]) -> Result<&str> {
142 from_utf8(value).map_err(|e| Error::InvalidLine(format!("malformed value: {e:?}")))
143}
144
145#[derive(Debug)]
147enum BomHeaderState {
148 Parsing(Vec<u8>),
149 Consumed,
150}
151
152const BOM_HEADER: &[u8] = b"\xEF\xBB\xBF";
153
154fn try_consume_bom_header(buf: &[u8]) -> Option<&[u8]> {
158 if buf.len() < BOM_HEADER.len() {
159 if BOM_HEADER.starts_with(buf) {
160 None
161 } else {
162 Some(buf)
163 }
164 } else if buf.starts_with(BOM_HEADER) {
165 Some(&buf[BOM_HEADER.len()..])
166 } else {
167 Some(buf)
168 }
169}
170
171#[pin_project]
172#[must_use = "streams do nothing unless polled"]
173pub struct EventParser {
174 complete_lines: VecDeque<Vec<u8>>,
177 incomplete_line: Option<Vec<u8>>,
180 last_char_was_cr: bool,
183 event_data: Option<EventData>,
185 last_event_id: Option<String>,
187 sse: VecDeque<SSE>,
188 bom_header_state: BomHeaderState,
190}
191
192impl EventParser {
193 pub fn new() -> Self {
194 Self {
195 complete_lines: VecDeque::with_capacity(10),
196 incomplete_line: None,
197 last_char_was_cr: false,
198 event_data: None,
199 last_event_id: None,
200 sse: VecDeque::with_capacity(3),
201 bom_header_state: BomHeaderState::Parsing(Vec::new()),
202 }
203 }
204
205 pub fn was_processing(&self) -> bool {
206 if self.incomplete_line.is_some() || !self.complete_lines.is_empty() {
207 true
208 } else {
209 !self.sse.is_empty()
210 }
211 }
212
213 pub fn get_event(&mut self) -> Option<SSE> {
214 self.sse.pop_front()
215 }
216
217 pub fn process_bytes(&mut self, bytes: Bytes) -> Result<()> {
218 trace!("Parsing bytes {bytes:?}");
219
220 let bytes_to_process =
223 if let BomHeaderState::Parsing(header_buf) = &mut self.bom_header_state {
224 header_buf.extend_from_slice(&bytes);
225 if let Some(rest) = try_consume_bom_header(header_buf) {
226 let owned_rest = rest.to_vec();
227 self.bom_header_state = BomHeaderState::Consumed;
228 Bytes::from_owner(owned_rest)
230 } else {
231 return Ok(());
232 }
233 } else {
234 bytes
235 };
236
237 self.decode_and_buffer_lines(bytes_to_process);
247 self.parse_complete_lines_into_event()?;
248
249 Ok(())
250 }
251
252 fn parse_complete_lines_into_event(&mut self) -> Result<()> {
258 loop {
259 let mut seen_empty_line = false;
260
261 while let Some(line) = self.complete_lines.pop_front() {
262 if line.is_empty() && self.event_data.is_some() {
263 seen_empty_line = true;
264 break;
265 } else if line.is_empty() {
266 continue;
267 }
268
269 if let Some((key, value)) = parse_field(&line)? {
270 if key == "comment" {
271 self.sse.push_back(SSE::Comment(value.to_string()));
272 continue;
273 }
274
275 let id = &self.last_event_id;
276 let event_data = self
277 .event_data
278 .get_or_insert_with(|| EventData::new().with_id(id.clone()));
279
280 if key == "event" {
281 event_data.event_type = value.to_string()
282 } else if key == "data" {
283 event_data.append_data(value);
284 } else if key == "id" {
285 if value.chars().any(|c| c == '\0') {
288 debug!("Ignoring event ID containing null byte");
289 continue;
290 }
291
292 if value.is_empty() {
293 self.last_event_id = Some("".to_string());
294 } else {
295 self.last_event_id = Some(value.to_string());
296 }
297
298 event_data.id.clone_from(&self.last_event_id)
299 } else if key == "retry" {
300 match value.parse::<u64>() {
301 Ok(retry) => {
302 event_data.retry = Some(retry);
303 }
304 _ => debug!("Failed to parse {value:?} into retry value"),
305 };
306 }
307 }
308 }
309
310 if seen_empty_line {
311 let event_data = self.event_data.take();
312
313 trace!(
314 "seen empty line, event_data is {:?})",
315 event_data.as_ref().map(|event_data| &event_data.event_type)
316 );
317
318 if let Some(event_data) = event_data {
319 match Option::<SSE>::try_from(event_data) {
320 Err(e) => return Err(e),
321 Ok(None) => (),
322 Ok(Some(event)) => self.sse.push_back(event),
323 };
324 }
325
326 continue;
327 } else {
328 trace!("processed all complete lines but event_data not yet complete");
329 }
330
331 break;
332 }
333
334 Ok(())
335 }
336
337 fn decode_and_buffer_lines(&mut self, chunk: Bytes) {
340 let mut lines = chunk.split_inclusive(|&b| b == b'\n' || b == b'\r');
341 if let Some(incomplete_line) = self.incomplete_line.as_mut() {
348 if let Some(line) = lines.next() {
349 trace!(
350 "extending line from previous chunk: {:?}+{:?}",
351 logify(incomplete_line),
352 logify(line)
353 );
354
355 self.last_char_was_cr = false;
356 if !line.is_empty() {
357 match line.last().unwrap() {
360 b'\r' => {
361 incomplete_line.extend_from_slice(&line[..line.len() - 1]);
362 let il = self.incomplete_line.take();
363 self.complete_lines.push_back(il.unwrap());
364 self.last_char_was_cr = true;
365 }
366 b'\n' => {
367 incomplete_line.extend_from_slice(&line[..line.len() - 1]);
368 let il = self.incomplete_line.take();
369 self.complete_lines.push_back(il.unwrap());
370 }
371 _ => incomplete_line.extend_from_slice(line),
372 };
373 }
374 }
375 }
376
377 let mut lines = lines.peekable();
378 while let Some(line) = lines.next() {
379 if let Some(actually_complete_line) = self.incomplete_line.take() {
380 trace!(
382 "previous line was complete: {:?}",
383 logify(&actually_complete_line)
384 );
385 self.complete_lines.push_back(actually_complete_line);
386 }
387
388 if self.last_char_was_cr && line == [b'\n'] {
389 self.last_char_was_cr = false;
392 continue;
393 }
394
395 self.last_char_was_cr = false;
396 if line.ends_with(b"\r") {
397 self.complete_lines
398 .push_back(line[..line.len() - 1].to_vec());
399 self.last_char_was_cr = true;
400 } else if line.ends_with(b"\n") {
401 self.complete_lines
403 .push_back(line[..line.len() - 1].to_vec());
404 } else if line.is_empty() {
405 trace!("chunk ended with a line terminator");
407 } else if lines.peek().is_some() {
408 self.complete_lines.push_back(line.to_vec());
411 } else {
412 trace!("buffering incomplete line: {:?}", logify(line));
414 self.incomplete_line = Some(line.to_vec());
415 }
416 }
417
418 if log_enabled!(log::Level::Trace) {
419 for line in &self.complete_lines {
420 trace!("complete line: {:?}", logify(line));
421 }
422 if let Some(line) = &self.incomplete_line {
423 trace!("incomplete line: {:?}", logify(line));
424 }
425 }
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use std::str::FromStr;
432
433 use super::{Error::*, *};
434 use proptest::proptest;
435 use test_case::test_case;
436
437 fn field<'a>(key: &'a str, value: &'a str) -> Result<Option<(&'a str, &'a str)>> {
438 Ok(Some((key, value)))
439 }
440
441 fn require_pop_event<F>(parser: &mut EventParser, f: F)
444 where
445 F: FnOnce(Event),
446 {
447 if let Some(SSE::Event(event)) = parser.get_event() {
448 f(event)
449 } else {
450 panic!("Event should have been received")
451 }
452 }
453
454 #[test]
455 fn test_logify_handles_code_point_boundaries() {
456 let phase = String::from_str(
457 "这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。",
458 )
459 .expect("Invalid sample string");
460
461 let input: &[u8] = phase.as_bytes();
462 let result = logify(input);
463
464 assert!(result == "这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如");
465 }
466
467 #[test]
468 fn test_parse_field_invalid() {
469 assert!(parse_field(b"").is_err());
470
471 match parse_field(b"\x80: invalid UTF-8") {
472 Err(InvalidLine(msg)) => assert!(msg.contains("Utf8Error")),
473 res => panic!("expected InvalidLine error, got {res:?}"),
474 }
475 }
476
477 #[test]
478 fn test_event_id_error_if_invalid_utf8() {
479 let mut bytes = Vec::from("id: ");
480 let mut invalid = vec![b'\xf0', b'\x28', b'\x8c', b'\xbc'];
481 bytes.append(&mut invalid);
482 bytes.push(b'\n');
483 let mut parser = EventParser::new();
484 assert!(parser.process_bytes(Bytes::from(bytes)).is_err());
485 }
486
487 #[test]
488 fn test_parse_field_comments() {
489 assert_eq!(parse_field(b":"), field("comment", ""));
490 assert_eq!(
491 parse_field(b":hello \0 world"),
492 field("comment", "hello \0 world")
493 );
494 assert_eq!(parse_field(b":event: foo"), field("comment", "event: foo"));
495 }
496
497 #[test]
498 fn test_parse_field_valid() {
499 assert_eq!(parse_field(b"event:foo"), field("event", "foo"));
500 assert_eq!(parse_field(b"event: foo"), field("event", "foo"));
501 assert_eq!(parse_field(b"event: foo"), field("event", " foo"));
502 assert_eq!(parse_field(b"event:\tfoo"), field("event", "\tfoo"));
503 assert_eq!(parse_field(b"event: foo "), field("event", "foo "));
504
505 assert_eq!(parse_field(b"disconnect:"), field("disconnect", ""));
506 assert_eq!(parse_field(b"disconnect: "), field("disconnect", ""));
507 assert_eq!(parse_field(b"disconnect: "), field("disconnect", " "));
508 assert_eq!(parse_field(b"disconnect:\t"), field("disconnect", "\t"));
509
510 assert_eq!(parse_field(b"disconnect"), field("disconnect", ""));
511
512 assert_eq!(parse_field(b" : foo"), field(" ", "foo"));
513 assert_eq!(parse_field(b"\xe2\x98\x83: foo"), field("☃", "foo"));
514 }
515
516 fn event(typ: &str, data: &str) -> SSE {
517 SSE::Event(Event {
518 data: data.to_string(),
519 id: None,
520 event_type: typ.to_string(),
521 retry: None,
522 })
523 }
524
525 fn event_with_id(typ: &str, data: &str, id: &str) -> SSE {
526 SSE::Event(Event {
527 data: data.to_string(),
528 id: Some(id.to_string()),
529 event_type: typ.to_string(),
530 retry: None,
531 })
532 }
533
534 #[test]
535 fn test_event_without_data_yields_no_event() {
536 let mut parser = EventParser::new();
537 assert!(parser.process_bytes(Bytes::from("id: abc\n\n")).is_ok());
538 assert!(parser.get_event().is_none());
539 }
540
541 #[test]
542 fn test_ignore_id_containing_null() {
543 let mut parser = EventParser::new();
544 assert!(parser
545 .process_bytes(Bytes::from("id: a\x00bc\nevent: add\ndata: abc\n\n"))
546 .is_ok());
547
548 if let Some(SSE::Event(event)) = parser.get_event() {
549 assert!(event.id.is_none());
550 } else {
551 panic!("Event should have been received");
552 }
553 }
554
555 #[test_case("event: add\ndata: hello\n\n", "add".into())]
556 #[test_case("data: hello\n\n", "message".into())]
557 fn test_event_can_parse_type_correctly(chunk: &'static str, event_type: String) {
558 let mut parser = EventParser::new();
559
560 assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
561
562 require_pop_event(&mut parser, |e| assert_eq!(event_type, e.event_type));
563 }
564
565 #[test_case("data: hello\n\n", event("message", "hello"); "parses event body with LF")]
566 #[test_case("data: hello\n\r", event("message", "hello"); "parses event body with LF and trailing CR")]
567 #[test_case("data: hello\r\n\n", event("message", "hello"); "parses event body with CRLF")]
568 #[test_case("data: hello\r\n\r", event("message", "hello"); "parses event body with CRLF and trailing CR")]
569 #[test_case("data: hello\r\r", event("message", "hello"); "parses event body with CR")]
570 #[test_case("data: hello\r\r\n", event("message", "hello"); "parses event body with CR and trailing CRLF")]
571 #[test_case("id: 1\ndata: hello\n\n", event_with_id("message", "hello", "1"))]
572 #[test_case("id: 😀\ndata: hello\n\n", event_with_id("message", "hello", "😀"))]
573 fn test_decode_chunks_simple(chunk: &'static str, event: SSE) {
574 let mut parser = EventParser::new();
575 assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
576 assert_eq!(parser.get_event().unwrap(), event);
577 assert!(parser.get_event().is_none());
578 }
579
580 #[test_case("persistent-event-id.sse"; "persistent-event-id.sse")]
581 fn test_last_id_persists_if_not_overridden(file: &str) {
582 let contents = read_contents_from_file(file);
583 let mut parser = EventParser::new();
584 assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
585
586 require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("1".into())));
587 require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("1".into())));
588 require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("3".into())));
589 require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("3".into())));
590 }
591
592 #[test_case(b":hello\n"; "with LF")]
593 #[test_case(b":hello\r"; "with CR")]
594 #[test_case(b":hello\r\n"; "with CRLF")]
595 fn test_decode_chunks_comments_are_generated(chunk: &'static [u8]) {
596 let mut parser = EventParser::new();
597 assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
598 assert!(parser.get_event().is_some());
599 }
600
601 #[test]
602 fn test_comment_is_separate_from_event() {
603 let mut parser = EventParser::new();
604 let result = parser.process_bytes(Bytes::from(":comment\ndata:hello\n\n"));
605 assert!(result.is_ok());
606
607 let comment = parser.get_event();
608 assert!(matches!(comment, Some(SSE::Comment(_))));
609
610 let event = parser.get_event();
611 assert!(matches!(event, Some(SSE::Event(_))));
612
613 assert!(parser.get_event().is_none());
614 }
615
616 #[test]
617 fn test_comment_with_trailing_blank_line() {
618 let mut parser = EventParser::new();
619 let result = parser.process_bytes(Bytes::from(":comment\n\r\n\r"));
620 assert!(result.is_ok());
621
622 let comment = parser.get_event();
623 assert!(matches!(comment, Some(SSE::Comment(_))));
624
625 assert!(parser.get_event().is_none());
626 }
627
628 #[test_case(&["data:", "hello\n\n"], event("message", "hello"); "data split")]
629 #[test_case(&["data:hell", "o\n\n"], event("message", "hello"); "data truncated")]
630 fn test_decode_message_split_across_chunks(chunks: &[&'static str], event: SSE) {
631 let mut parser = EventParser::new();
632
633 if let Some((last, chunks)) = chunks.split_last() {
634 for chunk in chunks {
635 assert!(parser.process_bytes(Bytes::from(*chunk)).is_ok());
636 assert!(parser.get_event().is_none());
637 }
638
639 assert!(parser.process_bytes(Bytes::from(*last)).is_ok());
640 assert_eq!(parser.get_event(), Some(event));
641 assert!(parser.get_event().is_none());
642 } else {
643 panic!("Failed to split last");
644 }
645 }
646
647 #[test_case(&["data:hell", "o\n\ndata:", "world\n\n"], &[event("message", "hello"), event("message", "world")]; "with lf")]
648 #[test_case(&["data:hell", "o\r\rdata:", "world\r\r"], &[event("message", "hello"), event("message", "world")]; "with cr")]
649 #[test_case(&["data:hell", "o\r\n\ndata:", "world\r\n\n"], &[event("message", "hello"), event("message", "world")]; "with crlf")]
650 fn test_decode_multiple_messages_split_across_chunks(chunks: &[&'static str], events: &[SSE]) {
651 let mut parser = EventParser::new();
652
653 for chunk in chunks {
654 assert!(parser.process_bytes(Bytes::from(*chunk)).is_ok());
655 }
656
657 for event in events {
658 assert_eq!(parser.get_event().unwrap(), *event);
659 }
660
661 assert!(parser.get_event().is_none());
662 }
663
664 #[test]
665 fn test_decode_line_split_across_chunks() {
666 let mut parser = EventParser::new();
667 assert!(parser.process_bytes(Bytes::from("data:foo")).is_ok());
668 assert!(parser.process_bytes(Bytes::from("")).is_ok());
669 assert!(parser.process_bytes(Bytes::from("baz\n\n")).is_ok());
670 assert_eq!(parser.get_event(), Some(event("message", "foobaz")));
671 assert!(parser.get_event().is_none());
672
673 assert!(parser.process_bytes(Bytes::from("data:foo")).is_ok());
674 assert!(parser.process_bytes(Bytes::from("bar")).is_ok());
675 assert!(parser.process_bytes(Bytes::from("baz\n\n")).is_ok());
676 assert_eq!(parser.get_event(), Some(event("message", "foobarbaz")));
677 assert!(parser.get_event().is_none());
678 }
679
680 #[test]
681 fn test_decode_concatenates_multiple_values_for_same_field() {
682 let mut parser = EventParser::new();
683 assert!(parser.process_bytes(Bytes::from("data:hello\n")).is_ok());
684 assert!(parser.process_bytes(Bytes::from("data:world\n\n")).is_ok());
685 assert_eq!(parser.get_event(), Some(event("message", "hello\nworld")));
686 assert!(parser.get_event().is_none());
687 }
688
689 #[test_case("\n\n\n\n" ; "all LFs")]
690 #[test_case("\r\r\r\r" ; "all CRs")]
691 #[test_case("\r\n\r\n\r\n\r\n" ; "all CRLFs")]
692 fn test_decode_repeated_terminators(chunk: &'static str) {
693 let mut parser = EventParser::new();
694 assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
695
696 assert!(parser.get_event().is_none());
699 }
700
701 #[test]
702 fn test_decode_extra_terminators_between_events() {
703 let mut parser = EventParser::new();
704 assert!(parser
705 .process_bytes(Bytes::from("data: abc\n\n\ndata: def\n\n"))
706 .is_ok());
707
708 assert_eq!(parser.get_event(), Some(event("message", "abc")));
709 assert_eq!(parser.get_event(), Some(event("message", "def")));
710 assert!(parser.get_event().is_none());
711 }
712
713 #[test_case("one-event.sse"; "one-event.sse")]
714 #[test_case("one-event-crlf.sse"; "one-event-crlf.sse")]
715 fn test_decode_one_event(file: &str) {
716 let contents = read_contents_from_file(file);
717 let mut parser = EventParser::new();
718 assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
719
720 require_pop_event(&mut parser, |e| {
721 assert_eq!(e.event_type, "patch");
722 assert!(e
723 .data
724 .contains(r#"path":"/flags/goals.02.featureWithGoals"#));
725 });
726 }
727
728 #[test_case("two-events.sse"; "two-events.sse")]
729 #[test_case("two-events-crlf.sse"; "two-events-crlf.sse")]
730 fn test_decode_two_events(file: &str) {
731 let contents = read_contents_from_file(file);
732 let mut parser = EventParser::new();
733 assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
734
735 require_pop_event(&mut parser, |e| {
736 assert_eq!(e.event_type, "one");
737 assert_eq!(e.data, "One");
738 });
739
740 require_pop_event(&mut parser, |e| {
741 assert_eq!(e.event_type, "two");
742 assert_eq!(e.data, "Two");
743 });
744 }
745
746 #[test_case("big-event-followed-by-another.sse"; "big-event-followed-by-another.sse")]
747 #[test_case("big-event-followed-by-another-crlf.sse"; "big-event-followed-by-another-crlf.sse")]
748 fn test_decode_big_event_followed_by_another(file: &str) {
749 let contents = read_contents_from_file(file);
750 let mut parser = EventParser::new();
751 assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
752
753 require_pop_event(&mut parser, |e| {
754 assert_eq!(e.event_type, "patch");
755 assert!(e.data.len() > 10_000);
756 assert!(e.data.contains(r#"path":"/flags/big.00.bigFeatureKey"#));
757 });
758
759 require_pop_event(&mut parser, |e| {
760 assert_eq!(e.event_type, "patch");
761 assert!(e
762 .data
763 .contains(r#"path":"/flags/goals.02.featureWithGoals"#));
764 });
765 }
766
767 fn read_contents_from_file(name: &str) -> Vec<u8> {
768 std::fs::read(format!("test-data/{name}"))
769 .unwrap_or_else(|_| panic!("couldn't read {name}"))
770 }
771
772 #[test]
773 fn test_event_parser_with_bom_header_split_across_chunks() {
774 let mut parser = EventParser::new();
775 assert!(parser
777 .process_bytes(Bytes::from(b"\xEF\xBB".as_slice()))
778 .is_ok());
779 assert!(parser.get_event().is_none());
780 assert!(parser
782 .process_bytes(Bytes::from(b"\xBFdata: hello\n\n".as_slice()))
783 .is_ok());
784 assert_eq!(parser.get_event(), Some(event("message", "hello")));
785 assert!(parser.get_event().is_none());
786 }
787
788 #[test]
789 fn test_event_parser_second_bom_should_fail() {
790 let mut parser = EventParser::new();
791 assert!(parser
793 .process_bytes(Bytes::from(b"\xEF\xBB\xBFdata: first\n\n".as_slice()))
794 .is_ok());
795 assert_eq!(parser.get_event(), Some(event("message", "first")));
796
797 let result = parser.process_bytes(Bytes::from(b"\xEF\xBB\xBFdata: second\n\n".as_slice()));
799 assert!(result.is_err());
800 }
801
802 proptest! {
803 #[test]
804 fn test_decode_and_buffer_lines_does_not_crash(next in "(\r\n|\r|\n)*event: [^\n\r:]*(\r\n|\r|\n)", previous in "(\r\n|\r|\n)*event: [^\n\r:]*(\r\n|\r|\n)") {
805 let mut parser = EventParser::new();
806 parser.incomplete_line = Some(previous.as_bytes().to_vec());
807 parser.decode_and_buffer_lines(Bytes::from(next));
808 }
809 }
810}