rocket_community/response/stream/sse.rs
1use std::borrow::Cow;
2
3use futures::{
4 future::Either,
5 stream::{self, Stream},
6};
7use tokio::io::AsyncRead;
8use tokio::time::{interval, Duration};
9use tokio_stream::{wrappers::IntervalStream, StreamExt};
10
11use crate::http::ContentType;
12use crate::request::Request;
13use crate::response::{
14 self,
15 stream::{RawLinedEvent, ReaderStream},
16 Responder, Response,
17};
18
19/// A Server-Sent `Event` (SSE) in a Server-Sent [`struct@EventStream`].
20///
21/// A server-sent event is either a _field_ or a _comment_. Comments can be
22/// constructed via [`Event::comment()`] while fields can be constructed via
23/// [`Event::data()`], [`Event::json()`], and [`Event::retry()`].
24///
25/// ```rust
26/// # extern crate rocket_community as rocket;
27/// use rocket::tokio::time::Duration;
28/// use rocket::response::stream::Event;
29///
30/// // A `data` event with message "Hello, SSE!".
31/// let event = Event::data("Hello, SSE!");
32///
33/// // The same event but with event name of `hello`.
34/// let event = Event::data("Hello, SSE!").event("hello");
35///
36/// // A `retry` event to set the client-side reconnection time.
37/// let event = Event::retry(Duration::from_secs(5));
38///
39/// // An event with an attached comment, event name, and ID.
40/// let event = Event::data("Hello, SSE!")
41/// .with_comment("just a hello message")
42/// .event("hello")
43/// .id("1");
44/// ```
45///
46/// We largely defer to [MDN's using server-sent events] documentation for
47/// client-side details but reproduce, in our words, relevant server-side
48/// documentation here.
49///
50/// [MDN's using server-sent events]:
51/// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
52///
53/// # Pitfalls
54///
55/// Server-Sent Events suffer from certain pitfalls. We encourage readers to
56/// read through [pitfalls](struct@EventStream#pitfalls) before making use of
57/// Rocket's SSE support.
58///
59/// # Comments
60///
61/// A server-sent _comment_, created via [`Event::comment()`], is an event that
62/// appears only in the raw server-sent event data stream and is inaccessible by
63/// most clients. This includes JavaScript's `EventSource`. As such, they serve
64/// little utility beyond debugging a raw data stream and keeping a connection
65/// alive. See [heartbeat](struct@EventStream#heartbeat) for information on
66/// Rocket's `EventStream` keep-alive.
67///
68/// # Fields
69///
70/// A server-sent field can be one of four kinds:
71///
72/// * `retry`
73///
74/// A `retry` event, created via [`Event::retry()`], sets the reconnection
75/// time on the client side. It is the duration the client will wait before
76/// attempting to reconnect when a connection is lost. Most applications
77/// will not need to set a `retry`, opting instead to use the
78/// implementation's default or to reconnect manually on error.
79///
80/// * `id`
81///
82/// Sets the event id to associate all subsequent fields with. This value
83/// cannot be retrieved directly via most clients, including JavaScript
84/// `EventSource`. Instead, it is sent by the implementation on reconnection
85/// via the `Last-Event-ID` header. An `id` can be attached to other fields
86/// via the [`Event::id()`] builder method.
87///
88/// * `event`
89///
90/// Sets the event name to associate the next `data` field with. In
91/// JavaScript's `EventSource`, this is the event to be listened for, which
92/// defaults to `message`. An `event` can be attached to other fields via
93/// the [`Event::event()`] builder method.
94///
95/// * `data`
96///
97/// Sends data to dispatch as an event at the client. In JavaScript's
98/// `EventSource`, this (and only this) results in an event handler for
99/// `event`, specified just prior, being triggered. A data field can be
100/// created via the [`Event::data()`] or [`Event::json()`] constructors.
101///
102/// # Implementation Notes
103///
104/// A constructed `Event` _always_ emits its fields in the following order:
105///
106/// 1. `comment`
107/// 2. `retry`
108/// 3. `id`
109/// 4. `event`
110/// 5. `data`
111///
112/// The `event` and `id` fields _cannot_ contain new lines or carriage returns.
113/// Rocket's default implementation automatically converts new lines and
114/// carriage returns in `event` and `id` fields to spaces.
115///
116/// The `data` and `comment` fields _cannot_ contain carriage returns. Rocket
117/// converts the unencoded sequence `\r\n` and the isolated `\r` into a
118/// protocol-level `\n`, that is, in such a way that they are interpreted as
119/// `\n` at the client. For example, the raw message `foo\r\nbar\rbaz` is
120/// received as `foo\nbar\nbaz` at the client-side. Encoded sequences, such as
121/// those emitted by [`Event::json()`], have no such restrictions.
122#[derive(Clone, Eq, PartialEq, Hash, Debug)]
123pub struct Event {
124 comment: Option<Cow<'static, str>>,
125 retry: Option<Duration>,
126 id: Option<Cow<'static, str>>,
127 event: Option<Cow<'static, str>>,
128 data: Option<Cow<'static, str>>,
129}
130
131impl Event {
132 // We hide this since we never want to construct an `Event` with nothing.
133 fn new() -> Self {
134 Event {
135 comment: None,
136 retry: None,
137 id: None,
138 event: None,
139 data: None,
140 }
141 }
142
143 /// Creates a new `Event` with an empty data field.
144 ///
145 /// This is exactly equivalent to `Event::data("")`.
146 ///
147 /// # Example
148 ///
149 /// ```rust
150 /// # extern crate rocket_community as rocket;
151 /// use rocket::response::stream::Event;
152 ///
153 /// let event = Event::empty();
154 /// ```
155 pub fn empty() -> Self {
156 Event::data("")
157 }
158
159 /// Creates a new `Event` with a data field of `data` serialized as JSON.
160 ///
161 /// # Example
162 ///
163 /// ```rust
164 /// # extern crate rocket_community as rocket;
165 /// use rocket::serde::Serialize;
166 /// use rocket::response::stream::Event;
167 ///
168 /// #[derive(Serialize)]
169 /// #[serde(crate = "rocket::serde")]
170 /// struct MyData<'r> {
171 /// string: &'r str,
172 /// number: usize,
173 /// }
174 ///
175 /// let data = MyData { string: "hello!", number: 10 };
176 /// let event = Event::json(&data);
177 /// ```
178 #[cfg(feature = "json")]
179 #[cfg_attr(nightly, doc(cfg(feature = "json")))]
180 pub fn json<T: serde::Serialize>(data: &T) -> Self {
181 let string = serde_json::to_string(data).unwrap_or_default();
182 Self::data(string)
183 }
184
185 /// Creates a new `Event` with a data field containing the raw `data`.
186 ///
187 /// # Raw SSE is Lossy
188 ///
189 /// Unencoded carriage returns cannot be expressed in the protocol. Thus,
190 /// any carriage returns in `data` will not appear at the client-side.
191 /// Instead, the sequence `\r\n` and the isolated `\r` will each appear as
192 /// `\n` at the client-side. For example, the message `foo\r\nbar\rbaz` is
193 /// received as `foo\nbar\nbaz` at the client-side.
194 ///
195 /// See [pitfalls](struct@EventStream#pitfalls) for more details.
196 ///
197 /// # Example
198 ///
199 /// ```rust
200 /// # extern crate rocket_community as rocket;
201 /// use rocket::response::stream::Event;
202 ///
203 /// // A `data` event with message "Hello, SSE!".
204 /// let event = Event::data("Hello, SSE!");
205 /// ```
206 pub fn data<T: Into<Cow<'static, str>>>(data: T) -> Self {
207 Self {
208 data: Some(data.into()),
209 ..Event::new()
210 }
211 }
212
213 /// Creates a new comment `Event`.
214 ///
215 /// As with [`Event::data()`], unencoded carriage returns cannot be
216 /// expressed in the protocol. Thus, any carriage returns in `data` will
217 /// not appear at the client-side. For comments, this is generally not a
218 /// concern as comments are discarded by client-side libraries.
219 ///
220 /// # Example
221 ///
222 /// ```rust
223 /// # extern crate rocket_community as rocket;
224 /// use rocket::response::stream::Event;
225 ///
226 /// let event = Event::comment("bet you'll never see me!");
227 /// ```
228 pub fn comment<T: Into<Cow<'static, str>>>(data: T) -> Self {
229 Self {
230 comment: Some(data.into()),
231 ..Event::new()
232 }
233 }
234
235 /// Creates a new retry `Event`.
236 ///
237 /// # Example
238 ///
239 /// ```rust
240 /// # extern crate rocket_community as rocket;
241 /// use rocket::response::stream::Event;
242 /// use rocket::tokio::time::Duration;
243 ///
244 /// let event = Event::retry(Duration::from_millis(250));
245 /// ```
246 pub fn retry(period: Duration) -> Self {
247 Self {
248 retry: Some(period),
249 ..Event::new()
250 }
251 }
252
253 /// Sets the value of the 'event' (event type) field.
254 ///
255 /// Event names may not contain new lines `\n` or carriage returns `\r`. If
256 /// `event` _does_ contain new lines or carriage returns, they are replaced
257 /// with spaces (` `) before being sent to the client.
258 ///
259 /// # Example
260 ///
261 /// ```rust
262 /// # extern crate rocket_community as rocket;
263 /// use rocket::response::stream::Event;
264 ///
265 /// // The event name is "start".
266 /// let event = Event::data("hi").event("start");
267 ///
268 /// // The event name is "then end", with `\n` replaced with ` `.
269 /// let event = Event::data("bye").event("then\nend");
270 /// ```
271 pub fn event<T: Into<Cow<'static, str>>>(mut self, event: T) -> Self {
272 self.event = Some(event.into());
273 self
274 }
275
276 /// Sets the value of the 'id' (last event ID) field.
277 ///
278 /// Event IDs may not contain new lines `\n` or carriage returns `\r`. If
279 /// `id` _does_ contain new lines or carriage returns, they are replaced
280 /// with spaces (` `) before being sent to the client.
281 ///
282 /// # Example
283 ///
284 /// ```rust
285 /// # extern crate rocket_community as rocket;
286 /// use rocket::response::stream::Event;
287 ///
288 /// // The event ID is "start".
289 /// let event = Event::data("hi").id("start");
290 ///
291 /// // The event ID is "then end", with `\n` replaced with ` `.
292 /// let event = Event::data("bye").id("then\nend");
293 /// ```
294 /// Sets the value of the 'id' field. It may not contain newlines.
295 pub fn id<T: Into<Cow<'static, str>>>(mut self, id: T) -> Self {
296 self.id = Some(id.into());
297 self
298 }
299
300 /// Sets or replaces the value of the `data` field.
301 ///
302 /// # Example
303 ///
304 /// ```rust
305 /// # extern crate rocket_community as rocket;
306 /// use rocket::response::stream::Event;
307 ///
308 /// // The data "hello" will be sent.
309 /// let event = Event::data("hi").with_data("hello");
310 ///
311 /// // The two below are equivalent.
312 /// let event = Event::comment("bye").with_data("goodbye");
313 /// let event = Event::data("goodbye").with_comment("bye");
314 /// ```
315 pub fn with_data<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
316 self.data = Some(data.into());
317 self
318 }
319
320 /// Sets or replaces the value of the `comment` field.
321 ///
322 /// # Example
323 ///
324 /// ```rust
325 /// # extern crate rocket_community as rocket;
326 /// use rocket::response::stream::Event;
327 ///
328 /// // The comment "🚀" will be sent.
329 /// let event = Event::comment("Rocket is great!").with_comment("🚀");
330 ///
331 /// // The two below are equivalent.
332 /// let event = Event::comment("bye").with_data("goodbye");
333 /// let event = Event::data("goodbye").with_comment("bye");
334 /// ```
335 pub fn with_comment<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
336 self.comment = Some(data.into());
337 self
338 }
339
340 /// Sets or replaces the value of the `retry` field.
341 ///
342 /// # Example
343 ///
344 /// ```rust
345 /// # extern crate rocket_community as rocket;
346 /// use rocket::response::stream::Event;
347 /// use rocket::tokio::time::Duration;
348 ///
349 /// // The reconnection will be set to 10 seconds.
350 /// let event = Event::retry(Duration::from_millis(500))
351 /// .with_retry(Duration::from_secs(10));
352 ///
353 /// // The two below are equivalent.
354 /// let event = Event::comment("bye").with_retry(Duration::from_millis(500));
355 /// let event = Event::retry(Duration::from_millis(500)).with_comment("bye");
356 /// ```
357 pub fn with_retry(mut self, period: Duration) -> Self {
358 self.retry = Some(period);
359 self
360 }
361
362 fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
363 let events = [
364 self.comment.map(|v| RawLinedEvent::many("", v)),
365 self.retry
366 .map(|r| RawLinedEvent::one("retry", format!("{}", r.as_millis()))),
367 self.id.map(|v| RawLinedEvent::one("id", v)),
368 self.event.map(|v| RawLinedEvent::one("event", v)),
369 self.data.map(|v| RawLinedEvent::many("data", v)),
370 Some(RawLinedEvent::raw("")),
371 ];
372
373 stream::iter(events).filter_map(|x| x)
374 }
375}
376
377/// A potentially infinite stream of Server-Sent [`Event`]s (SSE).
378///
379/// An `EventStream` can be constructed from any [`Stream`] of items of type
380/// `Event`. The stream can be constructed directly via [`EventStream::from()`]
381/// or through generator syntax via [`EventStream!`].
382///
383/// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
384///
385/// # Responder
386///
387/// `EventStream` is a (potentially infinite) responder. The response
388/// `Content-Type` is set to [`EventStream`](const@ContentType::EventStream).
389/// The body is [unsized](crate::response::Body#unsized), and values are sent as
390/// soon as they are yielded by the internal iterator.
391///
392/// ## Heartbeat
393///
394/// A heartbeat comment is injected into the internal stream and sent at a fixed
395/// interval. The comment is discarded by clients and serves only to keep the
396/// connection alive; it does not interfere with application data. The interval
397/// defaults to 30 seconds but can be adjusted with
398/// [`EventStream::heartbeat()`].
399///
400/// # Examples
401///
402/// Use [`EventStream!`] to yield an infinite series of "ping" SSE messages to
403/// the client, one per second:
404///
405/// ```rust
406/// # extern crate rocket_community as rocket;
407/// # use rocket::*;
408/// use rocket::response::stream::{Event, EventStream};
409/// use rocket::tokio::time::{self, Duration};
410///
411/// #[get("/events")]
412/// fn stream() -> EventStream![] {
413/// EventStream! {
414/// let mut interval = time::interval(Duration::from_secs(1));
415/// loop {
416/// yield Event::data("ping");
417/// interval.tick().await;
418/// }
419/// }
420/// }
421/// ```
422///
423/// Yield 9 events: 3 triplets of `retry`, `data`, and `comment` events:
424///
425/// ```rust
426/// # extern crate rocket_community as rocket;
427/// # use rocket::get;
428/// use rocket::response::stream::{Event, EventStream};
429/// use rocket::tokio::time::Duration;
430///
431/// #[get("/events")]
432/// fn events() -> EventStream![] {
433/// EventStream! {
434/// for i in 0..3 {
435/// yield Event::retry(Duration::from_secs(10));
436/// yield Event::data(format!("{}", i)).id("cat").event("bar");
437/// yield Event::comment("silly boy");
438/// }
439/// }
440/// }
441/// ```
442///
443/// The syntax of `EventStream!` as an expression is identical to that of
444/// [`stream!`](crate::response::stream::stream). For how to gracefully
445/// terminate an otherwise infinite stream, see [graceful
446/// shutdown](crate::response::stream#graceful-shutdown).
447///
448/// # Borrowing
449///
450/// If an `EventStream` contains a borrow, the extended type syntax
451/// `EventStream![Event + '_]` must be used:
452///
453/// ```rust
454/// # extern crate rocket_community as rocket;
455/// # use rocket::get;
456/// use rocket::State;
457/// use rocket::response::stream::{Event, EventStream};
458///
459/// #[get("/events")]
460/// fn events(ctxt: &State<bool>) -> EventStream![Event + '_] {
461/// EventStream! {
462/// // By using `ctxt` in the stream, the borrow is moved into it. Thus,
463/// // the stream object contains a borrow, prompting the '_ annotation.
464/// if *ctxt.inner() {
465/// yield Event::data("hi");
466/// }
467/// }
468/// }
469/// ```
470///
471/// See [`stream#borrowing`](crate::response::stream#borrowing) for further
472/// details on borrowing in streams.
473///
474/// # Pitfalls
475///
476/// Server-Sent Events are a rather simple mechanism, though there are some
477/// pitfalls to be aware of.
478///
479/// * **Buffering**
480///
481/// Protocol restrictions complicate implementing an API that does not
482/// buffer. As such, if you are sending _lots_ of data, consider sending the
483/// data via multiple data fields (with events to signal start and end).
484/// Alternatively, send _one_ event which instructs the client to fetch the
485/// data from another endpoint which in-turn streams the data.
486///
487/// * **Raw SSE requires UTF-8 data**
488///
489/// Only UTF-8 data can be sent via SSE. If you need to send arbitrary bytes,
490/// consider encoding it, for instance, as JSON using [`Event::json()`].
491/// Alternatively, as described before, use SSE as a notifier which alerts
492/// the client to fetch the data from elsewhere.
493///
494/// * **Raw SSE is Lossy**
495///
496/// Data sent via SSE cannot contain new lines `\n` or carriage returns `\r`
497/// due to interference with the line protocol.
498///
499/// The protocol allows expressing new lines as multiple messages, however,
500/// and Rocket automatically transforms a message of `foo\nbar` into two
501/// messages, `foo` and `bar`, so that they are reconstructed (automatically)
502/// as `foo\nbar` on the client-side. For messages that only contain new
503/// lines `\n`, the conversion is lossless.
504///
505/// However, the protocol has no mechanism for expressing carriage returns
506/// and thus it is not possible to send unencoded carriage returns via SSE.
507/// Rocket handles carriage returns like it handles new lines: it splits the
508/// data into multiple messages. Thus, a sequence of `\r\n` becomes `\n` at
509/// the client side. A single `\r` that is not part of an `\r\n` sequence
510/// also becomes `\n` at the client side. As a result, the message
511/// `foo\r\nbar\rbaz` is read as `foo\nbar\nbaz` at the client-side.
512///
513/// To send messages losslessly, they must be encoded first, for instance, by
514/// using [`Event::json()`].
515///
516/// * **Clients reconnect ad-infinitum**
517///
518/// The [SSE standard] stipulates: _"Clients will reconnect if the connection
519/// is closed; a client can be told to stop reconnecting using the HTTP 204
520/// No Content response code."_ As a result, clients will typically reconnect
521/// exhaustively until either they choose to disconnect or they receive a
522/// `204 No Content` response.
523///
524/// [SSE standard]: https://html.spec.whatwg.org/multipage/server-sent-events.html
525pub struct EventStream<S> {
526 stream: S,
527 heartbeat: Option<Duration>,
528}
529
530impl<S: Stream<Item = Event>> EventStream<S> {
531 /// Sets a "ping" interval for this `EventStream` to avoid connection
532 /// timeouts when no data is being transferred. The default `interval` is 30
533 /// seconds.
534 ///
535 /// The ping is implemented by sending an empty comment to the client every
536 /// `interval` seconds.
537 ///
538 /// # Example
539 ///
540 /// ```rust
541 /// # extern crate rocket_community as rocket;
542 /// # use rocket::get;
543 /// use rocket::response::stream::{Event, EventStream};
544 /// use rocket::tokio::time::Duration;
545 ///
546 /// #[get("/events")]
547 /// fn events() -> EventStream![] {
548 /// // Remove the default heartbeat.
549 /// # let event_stream = rocket::futures::stream::pending();
550 /// EventStream::from(event_stream).heartbeat(None);
551 ///
552 /// // Set the heartbeat interval to 15 seconds.
553 /// # let event_stream = rocket::futures::stream::pending();
554 /// EventStream::from(event_stream).heartbeat(Duration::from_secs(15));
555 ///
556 /// // Do the same but for a generated `EventStream`:
557 /// let stream = EventStream! {
558 /// yield Event::data("hello");
559 /// };
560 ///
561 /// stream.heartbeat(Duration::from_secs(15))
562 /// }
563 /// ```
564 pub fn heartbeat<H: Into<Option<Duration>>>(mut self, heartbeat: H) -> Self {
565 self.heartbeat = heartbeat.into();
566 self
567 }
568
569 fn heartbeat_stream(&self) -> impl Stream<Item = RawLinedEvent> {
570 self.heartbeat
571 .map(|beat| IntervalStream::new(interval(beat)))
572 .map(|stream| stream.map(|_| RawLinedEvent::raw(":")))
573 .map_or_else(|| Either::Right(stream::empty()), Either::Left)
574 }
575
576 fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
577 use futures::StreamExt;
578
579 let heartbeats = self.heartbeat_stream();
580 let events = StreamExt::map(self.stream, |e| e.into_stream()).flatten();
581 crate::util::join(events, heartbeats)
582 }
583
584 fn into_reader(self) -> impl AsyncRead {
585 ReaderStream::from(self.into_stream())
586 }
587}
588
589impl<S: Stream<Item = Event>> From<S> for EventStream<S> {
590 /// Creates an `EventStream` from a [`Stream`] of [`Event`]s.
591 ///
592 /// Use `EventStream::from()` to construct an `EventStream` from an already
593 /// existing stream. Otherwise, prefer to use [`EventStream!`].
594 ///
595 /// # Example
596 ///
597 /// ```rust
598 /// # extern crate rocket_community as rocket;
599 /// use rocket::response::stream::{Event, EventStream};
600 /// use rocket::futures::stream;
601 ///
602 /// let raw = stream::iter(vec![Event::data("a"), Event::data("b")]);
603 /// let stream = EventStream::from(raw);
604 /// ```
605 fn from(stream: S) -> Self {
606 EventStream {
607 stream,
608 heartbeat: Some(Duration::from_secs(30)),
609 }
610 }
611}
612
613impl<'r, S: Stream<Item = Event> + Send + 'r> Responder<'r, 'r> for EventStream<S> {
614 fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
615 Response::build()
616 .header(ContentType::EventStream)
617 .raw_header("Cache-Control", "no-cache")
618 .raw_header("Expires", "0")
619 .streamed_body(self.into_reader())
620 .ok()
621 }
622}
623
624crate::export! {
625 /// Type and stream expression macro for [`struct@EventStream`].
626 ///
627 /// See [`stream!`](crate::response::stream::stream) for the syntax
628 /// supported by this macro. In addition to that syntax, this macro can also
629 /// be called with no arguments, `EventStream![]`, as shorthand for
630 /// `EventStream![Event]`.
631 ///
632 /// See [`struct@EventStream`] and the [module level
633 /// docs](crate::response::stream#typed-streams) for usage details.
634 macro_rules! EventStream {
635 () => ($crate::_typed_stream!(EventStream, $crate::response::stream::Event));
636 ($($s:tt)*) => ($crate::_typed_stream!(EventStream, $($s)*));
637 }
638}
639
640#[cfg(test)]
641mod sse_tests {
642 use crate::response::stream::{stream, Event, EventStream, ReaderStream};
643 use futures::stream::Stream;
644 use tokio::io::AsyncReadExt;
645 use tokio::time::{self, Duration};
646
647 impl Event {
648 fn into_string(self) -> String {
649 crate::async_test(async move {
650 let mut string = String::new();
651 let mut reader = ReaderStream::from(self.into_stream());
652 reader
653 .read_to_string(&mut string)
654 .await
655 .expect("event -> string");
656 string
657 })
658 }
659 }
660
661 impl<S: Stream<Item = Event>> EventStream<S> {
662 fn into_string(self) -> String {
663 use std::pin::pin;
664
665 crate::async_test(async move {
666 let mut string = String::new();
667 let mut reader = pin!(self.into_reader());
668 reader
669 .read_to_string(&mut string)
670 .await
671 .expect("event stream -> string");
672 string
673 })
674 }
675 }
676
677 #[test]
678 fn test_event_data() {
679 let event = Event::data("a\nb");
680 assert_eq!(event.into_string(), "data:a\ndata:b\n\n");
681
682 let event = Event::data("a\n");
683 assert_eq!(event.into_string(), "data:a\ndata:\n\n");
684
685 let event = Event::data("cats make me happy!");
686 assert_eq!(event.into_string(), "data:cats make me happy!\n\n");
687
688 let event = Event::data("in the\njungle\nthe mighty\njungle");
689 assert_eq!(
690 event.into_string(),
691 "data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n"
692 );
693
694 let event = Event::data("in the\njungle\r\nthe mighty\rjungle");
695 assert_eq!(
696 event.into_string(),
697 "data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n"
698 );
699
700 let event = Event::data("\nb\n");
701 assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
702
703 let event = Event::data("\r\nb\n");
704 assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
705
706 let event = Event::data("\r\nb\r\n");
707 assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
708
709 let event = Event::data("\n\nb\n");
710 assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
711
712 let event = Event::data("\n\rb\n");
713 assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
714
715 let event = Event::data("\n\rb\r");
716 assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
717
718 let event = Event::comment("\n\rb\r");
719 assert_eq!(event.into_string(), ":\n:\n:b\n:\n\n");
720
721 let event = Event::data("\n\n\n");
722 assert_eq!(event.into_string(), "data:\ndata:\ndata:\ndata:\n\n");
723
724 let event = Event::data("\n");
725 assert_eq!(event.into_string(), "data:\ndata:\n\n");
726
727 let event = Event::data("");
728 assert_eq!(event.into_string(), "data:\n\n");
729 }
730
731 #[test]
732 fn test_event_fields() {
733 let event = Event::data("foo").id("moo");
734 assert_eq!(event.into_string(), "id:moo\ndata:foo\n\n");
735
736 let event = Event::data("foo")
737 .id("moo")
738 .with_retry(Duration::from_secs(45));
739 assert_eq!(event.into_string(), "retry:45000\nid:moo\ndata:foo\n\n");
740
741 let event = Event::data("foo\nbar")
742 .id("moo")
743 .with_retry(Duration::from_secs(45));
744 assert_eq!(
745 event.into_string(),
746 "retry:45000\nid:moo\ndata:foo\ndata:bar\n\n"
747 );
748
749 let event = Event::retry(Duration::from_secs(45));
750 assert_eq!(event.into_string(), "retry:45000\n\n");
751
752 let event = Event::comment("incoming data...");
753 assert_eq!(event.into_string(), ":incoming data...\n\n");
754
755 let event = Event::data("foo").id("moo").with_comment("cows, ey?");
756 assert_eq!(event.into_string(), ":cows, ey?\nid:moo\ndata:foo\n\n");
757
758 let event = Event::data("foo\nbar")
759 .id("moo")
760 .event("milk")
761 .with_retry(Duration::from_secs(3));
762
763 assert_eq!(
764 event.into_string(),
765 "retry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\n\n"
766 );
767
768 let event = Event::data("foo")
769 .id("moo")
770 .event("milk")
771 .with_comment("??")
772 .with_retry(Duration::from_secs(3));
773
774 assert_eq!(
775 event.into_string(),
776 ":??\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n"
777 );
778
779 let event = Event::data("foo")
780 .id("moo")
781 .event("milk")
782 .with_comment("?\n?")
783 .with_retry(Duration::from_secs(3));
784
785 assert_eq!(
786 event.into_string(),
787 ":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n"
788 );
789
790 let event = Event::data("foo\r\nbar\nbaz")
791 .id("moo")
792 .event("milk")
793 .with_comment("?\n?")
794 .with_retry(Duration::from_secs(3));
795
796 assert_eq!(
797 event.into_string(),
798 ":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\ndata:baz\n\n"
799 );
800 }
801
802 #[test]
803 fn test_bad_chars() {
804 let event = Event::data("foo").id("dead\nbeef").event("m\noo");
805 assert_eq!(
806 event.into_string(),
807 "id:dead beef\nevent:m oo\ndata:foo\n\n"
808 );
809
810 let event = Event::data("f\no").id("d\r\nbe\rf").event("m\n\r");
811 assert_eq!(
812 event.into_string(),
813 "id:d be f\nevent:m \ndata:f\ndata:o\n\n"
814 );
815
816 let event = Event::data("f\no").id("\r\n\n\r\n\r\r").event("\n\rb");
817 assert_eq!(
818 event.into_string(),
819 "id: \nevent: b\ndata:f\ndata:o\n\n"
820 );
821 }
822
823 #[test]
824 fn test_event_stream() {
825 use futures::stream::iter;
826
827 let stream = EventStream::from(iter(vec![Event::data("foo")]));
828 assert_eq!(stream.into_string().replace(":\n\n", ""), "data:foo\n\n");
829
830 let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
831 assert_eq!(
832 stream.into_string().replace(":\n\n", ""),
833 "data:a\n\ndata:b\n\n"
834 );
835
836 let stream = EventStream::from(iter(vec![
837 Event::data("a\nb"),
838 Event::data("b"),
839 Event::data("c\n\nd"),
840 Event::data("e"),
841 ]));
842
843 assert_eq!(
844 stream.into_string().replace(":\n\n", ""),
845 "data:a\ndata:b\n\ndata:b\n\ndata:c\ndata:\ndata:d\n\ndata:e\n\n"
846 );
847 }
848
849 #[test]
850 fn test_heartbeat() {
851 use futures::future::ready;
852 use futures::stream::{iter, once, StreamExt};
853
854 const HEARTBEAT: &str = ":\n";
855
856 // Set a heartbeat interval of 250ms. Send nothing for 600ms. We should
857 // get 2 or 3 heartbeats, the latter if one is sent eagerly. Maybe 4.
858 let raw = stream!(time::sleep(Duration::from_millis(600)).await;).map(|_| unreachable!());
859
860 let string = EventStream::from(raw)
861 .heartbeat(Duration::from_millis(250))
862 .into_string();
863
864 let heartbeats = string.matches(HEARTBEAT).count();
865 assert!(
866 heartbeats >= 2 && heartbeats <= 4,
867 "got {} beat(s)",
868 heartbeats
869 );
870
871 let stream = EventStream! {
872 time::sleep(Duration::from_millis(250)).await;
873 yield Event::data("foo");
874 time::sleep(Duration::from_millis(250)).await;
875 yield Event::data("bar");
876 };
877
878 // We expect: foo\n\n [heartbeat] bar\n\n [maybe heartbeat].
879 let string = stream.heartbeat(Duration::from_millis(350)).into_string();
880 let heartbeats = string.matches(HEARTBEAT).count();
881 assert!(
882 heartbeats >= 1 && heartbeats <= 3,
883 "got {} beat(s)",
884 heartbeats
885 );
886 assert!(string.contains("data:foo\n\n"), "string = {:?}", string);
887 assert!(string.contains("data:bar\n\n"), "string = {:?}", string);
888
889 // We shouldn't send a heartbeat if a message is immediately available.
890 let stream = EventStream::from(once(ready(Event::data("hello"))));
891 let string = stream.heartbeat(Duration::from_secs(1)).into_string();
892 assert_eq!(string, "data:hello\n\n", "string = {:?}", string);
893
894 // It's okay if we do it with two, though.
895 let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
896 let string = stream.heartbeat(Duration::from_secs(1)).into_string();
897 let heartbeats = string.matches(HEARTBEAT).count();
898 assert!(heartbeats <= 1);
899 assert!(string.contains("data:a\n\n"), "string = {:?}", string);
900 assert!(string.contains("data:b\n\n"), "string = {:?}", string);
901 }
902}