use crate::core::ConnectionReadyState;
use crate::{js, sendwrap_fn, use_event_listener, ReconnectLimit};
use codee::Decoder;
use default_struct_builder::DefaultBuilder;
use leptos::prelude::*;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
pub fn use_event_source<T, C>(
url: &str,
) -> UseEventSourceReturn<
T,
C::Error,
impl Fn() + Clone + Send + Sync + 'static,
impl Fn() + Clone + Send + Sync + 'static,
>
where
T: Clone + PartialEq + Send + Sync + 'static,
C: Decoder<T, Encoded = str>,
C::Error: Send + Sync,
{
use_event_source_with_options::<T, C>(url, UseEventSourceOptions::<T>::default())
}
pub fn use_event_source_with_options<T, C>(
url: &str,
options: UseEventSourceOptions<T>,
) -> UseEventSourceReturn<
T,
C::Error,
impl Fn() + Clone + Send + Sync + 'static,
impl Fn() + Clone + Send + Sync + 'static,
>
where
T: Clone + PartialEq + Send + Sync + 'static,
C: Decoder<T, Encoded = str>,
C::Error: Send + Sync,
{
let UseEventSourceOptions {
reconnect_limit,
reconnect_interval,
on_failed,
immediate,
named_events,
with_credentials,
_marker,
} = options;
let url = url.to_owned();
let (event, set_event) = signal_local(None::<web_sys::Event>);
let (data, set_data) = signal(None::<T>);
let (ready_state, set_ready_state) = signal(ConnectionReadyState::Closed);
let (event_source, set_event_source) = signal_local(None::<web_sys::EventSource>);
let (error, set_error) = signal_local(None::<UseEventSourceError<C::Error>>);
let explicitly_closed = Arc::new(AtomicBool::new(false));
let retried = Arc::new(AtomicU32::new(0));
let set_data_from_string = move |data_string: Option<String>| {
if let Some(data_string) = data_string {
match C::decode(&data_string) {
Ok(data) => set_data.set(Some(data)),
Err(err) => set_error.set(Some(UseEventSourceError::Deserialize(err))),
}
}
};
let close = {
let explicitly_closed = Arc::clone(&explicitly_closed);
sendwrap_fn!(move || {
if let Some(event_source) = event_source.get_untracked() {
event_source.close();
set_event_source.set(None);
set_ready_state.set(ConnectionReadyState::Closed);
explicitly_closed.store(true, std::sync::atomic::Ordering::Relaxed);
}
})
};
let init = StoredValue::new(None::<Arc<dyn Fn() + Send + Sync>>);
init.set_value(Some(Arc::new({
let explicitly_closed = Arc::clone(&explicitly_closed);
let retried = Arc::clone(&retried);
move || {
use wasm_bindgen::prelude::*;
if explicitly_closed.load(std::sync::atomic::Ordering::Relaxed) {
return;
}
let event_src_opts = web_sys::EventSourceInit::new();
event_src_opts.set_with_credentials(with_credentials);
let es = web_sys::EventSource::new_with_event_source_init_dict(&url, &event_src_opts)
.unwrap_throw();
set_ready_state.set(ConnectionReadyState::Connecting);
set_event_source.set(Some(es.clone()));
let on_open = Closure::wrap(Box::new(move |_: web_sys::Event| {
set_ready_state.set(ConnectionReadyState::Open);
set_error.set(None);
}) as Box<dyn FnMut(web_sys::Event)>);
es.set_onopen(Some(on_open.as_ref().unchecked_ref()));
on_open.forget();
let on_error = Closure::wrap(Box::new({
let explicitly_closed = Arc::clone(&explicitly_closed);
let retried = Arc::clone(&retried);
let on_failed = Arc::clone(&on_failed);
let es = es.clone();
move |e: web_sys::Event| {
set_ready_state.set(ConnectionReadyState::Closed);
set_error.set(Some(UseEventSourceError::Event(e)));
if es.ready_state() == 2
&& !explicitly_closed.load(std::sync::atomic::Ordering::Relaxed)
&& matches!(reconnect_limit, ReconnectLimit::Limited(_))
{
es.close();
let retried_value =
retried.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
if reconnect_limit.is_exceeded_by(retried_value as u64) {
set_timeout(
move || {
if let Some(init) = init.get_value() {
init();
}
},
Duration::from_millis(reconnect_interval),
);
} else {
#[cfg(debug_assertions)]
let _z = leptos::reactive::diagnostics::SpecialNonReactiveZone::enter();
on_failed();
}
}
}
}) as Box<dyn FnMut(web_sys::Event)>);
es.set_onerror(Some(on_error.as_ref().unchecked_ref()));
on_error.forget();
let on_message = Closure::wrap(Box::new(move |e: web_sys::MessageEvent| {
set_data_from_string(e.data().as_string());
}) as Box<dyn FnMut(web_sys::MessageEvent)>);
es.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
on_message.forget();
for event_name in named_events.clone() {
let _ = use_event_listener(
es.clone(),
leptos::ev::Custom::<leptos::ev::Event>::new(event_name),
move |e| {
set_event.set(Some(e.clone()));
let data_string = js!(e["data"]).ok().and_then(|d| d.as_string());
set_data_from_string(data_string);
},
);
}
}
})));
let open;
#[cfg(not(feature = "ssr"))]
{
open = {
let close = close.clone();
let explicitly_closed = Arc::clone(&explicitly_closed);
let retried = Arc::clone(&retried);
sendwrap_fn!(move || {
close();
explicitly_closed.store(false, std::sync::atomic::Ordering::Relaxed);
retried.store(0, std::sync::atomic::Ordering::Relaxed);
if let Some(init) = init.get_value() {
init();
}
})
};
}
#[cfg(feature = "ssr")]
{
open = move || {};
}
if immediate {
open();
}
on_cleanup(close.clone());
UseEventSourceReturn {
event_source: event_source.into(),
event: event.into(),
data: data.into(),
ready_state: ready_state.into(),
error: error.into(),
open,
close,
}
}
#[derive(DefaultBuilder)]
pub struct UseEventSourceOptions<T>
where
T: 'static,
{
reconnect_limit: ReconnectLimit,
reconnect_interval: u64,
on_failed: Arc<dyn Fn() + Send + Sync>,
immediate: bool,
#[builder(into)]
named_events: Vec<String>,
with_credentials: bool,
_marker: PhantomData<T>,
}
impl<T> Default for UseEventSourceOptions<T> {
fn default() -> Self {
Self {
reconnect_limit: ReconnectLimit::default(),
reconnect_interval: 3000,
on_failed: Arc::new(|| {}),
immediate: true,
named_events: vec![],
with_credentials: false,
_marker: PhantomData,
}
}
}
pub struct UseEventSourceReturn<T, Err, OpenFn, CloseFn>
where
Err: Send + Sync + 'static,
T: Clone + Send + Sync + 'static,
OpenFn: Fn() + Clone + Send + Sync + 'static,
CloseFn: Fn() + Clone + Send + Sync + 'static,
{
pub data: Signal<Option<T>>,
pub ready_state: Signal<ConnectionReadyState>,
pub event: Signal<Option<web_sys::Event>, LocalStorage>,
pub error: Signal<Option<UseEventSourceError<Err>>, LocalStorage>,
pub open: OpenFn,
pub close: CloseFn,
pub event_source: Signal<Option<web_sys::EventSource>, LocalStorage>,
}
#[derive(Error, Debug)]
pub enum UseEventSourceError<Err> {
#[error("Error event: {0:?}")]
Event(web_sys::Event),
#[error("Error decoding value")]
Deserialize(Err),
}