leptos_use/
use_event_source.rs

1use crate::ReconnectLimit;
2use crate::core::ConnectionReadyState;
3use codee::Decoder;
4use default_struct_builder::DefaultBuilder;
5use leptos::prelude::*;
6use std::fmt::Debug;
7use std::marker::PhantomData;
8use std::sync::Arc;
9use thiserror::Error;
10use wasm_bindgen::JsCast;
11
12/// Reactive [EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource)
13///
14/// An [EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) or
15/// [Server-Sent-Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events)
16/// instance opens a persistent connection to an HTTP server,
17/// which sends events in text/event-stream format.
18///
19/// ## Usage
20///
21/// Values are decoded via the given decoder. You can use any of the string codecs or a
22/// binary codec wrapped in `Base64`.
23///
24/// > Please check [the codec chapter](https://leptos-use.rs/codecs.html) to see what codecs are
25/// > available and what feature flags they require.
26///
27/// ```
28/// # use leptos::prelude::*;
29/// # use leptos_use::{use_event_source, UseEventSourceReturn};
30/// # use codee::string::JsonSerdeCodec;
31/// # use serde::{Deserialize, Serialize};
32/// #
33/// #[derive(Serialize, Deserialize, Clone, PartialEq)]
34/// pub struct EventSourceData {
35///     pub message: String,
36///     pub priority: u8,
37/// }
38///
39/// # #[component]
40/// # fn Demo() -> impl IntoView {
41/// let UseEventSourceReturn {
42///     ready_state, message, error, close, ..
43/// } = use_event_source::<EventSourceData, JsonSerdeCodec>("https://event-source-url");
44/// #
45/// # view! { }
46/// # }
47/// ```
48///
49/// ### Named Events
50///
51/// You can define named events when using `use_event_source_with_options`.
52///
53/// ```
54/// # use leptos::prelude::*;
55/// # use leptos_use::{use_event_source_with_options, UseEventSourceReturn, UseEventSourceOptions};
56/// # use codee::string::FromToStringCodec;
57/// #
58/// # #[component]
59/// # fn Demo() -> impl IntoView {
60/// let UseEventSourceReturn {
61///     ready_state, message, error, close, ..
62/// } = use_event_source_with_options::<String, FromToStringCodec>(
63///     "https://event-source-url",
64///     UseEventSourceOptions::default()
65///         .named_events(["notice".to_string(), "update".to_string()])
66/// );
67/// #
68/// # view! { }
69/// # }
70/// ```
71///
72/// ### Custom Event Handler
73///
74/// You can provide a custom `on_event` handler using `use_event_source_with_options`.
75/// `on_event` wil be run for every received event, including the built-in `open`, `error`,
76/// and `message` events, as well as any named events you have specified.
77///
78/// With the return value of `on_event` you can control, whether `message` and named events
79/// should be further processed by `use_event_source` (`UseEventSourceOnEventReturn::ProcessMessage`)
80/// or ignored (`UseEventSourceOnEventReturn::IgnoreProcessingMessage`).
81///
82/// By default, the handler returns `UseEventSourceOnEventReturn::ProcessMessage`.
83///
84/// ```
85/// # use leptos::prelude::*;
86/// # use leptos_use::{use_event_source_with_options, UseEventSourceReturn, UseEventSourceOptions, UseEventSourceMessage, UseEventSourceOnEventReturn};
87/// # use codee::string::FromToStringCodec;
88/// #
89/// # #[component]
90/// # fn Demo() -> impl IntoView {
91/// // Custom example handler: log event name and check for named `custom_error` event
92/// let custom_event_handler = |e: &web_sys::Event| {
93///     leptos::logging::log!("Received event: {}", e.type_());
94///     if e.type_() == "custom_error" {
95///         if let Ok(error_message) = UseEventSourceMessage::<String, FromToStringCodec>::try_from(e.clone()) {
96///             // Decoded successfully, log the error message
97///             leptos::logging::log!("Error message: {}", error_message.data);
98///             // skip processing this message event further
99///             return UseEventSourceOnEventReturn::IgnoreProcessingMessage;
100///         }
101///     }
102///     // Process other message events normally
103///     UseEventSourceOnEventReturn::ProcessMessage
104/// };
105/// let UseEventSourceReturn {
106///     ready_state, message, error, close, ..
107/// } = use_event_source_with_options::<String, FromToStringCodec>(
108///     "https://event-source-url",
109///     UseEventSourceOptions::default()
110///         .named_events(["custom_error".to_string()])
111///         .on_event(custom_event_handler)
112/// );
113/// #
114/// # view! { }
115/// # }
116/// ```
117///
118/// ### Immediate
119///
120/// Auto-connect (enabled by default).
121///
122/// This will call `open()` automatically for you, and you don't need to call it by yourself.
123///
124/// ### Auto-Reconnection
125///
126/// Reconnect on errors automatically (enabled by default).
127///
128/// You can control the number of reconnection attempts by setting `reconnect_limit` and the
129/// interval between them by setting `reconnect_interval`.
130///
131/// ```
132/// # use leptos::prelude::*;
133/// # use leptos_use::{use_event_source_with_options, UseEventSourceReturn, UseEventSourceOptions, ReconnectLimit};
134/// # use codee::string::FromToStringCodec;
135/// #
136/// # #[component]
137/// # fn Demo() -> impl IntoView {
138/// let UseEventSourceReturn {
139///     ready_state, message, error, close, ..
140/// } = use_event_source_with_options::<bool, FromToStringCodec>(
141///     "https://event-source-url",
142///     UseEventSourceOptions::default()
143///         .reconnect_limit(ReconnectLimit::Limited(5))         // at most 5 attempts
144///         .reconnect_interval(2000)   // wait for 2 seconds between attempts
145/// );
146/// #
147/// # view! { }
148/// # }
149/// ```
150///
151///
152/// ## SendWrapped Return
153///
154/// The returned closures `open` and `close` are sendwrapped functions. They can
155/// only be called from the same thread that called `use_event_source`.
156///
157/// To disable auto-reconnection, set `reconnect_limit` to `0`.
158///
159/// ## Server-Side Rendering
160///
161/// > Make sure you follow the [instructions in Server-Side Rendering](https://leptos-use.rs/server_side_rendering.html).
162///
163/// On the server-side, `use_event_source` will always return `ready_state` as `ConnectionReadyState::Closed`,
164/// `data`, `event` and `error` will always be `None`, and `open` and `close` will do nothing.
165pub fn use_event_source<T, C>(
166    url: impl Into<Signal<String>>,
167) -> UseEventSourceReturn<
168    T,
169    C,
170    C::Error,
171    impl Fn() + Clone + Send + Sync + 'static,
172    impl Fn() + Clone + Send + Sync + 'static,
173>
174where
175    T: Clone + PartialEq + Send + Sync + 'static,
176    C: Decoder<T, Encoded = str> + Send + Sync,
177    C::Error: Send + Sync,
178{
179    use_event_source_with_options::<T, C>(url, UseEventSourceOptions::<T>::default())
180}
181
182/// Version of [`use_event_source`] that takes a `UseEventSourceOptions`. See [`use_event_source`] for how to use.
183pub fn use_event_source_with_options<T, C>(
184    url: impl Into<Signal<String>>,
185    options: UseEventSourceOptions<T>,
186) -> UseEventSourceReturn<
187    T,
188    C,
189    C::Error,
190    impl Fn() + Clone + Send + Sync + 'static,
191    impl Fn() + Clone + Send + Sync + 'static,
192>
193where
194    T: Clone + PartialEq + Send + Sync + 'static,
195    C: Decoder<T, Encoded = str> + Send + Sync,
196    C::Error: Send + Sync,
197{
198    let UseEventSourceOptions {
199        reconnect_limit,
200        reconnect_interval,
201        on_failed,
202        immediate,
203        named_events,
204        on_event,
205        with_credentials,
206        _marker,
207    } = options;
208
209    let (message, set_message) = signal(None::<UseEventSourceMessage<T, C>>);
210    let (ready_state, set_ready_state) = signal(ConnectionReadyState::Closed);
211    let (error, set_error) = signal(None::<UseEventSourceError<C::Error>>);
212
213    let open;
214    let close;
215
216    #[cfg(not(feature = "ssr"))]
217    {
218        use crate::{sendwrap_fn, use_event_listener};
219        use std::sync::atomic::{AtomicBool, AtomicU32};
220        use std::time::Duration;
221        use wasm_bindgen::prelude::*;
222
223        let (event_source, set_event_source) = signal_local(None::<web_sys::EventSource>);
224        let explicitly_closed = Arc::new(AtomicBool::new(false));
225        let retried = Arc::new(AtomicU32::new(0));
226
227        let on_event_return = move |e: &web_sys::Event| {
228            // make sure handler doesn't create reactive dependencies
229            #[cfg(debug_assertions)]
230            let _ = leptos::reactive::diagnostics::SpecialNonReactiveZone::enter();
231
232            on_event(e)
233        };
234
235        let on_message_event = {
236            let on_event_return = on_event_return.clone();
237            move |e: &web_sys::Event| {
238                match on_event_return(e) {
239                    UseEventSourceOnEventReturn::IgnoreProcessingMessage => {
240                        // skip processing message event!
241                    }
242                    UseEventSourceOnEventReturn::ProcessMessage => {
243                        let message_event = e
244                            .dyn_ref::<web_sys::MessageEvent>()
245                            .expect("Event is not a MessageEvent");
246
247                        match UseEventSourceMessage::<T, C>::try_from(message_event) {
248                            Ok(event_msg) => {
249                                set_message.set(Some(event_msg));
250                            }
251                            Err(err) => {
252                                set_error.set(Some(err));
253                            }
254                        }
255                    }
256                }
257            }
258        };
259
260        let init = StoredValue::new(None::<Arc<dyn Fn() + Send + Sync>>);
261
262        let set_init = {
263            let explicitly_closed = Arc::clone(&explicitly_closed);
264            let retried = Arc::clone(&retried);
265
266            move |url: String| {
267                init.set_value(Some(Arc::new({
268                    let explicitly_closed = Arc::clone(&explicitly_closed);
269                    let retried = Arc::clone(&retried);
270                    let on_event_return = on_event_return.clone();
271                    let on_message_event = on_message_event.clone();
272                    let named_events = named_events.clone();
273                    let on_failed = Arc::clone(&on_failed);
274
275                    move || {
276                        if explicitly_closed.load(std::sync::atomic::Ordering::Relaxed) {
277                            return;
278                        }
279
280                        let event_src_opts = web_sys::EventSourceInit::new();
281                        event_src_opts.set_with_credentials(with_credentials);
282
283                        let es = web_sys::EventSource::new_with_event_source_init_dict(
284                            &url,
285                            &event_src_opts,
286                        )
287                        .unwrap_throw();
288
289                        set_ready_state.set(ConnectionReadyState::Connecting);
290
291                        set_event_source.set(Some(es.clone()));
292
293                        let on_open = Closure::wrap(Box::new({
294                            let on_event_return = on_event_return.clone();
295                            move |e: web_sys::Event| {
296                                on_event_return(&e);
297                                set_ready_state.set(ConnectionReadyState::Open);
298                                set_error.set(None);
299                            }})
300                                as Box<dyn FnMut(web_sys::Event)>);
301                        es.set_onopen(Some(on_open.as_ref().unchecked_ref()));
302                        on_open.forget();
303
304                        let on_error = Closure::wrap(Box::new({
305                            let on_event_return = on_event_return.clone();
306                            let explicitly_closed = Arc::clone(&explicitly_closed);
307                            let retried = Arc::clone(&retried);
308                            let on_failed = Arc::clone(&on_failed);
309                            let es = es.clone();
310
311                            move |e: web_sys::Event| {
312                                on_event_return(&e);
313                                set_ready_state.set(ConnectionReadyState::Closed);
314                                set_error.set(Some(UseEventSourceError::ErrorEvent));
315
316                                // only reconnect if EventSource isn't reconnecting by itself
317                                // this is the case when the connection is closed (readyState is 2)
318                                if es.ready_state() == 2
319                                    && !explicitly_closed.load(std::sync::atomic::Ordering::Relaxed)
320                                {
321                                    es.close();
322
323                                    let retried_value = retried
324                                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
325                                        + 1;
326
327                                    if !reconnect_limit.is_exceeded_by(retried_value as u64) {
328                                        set_timeout(
329                                            move || {
330                                                if let Some(init) = init.get_value() {
331                                                    init();
332                                                }
333                                            },
334                                            Duration::from_millis(reconnect_interval),
335                                        );
336                                    } else {
337                                        #[cfg(debug_assertions)]
338                                        let _z =
339                                            leptos::reactive::diagnostics::SpecialNonReactiveZone::enter();
340
341                                        on_failed();
342                                    }
343                                }
344                            }
345                        })
346                            as Box<dyn FnMut(web_sys::Event)>);
347                        es.set_onerror(Some(on_error.as_ref().unchecked_ref()));
348                        on_error.forget();
349
350                        let on_message = Closure::wrap(Box::new({
351                            let on_message_event = on_message_event.clone();
352                            move |e: web_sys::MessageEvent| {
353                                let e: &web_sys::Event = e.as_ref();
354                                on_message_event(e);
355                            }})
356                                as Box<dyn FnMut(web_sys::MessageEvent)>);
357                        es.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
358                        on_message.forget();
359
360                        for event_name in named_events.clone() {
361                            let event_handler = {
362                                let on_message_event = on_message_event.clone();
363                                move |e: web_sys::Event| {
364                                    on_message_event(&e);
365                                }
366                            };
367
368                            let _ = use_event_listener(
369                                es.clone(),
370                                leptos::ev::Custom::<leptos::ev::Event>::new(event_name),
371                                event_handler,
372                            );
373                        }
374                    }
375                })))
376            }
377        };
378
379        close = {
380            let explicitly_closed = Arc::clone(&explicitly_closed);
381
382            sendwrap_fn!(move || {
383                if let Some(event_source) = event_source.get_untracked() {
384                    event_source.close();
385                    set_event_source.set(None);
386                    set_ready_state.set(ConnectionReadyState::Closed);
387                    explicitly_closed.store(true, std::sync::atomic::Ordering::Relaxed);
388                }
389            })
390        };
391
392        let url: Signal<String> = url.into();
393
394        open = {
395            let close = close.clone();
396            let explicitly_closed = Arc::clone(&explicitly_closed);
397            let retried = Arc::clone(&retried);
398            let set_init = set_init.clone();
399
400            sendwrap_fn!(move || {
401                close();
402                explicitly_closed.store(false, std::sync::atomic::Ordering::Relaxed);
403                retried.store(0, std::sync::atomic::Ordering::Relaxed);
404                if init.get_value().is_none() && !url.get_untracked().is_empty() {
405                    set_init(url.get_untracked());
406                }
407                if let Some(init) = init.get_value() {
408                    init();
409                }
410            })
411        };
412
413        {
414            let close = close.clone();
415            let open = open.clone();
416            let set_init = set_init.clone();
417            Effect::watch(
418                move || url.get(),
419                move |url, prev_url, _| {
420                    if url.is_empty() {
421                        close();
422                    } else if Some(url) != prev_url {
423                        close();
424                        set_init(url.to_owned());
425                        open();
426                    }
427                },
428                immediate,
429            );
430        }
431
432        on_cleanup(close.clone());
433    }
434
435    #[cfg(feature = "ssr")]
436    {
437        open = move || {};
438        close = move || {};
439
440        let _ = reconnect_limit;
441        let _ = reconnect_interval;
442        let _ = on_failed;
443        let _ = immediate;
444        let _ = named_events;
445        let _ = on_event;
446        let _ = with_credentials;
447
448        let _ = set_message;
449        let _ = set_ready_state;
450        let _ = set_error;
451        let _ = url;
452    }
453
454    UseEventSourceReturn {
455        message: message.into(),
456        ready_state: ready_state.into(),
457        error: error.into(),
458        open,
459        close,
460    }
461}
462
463/// Message received from the `EventSource` with transcoded data.
464#[derive(PartialEq)]
465pub struct UseEventSourceMessage<T, C>
466where
467    T: Clone + Send + Sync + 'static,
468    C: Decoder<T, Encoded = str> + Send + Sync,
469    C::Error: Send + Sync,
470{
471    pub event_type: String,
472    pub data: T,
473    pub last_event_id: String,
474    _marker: PhantomData<C>,
475}
476
477impl<T, C> Clone for UseEventSourceMessage<T, C>
478where
479    T: Clone + Send + Sync + 'static,
480    C: Decoder<T, Encoded = str> + Send + Sync,
481    C::Error: Send + Sync,
482{
483    fn clone(&self) -> Self {
484        Self {
485            event_type: self.event_type.clone(),
486            data: self.data.clone(),
487            last_event_id: self.last_event_id.clone(),
488            _marker: PhantomData,
489        }
490    }
491}
492
493impl<T, C> Debug for UseEventSourceMessage<T, C>
494where
495    T: Debug + Clone + Send + Sync + 'static,
496    C: Decoder<T, Encoded = str> + Send + Sync,
497    C::Error: Send + Sync,
498{
499    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
500        f.debug_struct("UseEventSourceMessage")
501            .field("data", &self.data)
502            .field("event_type", &self.event_type)
503            .field("last_event_id", &self.last_event_id)
504            .finish()
505    }
506}
507
508impl<T, C> TryFrom<&web_sys::MessageEvent> for UseEventSourceMessage<T, C>
509where
510    T: Clone + Send + Sync + 'static,
511    C: Decoder<T, Encoded = str> + Send + Sync,
512    C::Error: Send + Sync,
513{
514    type Error = UseEventSourceError<C::Error>;
515
516    fn try_from(message_event: &web_sys::MessageEvent) -> Result<Self, Self::Error> {
517        let data_string = message_event.data().as_string().unwrap_or_default();
518
519        let data = C::decode(&data_string).map_err(UseEventSourceError::Deserialize)?;
520
521        Ok(Self {
522            event_type: message_event.type_(),
523            data,
524            last_event_id: message_event.last_event_id(),
525            _marker: PhantomData,
526        })
527    }
528}
529
530impl<T, C> TryFrom<web_sys::Event> for UseEventSourceMessage<T, C>
531where
532    T: Clone + Send + Sync + 'static,
533    C: Decoder<T, Encoded = str> + Send + Sync,
534    C::Error: Send + Sync,
535{
536    type Error = UseEventSourceError<C::Error>;
537
538    fn try_from(event: web_sys::Event) -> Result<Self, Self::Error> {
539        let message_event = event
540            .dyn_into::<web_sys::MessageEvent>()
541            .map_err(|e| UseEventSourceError::CastToMessageEvent(e.type_()))?;
542
543        UseEventSourceMessage::try_from(&message_event)
544    }
545}
546
547/// Options for [`use_event_source_with_options`].
548#[derive(DefaultBuilder)]
549pub struct UseEventSourceOptions<T>
550where
551    T: 'static,
552{
553    /// Retry times. Defaults to `ReconnectLimit::Limited(3)`. Use `ReconnectLimit::Infinite` for
554    /// infinite retries.
555    reconnect_limit: ReconnectLimit,
556
557    /// Retry interval in ms. Defaults to 3000.
558    reconnect_interval: u64,
559
560    /// On maximum retry times reached.
561    on_failed: Arc<dyn Fn() + Send + Sync>,
562
563    /// If `true` the `EventSource` connection will immediately be opened when calling this function.
564    /// If `false` you have to manually call the `open` function.
565    /// Defaults to `true`.
566    immediate: bool,
567
568    /// List of named events to listen for on the `EventSource`.
569    #[builder(into)]
570    named_events: Vec<String>,
571
572    /// The `on_event` is called before processing any event inside of [`use_event_source`].
573    /// Return `UseEventSourceOnEventReturn::Ignore` to ignore further processing of the respective event
574    /// in [`use_event_source`], or `UseEventSourceOnEventReturn::Use` to process the event as usual.
575    ///
576    /// Beware that ignoring processing the `open` and `error` events may yield unexpected results.
577    ///
578    /// You may want to use [`UseEventSourceMessage::try_from()`] to access the event data.
579    ///
580    /// Default handler returns `UseEventSourceOnEventReturn::Use`.
581    on_event: Arc<dyn Fn(&web_sys::Event) -> UseEventSourceOnEventReturn + Send + Sync>,
582
583    /// If CORS should be set to `include` credentials. Defaults to `false`.
584    with_credentials: bool,
585
586    _marker: PhantomData<T>,
587}
588
589impl<T> Default for UseEventSourceOptions<T> {
590    fn default() -> Self {
591        Self {
592            reconnect_limit: ReconnectLimit::default(),
593            reconnect_interval: 3000,
594            on_failed: Arc::new(|| {}),
595            immediate: true,
596            named_events: vec![],
597            on_event: Arc::new(|_| UseEventSourceOnEventReturn::ProcessMessage),
598            with_credentials: false,
599            _marker: PhantomData,
600        }
601    }
602}
603
604/// Return type of the `on_event` handler in [`UseEventSourceOptions`].
605pub enum UseEventSourceOnEventReturn {
606    /// Ignore further processing of the message event in [`use_event_source`].
607    IgnoreProcessingMessage,
608    /// Use the default processing of the message event in [`use_event_source`].
609    ProcessMessage,
610}
611
612/// Return type of [`use_event_source`].
613pub struct UseEventSourceReturn<T, C, Err, OpenFn, CloseFn>
614where
615    T: Clone + Send + Sync + 'static,
616    C: Decoder<T, Encoded = str> + Send + Sync,
617    C::Error: Send + Sync,
618    Err: Send + Sync + 'static,
619    OpenFn: Fn() + Clone + Send + Sync + 'static,
620    CloseFn: Fn() + Clone + Send + Sync + 'static,
621{
622    /// The latest message
623    pub message: Signal<Option<UseEventSourceMessage<T, C>>>,
624
625    /// The current state of the connection,
626    pub ready_state: Signal<ConnectionReadyState>,
627
628    /// The current error
629    pub error: Signal<Option<UseEventSourceError<Err>>>,
630
631    /// (Re-)Opens the `EventSource` connection
632    /// If the current one is active, will close it before opening a new one.
633    pub open: OpenFn,
634
635    /// Closes the `EventSource` connection
636    pub close: CloseFn,
637}
638
639#[derive(Error, Debug)]
640pub enum UseEventSourceError<Err> {
641    #[error("Error event received")]
642    ErrorEvent,
643
644    #[error("Error decoding value")]
645    Deserialize(Err),
646
647    #[error("Error casting event '{0}' to MessageEvent")]
648    CastToMessageEvent(String),
649}