leptos_use/use_event_source.rs
1use crate::ReconnectLimit;
2use crate::core::ConnectionReadyState;
3use codee::Decoder;
4use default_struct_builder::DefaultBuilder;
5use leptos::prelude::*;
6use std::fmt::Debug;
7use std::marker::PhantomData;
8use std::sync::Arc;
9use thiserror::Error;
10use wasm_bindgen::JsCast;
11
12/// Reactive [EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource)
13///
14/// An [EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) or
15/// [Server-Sent-Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events)
16/// instance opens a persistent connection to an HTTP server,
17/// which sends events in text/event-stream format.
18///
19/// ## Usage
20///
21/// Values are decoded via the given decoder. You can use any of the string codecs or a
22/// binary codec wrapped in `Base64`.
23///
24/// > Please check [the codec chapter](https://leptos-use.rs/codecs.html) to see what codecs are
25/// > available and what feature flags they require.
26///
27/// ```
28/// # use leptos::prelude::*;
29/// # use leptos_use::{use_event_source, UseEventSourceReturn};
30/// # use codee::string::JsonSerdeCodec;
31/// # use serde::{Deserialize, Serialize};
32/// #
33/// #[derive(Serialize, Deserialize, Clone, PartialEq)]
34/// pub struct EventSourceData {
35/// pub message: String,
36/// pub priority: u8,
37/// }
38///
39/// # #[component]
40/// # fn Demo() -> impl IntoView {
41/// let UseEventSourceReturn {
42/// ready_state, message, error, close, ..
43/// } = use_event_source::<EventSourceData, JsonSerdeCodec>("https://event-source-url");
44/// #
45/// # view! { }
46/// # }
47/// ```
48///
49/// ### Named Events
50///
51/// You can define named events when using `use_event_source_with_options`.
52///
53/// ```
54/// # use leptos::prelude::*;
55/// # use leptos_use::{use_event_source_with_options, UseEventSourceReturn, UseEventSourceOptions};
56/// # use codee::string::FromToStringCodec;
57/// #
58/// # #[component]
59/// # fn Demo() -> impl IntoView {
60/// let UseEventSourceReturn {
61/// ready_state, message, error, close, ..
62/// } = use_event_source_with_options::<String, FromToStringCodec>(
63/// "https://event-source-url",
64/// UseEventSourceOptions::default()
65/// .named_events(["notice".to_string(), "update".to_string()])
66/// );
67/// #
68/// # view! { }
69/// # }
70/// ```
71///
72/// ### Custom Event Handler
73///
74/// You can provide a custom `on_event` handler using `use_event_source_with_options`.
75/// `on_event` wil be run for every received event, including the built-in `open`, `error`,
76/// and `message` events, as well as any named events you have specified.
77///
78/// With the return value of `on_event` you can control, whether `message` and named events
79/// should be further processed by `use_event_source` (`UseEventSourceOnEventReturn::ProcessMessage`)
80/// or ignored (`UseEventSourceOnEventReturn::IgnoreProcessingMessage`).
81///
82/// By default, the handler returns `UseEventSourceOnEventReturn::ProcessMessage`.
83///
84/// ```
85/// # use leptos::prelude::*;
86/// # use leptos_use::{use_event_source_with_options, UseEventSourceReturn, UseEventSourceOptions, UseEventSourceMessage, UseEventSourceOnEventReturn};
87/// # use codee::string::FromToStringCodec;
88/// #
89/// # #[component]
90/// # fn Demo() -> impl IntoView {
91/// // Custom example handler: log event name and check for named `custom_error` event
92/// let custom_event_handler = |e: &web_sys::Event| {
93/// leptos::logging::log!("Received event: {}", e.type_());
94/// if e.type_() == "custom_error" {
95/// if let Ok(error_message) = UseEventSourceMessage::<String, FromToStringCodec>::try_from(e.clone()) {
96/// // Decoded successfully, log the error message
97/// leptos::logging::log!("Error message: {}", error_message.data);
98/// // skip processing this message event further
99/// return UseEventSourceOnEventReturn::IgnoreProcessingMessage;
100/// }
101/// }
102/// // Process other message events normally
103/// UseEventSourceOnEventReturn::ProcessMessage
104/// };
105/// let UseEventSourceReturn {
106/// ready_state, message, error, close, ..
107/// } = use_event_source_with_options::<String, FromToStringCodec>(
108/// "https://event-source-url",
109/// UseEventSourceOptions::default()
110/// .named_events(["custom_error".to_string()])
111/// .on_event(custom_event_handler)
112/// );
113/// #
114/// # view! { }
115/// # }
116/// ```
117///
118/// ### Immediate
119///
120/// Auto-connect (enabled by default).
121///
122/// This will call `open()` automatically for you, and you don't need to call it by yourself.
123///
124/// ### Auto-Reconnection
125///
126/// Reconnect on errors automatically (enabled by default).
127///
128/// You can control the number of reconnection attempts by setting `reconnect_limit` and the
129/// interval between them by setting `reconnect_interval`.
130///
131/// ```
132/// # use leptos::prelude::*;
133/// # use leptos_use::{use_event_source_with_options, UseEventSourceReturn, UseEventSourceOptions, ReconnectLimit};
134/// # use codee::string::FromToStringCodec;
135/// #
136/// # #[component]
137/// # fn Demo() -> impl IntoView {
138/// let UseEventSourceReturn {
139/// ready_state, message, error, close, ..
140/// } = use_event_source_with_options::<bool, FromToStringCodec>(
141/// "https://event-source-url",
142/// UseEventSourceOptions::default()
143/// .reconnect_limit(ReconnectLimit::Limited(5)) // at most 5 attempts
144/// .reconnect_interval(2000) // wait for 2 seconds between attempts
145/// );
146/// #
147/// # view! { }
148/// # }
149/// ```
150///
151///
152/// ## SendWrapped Return
153///
154/// The returned closures `open` and `close` are sendwrapped functions. They can
155/// only be called from the same thread that called `use_event_source`.
156///
157/// To disable auto-reconnection, set `reconnect_limit` to `0`.
158///
159/// ## Server-Side Rendering
160///
161/// > Make sure you follow the [instructions in Server-Side Rendering](https://leptos-use.rs/server_side_rendering.html).
162///
163/// On the server-side, `use_event_source` will always return `ready_state` as `ConnectionReadyState::Closed`,
164/// `data`, `event` and `error` will always be `None`, and `open` and `close` will do nothing.
165pub fn use_event_source<T, C>(
166 url: impl Into<Signal<String>>,
167) -> UseEventSourceReturn<
168 T,
169 C,
170 C::Error,
171 impl Fn() + Clone + Send + Sync + 'static,
172 impl Fn() + Clone + Send + Sync + 'static,
173>
174where
175 T: Clone + PartialEq + Send + Sync + 'static,
176 C: Decoder<T, Encoded = str> + Send + Sync,
177 C::Error: Send + Sync,
178{
179 use_event_source_with_options::<T, C>(url, UseEventSourceOptions::<T>::default())
180}
181
182/// Version of [`use_event_source`] that takes a `UseEventSourceOptions`. See [`use_event_source`] for how to use.
183pub fn use_event_source_with_options<T, C>(
184 url: impl Into<Signal<String>>,
185 options: UseEventSourceOptions<T>,
186) -> UseEventSourceReturn<
187 T,
188 C,
189 C::Error,
190 impl Fn() + Clone + Send + Sync + 'static,
191 impl Fn() + Clone + Send + Sync + 'static,
192>
193where
194 T: Clone + PartialEq + Send + Sync + 'static,
195 C: Decoder<T, Encoded = str> + Send + Sync,
196 C::Error: Send + Sync,
197{
198 let UseEventSourceOptions {
199 reconnect_limit,
200 reconnect_interval,
201 on_failed,
202 immediate,
203 named_events,
204 on_event,
205 with_credentials,
206 _marker,
207 } = options;
208
209 let (message, set_message) = signal(None::<UseEventSourceMessage<T, C>>);
210 let (ready_state, set_ready_state) = signal(ConnectionReadyState::Closed);
211 let (error, set_error) = signal(None::<UseEventSourceError<C::Error>>);
212
213 let open;
214 let close;
215
216 #[cfg(not(feature = "ssr"))]
217 {
218 use crate::{sendwrap_fn, use_event_listener};
219 use std::sync::atomic::{AtomicBool, AtomicU32};
220 use std::time::Duration;
221 use wasm_bindgen::prelude::*;
222
223 let (event_source, set_event_source) = signal_local(None::<web_sys::EventSource>);
224 let explicitly_closed = Arc::new(AtomicBool::new(false));
225 let retried = Arc::new(AtomicU32::new(0));
226
227 let on_event_return = move |e: &web_sys::Event| {
228 // make sure handler doesn't create reactive dependencies
229 #[cfg(debug_assertions)]
230 let _ = leptos::reactive::diagnostics::SpecialNonReactiveZone::enter();
231
232 on_event(e)
233 };
234
235 let on_message_event = {
236 let on_event_return = on_event_return.clone();
237 move |e: &web_sys::Event| {
238 match on_event_return(e) {
239 UseEventSourceOnEventReturn::IgnoreProcessingMessage => {
240 // skip processing message event!
241 }
242 UseEventSourceOnEventReturn::ProcessMessage => {
243 let message_event = e
244 .dyn_ref::<web_sys::MessageEvent>()
245 .expect("Event is not a MessageEvent");
246
247 match UseEventSourceMessage::<T, C>::try_from(message_event) {
248 Ok(event_msg) => {
249 set_message.set(Some(event_msg));
250 }
251 Err(err) => {
252 set_error.set(Some(err));
253 }
254 }
255 }
256 }
257 }
258 };
259
260 let init = StoredValue::new(None::<Arc<dyn Fn() + Send + Sync>>);
261
262 let set_init = {
263 let explicitly_closed = Arc::clone(&explicitly_closed);
264 let retried = Arc::clone(&retried);
265
266 move |url: String| {
267 init.set_value(Some(Arc::new({
268 let explicitly_closed = Arc::clone(&explicitly_closed);
269 let retried = Arc::clone(&retried);
270 let on_event_return = on_event_return.clone();
271 let on_message_event = on_message_event.clone();
272 let named_events = named_events.clone();
273 let on_failed = Arc::clone(&on_failed);
274
275 move || {
276 if explicitly_closed.load(std::sync::atomic::Ordering::Relaxed) {
277 return;
278 }
279
280 let event_src_opts = web_sys::EventSourceInit::new();
281 event_src_opts.set_with_credentials(with_credentials);
282
283 let es = web_sys::EventSource::new_with_event_source_init_dict(
284 &url,
285 &event_src_opts,
286 )
287 .unwrap_throw();
288
289 set_ready_state.set(ConnectionReadyState::Connecting);
290
291 set_event_source.set(Some(es.clone()));
292
293 let on_open = Closure::wrap(Box::new({
294 let on_event_return = on_event_return.clone();
295 move |e: web_sys::Event| {
296 on_event_return(&e);
297 set_ready_state.set(ConnectionReadyState::Open);
298 set_error.set(None);
299 }})
300 as Box<dyn FnMut(web_sys::Event)>);
301 es.set_onopen(Some(on_open.as_ref().unchecked_ref()));
302 on_open.forget();
303
304 let on_error = Closure::wrap(Box::new({
305 let on_event_return = on_event_return.clone();
306 let explicitly_closed = Arc::clone(&explicitly_closed);
307 let retried = Arc::clone(&retried);
308 let on_failed = Arc::clone(&on_failed);
309 let es = es.clone();
310
311 move |e: web_sys::Event| {
312 on_event_return(&e);
313 set_ready_state.set(ConnectionReadyState::Closed);
314 set_error.set(Some(UseEventSourceError::ErrorEvent));
315
316 // only reconnect if EventSource isn't reconnecting by itself
317 // this is the case when the connection is closed (readyState is 2)
318 if es.ready_state() == 2
319 && !explicitly_closed.load(std::sync::atomic::Ordering::Relaxed)
320 {
321 es.close();
322
323 let retried_value = retried
324 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
325 + 1;
326
327 if !reconnect_limit.is_exceeded_by(retried_value as u64) {
328 set_timeout(
329 move || {
330 if let Some(init) = init.get_value() {
331 init();
332 }
333 },
334 Duration::from_millis(reconnect_interval),
335 );
336 } else {
337 #[cfg(debug_assertions)]
338 let _z =
339 leptos::reactive::diagnostics::SpecialNonReactiveZone::enter();
340
341 on_failed();
342 }
343 }
344 }
345 })
346 as Box<dyn FnMut(web_sys::Event)>);
347 es.set_onerror(Some(on_error.as_ref().unchecked_ref()));
348 on_error.forget();
349
350 let on_message = Closure::wrap(Box::new({
351 let on_message_event = on_message_event.clone();
352 move |e: web_sys::MessageEvent| {
353 let e: &web_sys::Event = e.as_ref();
354 on_message_event(e);
355 }})
356 as Box<dyn FnMut(web_sys::MessageEvent)>);
357 es.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
358 on_message.forget();
359
360 for event_name in named_events.clone() {
361 let event_handler = {
362 let on_message_event = on_message_event.clone();
363 move |e: web_sys::Event| {
364 on_message_event(&e);
365 }
366 };
367
368 let _ = use_event_listener(
369 es.clone(),
370 leptos::ev::Custom::<leptos::ev::Event>::new(event_name),
371 event_handler,
372 );
373 }
374 }
375 })))
376 }
377 };
378
379 close = {
380 let explicitly_closed = Arc::clone(&explicitly_closed);
381
382 sendwrap_fn!(move || {
383 if let Some(event_source) = event_source.get_untracked() {
384 event_source.close();
385 set_event_source.set(None);
386 set_ready_state.set(ConnectionReadyState::Closed);
387 explicitly_closed.store(true, std::sync::atomic::Ordering::Relaxed);
388 }
389 })
390 };
391
392 let url: Signal<String> = url.into();
393
394 open = {
395 let close = close.clone();
396 let explicitly_closed = Arc::clone(&explicitly_closed);
397 let retried = Arc::clone(&retried);
398 let set_init = set_init.clone();
399
400 sendwrap_fn!(move || {
401 close();
402 explicitly_closed.store(false, std::sync::atomic::Ordering::Relaxed);
403 retried.store(0, std::sync::atomic::Ordering::Relaxed);
404 if init.get_value().is_none() && !url.get_untracked().is_empty() {
405 set_init(url.get_untracked());
406 }
407 if let Some(init) = init.get_value() {
408 init();
409 }
410 })
411 };
412
413 {
414 let close = close.clone();
415 let open = open.clone();
416 let set_init = set_init.clone();
417 Effect::watch(
418 move || url.get(),
419 move |url, prev_url, _| {
420 if url.is_empty() {
421 close();
422 } else if Some(url) != prev_url {
423 close();
424 set_init(url.to_owned());
425 open();
426 }
427 },
428 immediate,
429 );
430 }
431
432 on_cleanup(close.clone());
433 }
434
435 #[cfg(feature = "ssr")]
436 {
437 open = move || {};
438 close = move || {};
439
440 let _ = reconnect_limit;
441 let _ = reconnect_interval;
442 let _ = on_failed;
443 let _ = immediate;
444 let _ = named_events;
445 let _ = on_event;
446 let _ = with_credentials;
447
448 let _ = set_message;
449 let _ = set_ready_state;
450 let _ = set_error;
451 let _ = url;
452 }
453
454 UseEventSourceReturn {
455 message: message.into(),
456 ready_state: ready_state.into(),
457 error: error.into(),
458 open,
459 close,
460 }
461}
462
463/// Message received from the `EventSource` with transcoded data.
464#[derive(PartialEq)]
465pub struct UseEventSourceMessage<T, C>
466where
467 T: Clone + Send + Sync + 'static,
468 C: Decoder<T, Encoded = str> + Send + Sync,
469 C::Error: Send + Sync,
470{
471 pub event_type: String,
472 pub data: T,
473 pub last_event_id: String,
474 _marker: PhantomData<C>,
475}
476
477impl<T, C> Clone for UseEventSourceMessage<T, C>
478where
479 T: Clone + Send + Sync + 'static,
480 C: Decoder<T, Encoded = str> + Send + Sync,
481 C::Error: Send + Sync,
482{
483 fn clone(&self) -> Self {
484 Self {
485 event_type: self.event_type.clone(),
486 data: self.data.clone(),
487 last_event_id: self.last_event_id.clone(),
488 _marker: PhantomData,
489 }
490 }
491}
492
493impl<T, C> Debug for UseEventSourceMessage<T, C>
494where
495 T: Debug + Clone + Send + Sync + 'static,
496 C: Decoder<T, Encoded = str> + Send + Sync,
497 C::Error: Send + Sync,
498{
499 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
500 f.debug_struct("UseEventSourceMessage")
501 .field("data", &self.data)
502 .field("event_type", &self.event_type)
503 .field("last_event_id", &self.last_event_id)
504 .finish()
505 }
506}
507
508impl<T, C> TryFrom<&web_sys::MessageEvent> for UseEventSourceMessage<T, C>
509where
510 T: Clone + Send + Sync + 'static,
511 C: Decoder<T, Encoded = str> + Send + Sync,
512 C::Error: Send + Sync,
513{
514 type Error = UseEventSourceError<C::Error>;
515
516 fn try_from(message_event: &web_sys::MessageEvent) -> Result<Self, Self::Error> {
517 let data_string = message_event.data().as_string().unwrap_or_default();
518
519 let data = C::decode(&data_string).map_err(UseEventSourceError::Deserialize)?;
520
521 Ok(Self {
522 event_type: message_event.type_(),
523 data,
524 last_event_id: message_event.last_event_id(),
525 _marker: PhantomData,
526 })
527 }
528}
529
530impl<T, C> TryFrom<web_sys::Event> for UseEventSourceMessage<T, C>
531where
532 T: Clone + Send + Sync + 'static,
533 C: Decoder<T, Encoded = str> + Send + Sync,
534 C::Error: Send + Sync,
535{
536 type Error = UseEventSourceError<C::Error>;
537
538 fn try_from(event: web_sys::Event) -> Result<Self, Self::Error> {
539 let message_event = event
540 .dyn_into::<web_sys::MessageEvent>()
541 .map_err(|e| UseEventSourceError::CastToMessageEvent(e.type_()))?;
542
543 UseEventSourceMessage::try_from(&message_event)
544 }
545}
546
547/// Options for [`use_event_source_with_options`].
548#[derive(DefaultBuilder)]
549pub struct UseEventSourceOptions<T>
550where
551 T: 'static,
552{
553 /// Retry times. Defaults to `ReconnectLimit::Limited(3)`. Use `ReconnectLimit::Infinite` for
554 /// infinite retries.
555 reconnect_limit: ReconnectLimit,
556
557 /// Retry interval in ms. Defaults to 3000.
558 reconnect_interval: u64,
559
560 /// On maximum retry times reached.
561 on_failed: Arc<dyn Fn() + Send + Sync>,
562
563 /// If `true` the `EventSource` connection will immediately be opened when calling this function.
564 /// If `false` you have to manually call the `open` function.
565 /// Defaults to `true`.
566 immediate: bool,
567
568 /// List of named events to listen for on the `EventSource`.
569 #[builder(into)]
570 named_events: Vec<String>,
571
572 /// The `on_event` is called before processing any event inside of [`use_event_source`].
573 /// Return `UseEventSourceOnEventReturn::Ignore` to ignore further processing of the respective event
574 /// in [`use_event_source`], or `UseEventSourceOnEventReturn::Use` to process the event as usual.
575 ///
576 /// Beware that ignoring processing the `open` and `error` events may yield unexpected results.
577 ///
578 /// You may want to use [`UseEventSourceMessage::try_from()`] to access the event data.
579 ///
580 /// Default handler returns `UseEventSourceOnEventReturn::Use`.
581 on_event: Arc<dyn Fn(&web_sys::Event) -> UseEventSourceOnEventReturn + Send + Sync>,
582
583 /// If CORS should be set to `include` credentials. Defaults to `false`.
584 with_credentials: bool,
585
586 _marker: PhantomData<T>,
587}
588
589impl<T> Default for UseEventSourceOptions<T> {
590 fn default() -> Self {
591 Self {
592 reconnect_limit: ReconnectLimit::default(),
593 reconnect_interval: 3000,
594 on_failed: Arc::new(|| {}),
595 immediate: true,
596 named_events: vec![],
597 on_event: Arc::new(|_| UseEventSourceOnEventReturn::ProcessMessage),
598 with_credentials: false,
599 _marker: PhantomData,
600 }
601 }
602}
603
604/// Return type of the `on_event` handler in [`UseEventSourceOptions`].
605pub enum UseEventSourceOnEventReturn {
606 /// Ignore further processing of the message event in [`use_event_source`].
607 IgnoreProcessingMessage,
608 /// Use the default processing of the message event in [`use_event_source`].
609 ProcessMessage,
610}
611
612/// Return type of [`use_event_source`].
613pub struct UseEventSourceReturn<T, C, Err, OpenFn, CloseFn>
614where
615 T: Clone + Send + Sync + 'static,
616 C: Decoder<T, Encoded = str> + Send + Sync,
617 C::Error: Send + Sync,
618 Err: Send + Sync + 'static,
619 OpenFn: Fn() + Clone + Send + Sync + 'static,
620 CloseFn: Fn() + Clone + Send + Sync + 'static,
621{
622 /// The latest message
623 pub message: Signal<Option<UseEventSourceMessage<T, C>>>,
624
625 /// The current state of the connection,
626 pub ready_state: Signal<ConnectionReadyState>,
627
628 /// The current error
629 pub error: Signal<Option<UseEventSourceError<Err>>>,
630
631 /// (Re-)Opens the `EventSource` connection
632 /// If the current one is active, will close it before opening a new one.
633 pub open: OpenFn,
634
635 /// Closes the `EventSource` connection
636 pub close: CloseFn,
637}
638
639#[derive(Error, Debug)]
640pub enum UseEventSourceError<Err> {
641 #[error("Error event received")]
642 ErrorEvent,
643
644 #[error("Error decoding value")]
645 Deserialize(Err),
646
647 #[error("Error casting event '{0}' to MessageEvent")]
648 CastToMessageEvent(String),
649}