musli_web/
web.rs

1//! The generic web implementation.
2//!
3//! This is specialized over the `H` parameter through modules such as:
4//!
5//! * [`web03`] for `web-sys` `0.3.x`.
6//!
7//! [`web03`]: crate::web03
8
9use core::cell::{Cell, RefCell};
10use core::fmt;
11use core::marker::PhantomData;
12use core::mem;
13use core::ops::Deref;
14use core::ptr::NonNull;
15use std::collections::VecDeque;
16
17use alloc::boxed::Box;
18use alloc::format;
19use alloc::rc::Rc;
20use alloc::rc::Weak;
21use alloc::string::{String, ToString};
22use alloc::vec::Vec;
23
24use std::collections::hash_map::{Entry, HashMap};
25
26use musli::Decode;
27use musli::alloc::Global;
28use musli::mode::Binary;
29use musli::reader::SliceReader;
30use musli::storage;
31
32use slab::Slab;
33
34use crate::api::{self, Event, MessageId};
35
36const MAX_CAPACITY: usize = 1048576;
37
38/// An empty request body.
39#[non_exhaustive]
40pub struct EmptyBody;
41
42/// An empty callback.
43#[non_exhaustive]
44pub struct EmptyCallback;
45
46/// Slab of state listeners.
47type StateListeners = Slab<Rc<dyn Callback<State>>>;
48/// Slab of broadcast listeners.
49type Broadcasts = HashMap<MessageId, Slab<Rc<dyn Callback<Result<RawPacket>>>>>;
50/// Queue of recycled buffers.
51type Buffers = VecDeque<Box<BufData>>;
52/// Pending requests.
53type Requests = HashMap<u32, Box<Pending<dyn Callback<Result<RawPacket>>>>>;
54
55/// Location information for websocket implementation.
56#[doc(hidden)]
57pub struct Location {
58    pub(crate) protocol: String,
59    pub(crate) host: String,
60    pub(crate) port: String,
61}
62
63pub(crate) mod sealed_socket {
64    pub trait Sealed {}
65}
66
67pub(crate) trait SocketImpl
68where
69    Self: Sized + self::sealed_socket::Sealed,
70{
71    #[doc(hidden)]
72    type Handles;
73
74    #[doc(hidden)]
75    fn new(url: &str, handles: &Self::Handles) -> Result<Self, Error>;
76
77    #[doc(hidden)]
78    fn send(&self, data: &[u8]) -> Result<(), Error>;
79
80    #[doc(hidden)]
81    fn close(self) -> Result<(), Error>;
82}
83
84pub(crate) mod sealed_performance {
85    pub trait Sealed {}
86}
87
88pub trait PerformanceImpl
89where
90    Self: Sized + self::sealed_performance::Sealed,
91{
92    #[doc(hidden)]
93    fn now(&self) -> f64;
94}
95
96pub(crate) mod sealed_window {
97    pub trait Sealed {}
98}
99
100pub(crate) trait WindowImpl
101where
102    Self: Sized + self::sealed_window::Sealed,
103{
104    #[doc(hidden)]
105    type Performance: PerformanceImpl;
106
107    #[doc(hidden)]
108    type Timeout;
109
110    #[doc(hidden)]
111    fn new() -> Result<Self, Error>;
112
113    #[doc(hidden)]
114    fn performance(&self) -> Result<Self::Performance, Error>;
115
116    #[doc(hidden)]
117    fn location(&self) -> Result<Location, Error>;
118
119    #[doc(hidden)]
120    fn set_timeout(
121        &self,
122        millis: u32,
123        callback: impl FnOnce() + 'static,
124    ) -> Result<Self::Timeout, Error>;
125}
126
127pub(crate) mod sealed_web {
128    pub trait Sealed {}
129}
130
131/// Central trait for web integration.
132///
133/// Since web integration is currently unstable, this requires multiple
134/// different implementations, each time an ecosystem breaking change is
135/// released.
136///
137/// The crate in focus here is `web-sys`, and the corresponding modules provide
138/// integrations:
139///
140/// * [web03] for `web-sys` `0.3.x`.
141///
142/// [web03]: crate::web03
143pub trait WebImpl
144where
145    Self: 'static + Copy + Sized + self::sealed_web::Sealed,
146{
147    #[doc(hidden)]
148    #[allow(private_bounds)]
149    type Window: WindowImpl;
150
151    #[doc(hidden)]
152    type Handles;
153
154    #[doc(hidden)]
155    #[allow(private_bounds)]
156    type Socket: SocketImpl<Handles = Self::Handles>;
157
158    #[doc(hidden)]
159    #[allow(private_interfaces)]
160    fn handles(shared: &Weak<Shared<Self>>) -> Self::Handles;
161
162    #[doc(hidden)]
163    fn random(range: u32) -> u32;
164}
165
166/// Construct a new [`ServiceBuilder`] associated with the given [`Connect`]
167/// strategy.
168pub fn connect<H>(connect: Connect) -> ServiceBuilder<H, EmptyCallback>
169where
170    H: WebImpl,
171{
172    ServiceBuilder {
173        connect,
174        on_error: EmptyCallback,
175        _marker: PhantomData,
176    }
177}
178
179/// The state of the connection.
180///
181/// A listener for state changes can be set up through for example
182/// [`Handle::on_state_change`].
183#[derive(Debug, PartialEq, Eq, Clone, Copy)]
184#[non_exhaustive]
185pub enum State {
186    /// The connection is open.
187    Open,
188    /// The connection is closed.
189    Closed,
190}
191
192/// Error type for the WebSocket service.
193#[derive(Debug)]
194pub struct Error {
195    kind: ErrorKind,
196}
197
198#[derive(Debug)]
199enum ErrorKind {
200    Message(String),
201    Storage(storage::Error),
202    Overflow(usize, usize),
203}
204
205impl Error {
206    #[inline]
207    fn new(kind: ErrorKind) -> Self {
208        Self { kind }
209    }
210
211    #[inline]
212    pub(crate) fn msg(message: impl fmt::Display) -> Self {
213        Self::new(ErrorKind::Message(message.to_string()))
214    }
215}
216
217impl fmt::Display for Error {
218    #[inline]
219    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220        match &self.kind {
221            ErrorKind::Message(message) => write!(f, "{message}"),
222            ErrorKind::Storage(error) => write!(f, "Encoding error: {error}"),
223            ErrorKind::Overflow(at, len) => {
224                write!(f, "Internal packet overflow, {at} not in range 0-{len}")
225            }
226        }
227    }
228}
229
230impl core::error::Error for Error {
231    #[inline]
232    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
233        match &self.kind {
234            ErrorKind::Storage(error) => Some(error),
235            _ => None,
236        }
237    }
238}
239
240impl From<storage::Error> for Error {
241    #[inline]
242    fn from(error: storage::Error) -> Self {
243        Self::new(ErrorKind::Storage(error))
244    }
245}
246
247#[cfg(feature = "wasm_bindgen02")]
248impl From<wasm_bindgen02::JsValue> for Error {
249    #[inline]
250    fn from(error: wasm_bindgen02::JsValue) -> Self {
251        Self::new(ErrorKind::Message(format!("{error:?}")))
252    }
253}
254
255type Result<T, E = Error> = core::result::Result<T, E>;
256
257const INITIAL_TIMEOUT: u32 = 250;
258const MAX_TIMEOUT: u32 = 4000;
259
260/// How to connect to the websocket.
261enum ConnectKind {
262    Location { path: String },
263    Url { url: String },
264}
265
266/// A specification for how to connect a websocket.
267pub struct Connect {
268    kind: ConnectKind,
269}
270
271impl Connect {
272    /// Connect to the same location with a custom path.
273    ///
274    /// Note that any number of `/` prefixes are ignored, the canonical
275    /// representation always ignores them and the path is relative to the
276    /// current location.
277    #[inline]
278    pub fn location(path: impl AsRef<str>) -> Self {
279        Self {
280            kind: ConnectKind::Location {
281                path: String::from(path.as_ref()),
282            },
283        }
284    }
285
286    /// Connect to the specified URL.
287    #[inline]
288    pub fn url(url: String) -> Self {
289        Self {
290            kind: ConnectKind::Url { url },
291        }
292    }
293}
294
295/// Generic but shared fields which do not depend on specialization over `H`.
296struct Generic {
297    state_listeners: RefCell<StateListeners>,
298    requests: RefCell<Requests>,
299    broadcasts: RefCell<Broadcasts>,
300    buffers: RefCell<Buffers>,
301}
302
303/// Shared implementation details for websocket implementations.
304pub(crate) struct Shared<H>
305where
306    H: WebImpl,
307{
308    connect: Connect,
309    pub(crate) on_error: Box<dyn Callback<Error>>,
310    window: H::Window,
311    performance: <H::Window as WindowImpl>::Performance,
312    handles: H::Handles,
313    state: Cell<State>,
314    opened: Cell<Option<f64>>,
315    serial: Cell<u32>,
316    defer_broadcasts: RefCell<VecDeque<Weak<dyn Callback<Result<RawPacket>>>>>,
317    defer_state_listeners: RefCell<VecDeque<Weak<dyn Callback<State>>>>,
318    pub(crate) socket: RefCell<Option<H::Socket>>,
319    output: RefCell<Vec<u8>>,
320    current_timeout: Cell<u32>,
321    reconnect_timeout: RefCell<Option<<H::Window as WindowImpl>::Timeout>>,
322    g: Rc<Generic>,
323}
324
325impl<H> Drop for Shared<H>
326where
327    H: WebImpl,
328{
329    fn drop(&mut self) {
330        if let Some(s) = self.socket.take()
331            && let Err(e) = s.close()
332        {
333            self.on_error.call(e);
334        }
335
336        // We don't need to worry about mutable borrows here, since we only have
337        // weak references to Shared and by virtue of this being dropped they
338        // are all invalid.
339        let state_listeners = mem::take(&mut *self.g.state_listeners.borrow_mut());
340        let mut requests = self.g.requests.borrow_mut();
341
342        for (_, listener) in state_listeners {
343            listener.call(State::Closed);
344        }
345
346        for (_, p) in requests.drain() {
347            p.callback.call(Err(Error::msg("Websocket service closed")));
348        }
349    }
350}
351
352/// Builder of a service.
353pub struct ServiceBuilder<H, E>
354where
355    H: WebImpl,
356{
357    connect: Connect,
358    on_error: E,
359    _marker: PhantomData<H>,
360}
361
362impl<H, E> ServiceBuilder<H, E>
363where
364    H: WebImpl,
365    E: Callback<Error>,
366{
367    /// Set the error handler to use for the service.
368    pub fn on_error<U>(self, on_error: U) -> ServiceBuilder<H, U>
369    where
370        U: Callback<Error>,
371    {
372        ServiceBuilder {
373            connect: self.connect,
374            on_error,
375            _marker: self._marker,
376        }
377    }
378
379    /// Build a new service and handle.
380    pub fn build(self) -> Service<H> {
381        let window = match H::Window::new() {
382            Ok(window) => window,
383            Err(error) => {
384                panic!("{error}")
385            }
386        };
387
388        let performance = match WindowImpl::performance(&window) {
389            Ok(performance) => performance,
390            Err(error) => {
391                panic!("{error}")
392            }
393        };
394
395        let shared = Rc::<Shared<H>>::new_cyclic(move |shared| Shared {
396            connect: self.connect,
397            on_error: Box::new(self.on_error),
398            window,
399            performance,
400            handles: H::handles(shared),
401            state: Cell::new(State::Closed),
402            opened: Cell::new(None),
403            serial: Cell::new(0),
404            defer_broadcasts: RefCell::new(VecDeque::new()),
405            defer_state_listeners: RefCell::new(VecDeque::new()),
406            socket: RefCell::new(None),
407            output: RefCell::new(Vec::new()),
408            current_timeout: Cell::new(INITIAL_TIMEOUT),
409            reconnect_timeout: RefCell::new(None),
410            g: Rc::new(Generic {
411                state_listeners: RefCell::new(Slab::new()),
412                broadcasts: RefCell::new(HashMap::new()),
413                requests: RefCell::new(Requests::new()),
414                buffers: RefCell::new(VecDeque::new()),
415            }),
416        });
417
418        let handle = Handle {
419            shared: Rc::downgrade(&shared),
420        };
421
422        Service { shared, handle }
423    }
424}
425
426/// The service handle.
427///
428/// Once dropped this will cause the service to be disconnected and all requests
429/// to be cancelled.
430pub struct Service<H>
431where
432    H: WebImpl,
433{
434    shared: Rc<Shared<H>>,
435    handle: Handle<H>,
436}
437
438impl<H> Service<H>
439where
440    H: WebImpl,
441{
442    /// Attempt to establish a websocket connection.
443    pub fn connect(&self) {
444        self.shared.connect()
445    }
446
447    /// Build a handle to the service.
448    pub fn handle(&self) -> &Handle<H> {
449        &self.handle
450    }
451}
452
453impl<H> Shared<H>
454where
455    H: WebImpl,
456{
457    /// Send a client message.
458    fn send_client_request<T>(&self, serial: u32, body: &T) -> Result<()>
459    where
460        T: api::Request,
461    {
462        let Some(ref socket) = *self.socket.borrow() else {
463            return Err(Error::msg("Socket is not connected"));
464        };
465
466        let header = api::RequestHeader {
467            serial,
468            id: <T::Endpoint as api::Endpoint>::ID.get(),
469        };
470
471        let out = &mut *self.output.borrow_mut();
472
473        storage::to_writer(&mut *out, &header)?;
474        storage::to_writer(&mut *out, &body)?;
475
476        tracing::debug!(
477            header.serial,
478            ?header.id,
479            len = out.len(),
480            "Sending request"
481        );
482
483        socket.send(out.as_slice())?;
484
485        out.clear();
486        out.shrink_to(MAX_CAPACITY);
487        Ok(())
488    }
489
490    pub(crate) fn next_buffer(self: &Rc<Self>, needed: usize) -> Box<BufData> {
491        match self.g.buffers.borrow_mut().pop_front() {
492            Some(mut buf) => {
493                if buf.data.capacity() < needed {
494                    buf.data.reserve(needed - buf.data.len());
495                }
496
497                buf
498            }
499            None => Box::new(BufData::with_capacity(Rc::downgrade(&self.g), needed)),
500        }
501    }
502
503    pub(crate) fn message(self: &Rc<Self>, buf: Box<BufData>) -> Result<()> {
504        // Wrap the buffer in a simple shared reference-counted container.
505        let buf = BufRc::new(buf);
506        let mut reader = SliceReader::new(&buf);
507
508        let header: api::ResponseHeader = storage::decode(&mut reader)?;
509
510        if let Some(broadcast) = MessageId::new(header.broadcast) {
511            tracing::debug!(?header.broadcast, "Got broadcast");
512
513            if !self.prepare_broadcast(broadcast) {
514                return Ok(());
515            };
516
517            if let Some(id) = MessageId::new(header.error) {
518                let error = if id == MessageId::ERROR_MESSAGE {
519                    storage::decode(&mut reader)?
520                } else {
521                    api::ErrorMessage {
522                        message: "Unknown error",
523                    }
524                };
525
526                while let Some(callback) = self.defer_broadcasts.borrow_mut().pop_front() {
527                    if let Some(callback) = callback.upgrade() {
528                        callback.call(Err(Error::msg(error.message)));
529                    }
530                }
531
532                return Ok(());
533            }
534
535            let at = buf.len().saturating_sub(reader.remaining());
536
537            let packet = RawPacket {
538                buf: buf.clone(),
539                at: Cell::new(at),
540                id: broadcast,
541            };
542
543            while let Some(callback) = self.defer_broadcasts.borrow_mut().pop_front() {
544                if let Some(callback) = callback.upgrade() {
545                    callback.call(Ok(packet.clone()));
546                }
547            }
548        } else {
549            tracing::debug!(header.serial, "Got response");
550
551            let p = {
552                let mut requests = self.g.requests.borrow_mut();
553
554                let Some(p) = requests.remove(&header.serial) else {
555                    tracing::debug!(header.serial, "Missing request");
556                    return Ok(());
557                };
558
559                p
560            };
561
562            if let Some(id) = MessageId::new(header.error) {
563                let error = if id == MessageId::ERROR_MESSAGE {
564                    storage::decode(&mut reader)?
565                } else {
566                    api::ErrorMessage {
567                        message: "Unknown error",
568                    }
569                };
570
571                p.callback.call(Err(Error::msg(error.message)));
572                return Ok(());
573            }
574
575            let at = buf.len().saturating_sub(reader.remaining());
576
577            let packet = RawPacket {
578                id: p.kind,
579                buf,
580                at: Cell::new(at),
581            };
582
583            p.callback.call(Ok(packet));
584        }
585
586        Ok(())
587    }
588
589    fn prepare_broadcast(self: &Rc<Self>, kind: MessageId) -> bool {
590        // Note: We need to defer this, since the outcome of calling
591        // the broadcast callback might be that the broadcast
592        // listener is modified, which could require mutable access
593        // to broadcasts.
594        let mut defer = self.defer_broadcasts.borrow_mut();
595
596        let broadcasts = self.g.broadcasts.borrow();
597
598        let Some(broadcasts) = broadcasts.get(&kind) else {
599            return false;
600        };
601
602        for (_, callback) in broadcasts.iter() {
603            defer.push_back(Rc::downgrade(callback));
604        }
605
606        !defer.is_empty()
607    }
608
609    pub(crate) fn set_open(&self) {
610        tracing::debug!("Set open");
611        self.opened
612            .set(Some(PerformanceImpl::now(&self.performance)));
613        self.emit_state_change(State::Open);
614    }
615
616    fn is_open_for_a_while(&self) -> bool {
617        let Some(at) = self.opened.get() else {
618            return false;
619        };
620
621        let now = PerformanceImpl::now(&self.performance);
622        (now - at) >= 250.0
623    }
624
625    pub(crate) fn close(self: &Rc<Self>) -> Result<(), Error> {
626        tracing::debug!("Close connection");
627
628        // We need a weak reference back to shared state to handle the timeout.
629        let shared = Rc::downgrade(self);
630
631        tracing::debug!(
632            "Set closed timeout={}, opened={:?}",
633            self.current_timeout.get(),
634            self.opened.get(),
635        );
636
637        if !self.is_open_for_a_while() {
638            let current_timeout = self.current_timeout.get();
639
640            if current_timeout < MAX_TIMEOUT {
641                let fuzz = H::random(50);
642
643                self.current_timeout.set(
644                    current_timeout
645                        .saturating_mul(2)
646                        .saturating_add(fuzz)
647                        .min(MAX_TIMEOUT),
648                );
649            }
650        } else {
651            self.current_timeout.set(INITIAL_TIMEOUT);
652        }
653
654        self.opened.set(None);
655        self.emit_state_change(State::Closed);
656
657        if let Some(s) = self.socket.take() {
658            s.close()?;
659        }
660
661        self.close_pending();
662
663        let timeout = self
664            .window
665            .set_timeout(self.current_timeout.get(), move || {
666                if let Some(shared) = shared.upgrade() {
667                    Self::connect(&shared);
668                }
669            })?;
670
671        drop(self.reconnect_timeout.borrow_mut().replace(timeout));
672        Ok(())
673    }
674
675    /// Close an pending requests with an error, since there is no chance they
676    /// will be responded to any more.
677    fn close_pending(self: &Rc<Self>) {
678        loop {
679            let Some(serial) = self.g.requests.borrow().keys().next().copied() else {
680                break;
681            };
682
683            let p = {
684                let mut requests = self.g.requests.borrow_mut();
685
686                let Some(p) = requests.remove(&serial) else {
687                    break;
688                };
689
690                p
691            };
692
693            p.callback.call(Err(Error::msg("Connection closed")));
694        }
695    }
696
697    fn emit_state_change(&self, state: State) {
698        if self.state.get() == state {
699            return;
700        }
701
702        self.state.set(state);
703
704        {
705            // We need to collect callbacks to avoid the callback recursively
706            // borrowing state listeners, which it would if it modifies any
707            // existing state listeners.
708            let mut defer = self.defer_state_listeners.borrow_mut();
709
710            for (_, callback) in self.g.state_listeners.borrow().iter() {
711                defer.push_back(Rc::downgrade(callback));
712            }
713
714            if defer.is_empty() {
715                return;
716            }
717        }
718
719        while let Some(callback) = self.defer_state_listeners.borrow_mut().pop_front() {
720            if let Some(callback) = callback.upgrade() {
721                callback.call(state);
722            }
723        }
724    }
725
726    fn is_closed(&self) -> bool {
727        self.opened.get().is_none()
728    }
729
730    fn connect(self: &Rc<Self>) {
731        tracing::debug!("Connect");
732
733        if let Err(e) = self.build() {
734            self.on_error.call(e);
735        } else {
736            return;
737        }
738
739        if let Err(e) = self.close() {
740            self.on_error.call(e);
741        }
742    }
743
744    /// Build a websocket connection.
745    fn build(self: &Rc<Self>) -> Result<()> {
746        let url = match &self.connect.kind {
747            ConnectKind::Location { path } => {
748                let Location {
749                    protocol,
750                    host,
751                    port,
752                } = WindowImpl::location(&self.window)?;
753
754                let protocol = match protocol.as_str() {
755                    "https:" => "wss:",
756                    "http:" => "ws:",
757                    other => {
758                        return Err(Error::msg(format_args!(
759                            "Same host connection is not supported for protocol `{other}`"
760                        )));
761                    }
762                };
763
764                let path = ForcePrefix(path, '/');
765                format!("{protocol}//{host}:{port}{path}")
766            }
767            ConnectKind::Url { url } => url.clone(),
768        };
769
770        let ws = SocketImpl::new(&url, &self.handles)?;
771
772        let old = self.socket.borrow_mut().replace(ws);
773
774        if let Some(old) = old {
775            old.close()?;
776        }
777
778        Ok(())
779    }
780}
781
782/// Trait governing how callbacks are called.
783pub trait Callback<I>
784where
785    Self: 'static,
786{
787    /// Call the callback.
788    fn call(&self, input: I);
789}
790
791impl<I> Callback<I> for EmptyCallback {
792    #[inline]
793    fn call(&self, _: I) {}
794}
795
796impl<F, I> Callback<I> for F
797where
798    F: 'static + Fn(I),
799{
800    #[inline]
801    fn call(&self, input: I) {
802        self(input)
803    }
804}
805
806/// A request builder .
807///
808/// Associate the callback to be used by using either
809/// [`RequestBuilder::on_packet`] or [`RequestBuilder::on_raw_packet`] depending
810/// on your needs.
811///
812/// Send the request with [`RequestBuilder::send`].
813pub struct RequestBuilder<'a, H, B, C>
814where
815    H: WebImpl,
816{
817    shared: &'a Weak<Shared<H>>,
818    body: B,
819    callback: C,
820}
821
822impl<'a, H, B, C> RequestBuilder<'a, H, B, C>
823where
824    H: WebImpl,
825{
826    /// Set the body of the request.
827    #[inline]
828    pub fn body<U>(self, body: U) -> RequestBuilder<'a, H, U, C>
829    where
830        U: api::Request,
831    {
832        RequestBuilder {
833            shared: self.shared,
834            body,
835            callback: self.callback,
836        }
837    }
838
839    /// Handle the response using the specified callback.
840    ///
841    /// # Examples
842    ///
843    /// ```
844    /// # extern crate yew021 as yew;
845    /// use yew::prelude::*;
846    /// use musli_web::web03::prelude::*;
847    ///
848    /// mod api {
849    ///     use musli::{Decode, Encode};
850    ///     use musli_web::api;
851    ///
852    ///     #[derive(Encode, Decode)]
853    ///     pub struct HelloRequest<'de> {
854    ///         pub message: &'de str,
855    ///     }
856    ///
857    ///     #[derive(Encode, Decode)]
858    ///     pub struct HelloResponse<'de> {
859    ///         pub message: &'de str,
860    ///     }
861    ///
862    ///     api::define! {
863    ///         pub type Hello;
864    ///
865    ///         impl Endpoint for Hello {
866    ///             impl<'de> Request for HelloRequest<'de>;
867    ///             type Response<'de> = HelloResponse<'de>;
868    ///         }
869    ///     }
870    /// }
871    ///
872    /// enum Msg {
873    ///     OnHello(Result<ws::Packet<api::Hello>, ws::Error>),
874    /// }
875    ///
876    /// #[derive(Properties, PartialEq)]
877    /// struct Props {
878    ///     ws: ws::Handle,
879    /// }
880    ///
881    /// struct App {
882    ///     message: String,
883    ///     _hello: ws::Request,
884    /// }
885    ///
886    /// impl Component for App {
887    ///     type Message = Msg;
888    ///     type Properties = Props;
889    ///
890    ///     fn create(ctx: &Context<Self>) -> Self {
891    ///         let hello = ctx.props().ws
892    ///             .request()
893    ///             .body(api::HelloRequest { message: "Hello!"})
894    ///             .on_packet(ctx.link().callback(Msg::OnHello))
895    ///             .send();
896    ///
897    ///         Self {
898    ///             message: String::from("No Message :("),
899    ///             _hello: hello,
900    ///         }
901    ///     }
902    ///
903    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
904    ///         match msg {
905    ///             Msg::OnHello(Err(error)) => {
906    ///                 tracing::error!("Request error: {:?}", error);
907    ///                 false
908    ///             }
909    ///             Msg::OnHello(Ok(packet)) => {
910    ///                 if let Ok(response) = packet.decode() {
911    ///                     self.message = response.message.to_owned();
912    ///                 }
913    ///
914    ///                 true
915    ///             }
916    ///         }
917    ///     }
918    ///
919    ///     fn view(&self, ctx: &Context<Self>) -> Html {
920    ///         html! {
921    ///             <div>
922    ///                 <h1>{"WebSocket Example"}</h1>
923    ///                 <p>{format!("Message: {}", self.message)}</p>
924    ///             </div>
925    ///         }
926    ///     }
927    /// }
928    /// ```
929    pub fn on_packet<E>(
930        self,
931        callback: impl Callback<Result<Packet<E>>>,
932    ) -> RequestBuilder<'a, H, B, impl Callback<Result<RawPacket>>>
933    where
934        E: api::Endpoint,
935    {
936        self.on_raw_packet(move |result: Result<RawPacket>| match result {
937            Ok(ok) => callback.call(Ok(Packet::new(ok))),
938            Err(err) => callback.call(Err(err)),
939        })
940    }
941
942    /// Handle the response using the specified callback.
943    ///
944    /// # Examples
945    ///
946    /// ```
947    /// # extern crate yew021 as yew;
948    /// use yew::prelude::*;
949    /// use musli_web::web03::prelude::*;
950    ///
951    /// mod api {
952    ///     use musli::{Decode, Encode};
953    ///     use musli_web::api;
954    ///
955    ///     #[derive(Encode, Decode)]
956    ///     pub struct HelloRequest<'de> {
957    ///         pub message: &'de str,
958    ///     }
959    ///
960    ///     #[derive(Encode, Decode)]
961    ///     pub struct HelloResponse<'de> {
962    ///         pub message: &'de str,
963    ///     }
964    ///
965    ///     api::define! {
966    ///         pub type Hello;
967    ///
968    ///         impl Endpoint for Hello {
969    ///             impl<'de> Request for HelloRequest<'de>;
970    ///             type Response<'de> = HelloResponse<'de>;
971    ///         }
972    ///     }
973    /// }
974    ///
975    /// enum Msg {
976    ///     OnHello(Result<ws::RawPacket, ws::Error>),
977    /// }
978    ///
979    /// #[derive(Properties, PartialEq)]
980    /// struct Props {
981    ///     ws: ws::Handle,
982    /// }
983    ///
984    /// struct App {
985    ///     message: String,
986    ///     _hello: ws::Request,
987    /// }
988    ///
989    /// impl Component for App {
990    ///     type Message = Msg;
991    ///     type Properties = Props;
992    ///
993    ///     fn create(ctx: &Context<Self>) -> Self {
994    ///         let link = ctx.link().clone();
995    ///
996    ///         let hello = ctx.props().ws
997    ///             .request()
998    ///             .body(api::HelloRequest { message: "Hello!"})
999    ///             .on_raw_packet(move |packet| link.send_message(Msg::OnHello(packet)))
1000    ///             .send();
1001    ///
1002    ///         Self {
1003    ///             message: String::from("No Message :("),
1004    ///             _hello: hello,
1005    ///         }
1006    ///     }
1007    ///
1008    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1009    ///         match msg {
1010    ///             Msg::OnHello(Err(error)) => {
1011    ///                 tracing::error!("Request error: {:?}", error);
1012    ///                 false
1013    ///             }
1014    ///             Msg::OnHello(Ok(packet)) => {
1015    ///                 if let Ok(response) = packet.decode::<api::HelloResponse>() {
1016    ///                     self.message = response.message.to_owned();
1017    ///                 }
1018    ///
1019    ///                 true
1020    ///             }
1021    ///         }
1022    ///     }
1023    ///
1024    ///     fn view(&self, ctx: &Context<Self>) -> Html {
1025    ///         html! {
1026    ///             <div>
1027    ///                 <h1>{"WebSocket Example"}</h1>
1028    ///                 <p>{format!("Message: {}", self.message)}</p>
1029    ///             </div>
1030    ///         }
1031    ///     }
1032    /// }
1033    /// ```
1034    pub fn on_raw_packet<U>(self, callback: U) -> RequestBuilder<'a, H, B, U>
1035    where
1036        U: Callback<Result<RawPacket, Error>>,
1037    {
1038        RequestBuilder {
1039            shared: self.shared,
1040            body: self.body,
1041            callback,
1042        }
1043    }
1044}
1045
1046impl<'a, H, B, C> RequestBuilder<'a, H, B, C>
1047where
1048    B: api::Request,
1049    C: Callback<Result<RawPacket>>,
1050    H: WebImpl,
1051{
1052    /// Send the request.
1053    ///
1054    /// This requires that a body has been set using [`RequestBuilder::body`].
1055    pub fn send(self) -> Request {
1056        let Some(shared) = self.shared.upgrade() else {
1057            self.callback
1058                .call(Err(Error::msg("WebSocket service is down")));
1059            return Request::new();
1060        };
1061
1062        if shared.is_closed() {
1063            self.callback
1064                .call(Err(Error::msg("WebSocket is not connected")));
1065            return Request::new();
1066        }
1067
1068        let serial = shared.serial.get();
1069
1070        if let Err(error) = shared.send_client_request(serial, &self.body) {
1071            shared.on_error.call(error);
1072            return Request::new();
1073        }
1074
1075        shared.serial.set(serial.wrapping_add(1));
1076
1077        let pending = Pending {
1078            kind: <B::Endpoint as api::Endpoint>::ID,
1079            serial,
1080            callback: self.callback,
1081        };
1082
1083        let existing = shared
1084            .g
1085            .requests
1086            .borrow_mut()
1087            .insert(serial, Box::new(pending));
1088
1089        if let Some(p) = existing {
1090            p.callback.call(Err(Error::msg("Request cancelled")));
1091        }
1092
1093        Request {
1094            serial,
1095            g: Rc::downgrade(&shared.g),
1096        }
1097    }
1098}
1099
1100/// The handle for a pending request.
1101///
1102/// Dropping or [`clear()`] this handle will cancel the request.
1103///
1104/// [`clear()`]: Self::clear
1105pub struct Request {
1106    serial: u32,
1107    g: Weak<Generic>,
1108}
1109
1110impl Request {
1111    /// An empty request handler.
1112    #[inline]
1113    pub const fn new() -> Self {
1114        Self {
1115            serial: 0,
1116            g: Weak::new(),
1117        }
1118    }
1119
1120    /// Clear the request handle without dropping it, cancelling any pending
1121    /// requests.
1122    pub fn clear(&mut self) {
1123        let removed = {
1124            let serial = mem::take(&mut self.serial);
1125
1126            let Some(g) = self.g.upgrade() else {
1127                return;
1128            };
1129
1130            self.g = Weak::new();
1131
1132            let Some(p) = g.requests.borrow_mut().remove(&serial) else {
1133                return;
1134            };
1135
1136            p
1137        };
1138
1139        drop(removed);
1140    }
1141}
1142
1143impl Default for Request {
1144    #[inline]
1145    fn default() -> Self {
1146        Self::new()
1147    }
1148}
1149
1150impl Drop for Request {
1151    #[inline]
1152    fn drop(&mut self) {
1153        self.clear();
1154    }
1155}
1156
1157/// The handle for a pending request.
1158///
1159/// Dropping or calling [`clear()`] on this handle remove the listener.
1160///
1161/// [`clear()`]: Self::clear
1162pub struct Listener {
1163    kind: Option<MessageId>,
1164    index: usize,
1165    g: Weak<Generic>,
1166}
1167
1168impl Listener {
1169    /// Construct an empty listener.
1170    #[inline]
1171    pub const fn new() -> Self {
1172        Self {
1173            kind: None,
1174            index: 0,
1175            g: Weak::new(),
1176        }
1177    }
1178
1179    /// Build up an empty listener with the specified kind.
1180    #[inline]
1181    pub(crate) const fn empty_with_kind(kind: MessageId) -> Self {
1182        Self {
1183            kind: Some(kind),
1184            index: 0,
1185            g: Weak::new(),
1186        }
1187    }
1188
1189    /// Clear the listener without dropping it.
1190    ///
1191    /// This will remove the associated broadcast listener from being notified.
1192    pub fn clear(&mut self) {
1193        // Gather values here to drop them outside of the upgrade block.
1194        let removed;
1195        let removed_value;
1196
1197        {
1198            let Some(g) = self.g.upgrade() else {
1199                return;
1200            };
1201
1202            self.g = Weak::new();
1203            let index = mem::take(&mut self.index);
1204
1205            let Some(kind) = self.kind.take() else {
1206                return;
1207            };
1208
1209            let mut broadcasts = g.broadcasts.borrow_mut();
1210
1211            let Entry::Occupied(mut e) = broadcasts.entry(kind) else {
1212                return;
1213            };
1214
1215            removed = e.get_mut().try_remove(index);
1216
1217            if e.get().is_empty() {
1218                removed_value = Some(e.remove());
1219            } else {
1220                removed_value = None;
1221            }
1222        }
1223
1224        // Drop here, to avoid invoking any destructors which might borrow
1225        // shared mutably earlier.
1226        drop(removed);
1227        drop(removed_value);
1228    }
1229}
1230
1231impl Default for Listener {
1232    #[inline]
1233    fn default() -> Self {
1234        Self::new()
1235    }
1236}
1237
1238impl Drop for Listener {
1239    #[inline]
1240    fn drop(&mut self) {
1241        self.clear();
1242    }
1243}
1244
1245/// The handle for state change listening.
1246///
1247/// Dropping or calling [`clear()`] on this handle will remove the associated
1248/// callback from being notified.
1249///
1250/// [`clear()`]: Self::clear
1251pub struct StateListener {
1252    index: usize,
1253    g: Weak<Generic>,
1254}
1255
1256impl StateListener {
1257    /// Construct an empty state listener.
1258    #[inline]
1259    pub const fn new() -> Self {
1260        Self {
1261            index: 0,
1262            g: Weak::new(),
1263        }
1264    }
1265
1266    /// Clear the state listener without dropping it.
1267    ///
1268    /// This will remove the associated callback from being notified.
1269    pub fn clear(&mut self) {
1270        let removed = {
1271            let Some(g) = self.g.upgrade() else {
1272                return;
1273            };
1274
1275            self.g = Weak::new();
1276
1277            g.state_listeners.borrow_mut().try_remove(self.index)
1278        };
1279
1280        drop(removed);
1281    }
1282}
1283
1284impl Default for StateListener {
1285    #[inline]
1286    fn default() -> Self {
1287        Self::new()
1288    }
1289}
1290
1291impl Drop for StateListener {
1292    #[inline]
1293    fn drop(&mut self) {
1294        self.clear();
1295    }
1296}
1297
1298pub(crate) struct BufData {
1299    /// Buffer being used.
1300    pub(crate) data: Vec<u8>,
1301    /// Number of strong references to this buffer.
1302    strong: Cell<usize>,
1303    /// Reference to shared state where the buffer will be recycled to.
1304    g: Weak<Generic>,
1305}
1306
1307impl BufData {
1308    fn with_capacity(g: Weak<Generic>, capacity: usize) -> Self {
1309        Self {
1310            data: Vec::with_capacity(capacity),
1311            strong: Cell::new(0),
1312            g,
1313        }
1314    }
1315
1316    unsafe fn dec(ptr: NonNull<BufData>) {
1317        unsafe {
1318            let count = ptr.as_ref().strong.get().wrapping_sub(1);
1319            ptr.as_ref().strong.set(count);
1320
1321            if count > 0 {
1322                return;
1323            }
1324
1325            let mut buf = Box::from_raw(ptr.as_ptr());
1326
1327            // Try to recycle the buffer if shared is available, else let it be
1328            // dropped and free here.
1329            let Some(g) = buf.as_ref().g.upgrade() else {
1330                return;
1331            };
1332
1333            let mut buffers = g.buffers.borrow_mut();
1334
1335            // Set the length of the recycled buffer.
1336            buf.data.set_len(buf.data.len().min(MAX_CAPACITY));
1337
1338            // We size our buffers to some max capacity to avod overuse in case
1339            // we infrequently need to handle some massive message. If we don't
1340            // shrink the allocation, then memory use can run away over time.
1341            buf.data.shrink_to(MAX_CAPACITY);
1342
1343            buffers.push_back(buf);
1344        }
1345    }
1346
1347    unsafe fn inc(ptr: NonNull<BufData>) {
1348        unsafe {
1349            let count = ptr.as_ref().strong.get().wrapping_add(1);
1350
1351            if count == 0 {
1352                std::process::abort();
1353            }
1354
1355            ptr.as_ref().strong.set(count);
1356        }
1357    }
1358}
1359
1360/// A shared buffer of data that is recycled when dropped.
1361struct BufRc {
1362    data: NonNull<BufData>,
1363}
1364
1365impl BufRc {
1366    fn new(data: Box<BufData>) -> Self {
1367        let data = NonNull::from(Box::leak(data));
1368
1369        unsafe {
1370            BufData::inc(data);
1371        }
1372
1373        Self { data }
1374    }
1375}
1376
1377impl Deref for BufRc {
1378    type Target = [u8];
1379
1380    fn deref(&self) -> &Self::Target {
1381        unsafe { &(*self.data.as_ptr()).data }
1382    }
1383}
1384
1385impl Clone for BufRc {
1386    fn clone(&self) -> Self {
1387        unsafe {
1388            BufData::inc(self.data);
1389        }
1390
1391        Self { data: self.data }
1392    }
1393}
1394
1395impl Drop for BufRc {
1396    fn drop(&mut self) {
1397        unsafe {
1398            BufData::dec(self.data);
1399        }
1400    }
1401}
1402
1403/// A raw packet of data.
1404#[derive(Clone)]
1405pub struct RawPacket {
1406    id: MessageId,
1407    buf: BufRc,
1408    at: Cell<usize>,
1409}
1410
1411impl RawPacket {
1412    /// Decode the contents of a raw packet.
1413    ///
1414    /// This can be called multiple times if there are multiple payloads in
1415    /// sequence of the response.
1416    ///
1417    /// You can check if the packet is empty using [`RawPacket::is_empty`].
1418    pub fn decode<'this, T>(&'this self) -> Result<T>
1419    where
1420        T: Decode<'this, Binary, Global>,
1421    {
1422        let at = self.at.get();
1423
1424        let Some(bytes) = self.buf.get(at..) else {
1425            return Err(Error::new(ErrorKind::Overflow(at, self.buf.len())));
1426        };
1427
1428        let mut reader = SliceReader::new(bytes);
1429
1430        match storage::decode(&mut reader) {
1431            Ok(value) => {
1432                self.at.set(at + bytes.len() - reader.remaining());
1433                Ok(value)
1434            }
1435            Err(error) => {
1436                self.at.set(self.buf.len());
1437                Err(Error::from(error))
1438            }
1439        }
1440    }
1441
1442    /// Check if the packet is empty.
1443    pub fn is_empty(&self) -> bool {
1444        self.at.get() >= self.buf.len()
1445    }
1446
1447    /// The id of the packet this is a response to as specified by
1448    /// [`Endpoint::ID`] or [`Broadcast::ID`].
1449    ///
1450    /// [`Endpoint::ID`]: crate::api::Endpoint::ID
1451    /// [`Broadcast::ID`]: crate::api::Broadcast::ID
1452    pub fn id(&self) -> MessageId {
1453        self.id
1454    }
1455}
1456
1457/// A typed packet of data.
1458#[derive(Clone)]
1459pub struct Packet<T> {
1460    raw: RawPacket,
1461    _marker: PhantomData<T>,
1462}
1463
1464impl<T> Packet<T> {
1465    /// Construct a new typed package from a raw one.
1466    ///
1467    /// Note that this does not guarantee that the typed package is correct, but
1468    /// the `T` parameter becomes associated with it allowing it to be used
1469    /// automatically with methods such as [`Packet::decode`].
1470    #[inline]
1471    pub fn new(raw: RawPacket) -> Self {
1472        Self {
1473            raw,
1474            _marker: PhantomData,
1475        }
1476    }
1477
1478    /// Convert a packet into a raw packet.
1479    ///
1480    /// To determine which endpoint or broadcast it belongs to the
1481    /// [`RawPacket::id`] method can be used.
1482    pub fn into_raw(self) -> RawPacket {
1483        self.raw
1484    }
1485
1486    /// Check if the packet is empty.
1487    pub fn is_empty(&self) -> bool {
1488        self.raw.is_empty()
1489    }
1490
1491    /// The id of the packet this is a response to as specified by
1492    /// [`Endpoint::ID`] or [`Broadcast::ID`].
1493    ///
1494    /// [`Endpoint::ID`]: crate::api::Endpoint::ID
1495    /// [`Broadcast::ID`]: crate::api::Broadcast::ID
1496    pub fn id(&self) -> MessageId {
1497        self.raw.id()
1498    }
1499}
1500
1501impl<T> Packet<T>
1502where
1503    T: api::Endpoint,
1504{
1505    /// Decode the contents of a packet.
1506    ///
1507    /// This can be called multiple times if there are multiple payloads in
1508    /// sequence of the response.
1509    ///
1510    /// You can check if the packet is empty using [`Packet::is_empty`].
1511    pub fn decode(&self) -> Result<T::Response<'_>> {
1512        self.decode_any()
1513    }
1514
1515    /// Decode any contents of a packet.
1516    ///
1517    /// This can be called multiple times if there are multiple payloads in
1518    /// sequence of the response.
1519    ///
1520    /// You can check if the packet is empty using [`Packet::is_empty`].
1521    pub fn decode_any<'de, R>(&'de self) -> Result<R>
1522    where
1523        R: Decode<'de, Binary, Global>,
1524    {
1525        self.raw.decode()
1526    }
1527}
1528
1529impl<T> Packet<T>
1530where
1531    T: api::Broadcast,
1532{
1533    /// Decode the primary event related to a broadcast.
1534    pub fn decode_event<'de>(&'de self) -> Result<T::Event<'de>> {
1535        self.decode_event_any()
1536    }
1537
1538    /// Decode any event related to a broadcast.
1539    pub fn decode_event_any<'de, E>(&'de self) -> Result<E>
1540    where
1541        E: Event<Broadcast = T> + Decode<'de, Binary, Global>,
1542    {
1543        self.raw.decode()
1544    }
1545}
1546
1547/// A handle to the WebSocket service.
1548#[derive(Clone)]
1549#[repr(transparent)]
1550pub struct Handle<H>
1551where
1552    H: WebImpl,
1553{
1554    shared: Weak<Shared<H>>,
1555}
1556
1557impl<H> Handle<H>
1558where
1559    H: WebImpl,
1560{
1561    /// Send a request of type `T`.
1562    ///
1563    /// Returns a handle for the request.
1564    ///
1565    /// If the handle is dropped, the request is cancelled.
1566    ///
1567    /// # Examples
1568    ///
1569    /// ```
1570    /// # extern crate yew021 as yew;
1571    /// use yew::prelude::*;
1572    /// use musli_web::web03::prelude::*;
1573    ///
1574    /// mod api {
1575    ///     use musli::{Decode, Encode};
1576    ///     use musli_web::api;
1577    ///
1578    ///     #[derive(Encode, Decode)]
1579    ///     pub struct HelloRequest<'de> {
1580    ///         pub message: &'de str,
1581    ///     }
1582    ///
1583    ///     #[derive(Encode, Decode)]
1584    ///     pub struct HelloResponse<'de> {
1585    ///         pub message: &'de str,
1586    ///     }
1587    ///
1588    ///     api::define! {
1589    ///         pub type Hello;
1590    ///
1591    ///         impl Endpoint for Hello {
1592    ///             impl<'de> Request for HelloRequest<'de>;
1593    ///             type Response<'de> = HelloResponse<'de>;
1594    ///         }
1595    ///     }
1596    /// }
1597    ///
1598    /// enum Msg {
1599    ///     OnHello(Result<ws::Packet<api::Hello>, ws::Error>),
1600    /// }
1601    ///
1602    /// #[derive(Properties, PartialEq)]
1603    /// struct Props {
1604    ///     ws: ws::Handle,
1605    /// }
1606    ///
1607    /// struct App {
1608    ///     message: String,
1609    ///     _hello: ws::Request,
1610    /// }
1611    ///
1612    /// impl Component for App {
1613    ///     type Message = Msg;
1614    ///     type Properties = Props;
1615    ///
1616    ///     fn create(ctx: &Context<Self>) -> Self {
1617    ///         let hello = ctx.props().ws
1618    ///             .request()
1619    ///             .body(api::HelloRequest { message: "Hello!"})
1620    ///             .on_packet(ctx.link().callback(Msg::OnHello))
1621    ///             .send();
1622    ///
1623    ///         Self {
1624    ///             message: String::from("No Message :("),
1625    ///             _hello: hello,
1626    ///         }
1627    ///     }
1628    ///
1629    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1630    ///         match msg {
1631    ///             Msg::OnHello(Err(error)) => {
1632    ///                 tracing::error!("Request error: {:?}", error);
1633    ///                 false
1634    ///             }
1635    ///             Msg::OnHello(Ok(packet)) => {
1636    ///                 if let Ok(response) = packet.decode() {
1637    ///                     self.message = response.message.to_owned();
1638    ///                 }
1639    ///
1640    ///                 true
1641    ///             }
1642    ///         }
1643    ///     }
1644    ///
1645    ///     fn view(&self, ctx: &Context<Self>) -> Html {
1646    ///         html! {
1647    ///             <div>
1648    ///                 <h1>{"WebSocket Example"}</h1>
1649    ///                 <p>{format!("Message: {}", self.message)}</p>
1650    ///             </div>
1651    ///         }
1652    ///     }
1653    /// }
1654    /// ```
1655    pub fn request(&self) -> RequestBuilder<'_, H, EmptyBody, EmptyCallback> {
1656        RequestBuilder {
1657            shared: &self.shared,
1658            body: EmptyBody,
1659            callback: EmptyCallback,
1660        }
1661    }
1662
1663    /// Listen for broadcasts of type `T`.
1664    ///
1665    /// Returns a handle for the listener that will cancel the listener if
1666    /// dropped.
1667    ///
1668    /// # Examples
1669    ///
1670    /// ```
1671    /// # extern crate yew021 as yew;
1672    /// use yew::prelude::*;
1673    /// use musli_web::web03::prelude::*;
1674    ///
1675    /// mod api {
1676    ///     use musli::{Decode, Encode};
1677    ///     use musli_web::api;
1678    ///
1679    ///     #[derive(Encode, Decode)]
1680    ///     pub struct TickEvent<'de> {
1681    ///         pub message: &'de str,
1682    ///         pub tick: u32,
1683    ///     }
1684    ///
1685    ///     api::define! {
1686    ///         pub type Tick;
1687    ///
1688    ///         impl Broadcast for Tick {
1689    ///             impl<'de> Event for TickEvent<'de>;
1690    ///         }
1691    ///     }
1692    /// }
1693    ///
1694    /// enum Msg {
1695    ///     Tick(Result<ws::Packet<api::Tick>, ws::Error>),
1696    /// }
1697    ///
1698    /// #[derive(Properties, PartialEq)]
1699    /// struct Props {
1700    ///     ws: ws::Handle,
1701    /// }
1702    ///
1703    /// struct App {
1704    ///     tick: u32,
1705    ///     _listen: ws::Listener,
1706    /// }
1707    ///
1708    /// impl Component for App {
1709    ///     type Message = Msg;
1710    ///     type Properties = Props;
1711    ///
1712    ///     fn create(ctx: &Context<Self>) -> Self {
1713    ///         let listen = ctx.props().ws.on_broadcast(ctx.link().callback(Msg::Tick));
1714    ///
1715    ///         Self {
1716    ///             tick: 0,
1717    ///             _listen: listen,
1718    ///         }
1719    ///     }
1720    ///
1721    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1722    ///         match msg {
1723    ///             Msg::Tick(Err(error)) => {
1724    ///                 tracing::error!("Tick error: {error}");
1725    ///                 false
1726    ///             }
1727    ///             Msg::Tick(Ok(packet)) => {
1728    ///                 if let Ok(tick) = packet.decode_event() {
1729    ///                     self.tick = tick.tick;
1730    ///                 }
1731    ///
1732    ///                 true
1733    ///             }
1734    ///         }
1735    ///     }
1736    ///
1737    ///     fn view(&self, ctx: &Context<Self>) -> Html {
1738    ///         html! {
1739    ///             <div>
1740    ///                 <h1>{"WebSocket Example"}</h1>
1741    ///                 <p>{format!("Tick: {}", self.tick)}</p>
1742    ///             </div>
1743    ///         }
1744    ///     }
1745    /// }
1746    /// ```
1747    pub fn on_broadcast<T>(&self, callback: impl Callback<Result<Packet<T>>>) -> Listener
1748    where
1749        T: api::Broadcast,
1750    {
1751        self.on_raw_broadcast::<T>(move |result| match result {
1752            Ok(packet) => callback.call(Ok(Packet::new(packet))),
1753            Err(error) => callback.call(Err(error)),
1754        })
1755    }
1756
1757    /// Listen for broadcasts of type `T`.
1758    ///
1759    /// Returns a handle for the listener that will cancel the listener if
1760    /// dropped.
1761    ///
1762    /// # Examples
1763    ///
1764    /// ```
1765    /// # extern crate yew021 as yew;
1766    /// use yew::prelude::*;
1767    /// use musli_web::web03::prelude::*;
1768    ///
1769    /// mod api {
1770    ///     use musli::{Decode, Encode};
1771    ///     use musli_web::api;
1772    ///
1773    ///     #[derive(Encode, Decode)]
1774    ///     pub struct TickEvent<'de> {
1775    ///         pub message: &'de str,
1776    ///         pub tick: u32,
1777    ///     }
1778    ///
1779    ///     api::define! {
1780    ///         pub type Tick;
1781    ///
1782    ///         impl Broadcast for Tick {
1783    ///             impl<'de> Event for TickEvent<'de>;
1784    ///         }
1785    ///     }
1786    /// }
1787    ///
1788    /// enum Msg {
1789    ///     Tick(Result<ws::RawPacket, ws::Error>),
1790    /// }
1791    ///
1792    /// #[derive(Properties, PartialEq)]
1793    /// struct Props {
1794    ///     ws: ws::Handle,
1795    /// }
1796    ///
1797    /// struct App {
1798    ///     tick: u32,
1799    ///     _listen: ws::Listener,
1800    /// }
1801    ///
1802    /// impl Component for App {
1803    ///     type Message = Msg;
1804    ///     type Properties = Props;
1805    ///
1806    ///     fn create(ctx: &Context<Self>) -> Self {
1807    ///         let link = ctx.link().clone();
1808    ///         let listen = ctx.props().ws.on_raw_broadcast::<api::Tick>(ctx.link().callback(Msg::Tick));
1809    ///
1810    ///         Self {
1811    ///             tick: 0,
1812    ///             _listen: listen,
1813    ///         }
1814    ///     }
1815    ///
1816    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1817    ///         match msg {
1818    ///             Msg::Tick(Err(error)) => {
1819    ///                 tracing::error!("Tick error: {error}");
1820    ///                 false
1821    ///             }
1822    ///             Msg::Tick(Ok(packet)) => {
1823    ///                 if let Ok(tick) = packet.decode::<api::TickEvent>() {
1824    ///                     self.tick = tick.tick;
1825    ///                 }
1826    ///
1827    ///                 true
1828    ///             }
1829    ///         }
1830    ///     }
1831    ///
1832    ///     fn view(&self, ctx: &Context<Self>) -> Html {
1833    ///         html! {
1834    ///             <div>
1835    ///                 <h1>{"WebSocket Example"}</h1>
1836    ///                 <p>{format!("Tick: {}", self.tick)}</p>
1837    ///             </div>
1838    ///         }
1839    ///     }
1840    /// }
1841    /// ```
1842    pub fn on_raw_broadcast<T>(&self, callback: impl Callback<Result<RawPacket>>) -> Listener
1843    where
1844        T: api::Broadcast,
1845    {
1846        let Some(shared) = self.shared.upgrade() else {
1847            return Listener::empty_with_kind(T::ID);
1848        };
1849
1850        let index = {
1851            let mut broadcasts = shared.g.broadcasts.borrow_mut();
1852            let slots = broadcasts.entry(T::ID).or_default();
1853            slots.insert(Rc::new(callback))
1854        };
1855
1856        Listener {
1857            kind: Some(T::ID),
1858            index,
1859            g: Rc::downgrade(&shared.g),
1860        }
1861    }
1862
1863    /// Listen for state changes to the underlying connection.
1864    ///
1865    /// This indicates when the connection is open and ready to receive requests
1866    /// through [`State::Open`], or if it's closed and requests will be queued
1867    /// through [`State::Closed`].
1868    ///
1869    /// Note that if you are connecting through a proxy the reported updates
1870    /// might be volatile. It is always best to send a message over the
1871    /// connection on the server side that once received allows the client to
1872    /// know that it is connected.
1873    ///
1874    /// Dropping the returned handle will cancel the listener.
1875    ///
1876    /// # Examples
1877    ///
1878    /// ```
1879    /// # extern crate yew021 as yew;
1880    /// use yew::prelude::*;
1881    /// use musli_web::web03::prelude::*;
1882    ///
1883    /// enum Msg {
1884    ///     StateChange(ws::State),
1885    /// }
1886    ///
1887    /// #[derive(Properties, PartialEq)]
1888    /// struct Props {
1889    ///     ws: ws::Handle,
1890    /// }
1891    ///
1892    /// struct App {
1893    ///     state: ws::State,
1894    ///     _listen: ws::StateListener,
1895    /// }
1896    ///
1897    /// impl Component for App {
1898    ///     type Message = Msg;
1899    ///     type Properties = Props;
1900    ///
1901    ///     fn create(ctx: &Context<Self>) -> Self {
1902    ///         let link = ctx.link().clone();
1903    ///
1904    ///         let (state, listen) = ctx.props().ws.on_state_change(move |state| {
1905    ///             link.send_message(Msg::StateChange(state));
1906    ///         });
1907    ///
1908    ///         Self {
1909    ///             state,
1910    ///             _listen: listen,
1911    ///         }
1912    ///     }
1913    ///
1914    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1915    ///         match msg {
1916    ///             Msg::StateChange(state) => {
1917    ///                 self.state = state;
1918    ///                 true
1919    ///             }
1920    ///         }
1921    ///     }
1922    ///
1923    ///     fn view(&self, ctx: &Context<Self>) -> Html {
1924    ///         html! {
1925    ///             <div>
1926    ///                 <h1>{"WebSocket Example"}</h1>
1927    ///                 <p>{format!("State: {:?}", self.state)}</p>
1928    ///             </div>
1929    ///         }
1930    ///     }
1931    /// }
1932    pub fn on_state_change(&self, callback: impl Callback<State>) -> (State, StateListener) {
1933        let Some(shared) = self.shared.upgrade() else {
1934            return (
1935                State::Closed,
1936                StateListener {
1937                    index: 0,
1938                    g: Weak::new(),
1939                },
1940            );
1941        };
1942
1943        let (state, index) = {
1944            let index = shared
1945                .g
1946                .state_listeners
1947                .borrow_mut()
1948                .insert(Rc::new(callback));
1949            (shared.state.get(), index)
1950        };
1951
1952        let listener = StateListener {
1953            index,
1954            g: Rc::downgrade(&shared.g),
1955        };
1956
1957        (state, listener)
1958    }
1959}
1960
1961impl<H> PartialEq for Handle<H>
1962where
1963    H: WebImpl,
1964{
1965    #[inline]
1966    fn eq(&self, _: &Self) -> bool {
1967        true
1968    }
1969}
1970
1971struct Pending<C>
1972where
1973    C: ?Sized,
1974{
1975    kind: MessageId,
1976    serial: u32,
1977    callback: C,
1978}
1979
1980impl<C> fmt::Debug for Pending<C> {
1981    #[inline]
1982    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1983        f.debug_struct("Pending")
1984            .field("serial", &self.serial)
1985            .field("kind", &self.kind)
1986            .finish_non_exhaustive()
1987    }
1988}
1989struct ForcePrefix<'a>(&'a str, char);
1990
1991impl fmt::Display for ForcePrefix<'_> {
1992    #[inline]
1993    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1994        let Self(string, prefix) = *self;
1995        prefix.fmt(f)?;
1996        string.trim_start_matches(prefix).fmt(f)?;
1997        Ok(())
1998    }
1999}