use std::borrow::Cow;
use futures::{
future::Either,
stream::{self, Stream},
};
use tokio::io::AsyncRead;
use tokio::time::{interval, Duration};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use crate::http::ContentType;
use crate::request::Request;
use crate::response::{
self,
stream::{RawLinedEvent, ReaderStream},
Responder, Response,
};
#[derive(Clone, Eq, PartialEq, Hash, Debug)]
pub struct Event {
comment: Option<Cow<'static, str>>,
retry: Option<Duration>,
id: Option<Cow<'static, str>>,
event: Option<Cow<'static, str>>,
data: Option<Cow<'static, str>>,
}
impl Event {
fn new() -> Self {
Event {
comment: None,
retry: None,
id: None,
event: None,
data: None,
}
}
pub fn empty() -> Self {
Event::data("")
}
#[cfg(feature = "json")]
#[cfg_attr(nightly, doc(cfg(feature = "json")))]
pub fn json<T: serde::Serialize>(data: &T) -> Self {
let string = serde_json::to_string(data).unwrap_or_default();
Self::data(string)
}
pub fn data<T: Into<Cow<'static, str>>>(data: T) -> Self {
Self {
data: Some(data.into()),
..Event::new()
}
}
pub fn comment<T: Into<Cow<'static, str>>>(data: T) -> Self {
Self {
comment: Some(data.into()),
..Event::new()
}
}
pub fn retry(period: Duration) -> Self {
Self {
retry: Some(period),
..Event::new()
}
}
pub fn event<T: Into<Cow<'static, str>>>(mut self, event: T) -> Self {
self.event = Some(event.into());
self
}
pub fn id<T: Into<Cow<'static, str>>>(mut self, id: T) -> Self {
self.id = Some(id.into());
self
}
pub fn with_data<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
self.data = Some(data.into());
self
}
pub fn with_comment<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
self.comment = Some(data.into());
self
}
pub fn with_retry(mut self, period: Duration) -> Self {
self.retry = Some(period);
self
}
fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
let events = [
self.comment.map(|v| RawLinedEvent::many("", v)),
self.retry
.map(|r| RawLinedEvent::one("retry", format!("{}", r.as_millis()))),
self.id.map(|v| RawLinedEvent::one("id", v)),
self.event.map(|v| RawLinedEvent::one("event", v)),
self.data.map(|v| RawLinedEvent::many("data", v)),
Some(RawLinedEvent::raw("")),
];
stream::iter(events).filter_map(|x| x)
}
}
pub struct EventStream<S> {
stream: S,
heartbeat: Option<Duration>,
}
impl<S: Stream<Item = Event>> EventStream<S> {
pub fn heartbeat<H: Into<Option<Duration>>>(mut self, heartbeat: H) -> Self {
self.heartbeat = heartbeat.into();
self
}
fn heartbeat_stream(&self) -> impl Stream<Item = RawLinedEvent> {
self.heartbeat
.map(|beat| IntervalStream::new(interval(beat)))
.map(|stream| stream.map(|_| RawLinedEvent::raw(":")))
.map_or_else(|| Either::Right(stream::empty()), Either::Left)
}
fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
use futures::StreamExt;
let heartbeats = self.heartbeat_stream();
let events = StreamExt::map(self.stream, |e| e.into_stream()).flatten();
crate::util::join(events, heartbeats)
}
fn into_reader(self) -> impl AsyncRead {
ReaderStream::from(self.into_stream())
}
}
impl<S: Stream<Item = Event>> From<S> for EventStream<S> {
fn from(stream: S) -> Self {
EventStream {
stream,
heartbeat: Some(Duration::from_secs(30)),
}
}
}
impl<'r, S: Stream<Item = Event> + Send + 'r> Responder<'r, 'r> for EventStream<S> {
fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
Response::build()
.header(ContentType::EventStream)
.raw_header("Cache-Control", "no-cache")
.raw_header("Expires", "0")
.streamed_body(self.into_reader())
.ok()
}
}
crate::export! {
macro_rules! EventStream {
() => ($crate::_typed_stream!(EventStream, $crate::response::stream::Event));
($($s:tt)*) => ($crate::_typed_stream!(EventStream, $($s)*));
}
}
#[cfg(test)]
mod sse_tests {
use crate::response::stream::{stream, Event, EventStream, ReaderStream};
use futures::stream::Stream;
use tokio::io::AsyncReadExt;
use tokio::time::{self, Duration};
impl Event {
fn into_string(self) -> String {
crate::async_test(async move {
let mut string = String::new();
let mut reader = ReaderStream::from(self.into_stream());
reader
.read_to_string(&mut string)
.await
.expect("event -> string");
string
})
}
}
impl<S: Stream<Item = Event>> EventStream<S> {
fn into_string(self) -> String {
use std::pin::pin;
crate::async_test(async move {
let mut string = String::new();
let mut reader = pin!(self.into_reader());
reader
.read_to_string(&mut string)
.await
.expect("event stream -> string");
string
})
}
}
#[test]
fn test_event_data() {
let event = Event::data("a\nb");
assert_eq!(event.into_string(), "data:a\ndata:b\n\n");
let event = Event::data("a\n");
assert_eq!(event.into_string(), "data:a\ndata:\n\n");
let event = Event::data("cats make me happy!");
assert_eq!(event.into_string(), "data:cats make me happy!\n\n");
let event = Event::data("in the\njungle\nthe mighty\njungle");
assert_eq!(
event.into_string(),
"data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n"
);
let event = Event::data("in the\njungle\r\nthe mighty\rjungle");
assert_eq!(
event.into_string(),
"data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n"
);
let event = Event::data("\nb\n");
assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
let event = Event::data("\r\nb\n");
assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
let event = Event::data("\r\nb\r\n");
assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
let event = Event::data("\n\nb\n");
assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
let event = Event::data("\n\rb\n");
assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
let event = Event::data("\n\rb\r");
assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
let event = Event::comment("\n\rb\r");
assert_eq!(event.into_string(), ":\n:\n:b\n:\n\n");
let event = Event::data("\n\n\n");
assert_eq!(event.into_string(), "data:\ndata:\ndata:\ndata:\n\n");
let event = Event::data("\n");
assert_eq!(event.into_string(), "data:\ndata:\n\n");
let event = Event::data("");
assert_eq!(event.into_string(), "data:\n\n");
}
#[test]
fn test_event_fields() {
let event = Event::data("foo").id("moo");
assert_eq!(event.into_string(), "id:moo\ndata:foo\n\n");
let event = Event::data("foo")
.id("moo")
.with_retry(Duration::from_secs(45));
assert_eq!(event.into_string(), "retry:45000\nid:moo\ndata:foo\n\n");
let event = Event::data("foo\nbar")
.id("moo")
.with_retry(Duration::from_secs(45));
assert_eq!(
event.into_string(),
"retry:45000\nid:moo\ndata:foo\ndata:bar\n\n"
);
let event = Event::retry(Duration::from_secs(45));
assert_eq!(event.into_string(), "retry:45000\n\n");
let event = Event::comment("incoming data...");
assert_eq!(event.into_string(), ":incoming data...\n\n");
let event = Event::data("foo").id("moo").with_comment("cows, ey?");
assert_eq!(event.into_string(), ":cows, ey?\nid:moo\ndata:foo\n\n");
let event = Event::data("foo\nbar")
.id("moo")
.event("milk")
.with_retry(Duration::from_secs(3));
assert_eq!(
event.into_string(),
"retry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\n\n"
);
let event = Event::data("foo")
.id("moo")
.event("milk")
.with_comment("??")
.with_retry(Duration::from_secs(3));
assert_eq!(
event.into_string(),
":??\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n"
);
let event = Event::data("foo")
.id("moo")
.event("milk")
.with_comment("?\n?")
.with_retry(Duration::from_secs(3));
assert_eq!(
event.into_string(),
":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n"
);
let event = Event::data("foo\r\nbar\nbaz")
.id("moo")
.event("milk")
.with_comment("?\n?")
.with_retry(Duration::from_secs(3));
assert_eq!(
event.into_string(),
":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\ndata:baz\n\n"
);
}
#[test]
fn test_bad_chars() {
let event = Event::data("foo").id("dead\nbeef").event("m\noo");
assert_eq!(
event.into_string(),
"id:dead beef\nevent:m oo\ndata:foo\n\n"
);
let event = Event::data("f\no").id("d\r\nbe\rf").event("m\n\r");
assert_eq!(
event.into_string(),
"id:d be f\nevent:m \ndata:f\ndata:o\n\n"
);
let event = Event::data("f\no").id("\r\n\n\r\n\r\r").event("\n\rb");
assert_eq!(
event.into_string(),
"id: \nevent: b\ndata:f\ndata:o\n\n"
);
}
#[test]
fn test_event_stream() {
use futures::stream::iter;
let stream = EventStream::from(iter(vec![Event::data("foo")]));
assert_eq!(stream.into_string().replace(":\n\n", ""), "data:foo\n\n");
let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
assert_eq!(
stream.into_string().replace(":\n\n", ""),
"data:a\n\ndata:b\n\n"
);
let stream = EventStream::from(iter(vec![
Event::data("a\nb"),
Event::data("b"),
Event::data("c\n\nd"),
Event::data("e"),
]));
assert_eq!(
stream.into_string().replace(":\n\n", ""),
"data:a\ndata:b\n\ndata:b\n\ndata:c\ndata:\ndata:d\n\ndata:e\n\n"
);
}
#[test]
fn test_heartbeat() {
use futures::future::ready;
use futures::stream::{iter, once, StreamExt};
const HEARTBEAT: &str = ":\n";
let raw = stream!(time::sleep(Duration::from_millis(600)).await;).map(|_| unreachable!());
let string = EventStream::from(raw)
.heartbeat(Duration::from_millis(250))
.into_string();
let heartbeats = string.matches(HEARTBEAT).count();
assert!(
heartbeats >= 2 && heartbeats <= 4,
"got {} beat(s)",
heartbeats
);
let stream = EventStream! {
time::sleep(Duration::from_millis(250)).await;
yield Event::data("foo");
time::sleep(Duration::from_millis(250)).await;
yield Event::data("bar");
};
let string = stream.heartbeat(Duration::from_millis(350)).into_string();
let heartbeats = string.matches(HEARTBEAT).count();
assert!(
heartbeats >= 1 && heartbeats <= 3,
"got {} beat(s)",
heartbeats
);
assert!(string.contains("data:foo\n\n"), "string = {:?}", string);
assert!(string.contains("data:bar\n\n"), "string = {:?}", string);
let stream = EventStream::from(once(ready(Event::data("hello"))));
let string = stream.heartbeat(Duration::from_secs(1)).into_string();
assert_eq!(string, "data:hello\n\n", "string = {:?}", string);
let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
let string = stream.heartbeat(Duration::from_secs(1)).into_string();
let heartbeats = string.matches(HEARTBEAT).count();
assert!(heartbeats <= 1);
assert!(string.contains("data:a\n\n"), "string = {:?}", string);
assert!(string.contains("data:b\n\n"), "string = {:?}", string);
}
}