1use bytes::BytesMut;
35use futures_codec::{Decoder, Encoder, FramedRead, FramedWrite};
36use futures_io::{AsyncRead, AsyncWrite};
37use memchr::memchr2;
38use std::fmt::Write as _;
39use std::{fmt, str::FromStr};
40
41#[derive(Debug, Clone, PartialEq, Eq)]
44#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
45pub enum Event {
46 Message {
48 id: Option<String>,
52 event: String,
54 data: String,
56 },
57 Retry {
61 retry: u64,
63 },
64}
65
66impl Event {
67 pub fn message<'a>(event: &str, data: &str, id: impl Into<Option<&'a str>>) -> Self {
69 Event::Message {
70 id: id.into().map(String::from),
71 event: event.to_string(),
72 data: data.to_string(),
73 }
74 }
75
76 pub fn retry(time: u64) -> Self {
78 Event::Retry { retry: time }
79 }
80}
81
82#[derive(Debug)]
84pub enum Error {
85 IoError(std::io::Error),
87 Utf8Error(std::str::Utf8Error),
89 FmtError(std::fmt::Error),
91 IncompleteFrame,
93}
94
95impl fmt::Display for Error {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 match self {
98 Error::IoError(inner) => inner.fmt(f),
99 Error::Utf8Error(inner) => inner.fmt(f),
100 Error::FmtError(inner) => inner.fmt(f),
101 Error::IncompleteFrame => write!(f, "incomplete frame"),
102 }
103 }
104}
105
106impl std::error::Error for Error {}
107
108impl From<std::io::Error> for Error {
109 fn from(err: std::io::Error) -> Self {
110 Self::IoError(err)
111 }
112}
113
114impl From<std::fmt::Error> for Error {
115 fn from(err: std::fmt::Error) -> Self {
116 Self::FmtError(err)
117 }
118}
119
120impl From<std::str::Utf8Error> for Error {
121 fn from(err: std::str::Utf8Error) -> Self {
122 Self::Utf8Error(err)
123 }
124}
125
126fn strip_leading_space(input: &str) -> &str {
128 if input.starts_with(' ') {
129 &input[1..]
130 } else {
131 input
132 }
133}
134
135impl FromStr for Event {
136 type Err = Error;
137
138 fn from_str(s: &str) -> Result<Self, Self::Err> {
140 let mut codec = SSECodec::default();
141 for line in s.lines() {
142 if let Some(message @ Event::Message { .. }) = codec.parse_line(line) {
143 return Ok(message);
144 }
145 }
146 Err(Error::IncompleteFrame)
147 }
148}
149
150impl fmt::Display for Event {
151 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152 match self {
153 Event::Message { id, event, data } => {
154 if let Some(id) = id {
155 if id.is_empty() {
156 writeln!(f, "id")?;
157 } else {
158 writeln!(f, "id: {}", &id)?;
159 }
160 }
161 if event != "message" {
162 writeln!(f, "event: {}", &event)?;
163 }
164
165 for line in data.lines() {
166 writeln!(f, "data: {}", line)?;
167 }
168 Ok(())
169 }
170 Event::Retry { retry } => writeln!(f, "retry: {}", retry),
171 }
172 }
173}
174
175#[derive(Debug, Default, Clone)]
177pub struct SSECodec {
178 processed_bom: bool,
180 last_was_cr: bool,
182 buffer: BytesMut,
184 last_event_id: Option<String>,
186 event_type: Option<String>,
188 data: String,
190}
191
192impl SSECodec {
193 fn take_message(&mut self) -> Option<Event> {
194 fn default_event_name() -> String {
195 "message".to_string()
196 }
197
198 if self.data.is_empty() {
199 self.event_type.take();
201 None
202 } else {
203 if self.data.ends_with('\n') {
204 self.data.pop();
205 }
206 Some(Event::Message {
207 id: self.last_event_id.clone(),
209 event: self.event_type.take().unwrap_or_else(default_event_name),
210 data: std::mem::replace(&mut self.data, String::new()),
211 })
212 }
213 }
214
215 fn parse_line(&mut self, line: &str) -> Option<Event> {
216 let mut parts = line.splitn(2, ':');
217 match (parts.next(), parts.next()) {
218 (Some("retry"), Some(value)) if value.chars().all(|c| c.is_ascii_digit()) => {
220 if let Ok(time) = value.parse::<u64>() {
224 return Some(Event::Retry { retry: time });
225 }
226 }
227 (Some("event"), Some(value)) => {
229 self.event_type = Some(strip_leading_space(value).to_string());
231 }
232 (Some("data"), value) => {
234 if let Some(value) = value {
236 self.data += strip_leading_space(value);
237 }
238 self.data.push('\n');
240 }
241 (Some("id"), Some(id_str)) if !id_str.contains(char::from(0)) => {
243 self.last_event_id = Some(strip_leading_space(id_str).to_string());
246 }
247 (Some(""), Some(_)) => (),
249 (Some(""), None) => {
251 return self.take_message();
252 }
253 _ => (),
254 }
255 None
256 }
257}
258
259impl Decoder for SSECodec {
260 type Item = Event;
261 type Error = Error;
262
263 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
264 self.buffer.unsplit(src.split_to(src.len()));
268 while let Some(pos) = memchr2(b'\r', b'\n', &self.buffer) {
269 let line = self.buffer.split_to(pos + 1);
270
271 if pos == 0 && line == "\n" && self.last_was_cr {
273 self.last_was_cr = false;
274 continue;
275 }
276 self.last_was_cr = line.last() == Some(&b'\r');
277
278 let line = std::str::from_utf8(&line[..pos])?;
280 let line = if line.starts_with("\u{feff}") && !self.processed_bom {
282 self.processed_bom = true;
283 &line[3..]
284 } else {
285 line
286 };
287 if let Some(event) = self.parse_line(line) {
288 return Ok(Some(event));
289 }
290 }
291 Ok(None)
292 }
293
294 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
295 unreachable!(
296 "SSECodec::decode() should have consumed all data, but {} bytes are in the buffer",
297 src.len()
298 )
299 }
300}
301
302impl Encoder for SSECodec {
303 type Item = Event;
304 type Error = Error;
305
306 fn encode(&mut self, item: Self::Item, dest: &mut BytesMut) -> Result<(), Self::Error> {
307 writeln!(dest, "{}", item).map_err(Into::into)
308 }
309}
310
311pub type DecodeStream<R> = FramedRead<R, SSECodec>;
313
314pub type EncodeStream<W> = FramedWrite<W, SSECodec>;
316
317pub fn decode_stream<R: AsyncRead>(input: R) -> DecodeStream<R> {
319 FramedRead::new(input, SSECodec::default())
320}
321
322pub fn encode_stream<W: AsyncWrite>(output: W) -> EncodeStream<W> {
324 FramedWrite::new(output, SSECodec::default())
325}
326
327#[cfg(test)]
328mod encode_tests {
329 use super::*;
330 use futures::SinkExt;
331
332 #[async_std::test]
333 async fn simple_event() {
334 let mut output = vec![];
335 let mut stream = encode_stream(&mut output);
336 stream
337 .send(Event::Message {
338 id: None,
339 event: "add".to_string(),
340 data: "test\ntest2".to_string(),
341 })
342 .await
343 .unwrap();
344 assert_eq!(output, b"event: add\ndata: test\ndata: test2\n\n".to_vec());
345 }
346
347 #[async_std::test]
348 async fn with_id() {
349 let mut output = vec![];
350 let mut stream = encode_stream(&mut output);
351 stream
352 .send(Event::Message {
353 id: Some("whatever".to_string()),
354 event: "add".to_string(),
355 data: "test".to_string(),
356 })
357 .await
358 .unwrap();
359 assert_eq!(output, b"id: whatever\nevent: add\ndata: test\n\n".to_vec());
360 }
361
362 #[async_std::test]
363 async fn default_event() {
364 let mut output = vec![];
365 let mut stream = encode_stream(&mut output);
366 stream
367 .send(Event::Message {
368 id: None,
369 event: "message".to_string(),
370 data: "test".to_string(),
371 })
372 .await
373 .unwrap();
374 assert_eq!(output, b"data: test\n\n".to_vec());
375 }
376
377 #[async_std::test]
378 async fn multiple_events() {
379 let mut output = vec![];
380 let mut stream = encode_stream(&mut output);
381 stream
382 .send(Event::Message {
383 id: None,
384 event: "add".to_string(),
385 data: "test\ntest2".to_string(),
386 })
387 .await
388 .unwrap();
389 stream
390 .send(Event::Message {
391 id: Some("whatever".to_string()),
392 event: "add".to_string(),
393 data: "test".to_string(),
394 })
395 .await
396 .unwrap();
397 stream
398 .send(Event::Message {
399 id: None,
400 event: "message".to_string(),
401 data: "test".to_string(),
402 })
403 .await
404 .unwrap();
405 let mut expected = vec![];
406 expected.extend(b"event: add\ndata: test\ndata: test2\n\n".iter());
407 expected.extend(b"id: whatever\nevent: add\ndata: test\n\n".iter());
408 expected.extend(b"data: test\n\n".iter());
409 assert_eq!(output, expected);
410 }
411}
412
413#[cfg(test)]
414mod decode_tests {
415 use super::*;
416 use futures::stream::{self, StreamExt, TryStreamExt};
417
418 #[test]
419 fn simple_event() {
420 let mut codec = SSECodec::default();
421 let mut event = None;
422 let s = "event: add\ndata: test\ndata: test2\n\n";
423 for line in s.lines() {
424 if let Some(message @ Event::Message { .. }) = codec.parse_line(line) {
425 event = Some(message);
426 break;
427 }
428 }
429 assert_eq!(
430 event,
431 Some(Event::Message {
432 id: None,
433 event: "add".to_string(),
434 data: "test\ntest2".to_string(),
435 })
436 );
437 }
438
439 #[test]
440 fn decode_stream_when_fed_by_line() {
441 let input: Vec<&str> = vec![":ok", "", "event:message", "id:id1", "data:data1", ""];
442
443 let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
444
445 let messages = decode_stream(body_stream.into_async_read());
446
447 let mut result = None;
448 async_std::task::block_on(async {
449 result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
450 });
451
452 let results = result.unwrap();
453 assert_eq!(results.len(), 1);
454 assert_eq!(
455 results.get(0).unwrap(),
456 &Event::message("message", "data1", "id1")
457 );
458 }
459
460 #[test]
461 fn maintain_id_state() {
462 let input: Vec<&str> = vec!["id:1", "data:messageone", "", "data:messagetwo", ""];
463
464 let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
465
466 let messages = decode_stream(body_stream.into_async_read());
467
468 let mut result = None;
469 async_std::task::block_on(async {
470 result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
471 });
472
473 let mut results = result.unwrap();
474 assert_eq!(results.len(), 2);
475 assert_eq!(
476 results.remove(0),
477 Event::message("message", "messageone", "1")
478 );
479 assert_eq!(
480 results.remove(0),
481 Event::message("message", "messagetwo", "1")
482 );
483 }
484
485 #[test]
488 fn id_is_part_of_message() {
489 let input: Vec<&str> = vec![
490 "data:messageone",
491 "id:1",
492 "data:moremessageone",
493 "",
494 "data:messagetwo",
495 "",
496 ];
497
498 let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
499
500 let messages = decode_stream(body_stream.into_async_read());
501
502 let mut result = None;
503 async_std::task::block_on(async {
504 result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
505 });
506
507 let mut results = result.unwrap();
508 assert_eq!(results.len(), 2);
509 assert_eq!(
510 results.remove(0),
511 Event::message("message", "messageone\nmoremessageone", "1")
512 );
513 assert_eq!(
514 results.remove(0),
515 Event::message("message", "messagetwo", "1")
516 );
517 }
518}
519
520#[cfg(test)]
521mod wpt {
522 use super::*;
525 use futures::stream::StreamExt;
526
527 struct DecodeIter<'a> {
528 inner: FramedRead<&'a [u8], SSECodec>,
529 }
530 impl Iterator for DecodeIter<'_> {
531 type Item = Result<Event, Error>;
532 fn next(&mut self) -> Option<Self::Item> {
533 let mut result = None;
534 async_std::task::block_on(async {
535 result = self.inner.next().await;
536 });
537 result
538 }
539 }
540
541 fn decode(input: &[u8]) -> DecodeIter<'_> {
542 DecodeIter {
543 inner: decode_stream(input),
544 }
545 }
546
547 #[test]
549 fn data() {
550 let input = concat!(
551 "data:msg\n",
552 "data:msg\n",
553 "\n",
554 ":\n",
555 "falsefield:msg\n",
556 "\n",
557 "falsefield:msg\n",
558 "Data:data\n",
559 "\n",
560 "data\n",
561 "\n",
562 "data:end\n",
563 "\n",
564 );
565 let mut messages = decode(input.as_bytes());
566 assert_eq!(
567 messages.next().map(Result::unwrap),
568 Some(Event::Message {
569 id: None,
570 event: "message".into(),
571 data: "msg\nmsg".into()
572 })
573 );
574 assert_eq!(
575 messages.next().map(Result::unwrap),
576 Some(Event::Message {
577 id: None,
578 event: "message".into(),
579 data: "".into()
580 })
581 );
582 assert_eq!(
583 messages.next().map(Result::unwrap),
584 Some(Event::Message {
585 id: None,
586 event: "message".into(),
587 data: "end".into()
588 })
589 );
590 assert!(messages.next().is_none());
591 }
592
593 #[test]
596 fn bom() {
597 let mut input = vec![];
598 input.extend(b"\xEF\xBB\xBF");
599 input.extend(b"data:1\n");
600 input.extend(b"\n");
601 input.extend(b"\xEF\xBB\xBF");
602 input.extend(b"data:2\n");
603 input.extend(b"\n");
604 input.extend(b"data:3\n");
605 input.extend(b"\n");
606 let mut messages = decode(&input);
607 assert_eq!(
608 messages.next().map(Result::unwrap),
609 Some(Event::Message {
610 id: None,
611 event: "message".into(),
612 data: "1".into()
613 })
614 );
615 assert_eq!(
616 messages.next().map(Result::unwrap),
617 Some(Event::Message {
618 id: None,
619 event: "message".into(),
620 data: "3".into()
621 })
622 );
623 assert!(messages.next().is_none());
624 }
625
626 #[test]
630 fn bom2() {
631 let mut input = vec![];
632 input.extend(b"\xEF\xBB\xBF");
633 input.extend(b"\xEF\xBB\xBF");
634 input.extend(b"data:1\n");
635 input.extend(b"\n");
636 input.extend(b"data:2\n");
637 input.extend(b"\n");
638 input.extend(b"data:3\n");
639 input.extend(b"\n");
640 let mut messages = decode(&input);
641 assert_eq!(
642 messages.next().map(Result::unwrap),
643 Some(Event::Message {
644 id: None,
645 event: "message".into(),
646 data: "2".into()
647 })
648 );
649 assert_eq!(
650 messages.next().map(Result::unwrap),
651 Some(Event::Message {
652 id: None,
653 event: "message".into(),
654 data: "3".into()
655 })
656 );
657 assert!(messages.next().is_none());
658 }
659
660 #[test]
662 fn comments() {
663 let longstring = "x".repeat(2049);
664 let mut input = concat!("data:1\r", ":\0\n", ":\r\n", "data:2\n", ":").to_string();
665 input.push_str(&longstring);
666 input.push_str("\r");
667 input.push_str("data:3\n");
668 input.push_str(":data:fail\r");
669 input.push_str(":");
670 input.push_str(&longstring);
671 input.push_str("\n");
672 input.push_str("data:4\n\n");
673 let mut messages = decode(input.as_bytes());
674 assert_eq!(
675 messages.next().map(Result::unwrap),
676 Some(Event::Message {
677 id: None,
678 event: "message".into(),
679 data: "1\n2\n3\n4".into()
680 })
681 );
682 assert!(messages.next().is_none());
683 }
684
685 #[test]
687 fn data_before_final_empty_line() {
688 let input = "retry:1000\ndata:test1\n\nid:test\ndata:test2";
689 let mut messages = decode(input.as_bytes());
690 assert_eq!(
691 messages.next().map(Result::unwrap),
692 Some(Event::Retry { retry: 1000 })
693 );
694 assert_eq!(
695 messages.next().map(Result::unwrap),
696 Some(Event::Message {
697 id: None,
698 event: "message".into(),
699 data: "test1".into()
700 })
701 );
702 assert!(dbg!(messages.next()).is_none());
703 }
704
705 #[test]
707 fn field_data() {
708 let input = "data:\n\ndata\ndata\n\ndata:test\n\n";
709 let mut messages = decode(input.as_bytes());
710 assert_eq!(
711 messages.next().map(Result::unwrap),
712 Some(Event::Message {
713 id: None,
714 event: "message".into(),
715 data: "".into()
716 })
717 );
718 assert_eq!(
719 messages.next().map(Result::unwrap),
720 Some(Event::Message {
721 id: None,
722 event: "message".into(),
723 data: "\n".into()
724 })
725 );
726 assert_eq!(
727 messages.next().map(Result::unwrap),
728 Some(Event::Message {
729 id: None,
730 event: "message".into(),
731 data: "test".into()
732 })
733 );
734 assert!(messages.next().is_none());
735 }
736
737 #[test]
739 fn field_event_empty() {
740 let input = "event: \ndata:data\n\n";
741 let mut messages = decode(input.as_bytes());
742 assert_eq!(
743 messages.next().map(Result::unwrap),
744 Some(Event::Message {
745 id: None,
746 event: "".into(),
747 data: "data".into()
748 })
749 );
750 assert!(messages.next().is_none());
751 }
752
753 #[test]
755 fn field_event() {
756 let input = "event:test\ndata:x\n\ndata:x\n\n";
757 let mut messages = decode(input.as_bytes());
758 assert_eq!(
759 messages.next().map(Result::unwrap),
760 Some(Event::Message {
761 id: None,
762 event: "test".into(),
763 data: "x".into()
764 })
765 );
766 assert_eq!(
767 messages.next().map(Result::unwrap),
768 Some(Event::Message {
769 id: None,
770 event: "message".into(),
771 data: "x".into()
772 })
773 );
774 assert!(messages.next().is_none());
775 }
776
777 #[test]
779 #[ignore]
780 fn field_id() {
781 unimplemented!()
782 }
783
784 #[test]
786 #[ignore]
787 fn field_id_2() {
788 unimplemented!()
789 }
790
791 #[test]
793 fn field_parsing() {
794 let input = "data:\0\ndata: 2\rData:1\ndata\0:2\ndata:1\r\0data:4\nda-ta:3\rdata_5\ndata:3\rdata:\r\n data:32\ndata:4\n\n";
795 let mut messages = decode(input.as_bytes());
796 assert_eq!(
797 messages.next().map(Result::unwrap),
798 Some(Event::Message {
799 id: None,
800 event: "message".into(),
801 data: "\0\n 2\n1\n3\n\n4".into()
802 })
803 );
804 assert!(messages.next().is_none());
805 }
806
807 #[test]
809 fn field_retry_bogus() {
810 let input = "retry:3000\nretry:1000x\ndata:x\n\n";
811 let mut messages = decode(input.as_bytes());
812 assert_eq!(
813 messages.next().map(Result::unwrap),
814 Some(Event::Retry { retry: 3000 })
815 );
816 assert_eq!(
817 messages.next().map(Result::unwrap),
818 Some(Event::Message {
819 id: None,
820 event: "message".into(),
821 data: "x".into()
822 })
823 );
824 assert!(messages.next().is_none());
825 }
826
827 #[test]
829 fn field_retry_empty() {
830 let input = "retry\ndata:test\n\n";
831 let mut messages = decode(input.as_bytes());
832 assert_eq!(
833 messages.next().map(Result::unwrap),
834 Some(Event::Message {
835 id: None,
836 event: "message".into(),
837 data: "test".into()
838 })
839 );
840 assert!(messages.next().is_none());
841 }
842
843 #[test]
845 fn field_retry() {
846 let input = "retry:03000\ndata:x\n\n";
847 let mut messages = decode(input.as_bytes());
848 assert_eq!(
849 messages.next().map(Result::unwrap),
850 Some(Event::Retry { retry: 3000 })
851 );
852 assert_eq!(
853 messages.next().map(Result::unwrap),
854 Some(Event::Message {
855 id: None,
856 event: "message".into(),
857 data: "x".into()
858 })
859 );
860 assert!(messages.next().is_none());
861 }
862
863 #[test]
865 fn field_unknown() {
866 let input =
867 "data:test\n data\ndata\nfoobar:xxx\njustsometext\n:thisisacommentyay\ndata:test\n\n";
868 let mut messages = decode(input.as_bytes());
869 assert_eq!(
870 messages.next().map(Result::unwrap),
871 Some(Event::Message {
872 id: None,
873 event: "message".into(),
874 data: "test\n\ntest".into()
875 })
876 );
877 assert!(messages.next().is_none());
878 }
879
880 #[test]
882 fn leading_space() {
883 let input = "data:\ttest\rdata: \ndata:test\n\n";
884 let mut messages = decode(input.as_bytes());
885 assert_eq!(
886 messages.next().map(Result::unwrap),
887 Some(Event::Message {
888 id: None,
889 event: "message".into(),
890 data: "\ttest\n\ntest".into()
891 })
892 );
893 assert!(messages.next().is_none());
894 }
895
896 #[test]
898 fn newlines() {
899 let input = "data:test\r\ndata\ndata:test\r\n\r";
900 let mut messages = decode(input.as_bytes());
901 assert_eq!(
902 messages.next().map(Result::unwrap),
903 Some(Event::Message {
904 id: None,
905 event: "message".into(),
906 data: "test\n\ntest".into()
907 })
908 );
909 assert!(messages.next().is_none());
910 }
911
912 #[test]
914 fn null_character() {
915 let input = "data:\0\n\n\n\n";
916 let mut messages = decode(input.as_bytes());
917 assert_eq!(
918 messages.next().map(Result::unwrap),
919 Some(Event::Message {
920 id: None,
921 event: "message".into(),
922 data: "\0".into()
923 })
924 );
925 assert!(messages.next().is_none());
926 }
927
928 #[test]
930 fn utf_8() {
931 let input = b"data:ok\xE2\x80\xA6\n\n";
932 let mut messages = decode(input);
933 assert_eq!(
934 messages.next().map(Result::unwrap),
935 Some(Event::Message {
936 id: None,
937 event: "message".into(),
938 data: "ok…".into()
939 })
940 );
941 assert!(messages.next().is_none());
942 }
943}