sse_codec/
lib.rs

1//! A [`futures_codec`](https://crates.io/crates/futures_codec) that encodes and decodes Server-Sent Event/Event Sourcing streams.
2//!
3//! It emits or serializes full messages, and the meta-message `retry:`.
4//!
5//! # Examples
6//! ```rust,no_run
7//! # async fn amain() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
8//! use sse_codec::{decode_stream, Event};
9//! use futures::stream::TryStreamExt; // for try_next()
10//!
11//! let response = surf::get("https://some-site.com/events").await?;
12//! let mut events = decode_stream(response);
13//!
14//! while let Some(event) = events.try_next().await? {
15//!     println!("incoming: {:?}", event);
16//!
17//!     match event {
18//!         Event::Retry { retry } => {
19//!             // change a retry timer value or something
20//!         }
21//!         Event::Message { event, .. } if event == "stop" => {
22//!             break;
23//!         }
24//!         Event::Message { id, event, data } => {
25//!             if let Some(id) = id {
26//!                 // change the last event ID
27//!             }
28//!             // handle event here
29//!         }
30//!     }
31//! }
32//! # Ok(()) }
33//! ```
34use bytes::BytesMut;
35use futures_codec::{Decoder, Encoder, FramedRead, FramedWrite};
36use futures_io::{AsyncRead, AsyncWrite};
37use memchr::memchr2;
38use std::fmt::Write as _;
39use std::{fmt, str::FromStr};
40
41/// An "event", either an incoming message or some meta-action that needs to be applied to the
42/// stream.
43#[derive(Debug, Clone, PartialEq, Eq)]
44#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
45pub enum Event {
46    /// An incoming message.
47    Message {
48        /// The ID of this event.
49        ///
50        /// See also the [Server-Sent Events spec](https://html.spec.whatwg.org/multipage/server-sent-events.html#concept-event-stream-last-event-id).
51        id: Option<String>,
52        /// The event type. Defaults to "message" if no event name is provided.
53        event: String,
54        /// The data for this event.
55        data: String,
56    },
57    /// Set the _reconnection time_.
58    ///
59    /// See also the [Server-Sent Events spec](https://html.spec.whatwg.org/multipage/server-sent-events.html#concept-event-stream-reconnection-time).
60    Retry {
61        /// The new reconnection time in milliseconds.
62        retry: u64,
63    },
64}
65
66impl Event {
67    /// Create a server-sent event message.
68    pub fn message<'a>(event: &str, data: &str, id: impl Into<Option<&'a str>>) -> Self {
69        Event::Message {
70            id: id.into().map(String::from),
71            event: event.to_string(),
72            data: data.to_string(),
73        }
74    }
75
76    /// Create a message that configures the retry timeout.
77    pub fn retry(time: u64) -> Self {
78        Event::Retry { retry: time }
79    }
80}
81
82/// Errors that may occur while encoding or decoding server-sent event messages.
83#[derive(Debug)]
84pub enum Error {
85    /// An I/O error occurred while reading or writing a stream.
86    IoError(std::io::Error),
87    /// Incoming data is not valid utf-8.
88    Utf8Error(std::str::Utf8Error),
89    /// An error occurred while writing an event message.
90    FmtError(std::fmt::Error),
91    /// Tried to read an incomplete frame.
92    IncompleteFrame,
93}
94
95impl fmt::Display for Error {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        match self {
98            Error::IoError(inner) => inner.fmt(f),
99            Error::Utf8Error(inner) => inner.fmt(f),
100            Error::FmtError(inner) => inner.fmt(f),
101            Error::IncompleteFrame => write!(f, "incomplete frame"),
102        }
103    }
104}
105
106impl std::error::Error for Error {}
107
108impl From<std::io::Error> for Error {
109    fn from(err: std::io::Error) -> Self {
110        Self::IoError(err)
111    }
112}
113
114impl From<std::fmt::Error> for Error {
115    fn from(err: std::fmt::Error) -> Self {
116        Self::FmtError(err)
117    }
118}
119
120impl From<std::str::Utf8Error> for Error {
121    fn from(err: std::str::Utf8Error) -> Self {
122        Self::Utf8Error(err)
123    }
124}
125
126/// Chop off a leading space (code point 0x20) from a string slice.
127fn strip_leading_space(input: &str) -> &str {
128    if input.starts_with(' ') {
129        &input[1..]
130    } else {
131        input
132    }
133}
134
135impl FromStr for Event {
136    type Err = Error;
137
138    /// Parse an event message from a string.
139    fn from_str(s: &str) -> Result<Self, Self::Err> {
140        let mut codec = SSECodec::default();
141        for line in s.lines() {
142            if let Some(message @ Event::Message { .. }) = codec.parse_line(line) {
143                return Ok(message);
144            }
145        }
146        Err(Error::IncompleteFrame)
147    }
148}
149
150impl fmt::Display for Event {
151    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152        match self {
153            Event::Message { id, event, data } => {
154                if let Some(id) = id {
155                    if id.is_empty() {
156                        writeln!(f, "id")?;
157                    } else {
158                        writeln!(f, "id: {}", &id)?;
159                    }
160                }
161                if event != "message" {
162                    writeln!(f, "event: {}", &event)?;
163                }
164
165                for line in data.lines() {
166                    writeln!(f, "data: {}", line)?;
167                }
168                Ok(())
169            }
170            Event::Retry { retry } => writeln!(f, "retry: {}", retry),
171        }
172    }
173}
174
175/// Encoder/decoder for server-sent event streams.
176#[derive(Debug, Default, Clone)]
177pub struct SSECodec {
178    /// Have we processed the optional Byte Order Marker on the first line?
179    processed_bom: bool,
180    /// Was the last character of the previous line a \r?
181    last_was_cr: bool,
182    /// Bytes that were fed to the decoder but do not yet form a message.
183    buffer: BytesMut,
184    /// The _last event ID_ buffer.
185    last_event_id: Option<String>,
186    /// The _event type_ buffer.
187    event_type: Option<String>,
188    /// The _data_ buffer.
189    data: String,
190}
191
192impl SSECodec {
193    fn take_message(&mut self) -> Option<Event> {
194        fn default_event_name() -> String {
195            "message".to_string()
196        }
197
198        if self.data.is_empty() {
199            // If the data buffer is an empty string, set the data buffer and the event type buffer to the empty string [and return.]
200            self.event_type.take();
201            None
202        } else {
203            if self.data.ends_with('\n') {
204                self.data.pop();
205            }
206            Some(Event::Message {
207                // The _last event ID_ buffer persists between messages.
208                id: self.last_event_id.clone(),
209                event: self.event_type.take().unwrap_or_else(default_event_name),
210                data: std::mem::replace(&mut self.data, String::new()),
211            })
212        }
213    }
214
215    fn parse_line(&mut self, line: &str) -> Option<Event> {
216        let mut parts = line.splitn(2, ':');
217        match (parts.next(), parts.next()) {
218            // If the field name is "retry":
219            (Some("retry"), Some(value)) if value.chars().all(|c| c.is_ascii_digit()) => {
220                // If the field value consists of only ASCII digits, then interpret the field value
221                // as an integer in base ten, and set the event stream's reconnection time to that
222                // integer. Otherwise, ignore the field.
223                if let Ok(time) = value.parse::<u64>() {
224                    return Some(Event::Retry { retry: time });
225                }
226            }
227            // If the field name is "event":
228            (Some("event"), Some(value)) => {
229                // Set the event type buffer to field value.
230                self.event_type = Some(strip_leading_space(value).to_string());
231            }
232            // If the field name is "data":
233            (Some("data"), value) => {
234                // Append the field value to the data buffer,
235                if let Some(value) = value {
236                    self.data += strip_leading_space(value);
237                }
238                // then append a single U+000A LINE FEED (LF) character to the data buffer.
239                self.data.push('\n');
240            }
241            // If the field name is "id":
242            (Some("id"), Some(id_str)) if !id_str.contains(char::from(0)) => {
243                // If the field value does not contain U+0000 NULL, then set the last event ID buffer to the field value.
244                // Otherwise, ignore the field.
245                self.last_event_id = Some(strip_leading_space(id_str).to_string());
246            }
247            // Comment
248            (Some(""), Some(_)) => (),
249            // End of frame
250            (Some(""), None) => {
251                return self.take_message();
252            }
253            _ => (),
254        }
255        None
256    }
257}
258
259impl Decoder for SSECodec {
260    type Item = Event;
261    type Error = Error;
262
263    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
264        // it's unfortunate to have to copy this stuff in, but I think the only way to discard
265        // trailing data with futures_codec is by emptying the `src` buffer on every `decode()`
266        // call.
267        self.buffer.unsplit(src.split_to(src.len()));
268        while let Some(pos) = memchr2(b'\r', b'\n', &self.buffer) {
269            let line = self.buffer.split_to(pos + 1);
270
271            // treat \r\n as one newline
272            if pos == 0 && line == "\n" && self.last_was_cr {
273                self.last_was_cr = false;
274                continue;
275            }
276            self.last_was_cr = line.last() == Some(&b'\r');
277
278            // get rid of the '\n' at the end
279            let line = std::str::from_utf8(&line[..pos])?;
280            // get rid of the BOM at the start
281            let line = if line.starts_with("\u{feff}") && !self.processed_bom {
282                self.processed_bom = true;
283                &line[3..]
284            } else {
285                line
286            };
287            if let Some(event) = self.parse_line(line) {
288                return Ok(Some(event));
289            }
290        }
291        Ok(None)
292    }
293
294    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
295        unreachable!(
296            "SSECodec::decode() should have consumed all data, but {} bytes are in the buffer",
297            src.len()
298        )
299    }
300}
301
302impl Encoder for SSECodec {
303    type Item = Event;
304    type Error = Error;
305
306    fn encode(&mut self, item: Self::Item, dest: &mut BytesMut) -> Result<(), Self::Error> {
307        writeln!(dest, "{}", item).map_err(Into::into)
308    }
309}
310
311/// Type of a decoding stream, returned from `decode_stream()`.
312pub type DecodeStream<R> = FramedRead<R, SSECodec>;
313
314/// Type of an encoding stream, returned from `encode_stream()`.
315pub type EncodeStream<W> = FramedWrite<W, SSECodec>;
316
317/// Parse messages from an `AsyncRead`, returning a stream of `Event`s.
318pub fn decode_stream<R: AsyncRead>(input: R) -> DecodeStream<R> {
319    FramedRead::new(input, SSECodec::default())
320}
321
322/// Encode `Event`s into an `AsyncWrite`.
323pub fn encode_stream<W: AsyncWrite>(output: W) -> EncodeStream<W> {
324    FramedWrite::new(output, SSECodec::default())
325}
326
327#[cfg(test)]
328mod encode_tests {
329    use super::*;
330    use futures::SinkExt;
331
332    #[async_std::test]
333    async fn simple_event() {
334        let mut output = vec![];
335        let mut stream = encode_stream(&mut output);
336        stream
337            .send(Event::Message {
338                id: None,
339                event: "add".to_string(),
340                data: "test\ntest2".to_string(),
341            })
342            .await
343            .unwrap();
344        assert_eq!(output, b"event: add\ndata: test\ndata: test2\n\n".to_vec());
345    }
346
347    #[async_std::test]
348    async fn with_id() {
349        let mut output = vec![];
350        let mut stream = encode_stream(&mut output);
351        stream
352            .send(Event::Message {
353                id: Some("whatever".to_string()),
354                event: "add".to_string(),
355                data: "test".to_string(),
356            })
357            .await
358            .unwrap();
359        assert_eq!(output, b"id: whatever\nevent: add\ndata: test\n\n".to_vec());
360    }
361
362    #[async_std::test]
363    async fn default_event() {
364        let mut output = vec![];
365        let mut stream = encode_stream(&mut output);
366        stream
367            .send(Event::Message {
368                id: None,
369                event: "message".to_string(),
370                data: "test".to_string(),
371            })
372            .await
373            .unwrap();
374        assert_eq!(output, b"data: test\n\n".to_vec());
375    }
376
377    #[async_std::test]
378    async fn multiple_events() {
379        let mut output = vec![];
380        let mut stream = encode_stream(&mut output);
381        stream
382            .send(Event::Message {
383                id: None,
384                event: "add".to_string(),
385                data: "test\ntest2".to_string(),
386            })
387            .await
388            .unwrap();
389        stream
390            .send(Event::Message {
391                id: Some("whatever".to_string()),
392                event: "add".to_string(),
393                data: "test".to_string(),
394            })
395            .await
396            .unwrap();
397        stream
398            .send(Event::Message {
399                id: None,
400                event: "message".to_string(),
401                data: "test".to_string(),
402            })
403            .await
404            .unwrap();
405        let mut expected = vec![];
406        expected.extend(b"event: add\ndata: test\ndata: test2\n\n".iter());
407        expected.extend(b"id: whatever\nevent: add\ndata: test\n\n".iter());
408        expected.extend(b"data: test\n\n".iter());
409        assert_eq!(output, expected);
410    }
411}
412
413#[cfg(test)]
414mod decode_tests {
415    use super::*;
416    use futures::stream::{self, StreamExt, TryStreamExt};
417
418    #[test]
419    fn simple_event() {
420        let mut codec = SSECodec::default();
421        let mut event = None;
422        let s = "event: add\ndata: test\ndata: test2\n\n";
423        for line in s.lines() {
424            if let Some(message @ Event::Message { .. }) = codec.parse_line(line) {
425                event = Some(message);
426                break;
427            }
428        }
429        assert_eq!(
430            event,
431            Some(Event::Message {
432                id: None,
433                event: "add".to_string(),
434                data: "test\ntest2".to_string(),
435            })
436        );
437    }
438
439    #[test]
440    fn decode_stream_when_fed_by_line() {
441        let input: Vec<&str> = vec![":ok", "", "event:message", "id:id1", "data:data1", ""];
442
443        let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
444
445        let messages = decode_stream(body_stream.into_async_read());
446
447        let mut result = None;
448        async_std::task::block_on(async {
449            result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
450        });
451
452        let results = result.unwrap();
453        assert_eq!(results.len(), 1);
454        assert_eq!(
455            results.get(0).unwrap(),
456            &Event::message("message", "data1", "id1")
457        );
458    }
459
460    #[test]
461    fn maintain_id_state() {
462        let input: Vec<&str> = vec!["id:1", "data:messageone", "", "data:messagetwo", ""];
463
464        let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
465
466        let messages = decode_stream(body_stream.into_async_read());
467
468        let mut result = None;
469        async_std::task::block_on(async {
470            result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
471        });
472
473        let mut results = result.unwrap();
474        assert_eq!(results.len(), 2);
475        assert_eq!(
476            results.remove(0),
477            Event::message("message", "messageone", "1")
478        );
479        assert_eq!(
480            results.remove(0),
481            Event::message("message", "messagetwo", "1")
482        );
483    }
484
485    /// Regression test: `id:` lines had historically immediately emitted an event message in
486    /// sse-codec, but shouldn't.
487    #[test]
488    fn id_is_part_of_message() {
489        let input: Vec<&str> = vec![
490            "data:messageone",
491            "id:1",
492            "data:moremessageone",
493            "",
494            "data:messagetwo",
495            "",
496        ];
497
498        let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
499
500        let messages = decode_stream(body_stream.into_async_read());
501
502        let mut result = None;
503        async_std::task::block_on(async {
504            result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
505        });
506
507        let mut results = result.unwrap();
508        assert_eq!(results.len(), 2);
509        assert_eq!(
510            results.remove(0),
511            Event::message("message", "messageone\nmoremessageone", "1")
512        );
513        assert_eq!(
514            results.remove(0),
515            Event::message("message", "messagetwo", "1")
516        );
517    }
518}
519
520#[cfg(test)]
521mod wpt {
522    //! EventSource tests from the web-platform-tests suite. See https://github.com/web-platform-tests/wpt/tree/master/eventsource
523
524    use super::*;
525    use futures::stream::StreamExt;
526
527    struct DecodeIter<'a> {
528        inner: FramedRead<&'a [u8], SSECodec>,
529    }
530    impl Iterator for DecodeIter<'_> {
531        type Item = Result<Event, Error>;
532        fn next(&mut self) -> Option<Self::Item> {
533            let mut result = None;
534            async_std::task::block_on(async {
535                result = self.inner.next().await;
536            });
537            result
538        }
539    }
540
541    fn decode(input: &[u8]) -> DecodeIter<'_> {
542        DecodeIter {
543            inner: decode_stream(input),
544        }
545    }
546
547    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/event-data.html
548    #[test]
549    fn data() {
550        let input = concat!(
551            "data:msg\n",
552            "data:msg\n",
553            "\n",
554            ":\n",
555            "falsefield:msg\n",
556            "\n",
557            "falsefield:msg\n",
558            "Data:data\n",
559            "\n",
560            "data\n",
561            "\n",
562            "data:end\n",
563            "\n",
564        );
565        let mut messages = decode(input.as_bytes());
566        assert_eq!(
567            messages.next().map(Result::unwrap),
568            Some(Event::Message {
569                id: None,
570                event: "message".into(),
571                data: "msg\nmsg".into()
572            })
573        );
574        assert_eq!(
575            messages.next().map(Result::unwrap),
576            Some(Event::Message {
577                id: None,
578                event: "message".into(),
579                data: "".into()
580            })
581        );
582        assert_eq!(
583            messages.next().map(Result::unwrap),
584            Some(Event::Message {
585                id: None,
586                event: "message".into(),
587                data: "end".into()
588            })
589        );
590        assert!(messages.next().is_none());
591    }
592
593    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-bom.htm
594    /// The byte order marker should only be stripped at the very start.
595    #[test]
596    fn bom() {
597        let mut input = vec![];
598        input.extend(b"\xEF\xBB\xBF");
599        input.extend(b"data:1\n");
600        input.extend(b"\n");
601        input.extend(b"\xEF\xBB\xBF");
602        input.extend(b"data:2\n");
603        input.extend(b"\n");
604        input.extend(b"data:3\n");
605        input.extend(b"\n");
606        let mut messages = decode(&input);
607        assert_eq!(
608            messages.next().map(Result::unwrap),
609            Some(Event::Message {
610                id: None,
611                event: "message".into(),
612                data: "1".into()
613            })
614        );
615        assert_eq!(
616            messages.next().map(Result::unwrap),
617            Some(Event::Message {
618                id: None,
619                event: "message".into(),
620                data: "3".into()
621            })
622        );
623        assert!(messages.next().is_none());
624    }
625
626    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-bom-2.htm
627    /// Only _one_ byte order marker should be stripped. This has two, which means one will remain
628    /// in the first line, therefore making the first `data:1` invalid.
629    #[test]
630    fn bom2() {
631        let mut input = vec![];
632        input.extend(b"\xEF\xBB\xBF");
633        input.extend(b"\xEF\xBB\xBF");
634        input.extend(b"data:1\n");
635        input.extend(b"\n");
636        input.extend(b"data:2\n");
637        input.extend(b"\n");
638        input.extend(b"data:3\n");
639        input.extend(b"\n");
640        let mut messages = decode(&input);
641        assert_eq!(
642            messages.next().map(Result::unwrap),
643            Some(Event::Message {
644                id: None,
645                event: "message".into(),
646                data: "2".into()
647            })
648        );
649        assert_eq!(
650            messages.next().map(Result::unwrap),
651            Some(Event::Message {
652                id: None,
653                event: "message".into(),
654                data: "3".into()
655            })
656        );
657        assert!(messages.next().is_none());
658    }
659
660    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-comments.htm
661    #[test]
662    fn comments() {
663        let longstring = "x".repeat(2049);
664        let mut input = concat!("data:1\r", ":\0\n", ":\r\n", "data:2\n", ":").to_string();
665        input.push_str(&longstring);
666        input.push_str("\r");
667        input.push_str("data:3\n");
668        input.push_str(":data:fail\r");
669        input.push_str(":");
670        input.push_str(&longstring);
671        input.push_str("\n");
672        input.push_str("data:4\n\n");
673        let mut messages = decode(input.as_bytes());
674        assert_eq!(
675            messages.next().map(Result::unwrap),
676            Some(Event::Message {
677                id: None,
678                event: "message".into(),
679                data: "1\n2\n3\n4".into()
680            })
681        );
682        assert!(messages.next().is_none());
683    }
684
685    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-data-before-final-empty-line.htm
686    #[test]
687    fn data_before_final_empty_line() {
688        let input = "retry:1000\ndata:test1\n\nid:test\ndata:test2";
689        let mut messages = decode(input.as_bytes());
690        assert_eq!(
691            messages.next().map(Result::unwrap),
692            Some(Event::Retry { retry: 1000 })
693        );
694        assert_eq!(
695            messages.next().map(Result::unwrap),
696            Some(Event::Message {
697                id: None,
698                event: "message".into(),
699                data: "test1".into()
700            })
701        );
702        assert!(dbg!(messages.next()).is_none());
703    }
704
705    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-data.htm
706    #[test]
707    fn field_data() {
708        let input = "data:\n\ndata\ndata\n\ndata:test\n\n";
709        let mut messages = decode(input.as_bytes());
710        assert_eq!(
711            messages.next().map(Result::unwrap),
712            Some(Event::Message {
713                id: None,
714                event: "message".into(),
715                data: "".into()
716            })
717        );
718        assert_eq!(
719            messages.next().map(Result::unwrap),
720            Some(Event::Message {
721                id: None,
722                event: "message".into(),
723                data: "\n".into()
724            })
725        );
726        assert_eq!(
727            messages.next().map(Result::unwrap),
728            Some(Event::Message {
729                id: None,
730                event: "message".into(),
731                data: "test".into()
732            })
733        );
734        assert!(messages.next().is_none());
735    }
736
737    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-event-empty.htm
738    #[test]
739    fn field_event_empty() {
740        let input = "event: \ndata:data\n\n";
741        let mut messages = decode(input.as_bytes());
742        assert_eq!(
743            messages.next().map(Result::unwrap),
744            Some(Event::Message {
745                id: None,
746                event: "".into(),
747                data: "data".into()
748            })
749        );
750        assert!(messages.next().is_none());
751    }
752
753    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-event.htm
754    #[test]
755    fn field_event() {
756        let input = "event:test\ndata:x\n\ndata:x\n\n";
757        let mut messages = decode(input.as_bytes());
758        assert_eq!(
759            messages.next().map(Result::unwrap),
760            Some(Event::Message {
761                id: None,
762                event: "test".into(),
763                data: "x".into()
764            })
765        );
766        assert_eq!(
767            messages.next().map(Result::unwrap),
768            Some(Event::Message {
769                id: None,
770                event: "message".into(),
771                data: "x".into()
772            })
773        );
774        assert!(messages.next().is_none());
775    }
776
777    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-id.htm
778    #[test]
779    #[ignore]
780    fn field_id() {
781        unimplemented!()
782    }
783
784    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-id-2.htm
785    #[test]
786    #[ignore]
787    fn field_id_2() {
788        unimplemented!()
789    }
790
791    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-parsing.htm
792    #[test]
793    fn field_parsing() {
794        let input = "data:\0\ndata:  2\rData:1\ndata\0:2\ndata:1\r\0data:4\nda-ta:3\rdata_5\ndata:3\rdata:\r\n data:32\ndata:4\n\n";
795        let mut messages = decode(input.as_bytes());
796        assert_eq!(
797            messages.next().map(Result::unwrap),
798            Some(Event::Message {
799                id: None,
800                event: "message".into(),
801                data: "\0\n 2\n1\n3\n\n4".into()
802            })
803        );
804        assert!(messages.next().is_none());
805    }
806
807    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-retry-bogus.htm
808    #[test]
809    fn field_retry_bogus() {
810        let input = "retry:3000\nretry:1000x\ndata:x\n\n";
811        let mut messages = decode(input.as_bytes());
812        assert_eq!(
813            messages.next().map(Result::unwrap),
814            Some(Event::Retry { retry: 3000 })
815        );
816        assert_eq!(
817            messages.next().map(Result::unwrap),
818            Some(Event::Message {
819                id: None,
820                event: "message".into(),
821                data: "x".into()
822            })
823        );
824        assert!(messages.next().is_none());
825    }
826
827    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-retry-empty.htm
828    #[test]
829    fn field_retry_empty() {
830        let input = "retry\ndata:test\n\n";
831        let mut messages = decode(input.as_bytes());
832        assert_eq!(
833            messages.next().map(Result::unwrap),
834            Some(Event::Message {
835                id: None,
836                event: "message".into(),
837                data: "test".into()
838            })
839        );
840        assert!(messages.next().is_none());
841    }
842
843    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-retry.htm
844    #[test]
845    fn field_retry() {
846        let input = "retry:03000\ndata:x\n\n";
847        let mut messages = decode(input.as_bytes());
848        assert_eq!(
849            messages.next().map(Result::unwrap),
850            Some(Event::Retry { retry: 3000 })
851        );
852        assert_eq!(
853            messages.next().map(Result::unwrap),
854            Some(Event::Message {
855                id: None,
856                event: "message".into(),
857                data: "x".into()
858            })
859        );
860        assert!(messages.next().is_none());
861    }
862
863    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-unknown.htm
864    #[test]
865    fn field_unknown() {
866        let input =
867            "data:test\n data\ndata\nfoobar:xxx\njustsometext\n:thisisacommentyay\ndata:test\n\n";
868        let mut messages = decode(input.as_bytes());
869        assert_eq!(
870            messages.next().map(Result::unwrap),
871            Some(Event::Message {
872                id: None,
873                event: "message".into(),
874                data: "test\n\ntest".into()
875            })
876        );
877        assert!(messages.next().is_none());
878    }
879
880    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-leading-space.htm
881    #[test]
882    fn leading_space() {
883        let input = "data:\ttest\rdata: \ndata:test\n\n";
884        let mut messages = decode(input.as_bytes());
885        assert_eq!(
886            messages.next().map(Result::unwrap),
887            Some(Event::Message {
888                id: None,
889                event: "message".into(),
890                data: "\ttest\n\ntest".into()
891            })
892        );
893        assert!(messages.next().is_none());
894    }
895
896    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-newlines.htm
897    #[test]
898    fn newlines() {
899        let input = "data:test\r\ndata\ndata:test\r\n\r";
900        let mut messages = decode(input.as_bytes());
901        assert_eq!(
902            messages.next().map(Result::unwrap),
903            Some(Event::Message {
904                id: None,
905                event: "message".into(),
906                data: "test\n\ntest".into()
907            })
908        );
909        assert!(messages.next().is_none());
910    }
911
912    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-null-character.html
913    #[test]
914    fn null_character() {
915        let input = "data:\0\n\n\n\n";
916        let mut messages = decode(input.as_bytes());
917        assert_eq!(
918            messages.next().map(Result::unwrap),
919            Some(Event::Message {
920                id: None,
921                event: "message".into(),
922                data: "\0".into()
923            })
924        );
925        assert!(messages.next().is_none());
926    }
927
928    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-utf-8.htm
929    #[test]
930    fn utf_8() {
931        let input = b"data:ok\xE2\x80\xA6\n\n";
932        let mut messages = decode(input);
933        assert_eq!(
934            messages.next().map(Result::unwrap),
935            Some(Event::Message {
936                id: None,
937                event: "message".into(),
938                data: "ok…".into()
939            })
940        );
941        assert!(messages.next().is_none());
942    }
943}