#![deny(future_incompatible)]
#![deny(nonstandard_style)]
#![deny(rust_2018_idioms)]
#![deny(unsafe_code)]
#![warn(missing_docs)]
#![warn(unused)]
use futures_core::stream::Stream;
use futures_timer::Delay;
use log::trace;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
pub use surf::Url;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Event {
pub event: String,
pub data: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReadyState {
Connecting = 0,
Open = 1,
}
#[derive(Debug)]
pub enum Error {
Retry,
ConnectionError(surf::Error),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Retry => write!(f, "the connection was closed by the server, retrying."),
Self::ConnectionError(inner) => inner.fmt(f),
}
}
}
impl std::error::Error for Error {}
struct DynDebugFuture<T>(Pin<Box<dyn Future<Output = T>>>);
impl<T> Future for DynDebugFuture<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}
impl<T> fmt::Debug for DynDebugFuture<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[opaque Future type]")
}
}
type EventStream = sse_codec::DecodeStream<surf::Response>;
type ConnectionFuture = DynDebugFuture<Result<surf::Response, surf::Error>>;
#[derive(Debug)]
#[allow(clippy::large_enum_variant)] enum ConnectState {
Streaming(EventStream),
Connecting(ConnectionFuture),
WaitingToRetry(Delay),
Idle,
}
#[derive(Debug)]
pub struct EventSource {
client: surf::Client,
url: Url,
retry_time: Duration,
last_event_id: Option<String>,
state: ConnectState,
}
impl EventSource {
pub fn new(url: Url) -> Self {
Self::with_client(surf::client(), url)
}
pub fn with_client(client: surf::Client, url: Url) -> Self {
let mut event_source = Self {
client,
url,
retry_time: Duration::from_secs(3),
last_event_id: None,
state: ConnectState::Idle,
};
event_source.start_connect();
event_source
}
pub fn url(&self) -> &Url {
&self.url
}
pub fn ready_state(&self) -> ReadyState {
match self.state {
ConnectState::Streaming(_) => ReadyState::Open,
ConnectState::Connecting(_) | ConnectState::WaitingToRetry(_) => ReadyState::Connecting,
ConnectState::Idle => unreachable!("ReadyState::Closed"),
}
}
pub fn retry_time(&self) -> Duration {
self.retry_time
}
pub fn set_retry_time(&mut self, duration: Duration) {
self.retry_time = duration;
}
fn start_connect(&mut self) {
trace!(target: "surf-sse", "connecting to {}", self.url);
let mut request = surf::get(self.url.clone()).header("Accept", "text/event-stream");
if let Some(id) = &self.last_event_id {
request = request.header("Last-Event-ID", id.as_str());
}
let client = self.client.clone();
let request_future = Box::pin(async move { client.send(request).await });
self.state = ConnectState::Connecting(DynDebugFuture(request_future));
}
fn start_retry(&mut self) {
trace!(target: "surf-sse", "connection to {}, retrying in {:?}", self.url, self.retry_time);
self.state = ConnectState::WaitingToRetry(Delay::new(self.retry_time));
}
fn start_receiving(&mut self, response: surf::Response) {
trace!(target: "surf-sse", "connected to {}, now waiting for events", self.url);
self.state = ConnectState::Streaming(sse_codec::decode_stream(response));
}
}
impl Stream for EventSource {
type Item = Result<Event, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut self.state {
ConnectState::Streaming(event_stream) => {
match Pin::new(event_stream).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(Ok(event))) => match event {
sse_codec::Event::Message { id, event, data } => {
self.last_event_id = id;
Poll::Ready(Some(Ok(Event { event, data })))
}
sse_codec::Event::Retry { retry } => {
self.retry_time = Duration::from_millis(retry);
Poll::Pending
}
},
Poll::Ready(Some(Err(_))) => {
Poll::Pending
}
Poll::Ready(None) => {
self.start_retry();
let error = Error::Retry;
Poll::Ready(Some(Err(error)))
}
}
}
ConnectState::WaitingToRetry(timer) => {
match Pin::new(timer).poll(cx) {
Poll::Pending => (),
Poll::Ready(()) => self.start_connect(),
}
Poll::Pending
}
ConnectState::Connecting(connecting) => {
match Pin::new(connecting).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(response)) if response.status() == 204 => Poll::Ready(None),
Poll::Ready(Ok(response)) => {
self.start_receiving(response);
self.poll_next(cx)
}
Poll::Ready(Err(error)) => {
self.start_retry();
let error = Error::ConnectionError(error);
Poll::Ready(Some(Err(error)))
}
}
}
ConnectState::Idle => unreachable!(),
}
}
}
pub trait ClientExt {
fn connect_event_source(&self, url: Url) -> EventSource;
}
impl ClientExt for surf::Client {
fn connect_event_source(&self, url: Url) -> EventSource {
EventSource::with_client(self.clone(), url)
}
}