use std::borrow::Cow;
use tokio::io::AsyncRead;
use tokio::time::Duration;
use futures::stream::{self, Stream, StreamExt};
use futures::future::ready;
use crate::request::Request;
use crate::response::{self, Response, Responder, stream::{ReaderStream, RawLinedEvent}};
use crate::http::ContentType;
#[derive(Clone, Eq, PartialEq, Hash, Debug)]
pub struct Event {
comment: Option<Cow<'static, str>>,
retry: Option<Duration>,
id: Option<Cow<'static, str>>,
event: Option<Cow<'static, str>>,
data: Option<Cow<'static, str>>,
}
impl Event {
fn new() -> Self {
Event { comment: None, retry: None, id: None, event: None, data: None, }
}
pub fn empty() -> Self {
Event::data("")
}
#[cfg(feature = "json")]
#[cfg_attr(nightly, doc(cfg(feature = "json")))]
pub fn json<T: serde::Serialize>(data: &T) -> Self {
let string = serde_json::to_string(data).unwrap_or_default();
Self::data(string)
}
pub fn data<T: Into<Cow<'static, str>>>(data: T) -> Self {
Self { data: Some(data.into()), ..Event::new() }
}
pub fn comment<T: Into<Cow<'static, str>>>(data: T) -> Self {
Self { comment: Some(data.into()), ..Event::new() }
}
pub fn retry(period: Duration) -> Self {
Self { retry: Some(period), ..Event::new() }
}
pub fn event<T: Into<Cow<'static, str>>>(mut self, event: T) -> Self {
self.event = Some(event.into());
self
}
pub fn id<T: Into<Cow<'static, str>>>(mut self, id: T) -> Self {
self.id = Some(id.into());
self
}
pub fn with_data<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
self.data = Some(data.into());
self
}
pub fn with_comment<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
self.comment = Some(data.into());
self
}
pub fn with_retry(mut self, period: Duration) -> Self {
self.retry = Some(period);
self
}
fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
let events = [
self.comment.map(|v| RawLinedEvent::many("", v)),
self.retry.map(|r| RawLinedEvent::one("retry", format!("{}", r.as_millis()))),
self.id.map(|v| RawLinedEvent::one("id", v)),
self.event.map(|v| RawLinedEvent::one("event", v)),
self.data.map(|v| RawLinedEvent::many("data", v)),
Some(RawLinedEvent::raw("")),
];
stream::iter(events).filter_map(ready)
}
}
pub struct EventStream<S> {
stream: S,
heartbeat: Option<Duration>,
}
impl<S: Stream<Item = Event>> EventStream<S> {
pub fn heartbeat<H: Into<Option<Duration>>>(mut self, heartbeat: H) -> Self {
self.heartbeat = heartbeat.into();
self
}
fn heartbeat_stream(&self) -> Option<impl Stream<Item = RawLinedEvent>> {
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
self.heartbeat
.map(|beat| IntervalStream::new(interval(beat)))
.map(|stream| stream.map(|_| RawLinedEvent::raw(":")))
}
fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
use futures::future::Either;
use crate::ext::StreamExt;
let heartbeat_stream = self.heartbeat_stream();
let raw_events = self.stream.map(|e| e.into_stream()).flatten();
match heartbeat_stream {
Some(heartbeat) => Either::Left(raw_events.join(heartbeat)),
None => Either::Right(raw_events)
}
}
fn into_reader(self) -> impl AsyncRead {
ReaderStream::from(self.into_stream())
}
}
impl<S: Stream<Item = Event>> From<S> for EventStream<S> {
fn from(stream: S) -> Self {
EventStream { stream, heartbeat: Some(Duration::from_secs(30)), }
}
}
impl<'r, S: Stream<Item = Event> + Send + 'r> Responder<'r, 'r> for EventStream<S> {
fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
Response::build()
.header(ContentType::EventStream)
.raw_header("Cache-Control", "no-cache")
.raw_header("Expires", "0")
.streamed_body(self.into_reader())
.ok()
}
}
crate::export! {
macro_rules! EventStream {
() => ($crate::_typed_stream!(EventStream, $crate::response::stream::Event));
($($s:tt)*) => ($crate::_typed_stream!(EventStream, $($s)*));
}
}
#[cfg(test)]
mod sse_tests {
use tokio::io::AsyncReadExt;
use tokio::time::{self, Duration};
use futures::stream::Stream;
use crate::response::stream::{stream, Event, EventStream, ReaderStream};
impl Event {
fn into_string(self) -> String {
crate::async_test(async move {
let mut string = String::new();
let mut reader = ReaderStream::from(self.into_stream());
reader.read_to_string(&mut string).await.expect("event -> string");
string
})
}
}
impl<S: Stream<Item = Event>> EventStream<S> {
fn into_string(self) -> String {
crate::async_test(async move {
let mut string = String::new();
let reader = self.into_reader();
tokio::pin!(reader);
reader.read_to_string(&mut string).await.expect("event stream -> string");
string
})
}
}
#[test]
fn test_event_data() {
let event = Event::data("a\nb");
assert_eq!(event.into_string(), "data:a\ndata:b\n\n");
let event = Event::data("a\n");
assert_eq!(event.into_string(), "data:a\ndata:\n\n");
let event = Event::data("cats make me happy!");
assert_eq!(event.into_string(), "data:cats make me happy!\n\n");
let event = Event::data("in the\njungle\nthe mighty\njungle");
assert_eq!(event.into_string(),
"data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n");
let event = Event::data("in the\njungle\r\nthe mighty\rjungle");
assert_eq!(event.into_string(),
"data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n");
let event = Event::data("\nb\n");
assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
let event = Event::data("\r\nb\n");
assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
let event = Event::data("\r\nb\r\n");
assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
let event = Event::data("\n\nb\n");
assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
let event = Event::data("\n\rb\n");
assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
let event = Event::data("\n\rb\r");
assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
let event = Event::comment("\n\rb\r");
assert_eq!(event.into_string(), ":\n:\n:b\n:\n\n");
let event = Event::data("\n\n\n");
assert_eq!(event.into_string(), "data:\ndata:\ndata:\ndata:\n\n");
let event = Event::data("\n");
assert_eq!(event.into_string(), "data:\ndata:\n\n");
let event = Event::data("");
assert_eq!(event.into_string(), "data:\n\n");
}
#[test]
fn test_event_fields() {
let event = Event::data("foo").id("moo");
assert_eq!(event.into_string(), "id:moo\ndata:foo\n\n");
let event = Event::data("foo").id("moo").with_retry(Duration::from_secs(45));
assert_eq!(event.into_string(), "retry:45000\nid:moo\ndata:foo\n\n");
let event = Event::data("foo\nbar").id("moo").with_retry(Duration::from_secs(45));
assert_eq!(event.into_string(), "retry:45000\nid:moo\ndata:foo\ndata:bar\n\n");
let event = Event::retry(Duration::from_secs(45));
assert_eq!(event.into_string(), "retry:45000\n\n");
let event = Event::comment("incoming data...");
assert_eq!(event.into_string(), ":incoming data...\n\n");
let event = Event::data("foo").id("moo").with_comment("cows, ey?");
assert_eq!(event.into_string(), ":cows, ey?\nid:moo\ndata:foo\n\n");
let event = Event::data("foo\nbar")
.id("moo")
.event("milk")
.with_retry(Duration::from_secs(3));
assert_eq!(event.into_string(), "retry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\n\n");
let event = Event::data("foo")
.id("moo")
.event("milk")
.with_comment("??")
.with_retry(Duration::from_secs(3));
assert_eq!(event.into_string(), ":??\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n");
let event = Event::data("foo")
.id("moo")
.event("milk")
.with_comment("?\n?")
.with_retry(Duration::from_secs(3));
assert_eq!(event.into_string(), ":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n");
let event = Event::data("foo\r\nbar\nbaz")
.id("moo")
.event("milk")
.with_comment("?\n?")
.with_retry(Duration::from_secs(3));
assert_eq!(event.into_string(),
":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\ndata:baz\n\n");
}
#[test]
fn test_bad_chars() {
let event = Event::data("foo").id("dead\nbeef").event("m\noo");
assert_eq!(event.into_string(), "id:dead beef\nevent:m oo\ndata:foo\n\n");
let event = Event::data("f\no").id("d\r\nbe\rf").event("m\n\r");
assert_eq!(event.into_string(), "id:d be f\nevent:m \ndata:f\ndata:o\n\n");
let event = Event::data("f\no").id("\r\n\n\r\n\r\r").event("\n\rb");
assert_eq!(event.into_string(), "id: \nevent: b\ndata:f\ndata:o\n\n");
}
#[test]
fn test_event_stream() {
use futures::stream::iter;
let stream = EventStream::from(iter(vec![Event::data("foo")]));
assert_eq!(stream.into_string().replace(":\n\n", ""), "data:foo\n\n");
let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
assert_eq!(stream.into_string().replace(":\n\n", ""), "data:a\n\ndata:b\n\n");
let stream = EventStream::from(iter(vec![
Event::data("a\nb"),
Event::data("b"),
Event::data("c\n\nd"),
Event::data("e"),
]));
assert_eq!(stream.into_string().replace(":\n\n", ""),
"data:a\ndata:b\n\ndata:b\n\ndata:c\ndata:\ndata:d\n\ndata:e\n\n");
}
#[test]
fn test_heartbeat() {
use futures::future::ready;
use futures::stream::{once, iter, StreamExt};
const HEARTBEAT: &str = ":\n";
let raw = stream!(time::sleep(Duration::from_millis(600)).await;)
.map(|_| unreachable!());
let string = EventStream::from(raw)
.heartbeat(Duration::from_millis(250))
.into_string();
let heartbeats = string.matches(HEARTBEAT).count();
assert!(heartbeats >= 2 && heartbeats <= 4, "got {} beat(s)", heartbeats);
let stream = EventStream! {
time::sleep(Duration::from_millis(250)).await;
yield Event::data("foo");
time::sleep(Duration::from_millis(250)).await;
yield Event::data("bar");
};
let string = stream.heartbeat(Duration::from_millis(350)).into_string();
let heartbeats = string.matches(HEARTBEAT).count();
assert!(heartbeats >= 1 && heartbeats <= 3, "got {} beat(s)", heartbeats);
assert!(string.contains("data:foo\n\n"), "string = {:?}", string);
assert!(string.contains("data:bar\n\n"), "string = {:?}", string);
let stream = EventStream::from(once(ready(Event::data("hello"))));
let string = stream.heartbeat(Duration::from_secs(1)).into_string();
assert_eq!(string, "data:hello\n\n", "string = {:?}", string);
let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
let string = stream.heartbeat(Duration::from_secs(1)).into_string();
let heartbeats = string.matches(HEARTBEAT).count();
assert!(heartbeats <= 1);
assert!(string.contains("data:a\n\n"), "string = {:?}", string);
assert!(string.contains("data:b\n\n"), "string = {:?}", string);
}
}