1use futures_codec::{BytesMut, Decoder, Encoder, FramedRead, FramedWrite};
35use futures_io::{AsyncRead, AsyncWrite};
36use memchr::memchr2;
37use std::fmt::Write as _;
38use std::{fmt, str::FromStr};
39
40#[derive(Debug, Clone, PartialEq, Eq)]
43#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
44pub enum Event {
45 Message {
47 id: Option<String>,
51 event: String,
53 data: String,
55 },
56 Retry {
60 retry: u64,
62 },
63}
64
65impl Event {
66 pub fn message<'a>(event: &str, data: &str, id: impl Into<Option<&'a str>>) -> Self {
68 Event::Message {
69 id: id.into().map(String::from),
70 event: event.to_string(),
71 data: data.to_string(),
72 }
73 }
74
75 pub fn retry(time: u64) -> Self {
77 Event::Retry { retry: time }
78 }
79}
80
81#[derive(Debug)]
83pub enum Error {
84 IoError(std::io::Error),
86 Utf8Error(std::str::Utf8Error),
88 FmtError(std::fmt::Error),
90 IncompleteFrame,
92}
93
94impl fmt::Display for Error {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 match self {
97 Error::IoError(inner) => inner.fmt(f),
98 Error::Utf8Error(inner) => inner.fmt(f),
99 Error::FmtError(inner) => inner.fmt(f),
100 Error::IncompleteFrame => write!(f, "incomplete frame"),
101 }
102 }
103}
104
105impl std::error::Error for Error {}
106
107impl From<std::io::Error> for Error {
108 fn from(err: std::io::Error) -> Self {
109 Self::IoError(err)
110 }
111}
112
113impl From<std::fmt::Error> for Error {
114 fn from(err: std::fmt::Error) -> Self {
115 Self::FmtError(err)
116 }
117}
118
119impl From<std::str::Utf8Error> for Error {
120 fn from(err: std::str::Utf8Error) -> Self {
121 Self::Utf8Error(err)
122 }
123}
124
125fn strip_leading_space(input: &str) -> &str {
127 input.strip_prefix(' ').unwrap_or(input)
128}
129
130impl FromStr for Event {
131 type Err = Error;
132
133 fn from_str(s: &str) -> Result<Self, Self::Err> {
135 let mut codec = SSECodec::default();
136 for line in s.lines() {
137 if let Some(message @ Event::Message { .. }) = codec.parse_line(line) {
138 return Ok(message);
139 }
140 }
141 Err(Error::IncompleteFrame)
142 }
143}
144
145impl fmt::Display for Event {
146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147 match self {
148 Event::Message { id, event, data } => {
149 if let Some(id) = id {
150 if id.is_empty() {
151 writeln!(f, "id")?;
152 } else {
153 writeln!(f, "id: {}", &id)?;
154 }
155 }
156 if event != "message" {
157 writeln!(f, "event: {}", &event)?;
158 }
159
160 for line in data.lines() {
161 writeln!(f, "data: {}", line)?;
162 }
163 Ok(())
164 }
165 Event::Retry { retry } => writeln!(f, "retry: {}", retry),
166 }
167 }
168}
169
170#[derive(Debug, Default, Clone)]
172pub struct SSECodec {
173 processed_bom: bool,
175 last_was_cr: bool,
177 last_event_id: Option<String>,
179 event_type: Option<String>,
181 data: String,
183}
184
185impl SSECodec {
186 fn take_message(&mut self) -> Option<Event> {
187 fn default_event_name() -> String {
188 "message".to_string()
189 }
190
191 if self.data.is_empty() {
192 self.event_type.take();
194 None
195 } else {
196 if self.data.ends_with('\n') {
197 self.data.pop();
198 }
199 Some(Event::Message {
200 id: self.last_event_id.clone(),
202 event: self.event_type.take().unwrap_or_else(default_event_name),
203 data: std::mem::take(&mut self.data),
204 })
205 }
206 }
207
208 fn parse_line(&mut self, line: &str) -> Option<Event> {
209 let mut parts = line.splitn(2, ':');
210 match (parts.next(), parts.next()) {
211 (Some("retry"), Some(value)) if value.chars().all(|c| c.is_ascii_digit()) => {
213 if let Ok(time) = value.parse::<u64>() {
217 return Some(Event::Retry { retry: time });
218 }
219 }
220 (Some("event"), Some(value)) => {
222 self.event_type = Some(strip_leading_space(value).to_string());
224 }
225 (Some("data"), value) => {
227 if let Some(value) = value {
229 self.data += strip_leading_space(value);
230 }
231 self.data.push('\n');
233 }
234 (Some("id"), Some(id_str)) if !id_str.contains(char::from(0)) => {
236 self.last_event_id = Some(strip_leading_space(id_str).to_string());
239 }
240 (Some(""), Some(_)) => (),
242 (Some(""), None) => {
244 return self.take_message();
245 }
246 _ => (),
247 }
248 None
249 }
250}
251
252impl Decoder for SSECodec {
253 type Item = Event;
254 type Error = Error;
255
256 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
257 while let Some(pos) = memchr2(b'\r', b'\n', src) {
258 let line = src.split_to(pos + 1);
259
260 if pos == 0 && line == "\n" && self.last_was_cr {
262 self.last_was_cr = false;
263 continue;
264 }
265 self.last_was_cr = line.last() == Some(&b'\r');
266
267 let line = std::str::from_utf8(&line[..pos])?;
269 let line = if line.starts_with('\u{feff}') && !self.processed_bom {
271 self.processed_bom = true;
272 &line[3..]
273 } else {
274 line
275 };
276 if let Some(event) = self.parse_line(line) {
277 return Ok(Some(event));
278 }
279 }
280 Ok(None)
281 }
282
283 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
284 let _garbage = src.split_to(src.len());
285 Ok(None)
286 }
287}
288
289impl Encoder for SSECodec {
290 type Item = Event;
291 type Error = Error;
292
293 fn encode(&mut self, item: Self::Item, dest: &mut BytesMut) -> Result<(), Self::Error> {
294 writeln!(dest, "{}", item).map_err(Into::into)
295 }
296}
297
298pub type DecodeStream<R> = FramedRead<R, SSECodec>;
300
301pub type EncodeStream<W> = FramedWrite<W, SSECodec>;
303
304pub fn decode_stream<R: AsyncRead>(input: R) -> DecodeStream<R> {
306 FramedRead::new(input, SSECodec::default())
307}
308
309pub fn encode_stream<W: AsyncWrite>(output: W) -> EncodeStream<W> {
311 FramedWrite::new(output, SSECodec::default())
312}
313
314#[cfg(test)]
315mod encode_tests {
316 use super::*;
317 use futures::SinkExt;
318
319 #[async_std::test]
320 async fn simple_event() {
321 let mut output = vec![];
322 let mut stream = encode_stream(&mut output);
323 stream
324 .send(Event::Message {
325 id: None,
326 event: "add".to_string(),
327 data: "test\ntest2".to_string(),
328 })
329 .await
330 .unwrap();
331 assert_eq!(output, b"event: add\ndata: test\ndata: test2\n\n".to_vec());
332 }
333
334 #[async_std::test]
335 async fn with_id() {
336 let mut output = vec![];
337 let mut stream = encode_stream(&mut output);
338 stream
339 .send(Event::Message {
340 id: Some("whatever".to_string()),
341 event: "add".to_string(),
342 data: "test".to_string(),
343 })
344 .await
345 .unwrap();
346 assert_eq!(output, b"id: whatever\nevent: add\ndata: test\n\n".to_vec());
347 }
348
349 #[async_std::test]
350 async fn default_event() {
351 let mut output = vec![];
352 let mut stream = encode_stream(&mut output);
353 stream
354 .send(Event::Message {
355 id: None,
356 event: "message".to_string(),
357 data: "test".to_string(),
358 })
359 .await
360 .unwrap();
361 assert_eq!(output, b"data: test\n\n".to_vec());
362 }
363
364 #[async_std::test]
365 async fn multiple_events() {
366 let mut output = vec![];
367 let mut stream = encode_stream(&mut output);
368 stream
369 .send(Event::Message {
370 id: None,
371 event: "add".to_string(),
372 data: "test\ntest2".to_string(),
373 })
374 .await
375 .unwrap();
376 stream
377 .send(Event::Message {
378 id: Some("whatever".to_string()),
379 event: "add".to_string(),
380 data: "test".to_string(),
381 })
382 .await
383 .unwrap();
384 stream
385 .send(Event::Message {
386 id: None,
387 event: "message".to_string(),
388 data: "test".to_string(),
389 })
390 .await
391 .unwrap();
392 let mut expected = vec![];
393 expected.extend(b"event: add\ndata: test\ndata: test2\n\n".iter());
394 expected.extend(b"id: whatever\nevent: add\ndata: test\n\n".iter());
395 expected.extend(b"data: test\n\n".iter());
396 assert_eq!(output, expected);
397 }
398}
399
400#[cfg(test)]
401mod decode_tests {
402 use super::*;
403 use futures::stream::{self, StreamExt, TryStreamExt};
404
405 #[test]
406 fn simple_event() {
407 let mut codec = SSECodec::default();
408 let mut event = None;
409 let s = "event: add\ndata: test\ndata: test2\n\n";
410 for line in s.lines() {
411 if let Some(message @ Event::Message { .. }) = codec.parse_line(line) {
412 event = Some(message);
413 break;
414 }
415 }
416 assert_eq!(
417 event,
418 Some(Event::Message {
419 id: None,
420 event: "add".to_string(),
421 data: "test\ntest2".to_string(),
422 })
423 );
424 }
425
426 #[test]
427 fn decode_stream_when_fed_by_line() {
428 let input: Vec<&str> = vec![":ok", "", "event:message", "id:id1", "data:data1", ""];
429
430 let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
431
432 let messages = decode_stream(body_stream.into_async_read());
433
434 let mut result = None;
435 async_std::task::block_on(async {
436 result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
437 });
438
439 let results = result.unwrap();
440 assert_eq!(results.len(), 1);
441 assert_eq!(
442 results.get(0).unwrap(),
443 &Event::message("message", "data1", "id1")
444 );
445 }
446
447 #[test]
448 fn maintain_id_state() {
449 let input: Vec<&str> = vec!["id:1", "data:messageone", "", "data:messagetwo", ""];
450
451 let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
452
453 let messages = decode_stream(body_stream.into_async_read());
454
455 let mut result = None;
456 async_std::task::block_on(async {
457 result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
458 });
459
460 let mut results = result.unwrap();
461 assert_eq!(results.len(), 2);
462 assert_eq!(
463 results.remove(0),
464 Event::message("message", "messageone", "1")
465 );
466 assert_eq!(
467 results.remove(0),
468 Event::message("message", "messagetwo", "1")
469 );
470 }
471
472 #[test]
475 fn id_is_part_of_message() {
476 let input: Vec<&str> = vec![
477 "data:messageone",
478 "id:1",
479 "data:moremessageone",
480 "",
481 "data:messagetwo",
482 "",
483 ];
484
485 let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
486
487 let messages = decode_stream(body_stream.into_async_read());
488
489 let mut result = None;
490 async_std::task::block_on(async {
491 result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
492 });
493
494 let mut results = result.unwrap();
495 assert_eq!(results.len(), 2);
496 assert_eq!(
497 results.remove(0),
498 Event::message("message", "messageone\nmoremessageone", "1")
499 );
500 assert_eq!(
501 results.remove(0),
502 Event::message("message", "messagetwo", "1")
503 );
504 }
505}
506
507#[cfg(test)]
508mod wpt {
509 use super::*;
512 use futures::stream::StreamExt;
513
514 struct DecodeIter<'a> {
515 inner: FramedRead<&'a [u8], SSECodec>,
516 }
517 impl Iterator for DecodeIter<'_> {
518 type Item = Result<Event, Error>;
519 fn next(&mut self) -> Option<Self::Item> {
520 let mut result = None;
521 async_std::task::block_on(async {
522 result = self.inner.next().await;
523 });
524 result
525 }
526 }
527
528 fn decode(input: &[u8]) -> DecodeIter<'_> {
529 DecodeIter {
530 inner: decode_stream(input),
531 }
532 }
533
534 #[test]
536 fn data() {
537 let input = concat!(
538 "data:msg\n",
539 "data:msg\n",
540 "\n",
541 ":\n",
542 "falsefield:msg\n",
543 "\n",
544 "falsefield:msg\n",
545 "Data:data\n",
546 "\n",
547 "data\n",
548 "\n",
549 "data:end\n",
550 "\n",
551 );
552 let mut messages = decode(input.as_bytes());
553 assert_eq!(
554 messages.next().map(Result::unwrap),
555 Some(Event::Message {
556 id: None,
557 event: "message".into(),
558 data: "msg\nmsg".into()
559 })
560 );
561 assert_eq!(
562 messages.next().map(Result::unwrap),
563 Some(Event::Message {
564 id: None,
565 event: "message".into(),
566 data: "".into()
567 })
568 );
569 assert_eq!(
570 messages.next().map(Result::unwrap),
571 Some(Event::Message {
572 id: None,
573 event: "message".into(),
574 data: "end".into()
575 })
576 );
577 assert!(messages.next().is_none());
578 }
579
580 #[test]
583 fn bom() {
584 let mut input = vec![];
585 input.extend(b"\xEF\xBB\xBF");
586 input.extend(b"data:1\n");
587 input.extend(b"\n");
588 input.extend(b"\xEF\xBB\xBF");
589 input.extend(b"data:2\n");
590 input.extend(b"\n");
591 input.extend(b"data:3\n");
592 input.extend(b"\n");
593 let mut messages = decode(&input);
594 assert_eq!(
595 messages.next().map(Result::unwrap),
596 Some(Event::Message {
597 id: None,
598 event: "message".into(),
599 data: "1".into()
600 })
601 );
602 assert_eq!(
603 messages.next().map(Result::unwrap),
604 Some(Event::Message {
605 id: None,
606 event: "message".into(),
607 data: "3".into()
608 })
609 );
610 assert!(messages.next().is_none());
611 }
612
613 #[test]
617 fn bom2() {
618 let mut input = vec![];
619 input.extend(b"\xEF\xBB\xBF");
620 input.extend(b"\xEF\xBB\xBF");
621 input.extend(b"data:1\n");
622 input.extend(b"\n");
623 input.extend(b"data:2\n");
624 input.extend(b"\n");
625 input.extend(b"data:3\n");
626 input.extend(b"\n");
627 let mut messages = decode(&input);
628 assert_eq!(
629 messages.next().map(Result::unwrap),
630 Some(Event::Message {
631 id: None,
632 event: "message".into(),
633 data: "2".into()
634 })
635 );
636 assert_eq!(
637 messages.next().map(Result::unwrap),
638 Some(Event::Message {
639 id: None,
640 event: "message".into(),
641 data: "3".into()
642 })
643 );
644 assert!(messages.next().is_none());
645 }
646
647 #[test]
649 fn comments() {
650 let longstring = "x".repeat(2049);
651 let mut input = concat!("data:1\r", ":\0\n", ":\r\n", "data:2\n", ":").to_string();
652 input.push_str(&longstring);
653 input.push('\r');
654 input.push_str("data:3\n");
655 input.push_str(":data:fail\r");
656 input.push(':');
657 input.push_str(&longstring);
658 input.push('\n');
659 input.push_str("data:4\n\n");
660 let mut messages = decode(input.as_bytes());
661 assert_eq!(
662 messages.next().map(Result::unwrap),
663 Some(Event::Message {
664 id: None,
665 event: "message".into(),
666 data: "1\n2\n3\n4".into()
667 })
668 );
669 assert!(messages.next().is_none());
670 }
671
672 #[test]
674 fn data_before_final_empty_line() {
675 let input = "retry:1000\ndata:test1\n\nid:test\ndata:test2";
676 let mut messages = decode(input.as_bytes());
677 assert_eq!(
678 messages.next().map(Result::unwrap),
679 Some(Event::Retry { retry: 1000 })
680 );
681 assert_eq!(
682 messages.next().map(Result::unwrap),
683 Some(Event::Message {
684 id: None,
685 event: "message".into(),
686 data: "test1".into()
687 })
688 );
689 assert!(dbg!(messages.next()).is_none());
690 }
691
692 #[test]
694 fn field_data() {
695 let input = "data:\n\ndata\ndata\n\ndata:test\n\n";
696 let mut messages = decode(input.as_bytes());
697 assert_eq!(
698 messages.next().map(Result::unwrap),
699 Some(Event::Message {
700 id: None,
701 event: "message".into(),
702 data: "".into()
703 })
704 );
705 assert_eq!(
706 messages.next().map(Result::unwrap),
707 Some(Event::Message {
708 id: None,
709 event: "message".into(),
710 data: "\n".into()
711 })
712 );
713 assert_eq!(
714 messages.next().map(Result::unwrap),
715 Some(Event::Message {
716 id: None,
717 event: "message".into(),
718 data: "test".into()
719 })
720 );
721 assert!(messages.next().is_none());
722 }
723
724 #[test]
726 fn field_event_empty() {
727 let input = "event: \ndata:data\n\n";
728 let mut messages = decode(input.as_bytes());
729 assert_eq!(
730 messages.next().map(Result::unwrap),
731 Some(Event::Message {
732 id: None,
733 event: "".into(),
734 data: "data".into()
735 })
736 );
737 assert!(messages.next().is_none());
738 }
739
740 #[test]
742 fn field_event() {
743 let input = "event:test\ndata:x\n\ndata:x\n\n";
744 let mut messages = decode(input.as_bytes());
745 assert_eq!(
746 messages.next().map(Result::unwrap),
747 Some(Event::Message {
748 id: None,
749 event: "test".into(),
750 data: "x".into()
751 })
752 );
753 assert_eq!(
754 messages.next().map(Result::unwrap),
755 Some(Event::Message {
756 id: None,
757 event: "message".into(),
758 data: "x".into()
759 })
760 );
761 assert!(messages.next().is_none());
762 }
763
764 #[test]
766 #[ignore]
767 fn field_id() {
768 unimplemented!()
769 }
770
771 #[test]
773 #[ignore]
774 fn field_id_2() {
775 unimplemented!()
776 }
777
778 #[test]
780 fn field_parsing() {
781 let input = "data:\0\ndata: 2\rData:1\ndata\0:2\ndata:1\r\0data:4\nda-ta:3\rdata_5\ndata:3\rdata:\r\n data:32\ndata:4\n\n";
782 let mut messages = decode(input.as_bytes());
783 assert_eq!(
784 messages.next().map(Result::unwrap),
785 Some(Event::Message {
786 id: None,
787 event: "message".into(),
788 data: "\0\n 2\n1\n3\n\n4".into()
789 })
790 );
791 assert!(messages.next().is_none());
792 }
793
794 #[test]
796 fn field_retry_bogus() {
797 let input = "retry:3000\nretry:1000x\ndata:x\n\n";
798 let mut messages = decode(input.as_bytes());
799 assert_eq!(
800 messages.next().map(Result::unwrap),
801 Some(Event::Retry { retry: 3000 })
802 );
803 assert_eq!(
804 messages.next().map(Result::unwrap),
805 Some(Event::Message {
806 id: None,
807 event: "message".into(),
808 data: "x".into()
809 })
810 );
811 assert!(messages.next().is_none());
812 }
813
814 #[test]
816 fn field_retry_empty() {
817 let input = "retry\ndata:test\n\n";
818 let mut messages = decode(input.as_bytes());
819 assert_eq!(
820 messages.next().map(Result::unwrap),
821 Some(Event::Message {
822 id: None,
823 event: "message".into(),
824 data: "test".into()
825 })
826 );
827 assert!(messages.next().is_none());
828 }
829
830 #[test]
832 fn field_retry() {
833 let input = "retry:03000\ndata:x\n\n";
834 let mut messages = decode(input.as_bytes());
835 assert_eq!(
836 messages.next().map(Result::unwrap),
837 Some(Event::Retry { retry: 3000 })
838 );
839 assert_eq!(
840 messages.next().map(Result::unwrap),
841 Some(Event::Message {
842 id: None,
843 event: "message".into(),
844 data: "x".into()
845 })
846 );
847 assert!(messages.next().is_none());
848 }
849
850 #[test]
852 fn field_unknown() {
853 let input =
854 "data:test\n data\ndata\nfoobar:xxx\njustsometext\n:thisisacommentyay\ndata:test\n\n";
855 let mut messages = decode(input.as_bytes());
856 assert_eq!(
857 messages.next().map(Result::unwrap),
858 Some(Event::Message {
859 id: None,
860 event: "message".into(),
861 data: "test\n\ntest".into()
862 })
863 );
864 assert!(messages.next().is_none());
865 }
866
867 #[test]
869 fn leading_space() {
870 let input = "data:\ttest\rdata: \ndata:test\n\n";
871 let mut messages = decode(input.as_bytes());
872 assert_eq!(
873 messages.next().map(Result::unwrap),
874 Some(Event::Message {
875 id: None,
876 event: "message".into(),
877 data: "\ttest\n\ntest".into()
878 })
879 );
880 assert!(messages.next().is_none());
881 }
882
883 #[test]
885 fn newlines() {
886 let input = "data:test\r\ndata\ndata:test\r\n\r";
887 let mut messages = decode(input.as_bytes());
888 assert_eq!(
889 messages.next().map(Result::unwrap),
890 Some(Event::Message {
891 id: None,
892 event: "message".into(),
893 data: "test\n\ntest".into()
894 })
895 );
896 assert!(messages.next().is_none());
897 }
898
899 #[test]
901 fn null_character() {
902 let input = "data:\0\n\n\n\n";
903 let mut messages = decode(input.as_bytes());
904 assert_eq!(
905 messages.next().map(Result::unwrap),
906 Some(Event::Message {
907 id: None,
908 event: "message".into(),
909 data: "\0".into()
910 })
911 );
912 assert!(messages.next().is_none());
913 }
914
915 #[test]
917 fn utf_8() {
918 let input = b"data:ok\xE2\x80\xA6\n\n";
919 let mut messages = decode(input);
920 assert_eq!(
921 messages.next().map(Result::unwrap),
922 Some(Event::Message {
923 id: None,
924 event: "message".into(),
925 data: "ok…".into()
926 })
927 );
928 assert!(messages.next().is_none());
929 }
930}