Skip to main content

sse_codec/
lib.rs

1//! A [`futures_codec`](https://crates.io/crates/futures_codec) that encodes and decodes Server-Sent Event/Event Sourcing streams.
2//!
3//! It emits or serializes full messages, and the meta-message `retry:`.
4//!
5//! # Examples
6//! ```rust,no_run
7//! # async fn amain() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
8//! use sse_codec::{decode_stream, Event};
9//! use futures::stream::TryStreamExt; // for try_next()
10//!
11//! let response = surf::get("https://some-site.com/events").await?;
12//! let mut events = decode_stream(response);
13//!
14//! while let Some(event) = events.try_next().await? {
15//!     println!("incoming: {:?}", event);
16//!
17//!     match event {
18//!         Event::Retry { retry } => {
19//!             // change a retry timer value or something
20//!         }
21//!         Event::Message { event, .. } if event == "stop" => {
22//!             break;
23//!         }
24//!         Event::Message { id, event, data } => {
25//!             if let Some(id) = id {
26//!                 // change the last event ID
27//!             }
28//!             // handle event here
29//!         }
30//!     }
31//! }
32//! # Ok(()) }
33//! ```
34use futures_codec::{BytesMut, Decoder, Encoder, FramedRead, FramedWrite};
35use futures_io::{AsyncRead, AsyncWrite};
36use memchr::memchr2;
37use std::fmt::Write as _;
38use std::{fmt, str::FromStr};
39
40/// An "event", either an incoming message or some meta-action that needs to be applied to the
41/// stream.
42#[derive(Debug, Clone, PartialEq, Eq)]
43#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
44pub enum Event {
45    /// An incoming message.
46    Message {
47        /// The ID of this event.
48        ///
49        /// See also the [Server-Sent Events spec](https://html.spec.whatwg.org/multipage/server-sent-events.html#concept-event-stream-last-event-id).
50        id: Option<String>,
51        /// The event type. Defaults to "message" if no event name is provided.
52        event: String,
53        /// The data for this event.
54        data: String,
55    },
56    /// Set the _reconnection time_.
57    ///
58    /// See also the [Server-Sent Events spec](https://html.spec.whatwg.org/multipage/server-sent-events.html#concept-event-stream-reconnection-time).
59    Retry {
60        /// The new reconnection time in milliseconds.
61        retry: u64,
62    },
63}
64
65impl Event {
66    /// Create a server-sent event message.
67    pub fn message<'a>(event: &str, data: &str, id: impl Into<Option<&'a str>>) -> Self {
68        Event::Message {
69            id: id.into().map(String::from),
70            event: event.to_string(),
71            data: data.to_string(),
72        }
73    }
74
75    /// Create a message that configures the retry timeout.
76    pub fn retry(time: u64) -> Self {
77        Event::Retry { retry: time }
78    }
79}
80
81/// Errors that may occur while encoding or decoding server-sent event messages.
82#[derive(Debug)]
83pub enum Error {
84    /// An I/O error occurred while reading or writing a stream.
85    IoError(std::io::Error),
86    /// Incoming data is not valid utf-8.
87    Utf8Error(std::str::Utf8Error),
88    /// An error occurred while writing an event message.
89    FmtError(std::fmt::Error),
90    /// Tried to read an incomplete frame.
91    IncompleteFrame,
92}
93
94impl fmt::Display for Error {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        match self {
97            Error::IoError(inner) => inner.fmt(f),
98            Error::Utf8Error(inner) => inner.fmt(f),
99            Error::FmtError(inner) => inner.fmt(f),
100            Error::IncompleteFrame => write!(f, "incomplete frame"),
101        }
102    }
103}
104
105impl std::error::Error for Error {}
106
107impl From<std::io::Error> for Error {
108    fn from(err: std::io::Error) -> Self {
109        Self::IoError(err)
110    }
111}
112
113impl From<std::fmt::Error> for Error {
114    fn from(err: std::fmt::Error) -> Self {
115        Self::FmtError(err)
116    }
117}
118
119impl From<std::str::Utf8Error> for Error {
120    fn from(err: std::str::Utf8Error) -> Self {
121        Self::Utf8Error(err)
122    }
123}
124
125/// Chop off a leading space (code point 0x20) from a string slice.
126fn strip_leading_space(input: &str) -> &str {
127    input.strip_prefix(' ').unwrap_or(input)
128}
129
130impl FromStr for Event {
131    type Err = Error;
132
133    /// Parse an event message from a string.
134    fn from_str(s: &str) -> Result<Self, Self::Err> {
135        let mut codec = SSECodec::default();
136        for line in s.lines() {
137            if let Some(message @ Event::Message { .. }) = codec.parse_line(line) {
138                return Ok(message);
139            }
140        }
141        Err(Error::IncompleteFrame)
142    }
143}
144
145impl fmt::Display for Event {
146    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147        match self {
148            Event::Message { id, event, data } => {
149                if let Some(id) = id {
150                    if id.is_empty() {
151                        writeln!(f, "id")?;
152                    } else {
153                        writeln!(f, "id: {}", &id)?;
154                    }
155                }
156                if event != "message" {
157                    writeln!(f, "event: {}", &event)?;
158                }
159
160                for line in data.lines() {
161                    writeln!(f, "data: {}", line)?;
162                }
163                Ok(())
164            }
165            Event::Retry { retry } => writeln!(f, "retry: {}", retry),
166        }
167    }
168}
169
170/// Encoder/decoder for server-sent event streams.
171#[derive(Debug, Default, Clone)]
172pub struct SSECodec {
173    /// Have we processed the optional Byte Order Marker on the first line?
174    processed_bom: bool,
175    /// Was the last character of the previous line a \r?
176    last_was_cr: bool,
177    /// The _last event ID_ buffer.
178    last_event_id: Option<String>,
179    /// The _event type_ buffer.
180    event_type: Option<String>,
181    /// The _data_ buffer.
182    data: String,
183}
184
185impl SSECodec {
186    fn take_message(&mut self) -> Option<Event> {
187        fn default_event_name() -> String {
188            "message".to_string()
189        }
190
191        if self.data.is_empty() {
192            // If the data buffer is an empty string, set the data buffer and the event type buffer to the empty string [and return.]
193            self.event_type.take();
194            None
195        } else {
196            if self.data.ends_with('\n') {
197                self.data.pop();
198            }
199            Some(Event::Message {
200                // The _last event ID_ buffer persists between messages.
201                id: self.last_event_id.clone(),
202                event: self.event_type.take().unwrap_or_else(default_event_name),
203                data: std::mem::take(&mut self.data),
204            })
205        }
206    }
207
208    fn parse_line(&mut self, line: &str) -> Option<Event> {
209        let mut parts = line.splitn(2, ':');
210        match (parts.next(), parts.next()) {
211            // If the field name is "retry":
212            (Some("retry"), Some(value)) if value.chars().all(|c| c.is_ascii_digit()) => {
213                // If the field value consists of only ASCII digits, then interpret the field value
214                // as an integer in base ten, and set the event stream's reconnection time to that
215                // integer. Otherwise, ignore the field.
216                if let Ok(time) = value.parse::<u64>() {
217                    return Some(Event::Retry { retry: time });
218                }
219            }
220            // If the field name is "event":
221            (Some("event"), Some(value)) => {
222                // Set the event type buffer to field value.
223                self.event_type = Some(strip_leading_space(value).to_string());
224            }
225            // If the field name is "data":
226            (Some("data"), value) => {
227                // Append the field value to the data buffer,
228                if let Some(value) = value {
229                    self.data += strip_leading_space(value);
230                }
231                // then append a single U+000A LINE FEED (LF) character to the data buffer.
232                self.data.push('\n');
233            }
234            // If the field name is "id":
235            (Some("id"), Some(id_str)) if !id_str.contains(char::from(0)) => {
236                // If the field value does not contain U+0000 NULL, then set the last event ID buffer to the field value.
237                // Otherwise, ignore the field.
238                self.last_event_id = Some(strip_leading_space(id_str).to_string());
239            }
240            // Comment
241            (Some(""), Some(_)) => (),
242            // End of frame
243            (Some(""), None) => {
244                return self.take_message();
245            }
246            _ => (),
247        }
248        None
249    }
250}
251
252impl Decoder for SSECodec {
253    type Item = Event;
254    type Error = Error;
255
256    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
257        while let Some(pos) = memchr2(b'\r', b'\n', src) {
258            let line = src.split_to(pos + 1);
259
260            // treat \r\n as one newline
261            if pos == 0 && line == "\n" && self.last_was_cr {
262                self.last_was_cr = false;
263                continue;
264            }
265            self.last_was_cr = line.last() == Some(&b'\r');
266
267            // get rid of the '\n' at the end
268            let line = std::str::from_utf8(&line[..pos])?;
269            // get rid of the BOM at the start
270            let line = if line.starts_with('\u{feff}') && !self.processed_bom {
271                self.processed_bom = true;
272                &line[3..]
273            } else {
274                line
275            };
276            if let Some(event) = self.parse_line(line) {
277                return Ok(Some(event));
278            }
279        }
280        Ok(None)
281    }
282
283    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
284        let _garbage = src.split_to(src.len());
285        Ok(None)
286    }
287}
288
289impl Encoder for SSECodec {
290    type Item = Event;
291    type Error = Error;
292
293    fn encode(&mut self, item: Self::Item, dest: &mut BytesMut) -> Result<(), Self::Error> {
294        writeln!(dest, "{}", item).map_err(Into::into)
295    }
296}
297
298/// Type of a decoding stream, returned from `decode_stream()`.
299pub type DecodeStream<R> = FramedRead<R, SSECodec>;
300
301/// Type of an encoding stream, returned from `encode_stream()`.
302pub type EncodeStream<W> = FramedWrite<W, SSECodec>;
303
304/// Parse messages from an `AsyncRead`, returning a stream of `Event`s.
305pub fn decode_stream<R: AsyncRead>(input: R) -> DecodeStream<R> {
306    FramedRead::new(input, SSECodec::default())
307}
308
309/// Encode `Event`s into an `AsyncWrite`.
310pub fn encode_stream<W: AsyncWrite>(output: W) -> EncodeStream<W> {
311    FramedWrite::new(output, SSECodec::default())
312}
313
314#[cfg(test)]
315mod encode_tests {
316    use super::*;
317    use futures::SinkExt;
318
319    #[async_std::test]
320    async fn simple_event() {
321        let mut output = vec![];
322        let mut stream = encode_stream(&mut output);
323        stream
324            .send(Event::Message {
325                id: None,
326                event: "add".to_string(),
327                data: "test\ntest2".to_string(),
328            })
329            .await
330            .unwrap();
331        assert_eq!(output, b"event: add\ndata: test\ndata: test2\n\n".to_vec());
332    }
333
334    #[async_std::test]
335    async fn with_id() {
336        let mut output = vec![];
337        let mut stream = encode_stream(&mut output);
338        stream
339            .send(Event::Message {
340                id: Some("whatever".to_string()),
341                event: "add".to_string(),
342                data: "test".to_string(),
343            })
344            .await
345            .unwrap();
346        assert_eq!(output, b"id: whatever\nevent: add\ndata: test\n\n".to_vec());
347    }
348
349    #[async_std::test]
350    async fn default_event() {
351        let mut output = vec![];
352        let mut stream = encode_stream(&mut output);
353        stream
354            .send(Event::Message {
355                id: None,
356                event: "message".to_string(),
357                data: "test".to_string(),
358            })
359            .await
360            .unwrap();
361        assert_eq!(output, b"data: test\n\n".to_vec());
362    }
363
364    #[async_std::test]
365    async fn multiple_events() {
366        let mut output = vec![];
367        let mut stream = encode_stream(&mut output);
368        stream
369            .send(Event::Message {
370                id: None,
371                event: "add".to_string(),
372                data: "test\ntest2".to_string(),
373            })
374            .await
375            .unwrap();
376        stream
377            .send(Event::Message {
378                id: Some("whatever".to_string()),
379                event: "add".to_string(),
380                data: "test".to_string(),
381            })
382            .await
383            .unwrap();
384        stream
385            .send(Event::Message {
386                id: None,
387                event: "message".to_string(),
388                data: "test".to_string(),
389            })
390            .await
391            .unwrap();
392        let mut expected = vec![];
393        expected.extend(b"event: add\ndata: test\ndata: test2\n\n".iter());
394        expected.extend(b"id: whatever\nevent: add\ndata: test\n\n".iter());
395        expected.extend(b"data: test\n\n".iter());
396        assert_eq!(output, expected);
397    }
398}
399
400#[cfg(test)]
401mod decode_tests {
402    use super::*;
403    use futures::stream::{self, StreamExt, TryStreamExt};
404
405    #[test]
406    fn simple_event() {
407        let mut codec = SSECodec::default();
408        let mut event = None;
409        let s = "event: add\ndata: test\ndata: test2\n\n";
410        for line in s.lines() {
411            if let Some(message @ Event::Message { .. }) = codec.parse_line(line) {
412                event = Some(message);
413                break;
414            }
415        }
416        assert_eq!(
417            event,
418            Some(Event::Message {
419                id: None,
420                event: "add".to_string(),
421                data: "test\ntest2".to_string(),
422            })
423        );
424    }
425
426    #[test]
427    fn decode_stream_when_fed_by_line() {
428        let input: Vec<&str> = vec![":ok", "", "event:message", "id:id1", "data:data1", ""];
429
430        let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
431
432        let messages = decode_stream(body_stream.into_async_read());
433
434        let mut result = None;
435        async_std::task::block_on(async {
436            result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
437        });
438
439        let results = result.unwrap();
440        assert_eq!(results.len(), 1);
441        assert_eq!(
442            results.get(0).unwrap(),
443            &Event::message("message", "data1", "id1")
444        );
445    }
446
447    #[test]
448    fn maintain_id_state() {
449        let input: Vec<&str> = vec!["id:1", "data:messageone", "", "data:messagetwo", ""];
450
451        let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
452
453        let messages = decode_stream(body_stream.into_async_read());
454
455        let mut result = None;
456        async_std::task::block_on(async {
457            result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
458        });
459
460        let mut results = result.unwrap();
461        assert_eq!(results.len(), 2);
462        assert_eq!(
463            results.remove(0),
464            Event::message("message", "messageone", "1")
465        );
466        assert_eq!(
467            results.remove(0),
468            Event::message("message", "messagetwo", "1")
469        );
470    }
471
472    /// Regression test: `id:` lines had historically immediately emitted an event message in
473    /// sse-codec, but shouldn't.
474    #[test]
475    fn id_is_part_of_message() {
476        let input: Vec<&str> = vec![
477            "data:messageone",
478            "id:1",
479            "data:moremessageone",
480            "",
481            "data:messagetwo",
482            "",
483        ];
484
485        let body_stream = stream::iter(input).map(|i| Ok(i.to_owned() + "\n"));
486
487        let messages = decode_stream(body_stream.into_async_read());
488
489        let mut result = None;
490        async_std::task::block_on(async {
491            result = Some(messages.map(|i| i.unwrap()).collect::<Vec<_>>().await);
492        });
493
494        let mut results = result.unwrap();
495        assert_eq!(results.len(), 2);
496        assert_eq!(
497            results.remove(0),
498            Event::message("message", "messageone\nmoremessageone", "1")
499        );
500        assert_eq!(
501            results.remove(0),
502            Event::message("message", "messagetwo", "1")
503        );
504    }
505}
506
507#[cfg(test)]
508mod wpt {
509    //! EventSource tests from the web-platform-tests suite. See https://github.com/web-platform-tests/wpt/tree/master/eventsource
510
511    use super::*;
512    use futures::stream::StreamExt;
513
514    struct DecodeIter<'a> {
515        inner: FramedRead<&'a [u8], SSECodec>,
516    }
517    impl Iterator for DecodeIter<'_> {
518        type Item = Result<Event, Error>;
519        fn next(&mut self) -> Option<Self::Item> {
520            let mut result = None;
521            async_std::task::block_on(async {
522                result = self.inner.next().await;
523            });
524            result
525        }
526    }
527
528    fn decode(input: &[u8]) -> DecodeIter<'_> {
529        DecodeIter {
530            inner: decode_stream(input),
531        }
532    }
533
534    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/event-data.html
535    #[test]
536    fn data() {
537        let input = concat!(
538            "data:msg\n",
539            "data:msg\n",
540            "\n",
541            ":\n",
542            "falsefield:msg\n",
543            "\n",
544            "falsefield:msg\n",
545            "Data:data\n",
546            "\n",
547            "data\n",
548            "\n",
549            "data:end\n",
550            "\n",
551        );
552        let mut messages = decode(input.as_bytes());
553        assert_eq!(
554            messages.next().map(Result::unwrap),
555            Some(Event::Message {
556                id: None,
557                event: "message".into(),
558                data: "msg\nmsg".into()
559            })
560        );
561        assert_eq!(
562            messages.next().map(Result::unwrap),
563            Some(Event::Message {
564                id: None,
565                event: "message".into(),
566                data: "".into()
567            })
568        );
569        assert_eq!(
570            messages.next().map(Result::unwrap),
571            Some(Event::Message {
572                id: None,
573                event: "message".into(),
574                data: "end".into()
575            })
576        );
577        assert!(messages.next().is_none());
578    }
579
580    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-bom.htm
581    /// The byte order marker should only be stripped at the very start.
582    #[test]
583    fn bom() {
584        let mut input = vec![];
585        input.extend(b"\xEF\xBB\xBF");
586        input.extend(b"data:1\n");
587        input.extend(b"\n");
588        input.extend(b"\xEF\xBB\xBF");
589        input.extend(b"data:2\n");
590        input.extend(b"\n");
591        input.extend(b"data:3\n");
592        input.extend(b"\n");
593        let mut messages = decode(&input);
594        assert_eq!(
595            messages.next().map(Result::unwrap),
596            Some(Event::Message {
597                id: None,
598                event: "message".into(),
599                data: "1".into()
600            })
601        );
602        assert_eq!(
603            messages.next().map(Result::unwrap),
604            Some(Event::Message {
605                id: None,
606                event: "message".into(),
607                data: "3".into()
608            })
609        );
610        assert!(messages.next().is_none());
611    }
612
613    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-bom-2.htm
614    /// Only _one_ byte order marker should be stripped. This has two, which means one will remain
615    /// in the first line, therefore making the first `data:1` invalid.
616    #[test]
617    fn bom2() {
618        let mut input = vec![];
619        input.extend(b"\xEF\xBB\xBF");
620        input.extend(b"\xEF\xBB\xBF");
621        input.extend(b"data:1\n");
622        input.extend(b"\n");
623        input.extend(b"data:2\n");
624        input.extend(b"\n");
625        input.extend(b"data:3\n");
626        input.extend(b"\n");
627        let mut messages = decode(&input);
628        assert_eq!(
629            messages.next().map(Result::unwrap),
630            Some(Event::Message {
631                id: None,
632                event: "message".into(),
633                data: "2".into()
634            })
635        );
636        assert_eq!(
637            messages.next().map(Result::unwrap),
638            Some(Event::Message {
639                id: None,
640                event: "message".into(),
641                data: "3".into()
642            })
643        );
644        assert!(messages.next().is_none());
645    }
646
647    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-comments.htm
648    #[test]
649    fn comments() {
650        let longstring = "x".repeat(2049);
651        let mut input = concat!("data:1\r", ":\0\n", ":\r\n", "data:2\n", ":").to_string();
652        input.push_str(&longstring);
653        input.push('\r');
654        input.push_str("data:3\n");
655        input.push_str(":data:fail\r");
656        input.push(':');
657        input.push_str(&longstring);
658        input.push('\n');
659        input.push_str("data:4\n\n");
660        let mut messages = decode(input.as_bytes());
661        assert_eq!(
662            messages.next().map(Result::unwrap),
663            Some(Event::Message {
664                id: None,
665                event: "message".into(),
666                data: "1\n2\n3\n4".into()
667            })
668        );
669        assert!(messages.next().is_none());
670    }
671
672    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-data-before-final-empty-line.htm
673    #[test]
674    fn data_before_final_empty_line() {
675        let input = "retry:1000\ndata:test1\n\nid:test\ndata:test2";
676        let mut messages = decode(input.as_bytes());
677        assert_eq!(
678            messages.next().map(Result::unwrap),
679            Some(Event::Retry { retry: 1000 })
680        );
681        assert_eq!(
682            messages.next().map(Result::unwrap),
683            Some(Event::Message {
684                id: None,
685                event: "message".into(),
686                data: "test1".into()
687            })
688        );
689        assert!(dbg!(messages.next()).is_none());
690    }
691
692    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-data.htm
693    #[test]
694    fn field_data() {
695        let input = "data:\n\ndata\ndata\n\ndata:test\n\n";
696        let mut messages = decode(input.as_bytes());
697        assert_eq!(
698            messages.next().map(Result::unwrap),
699            Some(Event::Message {
700                id: None,
701                event: "message".into(),
702                data: "".into()
703            })
704        );
705        assert_eq!(
706            messages.next().map(Result::unwrap),
707            Some(Event::Message {
708                id: None,
709                event: "message".into(),
710                data: "\n".into()
711            })
712        );
713        assert_eq!(
714            messages.next().map(Result::unwrap),
715            Some(Event::Message {
716                id: None,
717                event: "message".into(),
718                data: "test".into()
719            })
720        );
721        assert!(messages.next().is_none());
722    }
723
724    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-event-empty.htm
725    #[test]
726    fn field_event_empty() {
727        let input = "event: \ndata:data\n\n";
728        let mut messages = decode(input.as_bytes());
729        assert_eq!(
730            messages.next().map(Result::unwrap),
731            Some(Event::Message {
732                id: None,
733                event: "".into(),
734                data: "data".into()
735            })
736        );
737        assert!(messages.next().is_none());
738    }
739
740    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-event.htm
741    #[test]
742    fn field_event() {
743        let input = "event:test\ndata:x\n\ndata:x\n\n";
744        let mut messages = decode(input.as_bytes());
745        assert_eq!(
746            messages.next().map(Result::unwrap),
747            Some(Event::Message {
748                id: None,
749                event: "test".into(),
750                data: "x".into()
751            })
752        );
753        assert_eq!(
754            messages.next().map(Result::unwrap),
755            Some(Event::Message {
756                id: None,
757                event: "message".into(),
758                data: "x".into()
759            })
760        );
761        assert!(messages.next().is_none());
762    }
763
764    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-id.htm
765    #[test]
766    #[ignore]
767    fn field_id() {
768        unimplemented!()
769    }
770
771    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-id-2.htm
772    #[test]
773    #[ignore]
774    fn field_id_2() {
775        unimplemented!()
776    }
777
778    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-parsing.htm
779    #[test]
780    fn field_parsing() {
781        let input = "data:\0\ndata:  2\rData:1\ndata\0:2\ndata:1\r\0data:4\nda-ta:3\rdata_5\ndata:3\rdata:\r\n data:32\ndata:4\n\n";
782        let mut messages = decode(input.as_bytes());
783        assert_eq!(
784            messages.next().map(Result::unwrap),
785            Some(Event::Message {
786                id: None,
787                event: "message".into(),
788                data: "\0\n 2\n1\n3\n\n4".into()
789            })
790        );
791        assert!(messages.next().is_none());
792    }
793
794    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-retry-bogus.htm
795    #[test]
796    fn field_retry_bogus() {
797        let input = "retry:3000\nretry:1000x\ndata:x\n\n";
798        let mut messages = decode(input.as_bytes());
799        assert_eq!(
800            messages.next().map(Result::unwrap),
801            Some(Event::Retry { retry: 3000 })
802        );
803        assert_eq!(
804            messages.next().map(Result::unwrap),
805            Some(Event::Message {
806                id: None,
807                event: "message".into(),
808                data: "x".into()
809            })
810        );
811        assert!(messages.next().is_none());
812    }
813
814    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-retry-empty.htm
815    #[test]
816    fn field_retry_empty() {
817        let input = "retry\ndata:test\n\n";
818        let mut messages = decode(input.as_bytes());
819        assert_eq!(
820            messages.next().map(Result::unwrap),
821            Some(Event::Message {
822                id: None,
823                event: "message".into(),
824                data: "test".into()
825            })
826        );
827        assert!(messages.next().is_none());
828    }
829
830    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-retry.htm
831    #[test]
832    fn field_retry() {
833        let input = "retry:03000\ndata:x\n\n";
834        let mut messages = decode(input.as_bytes());
835        assert_eq!(
836            messages.next().map(Result::unwrap),
837            Some(Event::Retry { retry: 3000 })
838        );
839        assert_eq!(
840            messages.next().map(Result::unwrap),
841            Some(Event::Message {
842                id: None,
843                event: "message".into(),
844                data: "x".into()
845            })
846        );
847        assert!(messages.next().is_none());
848    }
849
850    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-field-unknown.htm
851    #[test]
852    fn field_unknown() {
853        let input =
854            "data:test\n data\ndata\nfoobar:xxx\njustsometext\n:thisisacommentyay\ndata:test\n\n";
855        let mut messages = decode(input.as_bytes());
856        assert_eq!(
857            messages.next().map(Result::unwrap),
858            Some(Event::Message {
859                id: None,
860                event: "message".into(),
861                data: "test\n\ntest".into()
862            })
863        );
864        assert!(messages.next().is_none());
865    }
866
867    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-leading-space.htm
868    #[test]
869    fn leading_space() {
870        let input = "data:\ttest\rdata: \ndata:test\n\n";
871        let mut messages = decode(input.as_bytes());
872        assert_eq!(
873            messages.next().map(Result::unwrap),
874            Some(Event::Message {
875                id: None,
876                event: "message".into(),
877                data: "\ttest\n\ntest".into()
878            })
879        );
880        assert!(messages.next().is_none());
881    }
882
883    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-newlines.htm
884    #[test]
885    fn newlines() {
886        let input = "data:test\r\ndata\ndata:test\r\n\r";
887        let mut messages = decode(input.as_bytes());
888        assert_eq!(
889            messages.next().map(Result::unwrap),
890            Some(Event::Message {
891                id: None,
892                event: "message".into(),
893                data: "test\n\ntest".into()
894            })
895        );
896        assert!(messages.next().is_none());
897    }
898
899    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-null-character.html
900    #[test]
901    fn null_character() {
902        let input = "data:\0\n\n\n\n";
903        let mut messages = decode(input.as_bytes());
904        assert_eq!(
905            messages.next().map(Result::unwrap),
906            Some(Event::Message {
907                id: None,
908                event: "message".into(),
909                data: "\0".into()
910            })
911        );
912        assert!(messages.next().is_none());
913    }
914
915    /// https://github.com/web-platform-tests/wpt/blob/master/eventsource/format-utf-8.htm
916    #[test]
917    fn utf_8() {
918        let input = b"data:ok\xE2\x80\xA6\n\n";
919        let mut messages = decode(input);
920        assert_eq!(
921            messages.next().map(Result::unwrap),
922            Some(Event::Message {
923                id: None,
924                event: "message".into(),
925                data: "ok…".into()
926            })
927        );
928        assert!(messages.next().is_none());
929    }
930}