use super::MessageEvent;
use crate::events::EventListener;
use crate::prelude::*;
use crate::utils::ResultExt;
use async_channel::{self as channel, Receiver, Sender};
use futures_core::Stream;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{io, pin::Pin, task::Poll};
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub enum ReadyState {
Connecting,
Open,
Closed,
}
#[pin_project::pin_project(PinnedDrop)]
#[derive(Debug)]
pub struct EventSource {
inner: web_sys::EventSource,
url: String,
err_listener: EventListener,
sender: Sender<MessageEvent>,
#[pin]
receiver: Receiver<MessageEvent>,
listeners: Vec<EventListener>,
reconnect: Arc<AtomicBool>,
}
impl EventSource {
pub async fn connect<S>(url: &str, interests: &[S]) -> io::Result<Self>
where
S: AsRef<str>,
{
crate::log::debug!("EventSource({}): connection initiated", url);
let url = url.to_owned();
let inner = web_sys::EventSource::new(&url).err_kind(io::ErrorKind::InvalidInput)?;
let reconnect = Arc::new(AtomicBool::new(true));
let url2 = url.clone();
let inner2 = inner.clone();
let reconnect2 = reconnect.clone();
let err_listener = inner.on_with("error", move |_| {
crate::log::debug!("EventSource({}): remote closed", url2);
if !reconnect2.load(Ordering::SeqCst) {
crate::log::debug!("EventSource({}): instance closed", url2);
inner2.close();
} else {
crate::log::debug!("EventSource({}): instance reconnecting", url2);
}
});
inner.once("open").await;
let (sender, receiver) = channel::unbounded();
let mut this = Self {
url,
inner,
err_listener,
sender,
receiver,
reconnect,
listeners: vec![],
};
for interest in interests {
let s = interest.as_ref();
this.register(s);
}
this.check_connection()?;
crate::log::debug!("EventSource({}): connection established", this.url);
Ok(this)
}
pub fn reconnect(&mut self) -> bool {
self.reconnect.load(Ordering::SeqCst)
}
pub fn set_reconnect(&mut self, reconnect: bool) {
self.reconnect.store(reconnect, Ordering::SeqCst);
}
pub fn register(&mut self, name: &str) {
let sender = self.sender.clone();
let name2 = name.to_owned();
let listener = EventListener::listen(&self.inner, name, move |ev| {
let ev = MessageEvent::from_event(name2.clone(), ev);
let _ = sender.try_send(ev);
});
self.listeners.push(listener);
}
pub async fn recv(&self) -> io::Result<MessageEvent> {
let res = self.receiver.recv().await.map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
format!("EventSource({}): receiver error", self.url),
)
})?;
Ok(res)
}
pub fn ready_state(&self) -> ReadyState {
match self.inner.ready_state() {
web_sys::EventSource::CONNECTING => ReadyState::Connecting,
web_sys::EventSource::OPEN => ReadyState::Open,
web_sys::EventSource::CLOSED => ReadyState::Closed,
_ => unreachable!("Unknown EventSource ready state"),
}
}
fn check_connection(&self) -> io::Result<()> {
if !matches!(self.ready_state(), ReadyState::Open) {
Err(io::Error::new(
io::ErrorKind::NotConnected,
format!("EventSource({})", self.url),
))
} else {
Ok(())
}
}
}
#[pin_project::pinned_drop]
impl PinnedDrop for EventSource {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
this.inner.close();
crate::log::debug!("EventSource({}): instance closed", this.url);
}
}
impl AsRef<web_sys::EventTarget> for EventSource {
fn as_ref(&self) -> &web_sys::EventTarget {
self.inner.as_ref()
}
}
impl Stream for EventSource {
type Item = io::Result<MessageEvent>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.check_connection()?;
let this = self.project();
let item = match this.receiver.poll_next(cx) {
Poll::Ready(item) => item,
Poll::Pending => {
if this.inner.ready_state() != web_sys::EventSource::OPEN {
return Poll::Ready(Some(Err(io::Error::new(
io::ErrorKind::NotConnected,
format!("EventSource({})", this.url),
))));
} else {
return Poll::Pending;
}
}
};
Poll::Ready(Ok(item).transpose())
}
}