Skip to main content

musli_web/
web.rs

1//! The generic web implementation.
2//!
3//! This is specialized over the `H` parameter through modules such as:
4//!
5//! * [`web03`] for `web-sys` `0.3.x`.
6//!
7//! [`web03`]: crate::web03
8
9use core::cell::{Cell, RefCell};
10use core::fmt;
11use core::marker::PhantomData;
12use core::mem;
13use core::ops::Deref;
14use core::ptr::NonNull;
15use std::collections::VecDeque;
16
17use alloc::boxed::Box;
18use alloc::format;
19use alloc::rc::Rc;
20use alloc::rc::Weak;
21use alloc::string::{String, ToString};
22use alloc::vec::Vec;
23
24use std::collections::hash_map::{Entry, HashMap};
25
26use musli::Decode;
27use musli::alloc::Global;
28use musli::mode::Binary;
29use musli::reader::SliceReader;
30use musli::storage;
31
32use slab::Slab;
33
34use crate::api::{self, Event, MessageId};
35
36const MAX_CAPACITY: usize = 1048576;
37
38/// An empty request body.
39#[non_exhaustive]
40pub struct EmptyBody;
41
42/// An empty callback.
43#[non_exhaustive]
44pub struct EmptyCallback;
45
46/// Slab of state listeners.
47type StateListeners = Slab<Rc<dyn Callback<State>>>;
48/// Slab of broadcast listeners.
49type Broadcasts = HashMap<MessageId, Slab<Rc<dyn Callback<Result<RawPacket>>>>>;
50/// Queue of recycled buffers.
51type Buffers = VecDeque<Box<BufData>>;
52/// Pending requests.
53type Requests = HashMap<u32, Box<Pending<dyn Callback<Result<RawPacket>>>>>;
54
55/// Location information for websocket implementation.
56#[doc(hidden)]
57pub struct Location {
58    pub(crate) protocol: String,
59    pub(crate) host: String,
60    pub(crate) port: String,
61}
62
63pub(crate) mod sealed_socket {
64    pub trait Sealed {}
65}
66
67pub(crate) trait SocketImpl
68where
69    Self: Sized + self::sealed_socket::Sealed,
70{
71    #[doc(hidden)]
72    type Handles;
73
74    #[doc(hidden)]
75    fn new(url: &str, handles: &Self::Handles) -> Result<Self, Error>;
76
77    #[doc(hidden)]
78    fn send(&self, data: &[u8]) -> Result<(), Error>;
79
80    #[doc(hidden)]
81    fn close(self) -> Result<(), Error>;
82}
83
84pub(crate) mod sealed_performance {
85    pub trait Sealed {}
86}
87
88pub trait PerformanceImpl
89where
90    Self: Sized + self::sealed_performance::Sealed,
91{
92    #[doc(hidden)]
93    fn now(&self) -> f64;
94}
95
96pub(crate) mod sealed_window {
97    pub trait Sealed {}
98}
99
100pub(crate) trait WindowImpl
101where
102    Self: Sized + self::sealed_window::Sealed,
103{
104    #[doc(hidden)]
105    type Performance: PerformanceImpl;
106
107    #[doc(hidden)]
108    type Timeout;
109
110    #[doc(hidden)]
111    fn new() -> Result<Self, Error>;
112
113    #[doc(hidden)]
114    fn performance(&self) -> Result<Self::Performance, Error>;
115
116    #[doc(hidden)]
117    fn location(&self) -> Result<Location, Error>;
118
119    #[doc(hidden)]
120    fn set_timeout(
121        &self,
122        millis: u32,
123        callback: impl FnOnce() + 'static,
124    ) -> Result<Self::Timeout, Error>;
125}
126
127pub(crate) mod sealed_web {
128    pub trait Sealed {}
129}
130
131/// Central trait for web integration.
132///
133/// Since web integration is currently unstable, this requires multiple
134/// different implementations, each time an ecosystem breaking change is
135/// released.
136///
137/// The crate in focus here is `web-sys`, and the corresponding modules provide
138/// integrations:
139///
140/// * [web03] for `web-sys` `0.3.x`.
141///
142/// [web03]: crate::web03
143pub trait WebImpl
144where
145    Self: 'static + Copy + Sized + self::sealed_web::Sealed,
146{
147    #[doc(hidden)]
148    #[allow(private_bounds)]
149    type Window: WindowImpl;
150
151    #[doc(hidden)]
152    type Handles;
153
154    #[doc(hidden)]
155    #[allow(private_bounds)]
156    type Socket: SocketImpl<Handles = Self::Handles>;
157
158    #[doc(hidden)]
159    #[allow(private_interfaces)]
160    fn handles(shared: &Weak<Shared<Self>>) -> Self::Handles;
161
162    #[doc(hidden)]
163    fn random(range: u32) -> u32;
164}
165
166/// Construct a new [`ServiceBuilder`] associated with the given [`Connect`]
167/// strategy.
168pub fn connect<H>(connect: Connect) -> ServiceBuilder<H, EmptyCallback>
169where
170    H: WebImpl,
171{
172    ServiceBuilder {
173        connect,
174        on_error: EmptyCallback,
175        _marker: PhantomData,
176    }
177}
178
179/// The state of the connection.
180///
181/// A listener for state changes can be set up through for example
182/// [`Handle::on_state_change`].
183#[derive(Debug, PartialEq, Eq, Clone, Copy)]
184#[non_exhaustive]
185pub enum State {
186    /// The connection is open.
187    Open,
188    /// The connection is closed.
189    Closed,
190}
191
192/// Error type for the WebSocket service.
193#[derive(Debug)]
194pub struct Error {
195    kind: ErrorKind,
196}
197
198impl Error {
199    /// Check if the error is caused by an empty packet.
200    ///
201    /// # Examples
202    ///
203    /// ```
204    /// use musli_web::web::{Error, RawPacket};
205    ///
206    /// let packet = RawPacket::empty();
207    /// let e = packet.decode::<u32>().unwrap_err();
208    ///
209    /// assert!(e.is_empty_packet());
210    /// ```
211    pub fn is_empty_packet(&self) -> bool {
212        matches!(self.kind, ErrorKind::EmptyPacket)
213    }
214}
215
216#[derive(Debug)]
217enum ErrorKind {
218    EmptyPacket,
219    Message(String),
220    Storage(storage::Error),
221    Overflow(usize, usize),
222}
223
224impl Error {
225    #[inline]
226    fn new(kind: ErrorKind) -> Self {
227        Self { kind }
228    }
229
230    #[inline]
231    pub(crate) fn msg(message: impl fmt::Display) -> Self {
232        Self::new(ErrorKind::Message(message.to_string()))
233    }
234}
235
236impl fmt::Display for Error {
237    #[inline]
238    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239        match &self.kind {
240            ErrorKind::EmptyPacket => write!(f, "Packet is empty"),
241            ErrorKind::Message(message) => write!(f, "{message}"),
242            ErrorKind::Storage(error) => write!(f, "Encoding error: {error}"),
243            ErrorKind::Overflow(at, len) => {
244                write!(f, "Internal packet overflow, {at} not in range 0-{len}")
245            }
246        }
247    }
248}
249
250impl core::error::Error for Error {
251    #[inline]
252    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
253        match &self.kind {
254            ErrorKind::Storage(error) => Some(error),
255            _ => None,
256        }
257    }
258}
259
260impl From<storage::Error> for Error {
261    #[inline]
262    fn from(error: storage::Error) -> Self {
263        Self::new(ErrorKind::Storage(error))
264    }
265}
266
267#[cfg(feature = "wasm_bindgen02")]
268impl From<wasm_bindgen02::JsValue> for Error {
269    #[inline]
270    fn from(error: wasm_bindgen02::JsValue) -> Self {
271        Self::new(ErrorKind::Message(format!("{error:?}")))
272    }
273}
274
275type Result<T, E = Error> = core::result::Result<T, E>;
276
277const INITIAL_TIMEOUT: u32 = 250;
278const MAX_TIMEOUT: u32 = 4000;
279
280/// How to connect to the websocket.
281enum ConnectKind {
282    Location { path: String },
283    Url { url: String },
284}
285
286/// A specification for how to connect a websocket.
287pub struct Connect {
288    kind: ConnectKind,
289}
290
291impl Connect {
292    /// Connect to the same location with a custom path.
293    ///
294    /// Note that any number of `/` prefixes are ignored, the canonical
295    /// representation always ignores them and the path is relative to the
296    /// current location.
297    #[inline]
298    pub fn location(path: impl AsRef<str>) -> Self {
299        Self {
300            kind: ConnectKind::Location {
301                path: String::from(path.as_ref()),
302            },
303        }
304    }
305
306    /// Connect to the specified URL.
307    #[inline]
308    pub fn url(url: String) -> Self {
309        Self {
310            kind: ConnectKind::Url { url },
311        }
312    }
313}
314
315/// Generic but shared fields which do not depend on specialization over `H`.
316struct Generic {
317    state_listeners: RefCell<StateListeners>,
318    requests: RefCell<Requests>,
319    broadcasts: RefCell<Broadcasts>,
320    buffers: RefCell<Buffers>,
321}
322
323/// Shared implementation details for websocket implementations.
324pub(crate) struct Shared<H>
325where
326    H: WebImpl,
327{
328    connect: Connect,
329    pub(crate) on_error: Box<dyn Callback<Error>>,
330    window: H::Window,
331    performance: <H::Window as WindowImpl>::Performance,
332    handles: H::Handles,
333    state: Cell<State>,
334    opened: Cell<Option<f64>>,
335    serial: Cell<u32>,
336    defer_broadcasts: RefCell<VecDeque<Weak<dyn Callback<Result<RawPacket>>>>>,
337    defer_state_listeners: RefCell<VecDeque<Weak<dyn Callback<State>>>>,
338    pub(crate) socket: RefCell<Option<H::Socket>>,
339    output: RefCell<Vec<u8>>,
340    current_timeout: Cell<u32>,
341    reconnect_timeout: RefCell<Option<<H::Window as WindowImpl>::Timeout>>,
342    g: Rc<Generic>,
343}
344
345impl<H> Drop for Shared<H>
346where
347    H: WebImpl,
348{
349    fn drop(&mut self) {
350        if let Some(s) = self.socket.take()
351            && let Err(e) = s.close()
352        {
353            self.on_error.call(e);
354        }
355
356        // We don't need to worry about mutable borrows here, since we only have
357        // weak references to Shared and by virtue of this being dropped they
358        // are all invalid.
359        let state_listeners = mem::take(&mut *self.g.state_listeners.borrow_mut());
360        let mut requests = self.g.requests.borrow_mut();
361
362        for (_, listener) in state_listeners {
363            listener.call(State::Closed);
364        }
365
366        for (_, p) in requests.drain() {
367            p.callback.call(Err(Error::msg("Websocket service closed")));
368        }
369    }
370}
371
372/// Builder of a service.
373pub struct ServiceBuilder<H, E>
374where
375    H: WebImpl,
376{
377    connect: Connect,
378    on_error: E,
379    _marker: PhantomData<H>,
380}
381
382impl<H, E> ServiceBuilder<H, E>
383where
384    H: WebImpl,
385    E: Callback<Error>,
386{
387    /// Set the error handler to use for the service.
388    pub fn on_error<U>(self, on_error: U) -> ServiceBuilder<H, U>
389    where
390        U: Callback<Error>,
391    {
392        ServiceBuilder {
393            connect: self.connect,
394            on_error,
395            _marker: self._marker,
396        }
397    }
398
399    /// Build a new service and handle.
400    pub fn build(self) -> Service<H> {
401        let window = match H::Window::new() {
402            Ok(window) => window,
403            Err(error) => {
404                panic!("{error}")
405            }
406        };
407
408        let performance = match WindowImpl::performance(&window) {
409            Ok(performance) => performance,
410            Err(error) => {
411                panic!("{error}")
412            }
413        };
414
415        let shared = Rc::<Shared<H>>::new_cyclic(move |shared| Shared {
416            connect: self.connect,
417            on_error: Box::new(self.on_error),
418            window,
419            performance,
420            handles: H::handles(shared),
421            state: Cell::new(State::Closed),
422            opened: Cell::new(None),
423            serial: Cell::new(0),
424            defer_broadcasts: RefCell::new(VecDeque::new()),
425            defer_state_listeners: RefCell::new(VecDeque::new()),
426            socket: RefCell::new(None),
427            output: RefCell::new(Vec::new()),
428            current_timeout: Cell::new(INITIAL_TIMEOUT),
429            reconnect_timeout: RefCell::new(None),
430            g: Rc::new(Generic {
431                state_listeners: RefCell::new(Slab::new()),
432                broadcasts: RefCell::new(HashMap::new()),
433                requests: RefCell::new(Requests::new()),
434                buffers: RefCell::new(VecDeque::new()),
435            }),
436        });
437
438        let handle = Handle {
439            shared: Rc::downgrade(&shared),
440        };
441
442        Service { shared, handle }
443    }
444}
445
446/// The service handle.
447///
448/// Once dropped this will cause the service to be disconnected and all requests
449/// to be cancelled.
450pub struct Service<H>
451where
452    H: WebImpl,
453{
454    shared: Rc<Shared<H>>,
455    handle: Handle<H>,
456}
457
458impl<H> Service<H>
459where
460    H: WebImpl,
461{
462    /// Attempt to establish a websocket connection.
463    pub fn connect(&self) {
464        self.shared.connect()
465    }
466
467    /// Build a handle to the service.
468    pub fn handle(&self) -> &Handle<H> {
469        &self.handle
470    }
471}
472
473impl<H> Shared<H>
474where
475    H: WebImpl,
476{
477    /// Send a client message.
478    fn send_client_request<T>(&self, serial: u32, body: &T) -> Result<()>
479    where
480        T: api::Request,
481    {
482        let Some(ref socket) = *self.socket.borrow() else {
483            return Err(Error::msg("Socket is not connected"));
484        };
485
486        let header = api::RequestHeader {
487            serial,
488            id: <T::Endpoint as api::Endpoint>::ID.get(),
489        };
490
491        let out = &mut *self.output.borrow_mut();
492
493        storage::to_writer(&mut *out, &header)?;
494        storage::to_writer(&mut *out, &body)?;
495
496        tracing::debug!(
497            header.serial,
498            ?header.id,
499            len = out.len(),
500            "Sending request"
501        );
502
503        socket.send(out.as_slice())?;
504
505        out.clear();
506        out.shrink_to(MAX_CAPACITY);
507        Ok(())
508    }
509
510    pub(crate) fn next_buffer(self: &Rc<Self>, needed: usize) -> Box<BufData> {
511        match self.g.buffers.borrow_mut().pop_front() {
512            Some(mut buf) => {
513                if buf.data.capacity() < needed {
514                    buf.data.reserve(needed - buf.data.len());
515                }
516
517                buf
518            }
519            None => Box::new(BufData::with_capacity(Rc::downgrade(&self.g), needed)),
520        }
521    }
522
523    pub(crate) fn message(self: &Rc<Self>, buf: Box<BufData>) -> Result<()> {
524        // Wrap the buffer in a simple shared reference-counted container.
525        let buf = BufRc::new(buf);
526        let mut reader = SliceReader::new(&buf);
527
528        let header: api::ResponseHeader = storage::decode(&mut reader)?;
529
530        if let Some(broadcast) = MessageId::new(header.broadcast) {
531            tracing::debug!(?header, "Got broadcast");
532
533            if !self.prepare_broadcast(broadcast) {
534                return Ok(());
535            };
536
537            if let Some(id) = MessageId::new(header.error) {
538                let error = if id == MessageId::ERROR_MESSAGE {
539                    storage::decode(&mut reader)?
540                } else {
541                    api::ErrorMessage {
542                        message: "Unknown error",
543                    }
544                };
545
546                while let Some(callback) = self.defer_broadcasts.borrow_mut().pop_front() {
547                    if let Some(callback) = callback.upgrade() {
548                        callback.call(Err(Error::msg(error.message)));
549                    }
550                }
551
552                return Ok(());
553            }
554
555            let at = buf.len().saturating_sub(reader.remaining());
556
557            let packet = RawPacket {
558                buf: Some(buf.clone()),
559                at: Cell::new(at),
560                id: broadcast,
561            };
562
563            while let Some(callback) = self.defer_broadcasts.borrow_mut().pop_front() {
564                if let Some(callback) = callback.upgrade() {
565                    callback.call(Ok(packet.clone()));
566                }
567            }
568        } else {
569            tracing::debug!(?header, "Got response");
570
571            let p = {
572                let mut requests = self.g.requests.borrow_mut();
573
574                let Some(p) = requests.remove(&header.serial) else {
575                    tracing::debug!(?header, "Missing request");
576                    return Ok(());
577                };
578
579                p
580            };
581
582            if let Some(id) = MessageId::new(header.error) {
583                let error = if id == MessageId::ERROR_MESSAGE {
584                    storage::decode(&mut reader)?
585                } else {
586                    api::ErrorMessage {
587                        message: "Unknown error",
588                    }
589                };
590
591                p.callback.call(Err(Error::msg(error.message)));
592                return Ok(());
593            }
594
595            let at = buf.len().saturating_sub(reader.remaining());
596
597            let packet = RawPacket {
598                id: p.kind,
599                buf: Some(buf),
600                at: Cell::new(at),
601            };
602
603            p.callback.call(Ok(packet));
604        }
605
606        Ok(())
607    }
608
609    fn prepare_broadcast(self: &Rc<Self>, kind: MessageId) -> bool {
610        // Note: We need to defer this, since the outcome of calling
611        // the broadcast callback might be that the broadcast
612        // listener is modified, which could require mutable access
613        // to broadcasts.
614        let mut defer = self.defer_broadcasts.borrow_mut();
615
616        let broadcasts = self.g.broadcasts.borrow();
617
618        let Some(broadcasts) = broadcasts.get(&kind) else {
619            return false;
620        };
621
622        for (_, callback) in broadcasts.iter() {
623            defer.push_back(Rc::downgrade(callback));
624        }
625
626        !defer.is_empty()
627    }
628
629    pub(crate) fn set_open(&self) {
630        tracing::debug!("Set open");
631        self.opened
632            .set(Some(PerformanceImpl::now(&self.performance)));
633        self.emit_state_change(State::Open);
634    }
635
636    fn is_open_for_a_while(&self) -> bool {
637        let Some(at) = self.opened.get() else {
638            return false;
639        };
640
641        let now = PerformanceImpl::now(&self.performance);
642        (now - at) >= 250.0
643    }
644
645    pub(crate) fn close(self: &Rc<Self>) -> Result<(), Error> {
646        tracing::debug!("Close connection");
647
648        // We need a weak reference back to shared state to handle the timeout.
649        let shared = Rc::downgrade(self);
650
651        tracing::debug!(
652            "Set closed timeout={}, opened={:?}",
653            self.current_timeout.get(),
654            self.opened.get(),
655        );
656
657        if !self.is_open_for_a_while() {
658            let current_timeout = self.current_timeout.get();
659
660            if current_timeout < MAX_TIMEOUT {
661                let fuzz = H::random(50);
662
663                self.current_timeout.set(
664                    current_timeout
665                        .saturating_mul(2)
666                        .saturating_add(fuzz)
667                        .min(MAX_TIMEOUT),
668                );
669            }
670        } else {
671            self.current_timeout.set(INITIAL_TIMEOUT);
672        }
673
674        self.opened.set(None);
675        self.emit_state_change(State::Closed);
676
677        if let Some(s) = self.socket.take() {
678            s.close()?;
679        }
680
681        self.close_pending();
682
683        let timeout = self
684            .window
685            .set_timeout(self.current_timeout.get(), move || {
686                if let Some(shared) = shared.upgrade() {
687                    Self::connect(&shared);
688                }
689            })?;
690
691        drop(self.reconnect_timeout.borrow_mut().replace(timeout));
692        Ok(())
693    }
694
695    /// Close an pending requests with an error, since there is no chance they
696    /// will be responded to any more.
697    fn close_pending(self: &Rc<Self>) {
698        loop {
699            let Some(serial) = self.g.requests.borrow().keys().next().copied() else {
700                break;
701            };
702
703            let p = {
704                let mut requests = self.g.requests.borrow_mut();
705
706                let Some(p) = requests.remove(&serial) else {
707                    break;
708                };
709
710                p
711            };
712
713            p.callback.call(Err(Error::msg("Connection closed")));
714        }
715    }
716
717    fn emit_state_change(&self, state: State) {
718        if self.state.get() == state {
719            return;
720        }
721
722        self.state.set(state);
723
724        {
725            // We need to collect callbacks to avoid the callback recursively
726            // borrowing state listeners, which it would if it modifies any
727            // existing state listeners.
728            let mut defer = self.defer_state_listeners.borrow_mut();
729
730            for (_, callback) in self.g.state_listeners.borrow().iter() {
731                defer.push_back(Rc::downgrade(callback));
732            }
733
734            if defer.is_empty() {
735                return;
736            }
737        }
738
739        while let Some(callback) = self.defer_state_listeners.borrow_mut().pop_front() {
740            if let Some(callback) = callback.upgrade() {
741                callback.call(state);
742            }
743        }
744    }
745
746    fn is_closed(&self) -> bool {
747        self.opened.get().is_none()
748    }
749
750    fn connect(self: &Rc<Self>) {
751        tracing::debug!("Connect");
752
753        if let Err(e) = self.build() {
754            self.on_error.call(e);
755        } else {
756            return;
757        }
758
759        if let Err(e) = self.close() {
760            self.on_error.call(e);
761        }
762    }
763
764    /// Build a websocket connection.
765    fn build(self: &Rc<Self>) -> Result<()> {
766        let url = match &self.connect.kind {
767            ConnectKind::Location { path } => {
768                let Location {
769                    protocol,
770                    host,
771                    port,
772                } = WindowImpl::location(&self.window)?;
773
774                let protocol = match protocol.as_str() {
775                    "https:" => "wss:",
776                    "http:" => "ws:",
777                    other => {
778                        return Err(Error::msg(format_args!(
779                            "Same host connection is not supported for protocol `{other}`"
780                        )));
781                    }
782                };
783
784                let path = ForcePrefix(path, '/');
785                format!("{protocol}//{host}:{port}{path}")
786            }
787            ConnectKind::Url { url } => url.clone(),
788        };
789
790        let ws = SocketImpl::new(&url, &self.handles)?;
791
792        let old = self.socket.borrow_mut().replace(ws);
793
794        if let Some(old) = old {
795            old.close()?;
796        }
797
798        Ok(())
799    }
800}
801
802/// Trait governing how callbacks are called.
803pub trait Callback<I>
804where
805    Self: 'static,
806{
807    /// Call the callback.
808    fn call(&self, input: I);
809}
810
811impl<I> Callback<I> for EmptyCallback {
812    #[inline]
813    fn call(&self, _: I) {}
814}
815
816impl<F, I> Callback<I> for F
817where
818    F: 'static + Fn(I),
819{
820    #[inline]
821    fn call(&self, input: I) {
822        self(input)
823    }
824}
825
826/// A request builder .
827///
828/// Associate the callback to be used by using either
829/// [`RequestBuilder::on_packet`] or [`RequestBuilder::on_raw_packet`] depending
830/// on your needs.
831///
832/// Send the request with [`RequestBuilder::send`].
833pub struct RequestBuilder<'a, H, B, C>
834where
835    H: WebImpl,
836{
837    shared: &'a Weak<Shared<H>>,
838    body: B,
839    callback: C,
840}
841
842impl<'a, H, B, C> RequestBuilder<'a, H, B, C>
843where
844    H: WebImpl,
845{
846    /// Set the body of the request.
847    #[inline]
848    pub fn body<U>(self, body: U) -> RequestBuilder<'a, H, U, C>
849    where
850        U: api::Request,
851    {
852        RequestBuilder {
853            shared: self.shared,
854            body,
855            callback: self.callback,
856        }
857    }
858
859    /// Handle the response using the specified callback.
860    ///
861    /// # Examples
862    ///
863    /// ```
864    /// # extern crate yew021 as yew;
865    /// use yew::prelude::*;
866    /// use musli_web::web03::prelude::*;
867    ///
868    /// mod api {
869    ///     use musli::{Decode, Encode};
870    ///     use musli_web::api;
871    ///
872    ///     #[derive(Encode, Decode)]
873    ///     pub struct HelloRequest<'de> {
874    ///         pub message: &'de str,
875    ///     }
876    ///
877    ///     #[derive(Encode, Decode)]
878    ///     pub struct HelloResponse<'de> {
879    ///         pub message: &'de str,
880    ///     }
881    ///
882    ///     api::define! {
883    ///         pub type Hello;
884    ///
885    ///         impl Endpoint for Hello {
886    ///             impl<'de> Request for HelloRequest<'de>;
887    ///             type Response<'de> = HelloResponse<'de>;
888    ///         }
889    ///     }
890    /// }
891    ///
892    /// enum Msg {
893    ///     OnHello(Result<ws::Packet<api::Hello>, ws::Error>),
894    /// }
895    ///
896    /// #[derive(Properties, PartialEq)]
897    /// struct Props {
898    ///     ws: ws::Handle,
899    /// }
900    ///
901    /// struct App {
902    ///     message: String,
903    ///     _hello: ws::Request,
904    /// }
905    ///
906    /// impl Component for App {
907    ///     type Message = Msg;
908    ///     type Properties = Props;
909    ///
910    ///     fn create(ctx: &Context<Self>) -> Self {
911    ///         let hello = ctx.props().ws
912    ///             .request()
913    ///             .body(api::HelloRequest { message: "Hello!"})
914    ///             .on_packet(ctx.link().callback(Msg::OnHello))
915    ///             .send();
916    ///
917    ///         Self {
918    ///             message: String::from("No Message :("),
919    ///             _hello: hello,
920    ///         }
921    ///     }
922    ///
923    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
924    ///         match msg {
925    ///             Msg::OnHello(Err(error)) => {
926    ///                 tracing::error!("Request error: {:?}", error);
927    ///                 false
928    ///             }
929    ///             Msg::OnHello(Ok(packet)) => {
930    ///                 if let Ok(response) = packet.decode() {
931    ///                     self.message = response.message.to_owned();
932    ///                 }
933    ///
934    ///                 true
935    ///             }
936    ///         }
937    ///     }
938    ///
939    ///     fn view(&self, ctx: &Context<Self>) -> Html {
940    ///         html! {
941    ///             <div>
942    ///                 <h1>{"WebSocket Example"}</h1>
943    ///                 <p>{format!("Message: {}", self.message)}</p>
944    ///             </div>
945    ///         }
946    ///     }
947    /// }
948    /// ```
949    pub fn on_packet<E>(
950        self,
951        callback: impl Callback<Result<Packet<E>>>,
952    ) -> RequestBuilder<'a, H, B, impl Callback<Result<RawPacket>>>
953    where
954        E: api::Endpoint,
955    {
956        self.on_raw_packet(move |result: Result<RawPacket>| match result {
957            Ok(ok) => callback.call(Ok(Packet::new(ok))),
958            Err(err) => callback.call(Err(err)),
959        })
960    }
961
962    /// Handle the response using the specified callback.
963    ///
964    /// # Examples
965    ///
966    /// ```
967    /// # extern crate yew021 as yew;
968    /// use yew::prelude::*;
969    /// use musli_web::web03::prelude::*;
970    ///
971    /// mod api {
972    ///     use musli::{Decode, Encode};
973    ///     use musli_web::api;
974    ///
975    ///     #[derive(Encode, Decode)]
976    ///     pub struct HelloRequest<'de> {
977    ///         pub message: &'de str,
978    ///     }
979    ///
980    ///     #[derive(Encode, Decode)]
981    ///     pub struct HelloResponse<'de> {
982    ///         pub message: &'de str,
983    ///     }
984    ///
985    ///     api::define! {
986    ///         pub type Hello;
987    ///
988    ///         impl Endpoint for Hello {
989    ///             impl<'de> Request for HelloRequest<'de>;
990    ///             type Response<'de> = HelloResponse<'de>;
991    ///         }
992    ///     }
993    /// }
994    ///
995    /// enum Msg {
996    ///     OnHello(Result<ws::RawPacket, ws::Error>),
997    /// }
998    ///
999    /// #[derive(Properties, PartialEq)]
1000    /// struct Props {
1001    ///     ws: ws::Handle,
1002    /// }
1003    ///
1004    /// struct App {
1005    ///     message: String,
1006    ///     _hello: ws::Request,
1007    /// }
1008    ///
1009    /// impl Component for App {
1010    ///     type Message = Msg;
1011    ///     type Properties = Props;
1012    ///
1013    ///     fn create(ctx: &Context<Self>) -> Self {
1014    ///         let link = ctx.link().clone();
1015    ///
1016    ///         let hello = ctx.props().ws
1017    ///             .request()
1018    ///             .body(api::HelloRequest { message: "Hello!"})
1019    ///             .on_raw_packet(move |packet| link.send_message(Msg::OnHello(packet)))
1020    ///             .send();
1021    ///
1022    ///         Self {
1023    ///             message: String::from("No Message :("),
1024    ///             _hello: hello,
1025    ///         }
1026    ///     }
1027    ///
1028    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1029    ///         match msg {
1030    ///             Msg::OnHello(Err(error)) => {
1031    ///                 tracing::error!("Request error: {:?}", error);
1032    ///                 false
1033    ///             }
1034    ///             Msg::OnHello(Ok(packet)) => {
1035    ///                 if let Ok(response) = packet.decode::<api::HelloResponse>() {
1036    ///                     self.message = response.message.to_owned();
1037    ///                 }
1038    ///
1039    ///                 true
1040    ///             }
1041    ///         }
1042    ///     }
1043    ///
1044    ///     fn view(&self, ctx: &Context<Self>) -> Html {
1045    ///         html! {
1046    ///             <div>
1047    ///                 <h1>{"WebSocket Example"}</h1>
1048    ///                 <p>{format!("Message: {}", self.message)}</p>
1049    ///             </div>
1050    ///         }
1051    ///     }
1052    /// }
1053    /// ```
1054    pub fn on_raw_packet<U>(self, callback: U) -> RequestBuilder<'a, H, B, U>
1055    where
1056        U: Callback<Result<RawPacket, Error>>,
1057    {
1058        RequestBuilder {
1059            shared: self.shared,
1060            body: self.body,
1061            callback,
1062        }
1063    }
1064}
1065
1066impl<'a, H, B, C> RequestBuilder<'a, H, B, C>
1067where
1068    B: api::Request,
1069    C: Callback<Result<RawPacket>>,
1070    H: WebImpl,
1071{
1072    /// Send the request.
1073    ///
1074    /// This requires that a body has been set using [`RequestBuilder::body`].
1075    pub fn send(self) -> Request {
1076        let Some(shared) = self.shared.upgrade() else {
1077            self.callback
1078                .call(Err(Error::msg("WebSocket service is down")));
1079            return Request::new();
1080        };
1081
1082        if shared.is_closed() {
1083            self.callback
1084                .call(Err(Error::msg("WebSocket is not connected")));
1085            return Request::new();
1086        }
1087
1088        let serial = shared.serial.get();
1089
1090        if let Err(error) = shared.send_client_request(serial, &self.body) {
1091            shared.on_error.call(error);
1092            return Request::new();
1093        }
1094
1095        shared.serial.set(serial.wrapping_add(1));
1096
1097        let pending = Pending {
1098            kind: <B::Endpoint as api::Endpoint>::ID,
1099            serial,
1100            callback: self.callback,
1101        };
1102
1103        let existing = shared
1104            .g
1105            .requests
1106            .borrow_mut()
1107            .insert(serial, Box::new(pending));
1108
1109        if let Some(p) = existing {
1110            p.callback.call(Err(Error::msg("Request cancelled")));
1111        }
1112
1113        Request {
1114            serial,
1115            g: Rc::downgrade(&shared.g),
1116        }
1117    }
1118}
1119
1120/// The handle for a pending request.
1121///
1122/// Dropping or [`clear()`] this handle will cancel the request.
1123///
1124/// [`clear()`]: Self::clear
1125pub struct Request {
1126    serial: u32,
1127    g: Weak<Generic>,
1128}
1129
1130impl Request {
1131    /// An empty request handler.
1132    #[inline]
1133    pub const fn new() -> Self {
1134        Self {
1135            serial: 0,
1136            g: Weak::new(),
1137        }
1138    }
1139
1140    /// Clear the request handle without dropping it, cancelling any pending
1141    /// requests.
1142    pub fn clear(&mut self) {
1143        let removed = {
1144            let serial = mem::take(&mut self.serial);
1145
1146            let Some(g) = self.g.upgrade() else {
1147                return;
1148            };
1149
1150            self.g = Weak::new();
1151
1152            let Some(p) = g.requests.borrow_mut().remove(&serial) else {
1153                return;
1154            };
1155
1156            p
1157        };
1158
1159        drop(removed);
1160    }
1161
1162    /// Indicate if the update request is pending.
1163    pub fn is_pending(&self) -> bool {
1164        let Some(g) = self.g.upgrade() else {
1165            return false;
1166        };
1167
1168        g.requests.borrow().contains_key(&self.serial)
1169    }
1170}
1171
1172impl Default for Request {
1173    #[inline]
1174    fn default() -> Self {
1175        Self::new()
1176    }
1177}
1178
1179impl Drop for Request {
1180    #[inline]
1181    fn drop(&mut self) {
1182        self.clear();
1183    }
1184}
1185
1186/// The handle for a pending request.
1187///
1188/// Dropping or calling [`clear()`] on this handle remove the listener.
1189///
1190/// [`clear()`]: Self::clear
1191pub struct Listener {
1192    kind: Option<MessageId>,
1193    index: usize,
1194    g: Weak<Generic>,
1195}
1196
1197impl Listener {
1198    /// Construct an empty listener.
1199    #[inline]
1200    pub const fn new() -> Self {
1201        Self {
1202            kind: None,
1203            index: 0,
1204            g: Weak::new(),
1205        }
1206    }
1207
1208    /// Build up an empty listener with the specified kind.
1209    #[inline]
1210    pub(crate) const fn empty_with_kind(kind: MessageId) -> Self {
1211        Self {
1212            kind: Some(kind),
1213            index: 0,
1214            g: Weak::new(),
1215        }
1216    }
1217
1218    /// Clear the listener without dropping it.
1219    ///
1220    /// This will remove the associated broadcast listener from being notified.
1221    pub fn clear(&mut self) {
1222        // Gather values here to drop them outside of the upgrade block.
1223        let removed;
1224        let removed_value;
1225
1226        {
1227            let Some(g) = self.g.upgrade() else {
1228                return;
1229            };
1230
1231            self.g = Weak::new();
1232            let index = mem::take(&mut self.index);
1233
1234            let Some(kind) = self.kind.take() else {
1235                return;
1236            };
1237
1238            let mut broadcasts = g.broadcasts.borrow_mut();
1239
1240            let Entry::Occupied(mut e) = broadcasts.entry(kind) else {
1241                return;
1242            };
1243
1244            removed = e.get_mut().try_remove(index);
1245
1246            if e.get().is_empty() {
1247                removed_value = Some(e.remove());
1248            } else {
1249                removed_value = None;
1250            }
1251        }
1252
1253        // Drop here, to avoid invoking any destructors which might borrow
1254        // shared mutably earlier.
1255        drop(removed);
1256        drop(removed_value);
1257    }
1258}
1259
1260impl Default for Listener {
1261    #[inline]
1262    fn default() -> Self {
1263        Self::new()
1264    }
1265}
1266
1267impl Drop for Listener {
1268    #[inline]
1269    fn drop(&mut self) {
1270        self.clear();
1271    }
1272}
1273
1274/// The handle for state change listening.
1275///
1276/// Dropping or calling [`clear()`] on this handle will remove the associated
1277/// callback from being notified.
1278///
1279/// [`clear()`]: Self::clear
1280pub struct StateListener {
1281    index: usize,
1282    g: Weak<Generic>,
1283}
1284
1285impl StateListener {
1286    /// Construct an empty state listener.
1287    #[inline]
1288    pub const fn new() -> Self {
1289        Self {
1290            index: 0,
1291            g: Weak::new(),
1292        }
1293    }
1294
1295    /// Clear the state listener without dropping it.
1296    ///
1297    /// This will remove the associated callback from being notified.
1298    pub fn clear(&mut self) {
1299        let removed = {
1300            let Some(g) = self.g.upgrade() else {
1301                return;
1302            };
1303
1304            self.g = Weak::new();
1305
1306            g.state_listeners.borrow_mut().try_remove(self.index)
1307        };
1308
1309        drop(removed);
1310    }
1311}
1312
1313impl Default for StateListener {
1314    #[inline]
1315    fn default() -> Self {
1316        Self::new()
1317    }
1318}
1319
1320impl Drop for StateListener {
1321    #[inline]
1322    fn drop(&mut self) {
1323        self.clear();
1324    }
1325}
1326
1327pub(crate) struct BufData {
1328    /// Buffer being used.
1329    pub(crate) data: Vec<u8>,
1330    /// Number of strong references to this buffer.
1331    strong: Cell<usize>,
1332    /// Reference to shared state where the buffer will be recycled to.
1333    g: Weak<Generic>,
1334}
1335
1336impl BufData {
1337    fn with_capacity(g: Weak<Generic>, capacity: usize) -> Self {
1338        Self {
1339            data: Vec::with_capacity(capacity),
1340            strong: Cell::new(0),
1341            g,
1342        }
1343    }
1344
1345    unsafe fn dec(ptr: NonNull<BufData>) {
1346        unsafe {
1347            let count = ptr.as_ref().strong.get().wrapping_sub(1);
1348            ptr.as_ref().strong.set(count);
1349
1350            if count > 0 {
1351                return;
1352            }
1353
1354            let mut buf = Box::from_raw(ptr.as_ptr());
1355
1356            // Try to recycle the buffer if shared is available, else let it be
1357            // dropped and free here.
1358            let Some(g) = buf.as_ref().g.upgrade() else {
1359                return;
1360            };
1361
1362            let mut buffers = g.buffers.borrow_mut();
1363
1364            // Set the length of the recycled buffer.
1365            buf.data.set_len(buf.data.len().min(MAX_CAPACITY));
1366
1367            // We size our buffers to some max capacity to avod overuse in case
1368            // we infrequently need to handle some massive message. If we don't
1369            // shrink the allocation, then memory use can run away over time.
1370            buf.data.shrink_to(MAX_CAPACITY);
1371
1372            buffers.push_back(buf);
1373        }
1374    }
1375
1376    unsafe fn inc(ptr: NonNull<BufData>) {
1377        unsafe {
1378            let count = ptr.as_ref().strong.get().wrapping_add(1);
1379
1380            if count == 0 {
1381                std::process::abort();
1382            }
1383
1384            ptr.as_ref().strong.set(count);
1385        }
1386    }
1387}
1388
1389/// A shared buffer of data that is recycled when dropped.
1390struct BufRc {
1391    data: NonNull<BufData>,
1392}
1393
1394impl BufRc {
1395    fn new(data: Box<BufData>) -> Self {
1396        let data = NonNull::from(Box::leak(data));
1397
1398        unsafe {
1399            BufData::inc(data);
1400        }
1401
1402        Self { data }
1403    }
1404}
1405
1406impl Deref for BufRc {
1407    type Target = [u8];
1408
1409    fn deref(&self) -> &Self::Target {
1410        unsafe { &(*self.data.as_ptr()).data }
1411    }
1412}
1413
1414impl Clone for BufRc {
1415    fn clone(&self) -> Self {
1416        unsafe {
1417            BufData::inc(self.data);
1418        }
1419
1420        Self { data: self.data }
1421    }
1422}
1423
1424impl Drop for BufRc {
1425    fn drop(&mut self) {
1426        unsafe {
1427            BufData::dec(self.data);
1428        }
1429    }
1430}
1431
1432/// A raw packet of data.
1433#[derive(Clone)]
1434pub struct RawPacket {
1435    id: MessageId,
1436    buf: Option<BufRc>,
1437    at: Cell<usize>,
1438}
1439
1440impl RawPacket {
1441    /// Construct an empty raw packet.
1442    ///
1443    /// # Examples
1444    ///
1445    /// ```
1446    /// use musli_web::api::MessageId;
1447    /// use musli_web::web::RawPacket;
1448    ///
1449    /// let packet = RawPacket::empty();
1450    ///
1451    /// assert!(packet.is_empty());
1452    /// assert_eq!(packet.id(), MessageId::EMPTY);
1453    /// ```
1454    pub const fn empty() -> Self {
1455        Self {
1456            id: MessageId::EMPTY,
1457            buf: None,
1458            at: Cell::new(0),
1459        }
1460    }
1461
1462    /// Decode the contents of a raw packet.
1463    ///
1464    /// This can be called multiple times if there are multiple payloads in
1465    /// sequence of the response.
1466    ///
1467    /// You can check if the packet is empty using [`RawPacket::is_empty`].
1468    pub fn decode<'this, T>(&'this self) -> Result<T>
1469    where
1470        T: Decode<'this, Binary, Global>,
1471    {
1472        let at = self.at.get();
1473
1474        if self.id == MessageId::EMPTY {
1475            return Err(Error::new(ErrorKind::EmptyPacket));
1476        }
1477
1478        let Some(bytes) = self.as_slice().get(at..) else {
1479            return Err(Error::new(ErrorKind::Overflow(at, self.as_slice().len())));
1480        };
1481
1482        let mut reader = SliceReader::new(bytes);
1483
1484        match storage::decode(&mut reader) {
1485            Ok(value) => {
1486                self.at.set(at + bytes.len() - reader.remaining());
1487                Ok(value)
1488            }
1489            Err(error) => {
1490                self.at.set(self.len());
1491                Err(Error::from(error))
1492            }
1493        }
1494    }
1495
1496    /// Get the underlying byte slice of the packet.
1497    ///
1498    /// # Examples
1499    ///
1500    /// ```
1501    /// use musli_web::web::RawPacket;
1502    ///
1503    /// let packet = RawPacket::empty();
1504    /// assert_eq!(packet.as_slice(), &[]);
1505    /// ```
1506    pub fn as_slice(&self) -> &[u8] {
1507        match &self.buf {
1508            Some(buf) => buf.as_ref(),
1509            None => &[],
1510        }
1511    }
1512
1513    /// Get the length of the packet.
1514    ///
1515    /// # Examples
1516    ////
1517    /// ```
1518    /// use musli_web::web::RawPacket;
1519    ///
1520    /// let packet = RawPacket::empty();
1521    /// assert_eq!(packet.len(), 0);
1522    /// ```
1523    pub fn len(&self) -> usize {
1524        match &self.buf {
1525            Some(buf) => buf.len(),
1526            None => 0,
1527        }
1528    }
1529
1530    /// Check if the packet is empty.
1531    ///
1532    /// # Examples
1533    ///
1534    /// ```
1535    /// use musli_web::web::RawPacket;
1536    ///
1537    /// let packet = RawPacket::empty();
1538    /// assert!(packet.is_empty());
1539    /// ```
1540    pub fn is_empty(&self) -> bool {
1541        self.at.get() >= self.len()
1542    }
1543
1544    /// The id of the packet this is a response to as specified by
1545    /// [`Endpoint::ID`] or [`Broadcast::ID`].
1546    ///
1547    /// [`Endpoint::ID`]: crate::api::Endpoint::ID
1548    /// [`Broadcast::ID`]: crate::api::Broadcast::ID
1549    pub fn id(&self) -> MessageId {
1550        self.id
1551    }
1552}
1553
1554/// A typed packet of data.
1555#[derive(Clone)]
1556pub struct Packet<T> {
1557    raw: RawPacket,
1558    _marker: PhantomData<T>,
1559}
1560
1561impl<T> Packet<T> {
1562    /// Construct an empty package.
1563    ///
1564    /// # Examples
1565    ///
1566    /// ```
1567    /// use musli_web::api::MessageId;
1568    /// use musli_web::web::Packet;
1569    ///
1570    /// let packet = Packet::<()>::empty();
1571    ///
1572    /// assert!(packet.is_empty());
1573    /// assert_eq!(packet.id(), MessageId::EMPTY);
1574    /// ```
1575    pub const fn empty() -> Self {
1576        Self {
1577            raw: RawPacket::empty(),
1578            _marker: PhantomData,
1579        }
1580    }
1581
1582    /// Construct a new typed package from a raw one.
1583    ///
1584    /// Note that this does not guarantee that the typed package is correct, but
1585    /// the `T` parameter becomes associated with it allowing it to be used
1586    /// automatically with methods such as [`Packet::decode`].
1587    #[inline]
1588    pub fn new(raw: RawPacket) -> Self {
1589        Self {
1590            raw,
1591            _marker: PhantomData,
1592        }
1593    }
1594
1595    /// Convert a packet into a raw packet.
1596    ///
1597    /// To determine which endpoint or broadcast it belongs to the
1598    /// [`RawPacket::id`] method can be used.
1599    pub fn into_raw(self) -> RawPacket {
1600        self.raw
1601    }
1602
1603    /// Check if the packet is empty.
1604    ///
1605    /// # Examples
1606    ///
1607    /// ```
1608    /// use musli_web::web::Packet;
1609    ///
1610    /// let packet = Packet::<()>::empty();
1611    /// assert!(packet.is_empty());
1612    /// ```
1613    pub fn is_empty(&self) -> bool {
1614        self.raw.is_empty()
1615    }
1616
1617    /// The id of the packet this is a response to as specified by
1618    /// [`Endpoint::ID`] or [`Broadcast::ID`].
1619    ///
1620    /// [`Endpoint::ID`]: crate::api::Endpoint::ID
1621    /// [`Broadcast::ID`]: crate::api::Broadcast::ID
1622    pub fn id(&self) -> MessageId {
1623        self.raw.id()
1624    }
1625}
1626
1627impl<T> Packet<T>
1628where
1629    T: api::Decodable,
1630{
1631    /// Decode the contents of a packet.
1632    ///
1633    /// This can be called multiple times if there are multiple payloads in
1634    /// sequence of the response.
1635    ///
1636    /// You can check if the packet is empty using [`Packet::is_empty`].
1637    pub fn decode(&self) -> Result<T::Type<'_>> {
1638        self.decode_any()
1639    }
1640
1641    /// Decode any contents of a packet.
1642    ///
1643    /// This can be called multiple times if there are multiple payloads in
1644    /// sequence of the response.
1645    ///
1646    /// You can check if the packet is empty using [`Packet::is_empty`].
1647    pub fn decode_any<'de, R>(&'de self) -> Result<R>
1648    where
1649        R: Decode<'de, Binary, Global>,
1650    {
1651        self.raw.decode()
1652    }
1653}
1654
1655impl<T> Packet<T>
1656where
1657    T: api::Endpoint,
1658{
1659    /// Decode the contents of a packet.
1660    ///
1661    /// This can be called multiple times if there are multiple payloads in
1662    /// sequence of the response.
1663    ///
1664    /// You can check if the packet is empty using [`Packet::is_empty`].
1665    pub fn decode_response(&self) -> Result<T::Response<'_>> {
1666        self.decode_any_response()
1667    }
1668
1669    /// Decode any contents of a packet.
1670    ///
1671    /// This can be called multiple times if there are multiple payloads in
1672    /// sequence of the response.
1673    ///
1674    /// You can check if the packet is empty using [`Packet::is_empty`].
1675    pub fn decode_any_response<'de, R>(&'de self) -> Result<R>
1676    where
1677        R: Decode<'de, Binary, Global>,
1678    {
1679        self.raw.decode()
1680    }
1681}
1682
1683impl<T> Packet<T>
1684where
1685    T: api::Broadcast,
1686{
1687    /// Decode the primary event related to a broadcast.
1688    pub fn decode_event<'de>(&'de self) -> Result<T::Event<'de>>
1689    where
1690        T: api::BroadcastWithEvent,
1691    {
1692        self.decode_event_any()
1693    }
1694
1695    /// Decode any event related to a broadcast.
1696    pub fn decode_event_any<'de, E>(&'de self) -> Result<E>
1697    where
1698        E: Event<Broadcast = T> + Decode<'de, Binary, Global>,
1699    {
1700        self.raw.decode()
1701    }
1702}
1703
1704/// A handle to the WebSocket service.
1705#[derive(Clone)]
1706#[repr(transparent)]
1707pub struct Handle<H>
1708where
1709    H: WebImpl,
1710{
1711    shared: Weak<Shared<H>>,
1712}
1713
1714impl<H> Handle<H>
1715where
1716    H: WebImpl,
1717{
1718    /// Send a request of type `T`.
1719    ///
1720    /// Returns a handle for the request.
1721    ///
1722    /// If the handle is dropped, the request is cancelled.
1723    ///
1724    /// # Examples
1725    ///
1726    /// ```
1727    /// # extern crate yew021 as yew;
1728    /// use yew::prelude::*;
1729    /// use musli_web::web03::prelude::*;
1730    ///
1731    /// mod api {
1732    ///     use musli::{Decode, Encode};
1733    ///     use musli_web::api;
1734    ///
1735    ///     #[derive(Encode, Decode)]
1736    ///     pub struct HelloRequest<'de> {
1737    ///         pub message: &'de str,
1738    ///     }
1739    ///
1740    ///     #[derive(Encode, Decode)]
1741    ///     pub struct HelloResponse<'de> {
1742    ///         pub message: &'de str,
1743    ///     }
1744    ///
1745    ///     api::define! {
1746    ///         pub type Hello;
1747    ///
1748    ///         impl Endpoint for Hello {
1749    ///             impl<'de> Request for HelloRequest<'de>;
1750    ///             type Response<'de> = HelloResponse<'de>;
1751    ///         }
1752    ///     }
1753    /// }
1754    ///
1755    /// enum Msg {
1756    ///     OnHello(Result<ws::Packet<api::Hello>, ws::Error>),
1757    /// }
1758    ///
1759    /// #[derive(Properties, PartialEq)]
1760    /// struct Props {
1761    ///     ws: ws::Handle,
1762    /// }
1763    ///
1764    /// struct App {
1765    ///     message: String,
1766    ///     _hello: ws::Request,
1767    /// }
1768    ///
1769    /// impl Component for App {
1770    ///     type Message = Msg;
1771    ///     type Properties = Props;
1772    ///
1773    ///     fn create(ctx: &Context<Self>) -> Self {
1774    ///         let hello = ctx.props().ws
1775    ///             .request()
1776    ///             .body(api::HelloRequest { message: "Hello!"})
1777    ///             .on_packet(ctx.link().callback(Msg::OnHello))
1778    ///             .send();
1779    ///
1780    ///         Self {
1781    ///             message: String::from("No Message :("),
1782    ///             _hello: hello,
1783    ///         }
1784    ///     }
1785    ///
1786    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1787    ///         match msg {
1788    ///             Msg::OnHello(Err(error)) => {
1789    ///                 tracing::error!("Request error: {:?}", error);
1790    ///                 false
1791    ///             }
1792    ///             Msg::OnHello(Ok(packet)) => {
1793    ///                 if let Ok(response) = packet.decode() {
1794    ///                     self.message = response.message.to_owned();
1795    ///                 }
1796    ///
1797    ///                 true
1798    ///             }
1799    ///         }
1800    ///     }
1801    ///
1802    ///     fn view(&self, ctx: &Context<Self>) -> Html {
1803    ///         html! {
1804    ///             <div>
1805    ///                 <h1>{"WebSocket Example"}</h1>
1806    ///                 <p>{format!("Message: {}", self.message)}</p>
1807    ///             </div>
1808    ///         }
1809    ///     }
1810    /// }
1811    /// ```
1812    pub fn request(&self) -> RequestBuilder<'_, H, EmptyBody, EmptyCallback> {
1813        RequestBuilder {
1814            shared: &self.shared,
1815            body: EmptyBody,
1816            callback: EmptyCallback,
1817        }
1818    }
1819
1820    /// Listen for broadcasts of type `T`.
1821    ///
1822    /// Returns a handle for the listener that will cancel the listener if
1823    /// dropped.
1824    ///
1825    /// # Examples
1826    ///
1827    /// ```
1828    /// # extern crate yew021 as yew;
1829    /// use yew::prelude::*;
1830    /// use musli_web::web03::prelude::*;
1831    ///
1832    /// mod api {
1833    ///     use musli::{Decode, Encode};
1834    ///     use musli_web::api;
1835    ///
1836    ///     #[derive(Encode, Decode)]
1837    ///     pub struct TickEvent<'de> {
1838    ///         pub message: &'de str,
1839    ///         pub tick: u32,
1840    ///     }
1841    ///
1842    ///     api::define! {
1843    ///         pub type Tick;
1844    ///
1845    ///         impl Broadcast for Tick {
1846    ///             impl<'de> Event for TickEvent<'de>;
1847    ///         }
1848    ///     }
1849    /// }
1850    ///
1851    /// enum Msg {
1852    ///     Tick(Result<ws::Packet<api::Tick>, ws::Error>),
1853    /// }
1854    ///
1855    /// #[derive(Properties, PartialEq)]
1856    /// struct Props {
1857    ///     ws: ws::Handle,
1858    /// }
1859    ///
1860    /// struct App {
1861    ///     tick: u32,
1862    ///     _listen: ws::Listener,
1863    /// }
1864    ///
1865    /// impl Component for App {
1866    ///     type Message = Msg;
1867    ///     type Properties = Props;
1868    ///
1869    ///     fn create(ctx: &Context<Self>) -> Self {
1870    ///         let listen = ctx.props().ws.on_broadcast(ctx.link().callback(Msg::Tick));
1871    ///
1872    ///         Self {
1873    ///             tick: 0,
1874    ///             _listen: listen,
1875    ///         }
1876    ///     }
1877    ///
1878    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1879    ///         match msg {
1880    ///             Msg::Tick(Err(error)) => {
1881    ///                 tracing::error!("Tick error: {error}");
1882    ///                 false
1883    ///             }
1884    ///             Msg::Tick(Ok(packet)) => {
1885    ///                 if let Ok(tick) = packet.decode_event() {
1886    ///                     self.tick = tick.tick;
1887    ///                 }
1888    ///
1889    ///                 true
1890    ///             }
1891    ///         }
1892    ///     }
1893    ///
1894    ///     fn view(&self, ctx: &Context<Self>) -> Html {
1895    ///         html! {
1896    ///             <div>
1897    ///                 <h1>{"WebSocket Example"}</h1>
1898    ///                 <p>{format!("Tick: {}", self.tick)}</p>
1899    ///             </div>
1900    ///         }
1901    ///     }
1902    /// }
1903    /// ```
1904    pub fn on_broadcast<T>(&self, callback: impl Callback<Result<Packet<T>>>) -> Listener
1905    where
1906        T: api::Broadcast,
1907    {
1908        self.on_raw_broadcast::<T>(move |result| match result {
1909            Ok(packet) => callback.call(Ok(Packet::new(packet))),
1910            Err(error) => callback.call(Err(error)),
1911        })
1912    }
1913
1914    /// Listen for broadcasts of type `T`.
1915    ///
1916    /// Returns a handle for the listener that will cancel the listener if
1917    /// dropped.
1918    ///
1919    /// # Examples
1920    ///
1921    /// ```
1922    /// # extern crate yew021 as yew;
1923    /// use yew::prelude::*;
1924    /// use musli_web::web03::prelude::*;
1925    ///
1926    /// mod api {
1927    ///     use musli::{Decode, Encode};
1928    ///     use musli_web::api;
1929    ///
1930    ///     #[derive(Encode, Decode)]
1931    ///     pub struct TickEvent<'de> {
1932    ///         pub message: &'de str,
1933    ///         pub tick: u32,
1934    ///     }
1935    ///
1936    ///     api::define! {
1937    ///         pub type Tick;
1938    ///
1939    ///         impl Broadcast for Tick {
1940    ///             impl<'de> Event for TickEvent<'de>;
1941    ///         }
1942    ///     }
1943    /// }
1944    ///
1945    /// enum Msg {
1946    ///     Tick(Result<ws::RawPacket, ws::Error>),
1947    /// }
1948    ///
1949    /// #[derive(Properties, PartialEq)]
1950    /// struct Props {
1951    ///     ws: ws::Handle,
1952    /// }
1953    ///
1954    /// struct App {
1955    ///     tick: u32,
1956    ///     _listen: ws::Listener,
1957    /// }
1958    ///
1959    /// impl Component for App {
1960    ///     type Message = Msg;
1961    ///     type Properties = Props;
1962    ///
1963    ///     fn create(ctx: &Context<Self>) -> Self {
1964    ///         let link = ctx.link().clone();
1965    ///         let listen = ctx.props().ws.on_raw_broadcast::<api::Tick>(ctx.link().callback(Msg::Tick));
1966    ///
1967    ///         Self {
1968    ///             tick: 0,
1969    ///             _listen: listen,
1970    ///         }
1971    ///     }
1972    ///
1973    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1974    ///         match msg {
1975    ///             Msg::Tick(Err(error)) => {
1976    ///                 tracing::error!("Tick error: {error}");
1977    ///                 false
1978    ///             }
1979    ///             Msg::Tick(Ok(packet)) => {
1980    ///                 if let Ok(tick) = packet.decode::<api::TickEvent>() {
1981    ///                     self.tick = tick.tick;
1982    ///                 }
1983    ///
1984    ///                 true
1985    ///             }
1986    ///         }
1987    ///     }
1988    ///
1989    ///     fn view(&self, ctx: &Context<Self>) -> Html {
1990    ///         html! {
1991    ///             <div>
1992    ///                 <h1>{"WebSocket Example"}</h1>
1993    ///                 <p>{format!("Tick: {}", self.tick)}</p>
1994    ///             </div>
1995    ///         }
1996    ///     }
1997    /// }
1998    /// ```
1999    pub fn on_raw_broadcast<T>(&self, callback: impl Callback<Result<RawPacket>>) -> Listener
2000    where
2001        T: api::Broadcast,
2002    {
2003        let Some(shared) = self.shared.upgrade() else {
2004            return Listener::empty_with_kind(T::ID);
2005        };
2006
2007        let index = {
2008            let mut broadcasts = shared.g.broadcasts.borrow_mut();
2009            let slots = broadcasts.entry(T::ID).or_default();
2010            slots.insert(Rc::new(callback))
2011        };
2012
2013        Listener {
2014            kind: Some(T::ID),
2015            index,
2016            g: Rc::downgrade(&shared.g),
2017        }
2018    }
2019
2020    /// Listen for state changes to the underlying connection.
2021    ///
2022    /// This indicates when the connection is open and ready to receive requests
2023    /// through [`State::Open`], or if it's closed and requests will be queued
2024    /// through [`State::Closed`].
2025    ///
2026    /// Note that if you are connecting through a proxy the reported updates
2027    /// might be volatile. It is always best to send a message over the
2028    /// connection on the server side that once received allows the client to
2029    /// know that it is connected.
2030    ///
2031    /// Dropping the returned handle will cancel the listener.
2032    ///
2033    /// # Examples
2034    ///
2035    /// ```
2036    /// # extern crate yew021 as yew;
2037    /// use yew::prelude::*;
2038    /// use musli_web::web03::prelude::*;
2039    ///
2040    /// enum Msg {
2041    ///     StateChange(ws::State),
2042    /// }
2043    ///
2044    /// #[derive(Properties, PartialEq)]
2045    /// struct Props {
2046    ///     ws: ws::Handle,
2047    /// }
2048    ///
2049    /// struct App {
2050    ///     state: ws::State,
2051    ///     _listen: ws::StateListener,
2052    /// }
2053    ///
2054    /// impl Component for App {
2055    ///     type Message = Msg;
2056    ///     type Properties = Props;
2057    ///
2058    ///     fn create(ctx: &Context<Self>) -> Self {
2059    ///         let link = ctx.link().clone();
2060    ///
2061    ///         let (state, listen) = ctx.props().ws.on_state_change(move |state| {
2062    ///             link.send_message(Msg::StateChange(state));
2063    ///         });
2064    ///
2065    ///         Self {
2066    ///             state,
2067    ///             _listen: listen,
2068    ///         }
2069    ///     }
2070    ///
2071    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
2072    ///         match msg {
2073    ///             Msg::StateChange(state) => {
2074    ///                 self.state = state;
2075    ///                 true
2076    ///             }
2077    ///         }
2078    ///     }
2079    ///
2080    ///     fn view(&self, ctx: &Context<Self>) -> Html {
2081    ///         html! {
2082    ///             <div>
2083    ///                 <h1>{"WebSocket Example"}</h1>
2084    ///                 <p>{format!("State: {:?}", self.state)}</p>
2085    ///             </div>
2086    ///         }
2087    ///     }
2088    /// }
2089    pub fn on_state_change(&self, callback: impl Callback<State>) -> (State, StateListener) {
2090        let Some(shared) = self.shared.upgrade() else {
2091            return (
2092                State::Closed,
2093                StateListener {
2094                    index: 0,
2095                    g: Weak::new(),
2096                },
2097            );
2098        };
2099
2100        let (state, index) = {
2101            let index = shared
2102                .g
2103                .state_listeners
2104                .borrow_mut()
2105                .insert(Rc::new(callback));
2106            (shared.state.get(), index)
2107        };
2108
2109        let listener = StateListener {
2110            index,
2111            g: Rc::downgrade(&shared.g),
2112        };
2113
2114        (state, listener)
2115    }
2116}
2117
2118impl<H> PartialEq for Handle<H>
2119where
2120    H: WebImpl,
2121{
2122    #[inline]
2123    fn eq(&self, _: &Self) -> bool {
2124        true
2125    }
2126}
2127
2128struct Pending<C>
2129where
2130    C: ?Sized,
2131{
2132    kind: MessageId,
2133    serial: u32,
2134    callback: C,
2135}
2136
2137impl<C> fmt::Debug for Pending<C> {
2138    #[inline]
2139    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2140        f.debug_struct("Pending")
2141            .field("serial", &self.serial)
2142            .field("kind", &self.kind)
2143            .finish_non_exhaustive()
2144    }
2145}
2146struct ForcePrefix<'a>(&'a str, char);
2147
2148impl fmt::Display for ForcePrefix<'_> {
2149    #[inline]
2150    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2151        let Self(string, prefix) = *self;
2152        prefix.fmt(f)?;
2153        string.trim_start_matches(prefix).fmt(f)?;
2154        Ok(())
2155    }
2156}