rocket_community/response/stream/
sse.rs

1use std::borrow::Cow;
2
3use futures::{
4    future::Either,
5    stream::{self, Stream},
6};
7use tokio::io::AsyncRead;
8use tokio::time::{interval, Duration};
9use tokio_stream::{wrappers::IntervalStream, StreamExt};
10
11use crate::http::ContentType;
12use crate::request::Request;
13use crate::response::{
14    self,
15    stream::{RawLinedEvent, ReaderStream},
16    Responder, Response,
17};
18
19/// A Server-Sent `Event` (SSE) in a Server-Sent [`struct@EventStream`].
20///
21/// A server-sent event is either a _field_ or a _comment_. Comments can be
22/// constructed via [`Event::comment()`] while fields can be constructed via
23/// [`Event::data()`], [`Event::json()`], and [`Event::retry()`].
24///
25/// ```rust
26/// # extern crate rocket_community as rocket;
27/// use rocket::tokio::time::Duration;
28/// use rocket::response::stream::Event;
29///
30/// // A `data` event with message "Hello, SSE!".
31/// let event = Event::data("Hello, SSE!");
32///
33/// // The same event but with event name of `hello`.
34/// let event = Event::data("Hello, SSE!").event("hello");
35///
36/// // A `retry` event to set the client-side reconnection time.
37/// let event = Event::retry(Duration::from_secs(5));
38///
39/// // An event with an attached comment, event name, and ID.
40/// let event = Event::data("Hello, SSE!")
41///     .with_comment("just a hello message")
42///     .event("hello")
43///     .id("1");
44/// ```
45///
46/// We largely defer to [MDN's using server-sent events] documentation for
47/// client-side details but reproduce, in our words, relevant server-side
48/// documentation here.
49///
50/// [MDN's using server-sent events]:
51/// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
52///
53/// # Pitfalls
54///
55/// Server-Sent Events suffer from certain pitfalls. We encourage readers to
56/// read through [pitfalls](struct@EventStream#pitfalls) before making use of
57/// Rocket's SSE support.
58///
59/// # Comments
60///
61/// A server-sent _comment_, created via [`Event::comment()`], is an event that
62/// appears only in the raw server-sent event data stream and is inaccessible by
63/// most clients. This includes JavaScript's `EventSource`. As such, they serve
64/// little utility beyond debugging a raw data stream and keeping a connection
65/// alive. See [heartbeat](struct@EventStream#heartbeat) for information on
66/// Rocket's `EventStream` keep-alive.
67///
68/// # Fields
69///
70/// A server-sent field can be one of four kinds:
71///
72///   * `retry`
73///
74///     A `retry` event, created via [`Event::retry()`], sets the reconnection
75///     time on the client side. It is the duration the client will wait before
76///     attempting to reconnect when a connection is lost. Most applications
77///     will not need to set a `retry`, opting instead to use the
78///     implementation's default or to reconnect manually on error.
79///
80///   * `id`
81///
82///     Sets the event id to associate all subsequent fields with. This value
83///     cannot be retrieved directly via most clients, including JavaScript
84///     `EventSource`. Instead, it is sent by the implementation on reconnection
85///     via the `Last-Event-ID` header. An `id` can be attached to other fields
86///     via the [`Event::id()`] builder method.
87///
88///   * `event`
89///
90///     Sets the event name to associate the next `data` field with. In
91///     JavaScript's `EventSource`, this is the event to be listened for, which
92///     defaults to `message`. An `event` can be attached to other fields via
93///     the [`Event::event()`] builder method.
94///
95///   * `data`
96///
97///     Sends data to dispatch as an event at the client. In JavaScript's
98///     `EventSource`, this (and only this) results in an event handler for
99///     `event`, specified just prior, being triggered. A data field can be
100///     created via the [`Event::data()`] or [`Event::json()`] constructors.
101///
102/// # Implementation Notes
103///
104/// A constructed `Event` _always_ emits its fields in the following order:
105///
106///   1. `comment`
107///   2. `retry`
108///   3. `id`
109///   4. `event`
110///   5. `data`
111///
112/// The `event` and `id` fields _cannot_ contain new lines or carriage returns.
113/// Rocket's default implementation automatically converts new lines and
114/// carriage returns in `event` and `id` fields to spaces.
115///
116/// The `data` and `comment` fields _cannot_ contain carriage returns. Rocket
117/// converts the unencoded sequence `\r\n` and the isolated `\r` into a
118/// protocol-level `\n`, that is, in such a way that they are interpreted as
119/// `\n` at the client. For example, the raw message `foo\r\nbar\rbaz` is
120/// received as `foo\nbar\nbaz` at the client-side. Encoded sequences, such as
121/// those emitted by [`Event::json()`], have no such restrictions.
122#[derive(Clone, Eq, PartialEq, Hash, Debug)]
123pub struct Event {
124    comment: Option<Cow<'static, str>>,
125    retry: Option<Duration>,
126    id: Option<Cow<'static, str>>,
127    event: Option<Cow<'static, str>>,
128    data: Option<Cow<'static, str>>,
129}
130
131impl Event {
132    // We hide this since we never want to construct an `Event` with nothing.
133    fn new() -> Self {
134        Event {
135            comment: None,
136            retry: None,
137            id: None,
138            event: None,
139            data: None,
140        }
141    }
142
143    /// Creates a new `Event` with an empty data field.
144    ///
145    /// This is exactly equivalent to `Event::data("")`.
146    ///
147    /// # Example
148    ///
149    /// ```rust
150    /// # extern crate rocket_community as rocket;
151    /// use rocket::response::stream::Event;
152    ///
153    /// let event = Event::empty();
154    /// ```
155    pub fn empty() -> Self {
156        Event::data("")
157    }
158
159    /// Creates a new `Event` with a data field of `data` serialized as JSON.
160    ///
161    /// # Example
162    ///
163    /// ```rust
164    /// # extern crate rocket_community as rocket;
165    /// use rocket::serde::Serialize;
166    /// use rocket::response::stream::Event;
167    ///
168    /// #[derive(Serialize)]
169    /// #[serde(crate = "rocket::serde")]
170    /// struct MyData<'r> {
171    ///     string: &'r str,
172    ///     number: usize,
173    /// }
174    ///
175    /// let data = MyData { string: "hello!", number: 10 };
176    /// let event = Event::json(&data);
177    /// ```
178    #[cfg(feature = "json")]
179    #[cfg_attr(nightly, doc(cfg(feature = "json")))]
180    pub fn json<T: serde::Serialize>(data: &T) -> Self {
181        let string = serde_json::to_string(data).unwrap_or_default();
182        Self::data(string)
183    }
184
185    /// Creates a new `Event` with a data field containing the raw `data`.
186    ///
187    /// # Raw SSE is Lossy
188    ///
189    /// Unencoded carriage returns cannot be expressed in the protocol. Thus,
190    /// any carriage returns in `data` will not appear at the client-side.
191    /// Instead, the sequence `\r\n` and the isolated `\r` will each appear as
192    /// `\n` at the client-side. For example, the message `foo\r\nbar\rbaz` is
193    /// received as `foo\nbar\nbaz` at the client-side.
194    ///
195    /// See [pitfalls](struct@EventStream#pitfalls) for more details.
196    ///
197    /// # Example
198    ///
199    /// ```rust
200    /// # extern crate rocket_community as rocket;
201    /// use rocket::response::stream::Event;
202    ///
203    /// // A `data` event with message "Hello, SSE!".
204    /// let event = Event::data("Hello, SSE!");
205    /// ```
206    pub fn data<T: Into<Cow<'static, str>>>(data: T) -> Self {
207        Self {
208            data: Some(data.into()),
209            ..Event::new()
210        }
211    }
212
213    /// Creates a new comment `Event`.
214    ///
215    /// As with [`Event::data()`], unencoded carriage returns cannot be
216    /// expressed in the protocol. Thus, any carriage returns in `data` will
217    /// not appear at the client-side. For comments, this is generally not a
218    /// concern as comments are discarded by client-side libraries.
219    ///
220    /// # Example
221    ///
222    /// ```rust
223    /// # extern crate rocket_community as rocket;
224    /// use rocket::response::stream::Event;
225    ///
226    /// let event = Event::comment("bet you'll never see me!");
227    /// ```
228    pub fn comment<T: Into<Cow<'static, str>>>(data: T) -> Self {
229        Self {
230            comment: Some(data.into()),
231            ..Event::new()
232        }
233    }
234
235    /// Creates a new retry `Event`.
236    ///
237    /// # Example
238    ///
239    /// ```rust
240    /// # extern crate rocket_community as rocket;
241    /// use rocket::response::stream::Event;
242    /// use rocket::tokio::time::Duration;
243    ///
244    /// let event = Event::retry(Duration::from_millis(250));
245    /// ```
246    pub fn retry(period: Duration) -> Self {
247        Self {
248            retry: Some(period),
249            ..Event::new()
250        }
251    }
252
253    /// Sets the value of the 'event' (event type) field.
254    ///
255    /// Event names may not contain new lines `\n` or carriage returns `\r`. If
256    /// `event` _does_ contain new lines or carriage returns, they are replaced
257    /// with spaces (` `) before being sent to the client.
258    ///
259    /// # Example
260    ///
261    /// ```rust
262    /// # extern crate rocket_community as rocket;
263    /// use rocket::response::stream::Event;
264    ///
265    /// // The event name is "start".
266    /// let event = Event::data("hi").event("start");
267    ///
268    /// // The event name is "then end", with `\n` replaced with ` `.
269    /// let event = Event::data("bye").event("then\nend");
270    /// ```
271    pub fn event<T: Into<Cow<'static, str>>>(mut self, event: T) -> Self {
272        self.event = Some(event.into());
273        self
274    }
275
276    /// Sets the value of the 'id' (last event ID) field.
277    ///
278    /// Event IDs may not contain new lines `\n` or carriage returns `\r`. If
279    /// `id` _does_ contain new lines or carriage returns, they are replaced
280    /// with spaces (` `) before being sent to the client.
281    ///
282    /// # Example
283    ///
284    /// ```rust
285    /// # extern crate rocket_community as rocket;
286    /// use rocket::response::stream::Event;
287    ///
288    /// // The event ID is "start".
289    /// let event = Event::data("hi").id("start");
290    ///
291    /// // The event ID is "then end", with `\n` replaced with ` `.
292    /// let event = Event::data("bye").id("then\nend");
293    /// ```
294    /// Sets the value of the 'id' field. It may not contain newlines.
295    pub fn id<T: Into<Cow<'static, str>>>(mut self, id: T) -> Self {
296        self.id = Some(id.into());
297        self
298    }
299
300    /// Sets or replaces the value of the `data` field.
301    ///
302    /// # Example
303    ///
304    /// ```rust
305    /// # extern crate rocket_community as rocket;
306    /// use rocket::response::stream::Event;
307    ///
308    /// // The data "hello" will be sent.
309    /// let event = Event::data("hi").with_data("hello");
310    ///
311    /// // The two below are equivalent.
312    /// let event = Event::comment("bye").with_data("goodbye");
313    /// let event = Event::data("goodbye").with_comment("bye");
314    /// ```
315    pub fn with_data<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
316        self.data = Some(data.into());
317        self
318    }
319
320    /// Sets or replaces the value of the `comment` field.
321    ///
322    /// # Example
323    ///
324    /// ```rust
325    /// # extern crate rocket_community as rocket;
326    /// use rocket::response::stream::Event;
327    ///
328    /// // The comment "🚀" will be sent.
329    /// let event = Event::comment("Rocket is great!").with_comment("🚀");
330    ///
331    /// // The two below are equivalent.
332    /// let event = Event::comment("bye").with_data("goodbye");
333    /// let event = Event::data("goodbye").with_comment("bye");
334    /// ```
335    pub fn with_comment<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
336        self.comment = Some(data.into());
337        self
338    }
339
340    /// Sets or replaces the value of the `retry` field.
341    ///
342    /// # Example
343    ///
344    /// ```rust
345    /// # extern crate rocket_community as rocket;
346    /// use rocket::response::stream::Event;
347    /// use rocket::tokio::time::Duration;
348    ///
349    /// // The reconnection will be set to 10 seconds.
350    /// let event = Event::retry(Duration::from_millis(500))
351    ///     .with_retry(Duration::from_secs(10));
352    ///
353    /// // The two below are equivalent.
354    /// let event = Event::comment("bye").with_retry(Duration::from_millis(500));
355    /// let event = Event::retry(Duration::from_millis(500)).with_comment("bye");
356    /// ```
357    pub fn with_retry(mut self, period: Duration) -> Self {
358        self.retry = Some(period);
359        self
360    }
361
362    fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
363        let events = [
364            self.comment.map(|v| RawLinedEvent::many("", v)),
365            self.retry
366                .map(|r| RawLinedEvent::one("retry", format!("{}", r.as_millis()))),
367            self.id.map(|v| RawLinedEvent::one("id", v)),
368            self.event.map(|v| RawLinedEvent::one("event", v)),
369            self.data.map(|v| RawLinedEvent::many("data", v)),
370            Some(RawLinedEvent::raw("")),
371        ];
372
373        stream::iter(events).filter_map(|x| x)
374    }
375}
376
377/// A potentially infinite stream of Server-Sent [`Event`]s (SSE).
378///
379/// An `EventStream` can be constructed from any [`Stream`] of items of type
380/// `Event`. The stream can be constructed directly via [`EventStream::from()`]
381/// or through generator syntax via [`EventStream!`].
382///
383/// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
384///
385/// # Responder
386///
387/// `EventStream` is a (potentially infinite) responder. The response
388/// `Content-Type` is set to [`EventStream`](const@ContentType::EventStream).
389/// The body is [unsized](crate::response::Body#unsized), and values are sent as
390/// soon as they are yielded by the internal iterator.
391///
392/// ## Heartbeat
393///
394/// A heartbeat comment is injected into the internal stream and sent at a fixed
395/// interval. The comment is discarded by clients and serves only to keep the
396/// connection alive; it does not interfere with application data. The interval
397/// defaults to 30 seconds but can be adjusted with
398/// [`EventStream::heartbeat()`].
399///
400/// # Examples
401///
402/// Use [`EventStream!`] to yield an infinite series of "ping" SSE messages to
403/// the client, one per second:
404///
405/// ```rust
406/// # extern crate rocket_community as rocket;
407/// # use rocket::*;
408/// use rocket::response::stream::{Event, EventStream};
409/// use rocket::tokio::time::{self, Duration};
410///
411/// #[get("/events")]
412/// fn stream() -> EventStream![] {
413///     EventStream! {
414///         let mut interval = time::interval(Duration::from_secs(1));
415///         loop {
416///             yield Event::data("ping");
417///             interval.tick().await;
418///         }
419///     }
420/// }
421/// ```
422///
423/// Yield 9 events: 3 triplets of `retry`, `data`, and `comment` events:
424///
425/// ```rust
426/// # extern crate rocket_community as rocket;
427/// # use rocket::get;
428/// use rocket::response::stream::{Event, EventStream};
429/// use rocket::tokio::time::Duration;
430///
431/// #[get("/events")]
432/// fn events() -> EventStream![] {
433///     EventStream! {
434///         for i in 0..3 {
435///             yield Event::retry(Duration::from_secs(10));
436///             yield Event::data(format!("{}", i)).id("cat").event("bar");
437///             yield Event::comment("silly boy");
438///         }
439///     }
440/// }
441/// ```
442///
443/// The syntax of `EventStream!` as an expression is identical to that of
444/// [`stream!`](crate::response::stream::stream). For how to gracefully
445/// terminate an otherwise infinite stream, see [graceful
446/// shutdown](crate::response::stream#graceful-shutdown).
447///
448/// # Borrowing
449///
450/// If an `EventStream` contains a borrow, the extended type syntax
451/// `EventStream![Event + '_]` must be used:
452///
453/// ```rust
454/// # extern crate rocket_community as rocket;
455/// # use rocket::get;
456/// use rocket::State;
457/// use rocket::response::stream::{Event, EventStream};
458///
459/// #[get("/events")]
460/// fn events(ctxt: &State<bool>) -> EventStream![Event + '_] {
461///     EventStream! {
462///         // By using `ctxt` in the stream, the borrow is moved into it. Thus,
463///         // the stream object contains a borrow, prompting the '_ annotation.
464///         if *ctxt.inner() {
465///             yield Event::data("hi");
466///         }
467///     }
468/// }
469/// ```
470///
471/// See [`stream#borrowing`](crate::response::stream#borrowing) for further
472/// details on borrowing in streams.
473///
474/// # Pitfalls
475///
476/// Server-Sent Events are a rather simple mechanism, though there are some
477/// pitfalls to be aware of.
478///
479///  * **Buffering**
480///
481///    Protocol restrictions complicate implementing an API that does not
482///    buffer. As such, if you are sending _lots_ of data, consider sending the
483///    data via multiple data fields (with events to signal start and end).
484///    Alternatively, send _one_ event which instructs the client to fetch the
485///    data from another endpoint which in-turn streams the data.
486///
487///  * **Raw SSE requires UTF-8 data**
488///
489///    Only UTF-8 data can be sent via SSE. If you need to send arbitrary bytes,
490///    consider encoding it, for instance, as JSON using [`Event::json()`].
491///    Alternatively, as described before, use SSE as a notifier which alerts
492///    the client to fetch the data from elsewhere.
493///
494///  * **Raw SSE is Lossy**
495///
496///    Data sent via SSE cannot contain new lines `\n` or carriage returns `\r`
497///    due to interference with the line protocol.
498///
499///    The protocol allows expressing new lines as multiple messages, however,
500///    and Rocket automatically transforms a message of `foo\nbar` into two
501///    messages, `foo` and `bar`, so that they are reconstructed (automatically)
502///    as `foo\nbar` on the client-side. For messages that only contain new
503///    lines `\n`, the conversion is lossless.
504///
505///    However, the protocol has no mechanism for expressing carriage returns
506///    and thus it is not possible to send unencoded carriage returns via SSE.
507///    Rocket handles carriage returns like it handles new lines: it splits the
508///    data into multiple messages. Thus, a sequence of `\r\n` becomes `\n` at
509///    the client side. A single `\r` that is not part of an `\r\n` sequence
510///    also becomes `\n` at the client side. As a result, the message
511///    `foo\r\nbar\rbaz` is read as `foo\nbar\nbaz` at the client-side.
512///
513///    To send messages losslessly, they must be encoded first, for instance, by
514///    using [`Event::json()`].
515///
516///  * **Clients reconnect ad-infinitum**
517///
518///    The [SSE standard] stipulates: _"Clients will reconnect if the connection
519///    is closed; a client can be told to stop reconnecting using the HTTP 204
520///    No Content response code."_ As a result, clients will typically reconnect
521///    exhaustively until either they choose to disconnect or they receive a
522///    `204 No Content` response.
523///
524///    [SSE standard]: https://html.spec.whatwg.org/multipage/server-sent-events.html
525pub struct EventStream<S> {
526    stream: S,
527    heartbeat: Option<Duration>,
528}
529
530impl<S: Stream<Item = Event>> EventStream<S> {
531    /// Sets a "ping" interval for this `EventStream` to avoid connection
532    /// timeouts when no data is being transferred. The default `interval` is 30
533    /// seconds.
534    ///
535    /// The ping is implemented by sending an empty comment to the client every
536    /// `interval` seconds.
537    ///
538    /// # Example
539    ///
540    /// ```rust
541    /// # extern crate rocket_community as rocket;
542    /// # use rocket::get;
543    /// use rocket::response::stream::{Event, EventStream};
544    /// use rocket::tokio::time::Duration;
545    ///
546    /// #[get("/events")]
547    /// fn events() -> EventStream![] {
548    ///     // Remove the default heartbeat.
549    ///     # let event_stream = rocket::futures::stream::pending();
550    ///     EventStream::from(event_stream).heartbeat(None);
551    ///
552    ///     // Set the heartbeat interval to 15 seconds.
553    ///     # let event_stream = rocket::futures::stream::pending();
554    ///     EventStream::from(event_stream).heartbeat(Duration::from_secs(15));
555    ///
556    ///     // Do the same but for a generated `EventStream`:
557    ///     let stream = EventStream! {
558    ///         yield Event::data("hello");
559    ///     };
560    ///
561    ///     stream.heartbeat(Duration::from_secs(15))
562    /// }
563    /// ```
564    pub fn heartbeat<H: Into<Option<Duration>>>(mut self, heartbeat: H) -> Self {
565        self.heartbeat = heartbeat.into();
566        self
567    }
568
569    fn heartbeat_stream(&self) -> impl Stream<Item = RawLinedEvent> {
570        self.heartbeat
571            .map(|beat| IntervalStream::new(interval(beat)))
572            .map(|stream| stream.map(|_| RawLinedEvent::raw(":")))
573            .map_or_else(|| Either::Right(stream::empty()), Either::Left)
574    }
575
576    fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
577        use futures::StreamExt;
578
579        let heartbeats = self.heartbeat_stream();
580        let events = StreamExt::map(self.stream, |e| e.into_stream()).flatten();
581        crate::util::join(events, heartbeats)
582    }
583
584    fn into_reader(self) -> impl AsyncRead {
585        ReaderStream::from(self.into_stream())
586    }
587}
588
589impl<S: Stream<Item = Event>> From<S> for EventStream<S> {
590    /// Creates an `EventStream` from a [`Stream`] of [`Event`]s.
591    ///
592    /// Use `EventStream::from()` to construct an `EventStream` from an already
593    /// existing stream. Otherwise, prefer to use [`EventStream!`].
594    ///
595    /// # Example
596    ///
597    /// ```rust
598    /// # extern crate rocket_community as rocket;
599    /// use rocket::response::stream::{Event, EventStream};
600    /// use rocket::futures::stream;
601    ///
602    /// let raw = stream::iter(vec![Event::data("a"), Event::data("b")]);
603    /// let stream = EventStream::from(raw);
604    /// ```
605    fn from(stream: S) -> Self {
606        EventStream {
607            stream,
608            heartbeat: Some(Duration::from_secs(30)),
609        }
610    }
611}
612
613impl<'r, S: Stream<Item = Event> + Send + 'r> Responder<'r, 'r> for EventStream<S> {
614    fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
615        Response::build()
616            .header(ContentType::EventStream)
617            .raw_header("Cache-Control", "no-cache")
618            .raw_header("Expires", "0")
619            .streamed_body(self.into_reader())
620            .ok()
621    }
622}
623
624crate::export! {
625    /// Type and stream expression macro for [`struct@EventStream`].
626    ///
627    /// See [`stream!`](crate::response::stream::stream) for the syntax
628    /// supported by this macro. In addition to that syntax, this macro can also
629    /// be called with no arguments, `EventStream![]`, as shorthand for
630    /// `EventStream![Event]`.
631    ///
632    /// See [`struct@EventStream`] and the [module level
633    /// docs](crate::response::stream#typed-streams) for usage details.
634    macro_rules! EventStream {
635        () => ($crate::_typed_stream!(EventStream, $crate::response::stream::Event));
636        ($($s:tt)*) => ($crate::_typed_stream!(EventStream, $($s)*));
637    }
638}
639
640#[cfg(test)]
641mod sse_tests {
642    use crate::response::stream::{stream, Event, EventStream, ReaderStream};
643    use futures::stream::Stream;
644    use tokio::io::AsyncReadExt;
645    use tokio::time::{self, Duration};
646
647    impl Event {
648        fn into_string(self) -> String {
649            crate::async_test(async move {
650                let mut string = String::new();
651                let mut reader = ReaderStream::from(self.into_stream());
652                reader
653                    .read_to_string(&mut string)
654                    .await
655                    .expect("event -> string");
656                string
657            })
658        }
659    }
660
661    impl<S: Stream<Item = Event>> EventStream<S> {
662        fn into_string(self) -> String {
663            use std::pin::pin;
664
665            crate::async_test(async move {
666                let mut string = String::new();
667                let mut reader = pin!(self.into_reader());
668                reader
669                    .read_to_string(&mut string)
670                    .await
671                    .expect("event stream -> string");
672                string
673            })
674        }
675    }
676
677    #[test]
678    fn test_event_data() {
679        let event = Event::data("a\nb");
680        assert_eq!(event.into_string(), "data:a\ndata:b\n\n");
681
682        let event = Event::data("a\n");
683        assert_eq!(event.into_string(), "data:a\ndata:\n\n");
684
685        let event = Event::data("cats make me happy!");
686        assert_eq!(event.into_string(), "data:cats make me happy!\n\n");
687
688        let event = Event::data("in the\njungle\nthe mighty\njungle");
689        assert_eq!(
690            event.into_string(),
691            "data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n"
692        );
693
694        let event = Event::data("in the\njungle\r\nthe mighty\rjungle");
695        assert_eq!(
696            event.into_string(),
697            "data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n"
698        );
699
700        let event = Event::data("\nb\n");
701        assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
702
703        let event = Event::data("\r\nb\n");
704        assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
705
706        let event = Event::data("\r\nb\r\n");
707        assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
708
709        let event = Event::data("\n\nb\n");
710        assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
711
712        let event = Event::data("\n\rb\n");
713        assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
714
715        let event = Event::data("\n\rb\r");
716        assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
717
718        let event = Event::comment("\n\rb\r");
719        assert_eq!(event.into_string(), ":\n:\n:b\n:\n\n");
720
721        let event = Event::data("\n\n\n");
722        assert_eq!(event.into_string(), "data:\ndata:\ndata:\ndata:\n\n");
723
724        let event = Event::data("\n");
725        assert_eq!(event.into_string(), "data:\ndata:\n\n");
726
727        let event = Event::data("");
728        assert_eq!(event.into_string(), "data:\n\n");
729    }
730
731    #[test]
732    fn test_event_fields() {
733        let event = Event::data("foo").id("moo");
734        assert_eq!(event.into_string(), "id:moo\ndata:foo\n\n");
735
736        let event = Event::data("foo")
737            .id("moo")
738            .with_retry(Duration::from_secs(45));
739        assert_eq!(event.into_string(), "retry:45000\nid:moo\ndata:foo\n\n");
740
741        let event = Event::data("foo\nbar")
742            .id("moo")
743            .with_retry(Duration::from_secs(45));
744        assert_eq!(
745            event.into_string(),
746            "retry:45000\nid:moo\ndata:foo\ndata:bar\n\n"
747        );
748
749        let event = Event::retry(Duration::from_secs(45));
750        assert_eq!(event.into_string(), "retry:45000\n\n");
751
752        let event = Event::comment("incoming data...");
753        assert_eq!(event.into_string(), ":incoming data...\n\n");
754
755        let event = Event::data("foo").id("moo").with_comment("cows, ey?");
756        assert_eq!(event.into_string(), ":cows, ey?\nid:moo\ndata:foo\n\n");
757
758        let event = Event::data("foo\nbar")
759            .id("moo")
760            .event("milk")
761            .with_retry(Duration::from_secs(3));
762
763        assert_eq!(
764            event.into_string(),
765            "retry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\n\n"
766        );
767
768        let event = Event::data("foo")
769            .id("moo")
770            .event("milk")
771            .with_comment("??")
772            .with_retry(Duration::from_secs(3));
773
774        assert_eq!(
775            event.into_string(),
776            ":??\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n"
777        );
778
779        let event = Event::data("foo")
780            .id("moo")
781            .event("milk")
782            .with_comment("?\n?")
783            .with_retry(Duration::from_secs(3));
784
785        assert_eq!(
786            event.into_string(),
787            ":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n"
788        );
789
790        let event = Event::data("foo\r\nbar\nbaz")
791            .id("moo")
792            .event("milk")
793            .with_comment("?\n?")
794            .with_retry(Duration::from_secs(3));
795
796        assert_eq!(
797            event.into_string(),
798            ":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\ndata:baz\n\n"
799        );
800    }
801
802    #[test]
803    fn test_bad_chars() {
804        let event = Event::data("foo").id("dead\nbeef").event("m\noo");
805        assert_eq!(
806            event.into_string(),
807            "id:dead beef\nevent:m oo\ndata:foo\n\n"
808        );
809
810        let event = Event::data("f\no").id("d\r\nbe\rf").event("m\n\r");
811        assert_eq!(
812            event.into_string(),
813            "id:d  be f\nevent:m  \ndata:f\ndata:o\n\n"
814        );
815
816        let event = Event::data("f\no").id("\r\n\n\r\n\r\r").event("\n\rb");
817        assert_eq!(
818            event.into_string(),
819            "id:       \nevent:  b\ndata:f\ndata:o\n\n"
820        );
821    }
822
823    #[test]
824    fn test_event_stream() {
825        use futures::stream::iter;
826
827        let stream = EventStream::from(iter(vec![Event::data("foo")]));
828        assert_eq!(stream.into_string().replace(":\n\n", ""), "data:foo\n\n");
829
830        let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
831        assert_eq!(
832            stream.into_string().replace(":\n\n", ""),
833            "data:a\n\ndata:b\n\n"
834        );
835
836        let stream = EventStream::from(iter(vec![
837            Event::data("a\nb"),
838            Event::data("b"),
839            Event::data("c\n\nd"),
840            Event::data("e"),
841        ]));
842
843        assert_eq!(
844            stream.into_string().replace(":\n\n", ""),
845            "data:a\ndata:b\n\ndata:b\n\ndata:c\ndata:\ndata:d\n\ndata:e\n\n"
846        );
847    }
848
849    #[test]
850    fn test_heartbeat() {
851        use futures::future::ready;
852        use futures::stream::{iter, once, StreamExt};
853
854        const HEARTBEAT: &str = ":\n";
855
856        // Set a heartbeat interval of 250ms. Send nothing for 600ms. We should
857        // get 2 or 3 heartbeats, the latter if one is sent eagerly. Maybe 4.
858        let raw = stream!(time::sleep(Duration::from_millis(600)).await;).map(|_| unreachable!());
859
860        let string = EventStream::from(raw)
861            .heartbeat(Duration::from_millis(250))
862            .into_string();
863
864        let heartbeats = string.matches(HEARTBEAT).count();
865        assert!(
866            heartbeats >= 2 && heartbeats <= 4,
867            "got {} beat(s)",
868            heartbeats
869        );
870
871        let stream = EventStream! {
872            time::sleep(Duration::from_millis(250)).await;
873            yield Event::data("foo");
874            time::sleep(Duration::from_millis(250)).await;
875            yield Event::data("bar");
876        };
877
878        // We expect: foo\n\n [heartbeat] bar\n\n [maybe heartbeat].
879        let string = stream.heartbeat(Duration::from_millis(350)).into_string();
880        let heartbeats = string.matches(HEARTBEAT).count();
881        assert!(
882            heartbeats >= 1 && heartbeats <= 3,
883            "got {} beat(s)",
884            heartbeats
885        );
886        assert!(string.contains("data:foo\n\n"), "string = {:?}", string);
887        assert!(string.contains("data:bar\n\n"), "string = {:?}", string);
888
889        // We shouldn't send a heartbeat if a message is immediately available.
890        let stream = EventStream::from(once(ready(Event::data("hello"))));
891        let string = stream.heartbeat(Duration::from_secs(1)).into_string();
892        assert_eq!(string, "data:hello\n\n", "string = {:?}", string);
893
894        // It's okay if we do it with two, though.
895        let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
896        let string = stream.heartbeat(Duration::from_secs(1)).into_string();
897        let heartbeats = string.matches(HEARTBEAT).count();
898        assert!(heartbeats <= 1);
899        assert!(string.contains("data:a\n\n"), "string = {:?}", string);
900        assert!(string.contains("data:b\n\n"), "string = {:?}", string);
901    }
902}