musli_web/
web.rs

1//! The generic web implementation.
2//!
3//! This is specialized over the `H` parameter through modules such as:
4//!
5//! * [`web03`] for `web-sys` `0.3.x`.
6//!
7//! [`web03`]: crate::web03
8
9use core::cell::{Cell, RefCell};
10use core::fmt;
11use core::marker::PhantomData;
12use core::mem;
13use core::ops::Deref;
14use core::ptr::NonNull;
15use std::collections::VecDeque;
16
17use alloc::boxed::Box;
18use alloc::format;
19use alloc::rc::Rc;
20use alloc::rc::Weak;
21use alloc::string::{String, ToString};
22use alloc::vec::Vec;
23
24use std::collections::hash_map::{Entry, HashMap};
25
26use musli::Decode;
27use musli::alloc::Global;
28use musli::mode::Binary;
29use musli::reader::SliceReader;
30use musli::storage;
31
32use slab::Slab;
33
34use crate::api::{self, Event, MessageId};
35
36const MAX_CAPACITY: usize = 1048576;
37
38/// An empty request body.
39#[non_exhaustive]
40pub struct EmptyBody;
41
42/// An empty callback.
43#[non_exhaustive]
44pub struct EmptyCallback;
45
46/// Slab of state listeners.
47type StateListeners = Slab<Rc<dyn Callback<State>>>;
48/// Slab of broadcast listeners.
49type Broadcasts = HashMap<MessageId, Slab<Rc<dyn Callback<Result<RawPacket>>>>>;
50/// Queue of recycled buffers.
51type Buffers = VecDeque<Box<BufData>>;
52/// Pending requests.
53type Requests = HashMap<u32, Box<Pending<dyn Callback<Result<RawPacket>>>>>;
54
55/// Location information for websocket implementation.
56#[doc(hidden)]
57pub struct Location {
58    pub(crate) protocol: String,
59    pub(crate) host: String,
60    pub(crate) port: String,
61}
62
63pub(crate) mod sealed_socket {
64    pub trait Sealed {}
65}
66
67pub(crate) trait SocketImpl
68where
69    Self: Sized + self::sealed_socket::Sealed,
70{
71    #[doc(hidden)]
72    type Handles;
73
74    #[doc(hidden)]
75    fn new(url: &str, handles: &Self::Handles) -> Result<Self, Error>;
76
77    #[doc(hidden)]
78    fn send(&self, data: &[u8]) -> Result<(), Error>;
79
80    #[doc(hidden)]
81    fn close(self) -> Result<(), Error>;
82}
83
84pub(crate) mod sealed_performance {
85    pub trait Sealed {}
86}
87
88pub trait PerformanceImpl
89where
90    Self: Sized + self::sealed_performance::Sealed,
91{
92    #[doc(hidden)]
93    fn now(&self) -> f64;
94}
95
96pub(crate) mod sealed_window {
97    pub trait Sealed {}
98}
99
100pub(crate) trait WindowImpl
101where
102    Self: Sized + self::sealed_window::Sealed,
103{
104    #[doc(hidden)]
105    type Performance: PerformanceImpl;
106
107    #[doc(hidden)]
108    type Timeout;
109
110    #[doc(hidden)]
111    fn new() -> Result<Self, Error>;
112
113    #[doc(hidden)]
114    fn performance(&self) -> Result<Self::Performance, Error>;
115
116    #[doc(hidden)]
117    fn location(&self) -> Result<Location, Error>;
118
119    #[doc(hidden)]
120    fn set_timeout(
121        &self,
122        millis: u32,
123        callback: impl FnOnce() + 'static,
124    ) -> Result<Self::Timeout, Error>;
125}
126
127pub(crate) mod sealed_web {
128    pub trait Sealed {}
129}
130
131/// Central trait for web integration.
132///
133/// Since web integration is currently unstable, this requires multiple
134/// different implementations, each time an ecosystem breaking change is
135/// released.
136///
137/// The crate in focus here is `web-sys`, and the corresponding modules provide
138/// integrations:
139///
140/// * [web03] for `web-sys` `0.3.x`.
141///
142/// [web03]: crate::web03
143pub trait WebImpl
144where
145    Self: 'static + Copy + Sized + self::sealed_web::Sealed,
146{
147    #[doc(hidden)]
148    #[allow(private_bounds)]
149    type Window: WindowImpl;
150
151    #[doc(hidden)]
152    type Handles;
153
154    #[doc(hidden)]
155    #[allow(private_bounds)]
156    type Socket: SocketImpl<Handles = Self::Handles>;
157
158    #[doc(hidden)]
159    #[allow(private_interfaces)]
160    fn handles(shared: &Weak<Shared<Self>>) -> Self::Handles;
161
162    #[doc(hidden)]
163    fn random(range: u32) -> u32;
164}
165
166/// Construct a new [`ServiceBuilder`] associated with the given [`Connect`]
167/// strategy.
168pub fn connect<H>(connect: Connect) -> ServiceBuilder<H, EmptyCallback>
169where
170    H: WebImpl,
171{
172    ServiceBuilder {
173        connect,
174        on_error: EmptyCallback,
175        _marker: PhantomData,
176    }
177}
178
179/// The state of the connection.
180///
181/// A listener for state changes can be set up through for example
182/// [`Handle::on_state_change`].
183#[derive(Debug, PartialEq, Eq, Clone, Copy)]
184#[non_exhaustive]
185pub enum State {
186    /// The connection is open.
187    Open,
188    /// The connection is closed.
189    Closed,
190}
191
192/// Error type for the WebSocket service.
193#[derive(Debug)]
194pub struct Error {
195    kind: ErrorKind,
196}
197
198impl Error {
199    /// Check if the error is caused by an empty packet.
200    ///
201    /// # Examples
202    ///
203    /// ```
204    /// use musli_web::web::{Error, RawPacket};
205    ///
206    /// let packet = RawPacket::empty();
207    /// let e = packet.decode::<u32>().unwrap_err();
208    ///
209    /// assert!(e.is_empty_packet());
210    /// ```
211    pub fn is_empty_packet(&self) -> bool {
212        matches!(self.kind, ErrorKind::EmptyPacket)
213    }
214}
215
216#[derive(Debug)]
217enum ErrorKind {
218    EmptyPacket,
219    Message(String),
220    Storage(storage::Error),
221    Overflow(usize, usize),
222}
223
224impl Error {
225    #[inline]
226    fn new(kind: ErrorKind) -> Self {
227        Self { kind }
228    }
229
230    #[inline]
231    pub(crate) fn msg(message: impl fmt::Display) -> Self {
232        Self::new(ErrorKind::Message(message.to_string()))
233    }
234}
235
236impl fmt::Display for Error {
237    #[inline]
238    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239        match &self.kind {
240            ErrorKind::EmptyPacket => write!(f, "Packet is empty"),
241            ErrorKind::Message(message) => write!(f, "{message}"),
242            ErrorKind::Storage(error) => write!(f, "Encoding error: {error}"),
243            ErrorKind::Overflow(at, len) => {
244                write!(f, "Internal packet overflow, {at} not in range 0-{len}")
245            }
246        }
247    }
248}
249
250impl core::error::Error for Error {
251    #[inline]
252    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
253        match &self.kind {
254            ErrorKind::Storage(error) => Some(error),
255            _ => None,
256        }
257    }
258}
259
260impl From<storage::Error> for Error {
261    #[inline]
262    fn from(error: storage::Error) -> Self {
263        Self::new(ErrorKind::Storage(error))
264    }
265}
266
267#[cfg(feature = "wasm_bindgen02")]
268impl From<wasm_bindgen02::JsValue> for Error {
269    #[inline]
270    fn from(error: wasm_bindgen02::JsValue) -> Self {
271        Self::new(ErrorKind::Message(format!("{error:?}")))
272    }
273}
274
275type Result<T, E = Error> = core::result::Result<T, E>;
276
277const INITIAL_TIMEOUT: u32 = 250;
278const MAX_TIMEOUT: u32 = 4000;
279
280/// How to connect to the websocket.
281enum ConnectKind {
282    Location { path: String },
283    Url { url: String },
284}
285
286/// A specification for how to connect a websocket.
287pub struct Connect {
288    kind: ConnectKind,
289}
290
291impl Connect {
292    /// Connect to the same location with a custom path.
293    ///
294    /// Note that any number of `/` prefixes are ignored, the canonical
295    /// representation always ignores them and the path is relative to the
296    /// current location.
297    #[inline]
298    pub fn location(path: impl AsRef<str>) -> Self {
299        Self {
300            kind: ConnectKind::Location {
301                path: String::from(path.as_ref()),
302            },
303        }
304    }
305
306    /// Connect to the specified URL.
307    #[inline]
308    pub fn url(url: String) -> Self {
309        Self {
310            kind: ConnectKind::Url { url },
311        }
312    }
313}
314
315/// Generic but shared fields which do not depend on specialization over `H`.
316struct Generic {
317    state_listeners: RefCell<StateListeners>,
318    requests: RefCell<Requests>,
319    broadcasts: RefCell<Broadcasts>,
320    buffers: RefCell<Buffers>,
321}
322
323/// Shared implementation details for websocket implementations.
324pub(crate) struct Shared<H>
325where
326    H: WebImpl,
327{
328    connect: Connect,
329    pub(crate) on_error: Box<dyn Callback<Error>>,
330    window: H::Window,
331    performance: <H::Window as WindowImpl>::Performance,
332    handles: H::Handles,
333    state: Cell<State>,
334    opened: Cell<Option<f64>>,
335    serial: Cell<u32>,
336    defer_broadcasts: RefCell<VecDeque<Weak<dyn Callback<Result<RawPacket>>>>>,
337    defer_state_listeners: RefCell<VecDeque<Weak<dyn Callback<State>>>>,
338    pub(crate) socket: RefCell<Option<H::Socket>>,
339    output: RefCell<Vec<u8>>,
340    current_timeout: Cell<u32>,
341    reconnect_timeout: RefCell<Option<<H::Window as WindowImpl>::Timeout>>,
342    g: Rc<Generic>,
343}
344
345impl<H> Drop for Shared<H>
346where
347    H: WebImpl,
348{
349    fn drop(&mut self) {
350        if let Some(s) = self.socket.take()
351            && let Err(e) = s.close()
352        {
353            self.on_error.call(e);
354        }
355
356        // We don't need to worry about mutable borrows here, since we only have
357        // weak references to Shared and by virtue of this being dropped they
358        // are all invalid.
359        let state_listeners = mem::take(&mut *self.g.state_listeners.borrow_mut());
360        let mut requests = self.g.requests.borrow_mut();
361
362        for (_, listener) in state_listeners {
363            listener.call(State::Closed);
364        }
365
366        for (_, p) in requests.drain() {
367            p.callback.call(Err(Error::msg("Websocket service closed")));
368        }
369    }
370}
371
372/// Builder of a service.
373pub struct ServiceBuilder<H, E>
374where
375    H: WebImpl,
376{
377    connect: Connect,
378    on_error: E,
379    _marker: PhantomData<H>,
380}
381
382impl<H, E> ServiceBuilder<H, E>
383where
384    H: WebImpl,
385    E: Callback<Error>,
386{
387    /// Set the error handler to use for the service.
388    pub fn on_error<U>(self, on_error: U) -> ServiceBuilder<H, U>
389    where
390        U: Callback<Error>,
391    {
392        ServiceBuilder {
393            connect: self.connect,
394            on_error,
395            _marker: self._marker,
396        }
397    }
398
399    /// Build a new service and handle.
400    pub fn build(self) -> Service<H> {
401        let window = match H::Window::new() {
402            Ok(window) => window,
403            Err(error) => {
404                panic!("{error}")
405            }
406        };
407
408        let performance = match WindowImpl::performance(&window) {
409            Ok(performance) => performance,
410            Err(error) => {
411                panic!("{error}")
412            }
413        };
414
415        let shared = Rc::<Shared<H>>::new_cyclic(move |shared| Shared {
416            connect: self.connect,
417            on_error: Box::new(self.on_error),
418            window,
419            performance,
420            handles: H::handles(shared),
421            state: Cell::new(State::Closed),
422            opened: Cell::new(None),
423            serial: Cell::new(0),
424            defer_broadcasts: RefCell::new(VecDeque::new()),
425            defer_state_listeners: RefCell::new(VecDeque::new()),
426            socket: RefCell::new(None),
427            output: RefCell::new(Vec::new()),
428            current_timeout: Cell::new(INITIAL_TIMEOUT),
429            reconnect_timeout: RefCell::new(None),
430            g: Rc::new(Generic {
431                state_listeners: RefCell::new(Slab::new()),
432                broadcasts: RefCell::new(HashMap::new()),
433                requests: RefCell::new(Requests::new()),
434                buffers: RefCell::new(VecDeque::new()),
435            }),
436        });
437
438        let handle = Handle {
439            shared: Rc::downgrade(&shared),
440        };
441
442        Service { shared, handle }
443    }
444}
445
446/// The service handle.
447///
448/// Once dropped this will cause the service to be disconnected and all requests
449/// to be cancelled.
450pub struct Service<H>
451where
452    H: WebImpl,
453{
454    shared: Rc<Shared<H>>,
455    handle: Handle<H>,
456}
457
458impl<H> Service<H>
459where
460    H: WebImpl,
461{
462    /// Attempt to establish a websocket connection.
463    pub fn connect(&self) {
464        self.shared.connect()
465    }
466
467    /// Build a handle to the service.
468    pub fn handle(&self) -> &Handle<H> {
469        &self.handle
470    }
471}
472
473impl<H> Shared<H>
474where
475    H: WebImpl,
476{
477    /// Send a client message.
478    fn send_client_request<T>(&self, serial: u32, body: &T) -> Result<()>
479    where
480        T: api::Request,
481    {
482        let Some(ref socket) = *self.socket.borrow() else {
483            return Err(Error::msg("Socket is not connected"));
484        };
485
486        let header = api::RequestHeader {
487            serial,
488            id: <T::Endpoint as api::Endpoint>::ID.get(),
489        };
490
491        let out = &mut *self.output.borrow_mut();
492
493        storage::to_writer(&mut *out, &header)?;
494        storage::to_writer(&mut *out, &body)?;
495
496        tracing::debug!(
497            header.serial,
498            ?header.id,
499            len = out.len(),
500            "Sending request"
501        );
502
503        socket.send(out.as_slice())?;
504
505        out.clear();
506        out.shrink_to(MAX_CAPACITY);
507        Ok(())
508    }
509
510    pub(crate) fn next_buffer(self: &Rc<Self>, needed: usize) -> Box<BufData> {
511        match self.g.buffers.borrow_mut().pop_front() {
512            Some(mut buf) => {
513                if buf.data.capacity() < needed {
514                    buf.data.reserve(needed - buf.data.len());
515                }
516
517                buf
518            }
519            None => Box::new(BufData::with_capacity(Rc::downgrade(&self.g), needed)),
520        }
521    }
522
523    pub(crate) fn message(self: &Rc<Self>, buf: Box<BufData>) -> Result<()> {
524        // Wrap the buffer in a simple shared reference-counted container.
525        let buf = BufRc::new(buf);
526        let mut reader = SliceReader::new(&buf);
527
528        let header: api::ResponseHeader = storage::decode(&mut reader)?;
529
530        if let Some(broadcast) = MessageId::new(header.broadcast) {
531            tracing::debug!(?header.broadcast, "Got broadcast");
532
533            if !self.prepare_broadcast(broadcast) {
534                return Ok(());
535            };
536
537            if let Some(id) = MessageId::new(header.error) {
538                let error = if id == MessageId::ERROR_MESSAGE {
539                    storage::decode(&mut reader)?
540                } else {
541                    api::ErrorMessage {
542                        message: "Unknown error",
543                    }
544                };
545
546                while let Some(callback) = self.defer_broadcasts.borrow_mut().pop_front() {
547                    if let Some(callback) = callback.upgrade() {
548                        callback.call(Err(Error::msg(error.message)));
549                    }
550                }
551
552                return Ok(());
553            }
554
555            let at = buf.len().saturating_sub(reader.remaining());
556
557            let packet = RawPacket {
558                buf: Some(buf.clone()),
559                at: Cell::new(at),
560                id: broadcast,
561            };
562
563            while let Some(callback) = self.defer_broadcasts.borrow_mut().pop_front() {
564                if let Some(callback) = callback.upgrade() {
565                    callback.call(Ok(packet.clone()));
566                }
567            }
568        } else {
569            tracing::debug!(header.serial, "Got response");
570
571            let p = {
572                let mut requests = self.g.requests.borrow_mut();
573
574                let Some(p) = requests.remove(&header.serial) else {
575                    tracing::debug!(header.serial, "Missing request");
576                    return Ok(());
577                };
578
579                p
580            };
581
582            if let Some(id) = MessageId::new(header.error) {
583                let error = if id == MessageId::ERROR_MESSAGE {
584                    storage::decode(&mut reader)?
585                } else {
586                    api::ErrorMessage {
587                        message: "Unknown error",
588                    }
589                };
590
591                p.callback.call(Err(Error::msg(error.message)));
592                return Ok(());
593            }
594
595            let at = buf.len().saturating_sub(reader.remaining());
596
597            let packet = RawPacket {
598                id: p.kind,
599                buf: Some(buf),
600                at: Cell::new(at),
601            };
602
603            p.callback.call(Ok(packet));
604        }
605
606        Ok(())
607    }
608
609    fn prepare_broadcast(self: &Rc<Self>, kind: MessageId) -> bool {
610        // Note: We need to defer this, since the outcome of calling
611        // the broadcast callback might be that the broadcast
612        // listener is modified, which could require mutable access
613        // to broadcasts.
614        let mut defer = self.defer_broadcasts.borrow_mut();
615
616        let broadcasts = self.g.broadcasts.borrow();
617
618        let Some(broadcasts) = broadcasts.get(&kind) else {
619            return false;
620        };
621
622        for (_, callback) in broadcasts.iter() {
623            defer.push_back(Rc::downgrade(callback));
624        }
625
626        !defer.is_empty()
627    }
628
629    pub(crate) fn set_open(&self) {
630        tracing::debug!("Set open");
631        self.opened
632            .set(Some(PerformanceImpl::now(&self.performance)));
633        self.emit_state_change(State::Open);
634    }
635
636    fn is_open_for_a_while(&self) -> bool {
637        let Some(at) = self.opened.get() else {
638            return false;
639        };
640
641        let now = PerformanceImpl::now(&self.performance);
642        (now - at) >= 250.0
643    }
644
645    pub(crate) fn close(self: &Rc<Self>) -> Result<(), Error> {
646        tracing::debug!("Close connection");
647
648        // We need a weak reference back to shared state to handle the timeout.
649        let shared = Rc::downgrade(self);
650
651        tracing::debug!(
652            "Set closed timeout={}, opened={:?}",
653            self.current_timeout.get(),
654            self.opened.get(),
655        );
656
657        if !self.is_open_for_a_while() {
658            let current_timeout = self.current_timeout.get();
659
660            if current_timeout < MAX_TIMEOUT {
661                let fuzz = H::random(50);
662
663                self.current_timeout.set(
664                    current_timeout
665                        .saturating_mul(2)
666                        .saturating_add(fuzz)
667                        .min(MAX_TIMEOUT),
668                );
669            }
670        } else {
671            self.current_timeout.set(INITIAL_TIMEOUT);
672        }
673
674        self.opened.set(None);
675        self.emit_state_change(State::Closed);
676
677        if let Some(s) = self.socket.take() {
678            s.close()?;
679        }
680
681        self.close_pending();
682
683        let timeout = self
684            .window
685            .set_timeout(self.current_timeout.get(), move || {
686                if let Some(shared) = shared.upgrade() {
687                    Self::connect(&shared);
688                }
689            })?;
690
691        drop(self.reconnect_timeout.borrow_mut().replace(timeout));
692        Ok(())
693    }
694
695    /// Close an pending requests with an error, since there is no chance they
696    /// will be responded to any more.
697    fn close_pending(self: &Rc<Self>) {
698        loop {
699            let Some(serial) = self.g.requests.borrow().keys().next().copied() else {
700                break;
701            };
702
703            let p = {
704                let mut requests = self.g.requests.borrow_mut();
705
706                let Some(p) = requests.remove(&serial) else {
707                    break;
708                };
709
710                p
711            };
712
713            p.callback.call(Err(Error::msg("Connection closed")));
714        }
715    }
716
717    fn emit_state_change(&self, state: State) {
718        if self.state.get() == state {
719            return;
720        }
721
722        self.state.set(state);
723
724        {
725            // We need to collect callbacks to avoid the callback recursively
726            // borrowing state listeners, which it would if it modifies any
727            // existing state listeners.
728            let mut defer = self.defer_state_listeners.borrow_mut();
729
730            for (_, callback) in self.g.state_listeners.borrow().iter() {
731                defer.push_back(Rc::downgrade(callback));
732            }
733
734            if defer.is_empty() {
735                return;
736            }
737        }
738
739        while let Some(callback) = self.defer_state_listeners.borrow_mut().pop_front() {
740            if let Some(callback) = callback.upgrade() {
741                callback.call(state);
742            }
743        }
744    }
745
746    fn is_closed(&self) -> bool {
747        self.opened.get().is_none()
748    }
749
750    fn connect(self: &Rc<Self>) {
751        tracing::debug!("Connect");
752
753        if let Err(e) = self.build() {
754            self.on_error.call(e);
755        } else {
756            return;
757        }
758
759        if let Err(e) = self.close() {
760            self.on_error.call(e);
761        }
762    }
763
764    /// Build a websocket connection.
765    fn build(self: &Rc<Self>) -> Result<()> {
766        let url = match &self.connect.kind {
767            ConnectKind::Location { path } => {
768                let Location {
769                    protocol,
770                    host,
771                    port,
772                } = WindowImpl::location(&self.window)?;
773
774                let protocol = match protocol.as_str() {
775                    "https:" => "wss:",
776                    "http:" => "ws:",
777                    other => {
778                        return Err(Error::msg(format_args!(
779                            "Same host connection is not supported for protocol `{other}`"
780                        )));
781                    }
782                };
783
784                let path = ForcePrefix(path, '/');
785                format!("{protocol}//{host}:{port}{path}")
786            }
787            ConnectKind::Url { url } => url.clone(),
788        };
789
790        let ws = SocketImpl::new(&url, &self.handles)?;
791
792        let old = self.socket.borrow_mut().replace(ws);
793
794        if let Some(old) = old {
795            old.close()?;
796        }
797
798        Ok(())
799    }
800}
801
802/// Trait governing how callbacks are called.
803pub trait Callback<I>
804where
805    Self: 'static,
806{
807    /// Call the callback.
808    fn call(&self, input: I);
809}
810
811impl<I> Callback<I> for EmptyCallback {
812    #[inline]
813    fn call(&self, _: I) {}
814}
815
816impl<F, I> Callback<I> for F
817where
818    F: 'static + Fn(I),
819{
820    #[inline]
821    fn call(&self, input: I) {
822        self(input)
823    }
824}
825
826/// A request builder .
827///
828/// Associate the callback to be used by using either
829/// [`RequestBuilder::on_packet`] or [`RequestBuilder::on_raw_packet`] depending
830/// on your needs.
831///
832/// Send the request with [`RequestBuilder::send`].
833pub struct RequestBuilder<'a, H, B, C>
834where
835    H: WebImpl,
836{
837    shared: &'a Weak<Shared<H>>,
838    body: B,
839    callback: C,
840}
841
842impl<'a, H, B, C> RequestBuilder<'a, H, B, C>
843where
844    H: WebImpl,
845{
846    /// Set the body of the request.
847    #[inline]
848    pub fn body<U>(self, body: U) -> RequestBuilder<'a, H, U, C>
849    where
850        U: api::Request,
851    {
852        RequestBuilder {
853            shared: self.shared,
854            body,
855            callback: self.callback,
856        }
857    }
858
859    /// Handle the response using the specified callback.
860    ///
861    /// # Examples
862    ///
863    /// ```
864    /// # extern crate yew021 as yew;
865    /// use yew::prelude::*;
866    /// use musli_web::web03::prelude::*;
867    ///
868    /// mod api {
869    ///     use musli::{Decode, Encode};
870    ///     use musli_web::api;
871    ///
872    ///     #[derive(Encode, Decode)]
873    ///     pub struct HelloRequest<'de> {
874    ///         pub message: &'de str,
875    ///     }
876    ///
877    ///     #[derive(Encode, Decode)]
878    ///     pub struct HelloResponse<'de> {
879    ///         pub message: &'de str,
880    ///     }
881    ///
882    ///     api::define! {
883    ///         pub type Hello;
884    ///
885    ///         impl Endpoint for Hello {
886    ///             impl<'de> Request for HelloRequest<'de>;
887    ///             type Response<'de> = HelloResponse<'de>;
888    ///         }
889    ///     }
890    /// }
891    ///
892    /// enum Msg {
893    ///     OnHello(Result<ws::Packet<api::Hello>, ws::Error>),
894    /// }
895    ///
896    /// #[derive(Properties, PartialEq)]
897    /// struct Props {
898    ///     ws: ws::Handle,
899    /// }
900    ///
901    /// struct App {
902    ///     message: String,
903    ///     _hello: ws::Request,
904    /// }
905    ///
906    /// impl Component for App {
907    ///     type Message = Msg;
908    ///     type Properties = Props;
909    ///
910    ///     fn create(ctx: &Context<Self>) -> Self {
911    ///         let hello = ctx.props().ws
912    ///             .request()
913    ///             .body(api::HelloRequest { message: "Hello!"})
914    ///             .on_packet(ctx.link().callback(Msg::OnHello))
915    ///             .send();
916    ///
917    ///         Self {
918    ///             message: String::from("No Message :("),
919    ///             _hello: hello,
920    ///         }
921    ///     }
922    ///
923    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
924    ///         match msg {
925    ///             Msg::OnHello(Err(error)) => {
926    ///                 tracing::error!("Request error: {:?}", error);
927    ///                 false
928    ///             }
929    ///             Msg::OnHello(Ok(packet)) => {
930    ///                 if let Ok(response) = packet.decode() {
931    ///                     self.message = response.message.to_owned();
932    ///                 }
933    ///
934    ///                 true
935    ///             }
936    ///         }
937    ///     }
938    ///
939    ///     fn view(&self, ctx: &Context<Self>) -> Html {
940    ///         html! {
941    ///             <div>
942    ///                 <h1>{"WebSocket Example"}</h1>
943    ///                 <p>{format!("Message: {}", self.message)}</p>
944    ///             </div>
945    ///         }
946    ///     }
947    /// }
948    /// ```
949    pub fn on_packet<E>(
950        self,
951        callback: impl Callback<Result<Packet<E>>>,
952    ) -> RequestBuilder<'a, H, B, impl Callback<Result<RawPacket>>>
953    where
954        E: api::Endpoint,
955    {
956        self.on_raw_packet(move |result: Result<RawPacket>| match result {
957            Ok(ok) => callback.call(Ok(Packet::new(ok))),
958            Err(err) => callback.call(Err(err)),
959        })
960    }
961
962    /// Handle the response using the specified callback.
963    ///
964    /// # Examples
965    ///
966    /// ```
967    /// # extern crate yew021 as yew;
968    /// use yew::prelude::*;
969    /// use musli_web::web03::prelude::*;
970    ///
971    /// mod api {
972    ///     use musli::{Decode, Encode};
973    ///     use musli_web::api;
974    ///
975    ///     #[derive(Encode, Decode)]
976    ///     pub struct HelloRequest<'de> {
977    ///         pub message: &'de str,
978    ///     }
979    ///
980    ///     #[derive(Encode, Decode)]
981    ///     pub struct HelloResponse<'de> {
982    ///         pub message: &'de str,
983    ///     }
984    ///
985    ///     api::define! {
986    ///         pub type Hello;
987    ///
988    ///         impl Endpoint for Hello {
989    ///             impl<'de> Request for HelloRequest<'de>;
990    ///             type Response<'de> = HelloResponse<'de>;
991    ///         }
992    ///     }
993    /// }
994    ///
995    /// enum Msg {
996    ///     OnHello(Result<ws::RawPacket, ws::Error>),
997    /// }
998    ///
999    /// #[derive(Properties, PartialEq)]
1000    /// struct Props {
1001    ///     ws: ws::Handle,
1002    /// }
1003    ///
1004    /// struct App {
1005    ///     message: String,
1006    ///     _hello: ws::Request,
1007    /// }
1008    ///
1009    /// impl Component for App {
1010    ///     type Message = Msg;
1011    ///     type Properties = Props;
1012    ///
1013    ///     fn create(ctx: &Context<Self>) -> Self {
1014    ///         let link = ctx.link().clone();
1015    ///
1016    ///         let hello = ctx.props().ws
1017    ///             .request()
1018    ///             .body(api::HelloRequest { message: "Hello!"})
1019    ///             .on_raw_packet(move |packet| link.send_message(Msg::OnHello(packet)))
1020    ///             .send();
1021    ///
1022    ///         Self {
1023    ///             message: String::from("No Message :("),
1024    ///             _hello: hello,
1025    ///         }
1026    ///     }
1027    ///
1028    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1029    ///         match msg {
1030    ///             Msg::OnHello(Err(error)) => {
1031    ///                 tracing::error!("Request error: {:?}", error);
1032    ///                 false
1033    ///             }
1034    ///             Msg::OnHello(Ok(packet)) => {
1035    ///                 if let Ok(response) = packet.decode::<api::HelloResponse>() {
1036    ///                     self.message = response.message.to_owned();
1037    ///                 }
1038    ///
1039    ///                 true
1040    ///             }
1041    ///         }
1042    ///     }
1043    ///
1044    ///     fn view(&self, ctx: &Context<Self>) -> Html {
1045    ///         html! {
1046    ///             <div>
1047    ///                 <h1>{"WebSocket Example"}</h1>
1048    ///                 <p>{format!("Message: {}", self.message)}</p>
1049    ///             </div>
1050    ///         }
1051    ///     }
1052    /// }
1053    /// ```
1054    pub fn on_raw_packet<U>(self, callback: U) -> RequestBuilder<'a, H, B, U>
1055    where
1056        U: Callback<Result<RawPacket, Error>>,
1057    {
1058        RequestBuilder {
1059            shared: self.shared,
1060            body: self.body,
1061            callback,
1062        }
1063    }
1064}
1065
1066impl<'a, H, B, C> RequestBuilder<'a, H, B, C>
1067where
1068    B: api::Request,
1069    C: Callback<Result<RawPacket>>,
1070    H: WebImpl,
1071{
1072    /// Send the request.
1073    ///
1074    /// This requires that a body has been set using [`RequestBuilder::body`].
1075    pub fn send(self) -> Request {
1076        let Some(shared) = self.shared.upgrade() else {
1077            self.callback
1078                .call(Err(Error::msg("WebSocket service is down")));
1079            return Request::new();
1080        };
1081
1082        if shared.is_closed() {
1083            self.callback
1084                .call(Err(Error::msg("WebSocket is not connected")));
1085            return Request::new();
1086        }
1087
1088        let serial = shared.serial.get();
1089
1090        if let Err(error) = shared.send_client_request(serial, &self.body) {
1091            shared.on_error.call(error);
1092            return Request::new();
1093        }
1094
1095        shared.serial.set(serial.wrapping_add(1));
1096
1097        let pending = Pending {
1098            kind: <B::Endpoint as api::Endpoint>::ID,
1099            serial,
1100            callback: self.callback,
1101        };
1102
1103        let existing = shared
1104            .g
1105            .requests
1106            .borrow_mut()
1107            .insert(serial, Box::new(pending));
1108
1109        if let Some(p) = existing {
1110            p.callback.call(Err(Error::msg("Request cancelled")));
1111        }
1112
1113        Request {
1114            serial,
1115            g: Rc::downgrade(&shared.g),
1116        }
1117    }
1118}
1119
1120/// The handle for a pending request.
1121///
1122/// Dropping or [`clear()`] this handle will cancel the request.
1123///
1124/// [`clear()`]: Self::clear
1125pub struct Request {
1126    serial: u32,
1127    g: Weak<Generic>,
1128}
1129
1130impl Request {
1131    /// An empty request handler.
1132    #[inline]
1133    pub const fn new() -> Self {
1134        Self {
1135            serial: 0,
1136            g: Weak::new(),
1137        }
1138    }
1139
1140    /// Clear the request handle without dropping it, cancelling any pending
1141    /// requests.
1142    pub fn clear(&mut self) {
1143        let removed = {
1144            let serial = mem::take(&mut self.serial);
1145
1146            let Some(g) = self.g.upgrade() else {
1147                return;
1148            };
1149
1150            self.g = Weak::new();
1151
1152            let Some(p) = g.requests.borrow_mut().remove(&serial) else {
1153                return;
1154            };
1155
1156            p
1157        };
1158
1159        drop(removed);
1160    }
1161}
1162
1163impl Default for Request {
1164    #[inline]
1165    fn default() -> Self {
1166        Self::new()
1167    }
1168}
1169
1170impl Drop for Request {
1171    #[inline]
1172    fn drop(&mut self) {
1173        self.clear();
1174    }
1175}
1176
1177/// The handle for a pending request.
1178///
1179/// Dropping or calling [`clear()`] on this handle remove the listener.
1180///
1181/// [`clear()`]: Self::clear
1182pub struct Listener {
1183    kind: Option<MessageId>,
1184    index: usize,
1185    g: Weak<Generic>,
1186}
1187
1188impl Listener {
1189    /// Construct an empty listener.
1190    #[inline]
1191    pub const fn new() -> Self {
1192        Self {
1193            kind: None,
1194            index: 0,
1195            g: Weak::new(),
1196        }
1197    }
1198
1199    /// Build up an empty listener with the specified kind.
1200    #[inline]
1201    pub(crate) const fn empty_with_kind(kind: MessageId) -> Self {
1202        Self {
1203            kind: Some(kind),
1204            index: 0,
1205            g: Weak::new(),
1206        }
1207    }
1208
1209    /// Clear the listener without dropping it.
1210    ///
1211    /// This will remove the associated broadcast listener from being notified.
1212    pub fn clear(&mut self) {
1213        // Gather values here to drop them outside of the upgrade block.
1214        let removed;
1215        let removed_value;
1216
1217        {
1218            let Some(g) = self.g.upgrade() else {
1219                return;
1220            };
1221
1222            self.g = Weak::new();
1223            let index = mem::take(&mut self.index);
1224
1225            let Some(kind) = self.kind.take() else {
1226                return;
1227            };
1228
1229            let mut broadcasts = g.broadcasts.borrow_mut();
1230
1231            let Entry::Occupied(mut e) = broadcasts.entry(kind) else {
1232                return;
1233            };
1234
1235            removed = e.get_mut().try_remove(index);
1236
1237            if e.get().is_empty() {
1238                removed_value = Some(e.remove());
1239            } else {
1240                removed_value = None;
1241            }
1242        }
1243
1244        // Drop here, to avoid invoking any destructors which might borrow
1245        // shared mutably earlier.
1246        drop(removed);
1247        drop(removed_value);
1248    }
1249}
1250
1251impl Default for Listener {
1252    #[inline]
1253    fn default() -> Self {
1254        Self::new()
1255    }
1256}
1257
1258impl Drop for Listener {
1259    #[inline]
1260    fn drop(&mut self) {
1261        self.clear();
1262    }
1263}
1264
1265/// The handle for state change listening.
1266///
1267/// Dropping or calling [`clear()`] on this handle will remove the associated
1268/// callback from being notified.
1269///
1270/// [`clear()`]: Self::clear
1271pub struct StateListener {
1272    index: usize,
1273    g: Weak<Generic>,
1274}
1275
1276impl StateListener {
1277    /// Construct an empty state listener.
1278    #[inline]
1279    pub const fn new() -> Self {
1280        Self {
1281            index: 0,
1282            g: Weak::new(),
1283        }
1284    }
1285
1286    /// Clear the state listener without dropping it.
1287    ///
1288    /// This will remove the associated callback from being notified.
1289    pub fn clear(&mut self) {
1290        let removed = {
1291            let Some(g) = self.g.upgrade() else {
1292                return;
1293            };
1294
1295            self.g = Weak::new();
1296
1297            g.state_listeners.borrow_mut().try_remove(self.index)
1298        };
1299
1300        drop(removed);
1301    }
1302}
1303
1304impl Default for StateListener {
1305    #[inline]
1306    fn default() -> Self {
1307        Self::new()
1308    }
1309}
1310
1311impl Drop for StateListener {
1312    #[inline]
1313    fn drop(&mut self) {
1314        self.clear();
1315    }
1316}
1317
1318pub(crate) struct BufData {
1319    /// Buffer being used.
1320    pub(crate) data: Vec<u8>,
1321    /// Number of strong references to this buffer.
1322    strong: Cell<usize>,
1323    /// Reference to shared state where the buffer will be recycled to.
1324    g: Weak<Generic>,
1325}
1326
1327impl BufData {
1328    fn with_capacity(g: Weak<Generic>, capacity: usize) -> Self {
1329        Self {
1330            data: Vec::with_capacity(capacity),
1331            strong: Cell::new(0),
1332            g,
1333        }
1334    }
1335
1336    unsafe fn dec(ptr: NonNull<BufData>) {
1337        unsafe {
1338            let count = ptr.as_ref().strong.get().wrapping_sub(1);
1339            ptr.as_ref().strong.set(count);
1340
1341            if count > 0 {
1342                return;
1343            }
1344
1345            let mut buf = Box::from_raw(ptr.as_ptr());
1346
1347            // Try to recycle the buffer if shared is available, else let it be
1348            // dropped and free here.
1349            let Some(g) = buf.as_ref().g.upgrade() else {
1350                return;
1351            };
1352
1353            let mut buffers = g.buffers.borrow_mut();
1354
1355            // Set the length of the recycled buffer.
1356            buf.data.set_len(buf.data.len().min(MAX_CAPACITY));
1357
1358            // We size our buffers to some max capacity to avod overuse in case
1359            // we infrequently need to handle some massive message. If we don't
1360            // shrink the allocation, then memory use can run away over time.
1361            buf.data.shrink_to(MAX_CAPACITY);
1362
1363            buffers.push_back(buf);
1364        }
1365    }
1366
1367    unsafe fn inc(ptr: NonNull<BufData>) {
1368        unsafe {
1369            let count = ptr.as_ref().strong.get().wrapping_add(1);
1370
1371            if count == 0 {
1372                std::process::abort();
1373            }
1374
1375            ptr.as_ref().strong.set(count);
1376        }
1377    }
1378}
1379
1380/// A shared buffer of data that is recycled when dropped.
1381struct BufRc {
1382    data: NonNull<BufData>,
1383}
1384
1385impl BufRc {
1386    fn new(data: Box<BufData>) -> Self {
1387        let data = NonNull::from(Box::leak(data));
1388
1389        unsafe {
1390            BufData::inc(data);
1391        }
1392
1393        Self { data }
1394    }
1395}
1396
1397impl Deref for BufRc {
1398    type Target = [u8];
1399
1400    fn deref(&self) -> &Self::Target {
1401        unsafe { &(*self.data.as_ptr()).data }
1402    }
1403}
1404
1405impl Clone for BufRc {
1406    fn clone(&self) -> Self {
1407        unsafe {
1408            BufData::inc(self.data);
1409        }
1410
1411        Self { data: self.data }
1412    }
1413}
1414
1415impl Drop for BufRc {
1416    fn drop(&mut self) {
1417        unsafe {
1418            BufData::dec(self.data);
1419        }
1420    }
1421}
1422
1423/// A raw packet of data.
1424#[derive(Clone)]
1425pub struct RawPacket {
1426    id: MessageId,
1427    buf: Option<BufRc>,
1428    at: Cell<usize>,
1429}
1430
1431impl RawPacket {
1432    /// Construct an empty raw packet.
1433    ///
1434    /// # Examples
1435    ///
1436    /// ```
1437    /// use musli_web::api::MessageId;
1438    /// use musli_web::web::RawPacket;
1439    ///
1440    /// let packet = RawPacket::empty();
1441    ///
1442    /// assert!(packet.is_empty());
1443    /// assert_eq!(packet.id(), MessageId::EMPTY);
1444    /// ```
1445    pub const fn empty() -> Self {
1446        Self {
1447            id: MessageId::EMPTY,
1448            buf: None,
1449            at: Cell::new(0),
1450        }
1451    }
1452
1453    /// Decode the contents of a raw packet.
1454    ///
1455    /// This can be called multiple times if there are multiple payloads in
1456    /// sequence of the response.
1457    ///
1458    /// You can check if the packet is empty using [`RawPacket::is_empty`].
1459    pub fn decode<'this, T>(&'this self) -> Result<T>
1460    where
1461        T: Decode<'this, Binary, Global>,
1462    {
1463        let at = self.at.get();
1464
1465        if self.id == MessageId::EMPTY {
1466            return Err(Error::new(ErrorKind::EmptyPacket));
1467        }
1468
1469        let Some(bytes) = self.as_slice().get(at..) else {
1470            return Err(Error::new(ErrorKind::Overflow(at, self.as_slice().len())));
1471        };
1472
1473        let mut reader = SliceReader::new(bytes);
1474
1475        match storage::decode(&mut reader) {
1476            Ok(value) => {
1477                self.at.set(at + bytes.len() - reader.remaining());
1478                Ok(value)
1479            }
1480            Err(error) => {
1481                self.at.set(self.len());
1482                Err(Error::from(error))
1483            }
1484        }
1485    }
1486
1487    /// Get the underlying byte slice of the packet.
1488    ///
1489    /// # Examples
1490    ///
1491    /// ```
1492    /// use musli_web::web::RawPacket;
1493    ///
1494    /// let packet = RawPacket::empty();
1495    /// assert_eq!(packet.as_slice(), &[]);
1496    /// ```
1497    pub fn as_slice(&self) -> &[u8] {
1498        match &self.buf {
1499            Some(buf) => buf.as_ref(),
1500            None => &[],
1501        }
1502    }
1503
1504    /// Get the length of the packet.
1505    ///
1506    /// # Examples
1507    ////
1508    /// ```
1509    /// use musli_web::web::RawPacket;
1510    ///
1511    /// let packet = RawPacket::empty();
1512    /// assert_eq!(packet.len(), 0);
1513    /// ```
1514    pub fn len(&self) -> usize {
1515        match &self.buf {
1516            Some(buf) => buf.len(),
1517            None => 0,
1518        }
1519    }
1520
1521    /// Check if the packet is empty.
1522    ///
1523    /// # Examples
1524    ///
1525    /// ```
1526    /// use musli_web::web::RawPacket;
1527    ///
1528    /// let packet = RawPacket::empty();
1529    /// assert!(packet.is_empty());
1530    /// ```
1531    pub fn is_empty(&self) -> bool {
1532        self.at.get() >= self.len()
1533    }
1534
1535    /// The id of the packet this is a response to as specified by
1536    /// [`Endpoint::ID`] or [`Broadcast::ID`].
1537    ///
1538    /// [`Endpoint::ID`]: crate::api::Endpoint::ID
1539    /// [`Broadcast::ID`]: crate::api::Broadcast::ID
1540    pub fn id(&self) -> MessageId {
1541        self.id
1542    }
1543}
1544
1545/// A typed packet of data.
1546#[derive(Clone)]
1547pub struct Packet<T> {
1548    raw: RawPacket,
1549    _marker: PhantomData<T>,
1550}
1551
1552impl<T> Packet<T> {
1553    /// Construct an empty package.
1554    ///
1555    /// # Examples
1556    ///
1557    /// ```
1558    /// use musli_web::api::MessageId;
1559    /// use musli_web::web::Packet;
1560    ///
1561    /// let packet = Packet::<()>::empty();
1562    ///
1563    /// assert!(packet.is_empty());
1564    /// assert_eq!(packet.id(), MessageId::EMPTY);
1565    /// ```
1566    pub const fn empty() -> Self {
1567        Self {
1568            raw: RawPacket::empty(),
1569            _marker: PhantomData,
1570        }
1571    }
1572
1573    /// Construct a new typed package from a raw one.
1574    ///
1575    /// Note that this does not guarantee that the typed package is correct, but
1576    /// the `T` parameter becomes associated with it allowing it to be used
1577    /// automatically with methods such as [`Packet::decode`].
1578    #[inline]
1579    pub fn new(raw: RawPacket) -> Self {
1580        Self {
1581            raw,
1582            _marker: PhantomData,
1583        }
1584    }
1585
1586    /// Convert a packet into a raw packet.
1587    ///
1588    /// To determine which endpoint or broadcast it belongs to the
1589    /// [`RawPacket::id`] method can be used.
1590    pub fn into_raw(self) -> RawPacket {
1591        self.raw
1592    }
1593
1594    /// Check if the packet is empty.
1595    ///
1596    /// # Examples
1597    ///
1598    /// ```
1599    /// use musli_web::web::Packet;
1600    ///
1601    /// let packet = Packet::<()>::empty();
1602    /// assert!(packet.is_empty());
1603    /// ```
1604    pub fn is_empty(&self) -> bool {
1605        self.raw.is_empty()
1606    }
1607
1608    /// The id of the packet this is a response to as specified by
1609    /// [`Endpoint::ID`] or [`Broadcast::ID`].
1610    ///
1611    /// [`Endpoint::ID`]: crate::api::Endpoint::ID
1612    /// [`Broadcast::ID`]: crate::api::Broadcast::ID
1613    pub fn id(&self) -> MessageId {
1614        self.raw.id()
1615    }
1616}
1617
1618impl<T> Packet<T>
1619where
1620    T: api::Decodable,
1621{
1622    /// Decode the contents of a packet.
1623    ///
1624    /// This can be called multiple times if there are multiple payloads in
1625    /// sequence of the response.
1626    ///
1627    /// You can check if the packet is empty using [`Packet::is_empty`].
1628    pub fn decode(&self) -> Result<T::Type<'_>> {
1629        self.decode_any()
1630    }
1631
1632    /// Decode any contents of a packet.
1633    ///
1634    /// This can be called multiple times if there are multiple payloads in
1635    /// sequence of the response.
1636    ///
1637    /// You can check if the packet is empty using [`Packet::is_empty`].
1638    pub fn decode_any<'de, R>(&'de self) -> Result<R>
1639    where
1640        R: Decode<'de, Binary, Global>,
1641    {
1642        self.raw.decode()
1643    }
1644}
1645
1646impl<T> Packet<T>
1647where
1648    T: api::Endpoint,
1649{
1650    /// Decode the contents of a packet.
1651    ///
1652    /// This can be called multiple times if there are multiple payloads in
1653    /// sequence of the response.
1654    ///
1655    /// You can check if the packet is empty using [`Packet::is_empty`].
1656    pub fn decode_response(&self) -> Result<T::Response<'_>> {
1657        self.decode_any_response()
1658    }
1659
1660    /// Decode any contents of a packet.
1661    ///
1662    /// This can be called multiple times if there are multiple payloads in
1663    /// sequence of the response.
1664    ///
1665    /// You can check if the packet is empty using [`Packet::is_empty`].
1666    pub fn decode_any_response<'de, R>(&'de self) -> Result<R>
1667    where
1668        R: Decode<'de, Binary, Global>,
1669    {
1670        self.raw.decode()
1671    }
1672}
1673
1674impl<T> Packet<T>
1675where
1676    T: api::Broadcast,
1677{
1678    /// Decode the primary event related to a broadcast.
1679    pub fn decode_event<'de>(&'de self) -> Result<T::Event<'de>>
1680    where
1681        T: api::BroadcastWithEvent,
1682    {
1683        self.decode_event_any()
1684    }
1685
1686    /// Decode any event related to a broadcast.
1687    pub fn decode_event_any<'de, E>(&'de self) -> Result<E>
1688    where
1689        E: Event<Broadcast = T> + Decode<'de, Binary, Global>,
1690    {
1691        self.raw.decode()
1692    }
1693}
1694
1695/// A handle to the WebSocket service.
1696#[derive(Clone)]
1697#[repr(transparent)]
1698pub struct Handle<H>
1699where
1700    H: WebImpl,
1701{
1702    shared: Weak<Shared<H>>,
1703}
1704
1705impl<H> Handle<H>
1706where
1707    H: WebImpl,
1708{
1709    /// Send a request of type `T`.
1710    ///
1711    /// Returns a handle for the request.
1712    ///
1713    /// If the handle is dropped, the request is cancelled.
1714    ///
1715    /// # Examples
1716    ///
1717    /// ```
1718    /// # extern crate yew021 as yew;
1719    /// use yew::prelude::*;
1720    /// use musli_web::web03::prelude::*;
1721    ///
1722    /// mod api {
1723    ///     use musli::{Decode, Encode};
1724    ///     use musli_web::api;
1725    ///
1726    ///     #[derive(Encode, Decode)]
1727    ///     pub struct HelloRequest<'de> {
1728    ///         pub message: &'de str,
1729    ///     }
1730    ///
1731    ///     #[derive(Encode, Decode)]
1732    ///     pub struct HelloResponse<'de> {
1733    ///         pub message: &'de str,
1734    ///     }
1735    ///
1736    ///     api::define! {
1737    ///         pub type Hello;
1738    ///
1739    ///         impl Endpoint for Hello {
1740    ///             impl<'de> Request for HelloRequest<'de>;
1741    ///             type Response<'de> = HelloResponse<'de>;
1742    ///         }
1743    ///     }
1744    /// }
1745    ///
1746    /// enum Msg {
1747    ///     OnHello(Result<ws::Packet<api::Hello>, ws::Error>),
1748    /// }
1749    ///
1750    /// #[derive(Properties, PartialEq)]
1751    /// struct Props {
1752    ///     ws: ws::Handle,
1753    /// }
1754    ///
1755    /// struct App {
1756    ///     message: String,
1757    ///     _hello: ws::Request,
1758    /// }
1759    ///
1760    /// impl Component for App {
1761    ///     type Message = Msg;
1762    ///     type Properties = Props;
1763    ///
1764    ///     fn create(ctx: &Context<Self>) -> Self {
1765    ///         let hello = ctx.props().ws
1766    ///             .request()
1767    ///             .body(api::HelloRequest { message: "Hello!"})
1768    ///             .on_packet(ctx.link().callback(Msg::OnHello))
1769    ///             .send();
1770    ///
1771    ///         Self {
1772    ///             message: String::from("No Message :("),
1773    ///             _hello: hello,
1774    ///         }
1775    ///     }
1776    ///
1777    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1778    ///         match msg {
1779    ///             Msg::OnHello(Err(error)) => {
1780    ///                 tracing::error!("Request error: {:?}", error);
1781    ///                 false
1782    ///             }
1783    ///             Msg::OnHello(Ok(packet)) => {
1784    ///                 if let Ok(response) = packet.decode() {
1785    ///                     self.message = response.message.to_owned();
1786    ///                 }
1787    ///
1788    ///                 true
1789    ///             }
1790    ///         }
1791    ///     }
1792    ///
1793    ///     fn view(&self, ctx: &Context<Self>) -> Html {
1794    ///         html! {
1795    ///             <div>
1796    ///                 <h1>{"WebSocket Example"}</h1>
1797    ///                 <p>{format!("Message: {}", self.message)}</p>
1798    ///             </div>
1799    ///         }
1800    ///     }
1801    /// }
1802    /// ```
1803    pub fn request(&self) -> RequestBuilder<'_, H, EmptyBody, EmptyCallback> {
1804        RequestBuilder {
1805            shared: &self.shared,
1806            body: EmptyBody,
1807            callback: EmptyCallback,
1808        }
1809    }
1810
1811    /// Listen for broadcasts of type `T`.
1812    ///
1813    /// Returns a handle for the listener that will cancel the listener if
1814    /// dropped.
1815    ///
1816    /// # Examples
1817    ///
1818    /// ```
1819    /// # extern crate yew021 as yew;
1820    /// use yew::prelude::*;
1821    /// use musli_web::web03::prelude::*;
1822    ///
1823    /// mod api {
1824    ///     use musli::{Decode, Encode};
1825    ///     use musli_web::api;
1826    ///
1827    ///     #[derive(Encode, Decode)]
1828    ///     pub struct TickEvent<'de> {
1829    ///         pub message: &'de str,
1830    ///         pub tick: u32,
1831    ///     }
1832    ///
1833    ///     api::define! {
1834    ///         pub type Tick;
1835    ///
1836    ///         impl Broadcast for Tick {
1837    ///             impl<'de> Event for TickEvent<'de>;
1838    ///         }
1839    ///     }
1840    /// }
1841    ///
1842    /// enum Msg {
1843    ///     Tick(Result<ws::Packet<api::Tick>, ws::Error>),
1844    /// }
1845    ///
1846    /// #[derive(Properties, PartialEq)]
1847    /// struct Props {
1848    ///     ws: ws::Handle,
1849    /// }
1850    ///
1851    /// struct App {
1852    ///     tick: u32,
1853    ///     _listen: ws::Listener,
1854    /// }
1855    ///
1856    /// impl Component for App {
1857    ///     type Message = Msg;
1858    ///     type Properties = Props;
1859    ///
1860    ///     fn create(ctx: &Context<Self>) -> Self {
1861    ///         let listen = ctx.props().ws.on_broadcast(ctx.link().callback(Msg::Tick));
1862    ///
1863    ///         Self {
1864    ///             tick: 0,
1865    ///             _listen: listen,
1866    ///         }
1867    ///     }
1868    ///
1869    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1870    ///         match msg {
1871    ///             Msg::Tick(Err(error)) => {
1872    ///                 tracing::error!("Tick error: {error}");
1873    ///                 false
1874    ///             }
1875    ///             Msg::Tick(Ok(packet)) => {
1876    ///                 if let Ok(tick) = packet.decode_event() {
1877    ///                     self.tick = tick.tick;
1878    ///                 }
1879    ///
1880    ///                 true
1881    ///             }
1882    ///         }
1883    ///     }
1884    ///
1885    ///     fn view(&self, ctx: &Context<Self>) -> Html {
1886    ///         html! {
1887    ///             <div>
1888    ///                 <h1>{"WebSocket Example"}</h1>
1889    ///                 <p>{format!("Tick: {}", self.tick)}</p>
1890    ///             </div>
1891    ///         }
1892    ///     }
1893    /// }
1894    /// ```
1895    pub fn on_broadcast<T>(&self, callback: impl Callback<Result<Packet<T>>>) -> Listener
1896    where
1897        T: api::Broadcast,
1898    {
1899        self.on_raw_broadcast::<T>(move |result| match result {
1900            Ok(packet) => callback.call(Ok(Packet::new(packet))),
1901            Err(error) => callback.call(Err(error)),
1902        })
1903    }
1904
1905    /// Listen for broadcasts of type `T`.
1906    ///
1907    /// Returns a handle for the listener that will cancel the listener if
1908    /// dropped.
1909    ///
1910    /// # Examples
1911    ///
1912    /// ```
1913    /// # extern crate yew021 as yew;
1914    /// use yew::prelude::*;
1915    /// use musli_web::web03::prelude::*;
1916    ///
1917    /// mod api {
1918    ///     use musli::{Decode, Encode};
1919    ///     use musli_web::api;
1920    ///
1921    ///     #[derive(Encode, Decode)]
1922    ///     pub struct TickEvent<'de> {
1923    ///         pub message: &'de str,
1924    ///         pub tick: u32,
1925    ///     }
1926    ///
1927    ///     api::define! {
1928    ///         pub type Tick;
1929    ///
1930    ///         impl Broadcast for Tick {
1931    ///             impl<'de> Event for TickEvent<'de>;
1932    ///         }
1933    ///     }
1934    /// }
1935    ///
1936    /// enum Msg {
1937    ///     Tick(Result<ws::RawPacket, ws::Error>),
1938    /// }
1939    ///
1940    /// #[derive(Properties, PartialEq)]
1941    /// struct Props {
1942    ///     ws: ws::Handle,
1943    /// }
1944    ///
1945    /// struct App {
1946    ///     tick: u32,
1947    ///     _listen: ws::Listener,
1948    /// }
1949    ///
1950    /// impl Component for App {
1951    ///     type Message = Msg;
1952    ///     type Properties = Props;
1953    ///
1954    ///     fn create(ctx: &Context<Self>) -> Self {
1955    ///         let link = ctx.link().clone();
1956    ///         let listen = ctx.props().ws.on_raw_broadcast::<api::Tick>(ctx.link().callback(Msg::Tick));
1957    ///
1958    ///         Self {
1959    ///             tick: 0,
1960    ///             _listen: listen,
1961    ///         }
1962    ///     }
1963    ///
1964    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1965    ///         match msg {
1966    ///             Msg::Tick(Err(error)) => {
1967    ///                 tracing::error!("Tick error: {error}");
1968    ///                 false
1969    ///             }
1970    ///             Msg::Tick(Ok(packet)) => {
1971    ///                 if let Ok(tick) = packet.decode::<api::TickEvent>() {
1972    ///                     self.tick = tick.tick;
1973    ///                 }
1974    ///
1975    ///                 true
1976    ///             }
1977    ///         }
1978    ///     }
1979    ///
1980    ///     fn view(&self, ctx: &Context<Self>) -> Html {
1981    ///         html! {
1982    ///             <div>
1983    ///                 <h1>{"WebSocket Example"}</h1>
1984    ///                 <p>{format!("Tick: {}", self.tick)}</p>
1985    ///             </div>
1986    ///         }
1987    ///     }
1988    /// }
1989    /// ```
1990    pub fn on_raw_broadcast<T>(&self, callback: impl Callback<Result<RawPacket>>) -> Listener
1991    where
1992        T: api::Broadcast,
1993    {
1994        let Some(shared) = self.shared.upgrade() else {
1995            return Listener::empty_with_kind(T::ID);
1996        };
1997
1998        let index = {
1999            let mut broadcasts = shared.g.broadcasts.borrow_mut();
2000            let slots = broadcasts.entry(T::ID).or_default();
2001            slots.insert(Rc::new(callback))
2002        };
2003
2004        Listener {
2005            kind: Some(T::ID),
2006            index,
2007            g: Rc::downgrade(&shared.g),
2008        }
2009    }
2010
2011    /// Listen for state changes to the underlying connection.
2012    ///
2013    /// This indicates when the connection is open and ready to receive requests
2014    /// through [`State::Open`], or if it's closed and requests will be queued
2015    /// through [`State::Closed`].
2016    ///
2017    /// Note that if you are connecting through a proxy the reported updates
2018    /// might be volatile. It is always best to send a message over the
2019    /// connection on the server side that once received allows the client to
2020    /// know that it is connected.
2021    ///
2022    /// Dropping the returned handle will cancel the listener.
2023    ///
2024    /// # Examples
2025    ///
2026    /// ```
2027    /// # extern crate yew021 as yew;
2028    /// use yew::prelude::*;
2029    /// use musli_web::web03::prelude::*;
2030    ///
2031    /// enum Msg {
2032    ///     StateChange(ws::State),
2033    /// }
2034    ///
2035    /// #[derive(Properties, PartialEq)]
2036    /// struct Props {
2037    ///     ws: ws::Handle,
2038    /// }
2039    ///
2040    /// struct App {
2041    ///     state: ws::State,
2042    ///     _listen: ws::StateListener,
2043    /// }
2044    ///
2045    /// impl Component for App {
2046    ///     type Message = Msg;
2047    ///     type Properties = Props;
2048    ///
2049    ///     fn create(ctx: &Context<Self>) -> Self {
2050    ///         let link = ctx.link().clone();
2051    ///
2052    ///         let (state, listen) = ctx.props().ws.on_state_change(move |state| {
2053    ///             link.send_message(Msg::StateChange(state));
2054    ///         });
2055    ///
2056    ///         Self {
2057    ///             state,
2058    ///             _listen: listen,
2059    ///         }
2060    ///     }
2061    ///
2062    ///     fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
2063    ///         match msg {
2064    ///             Msg::StateChange(state) => {
2065    ///                 self.state = state;
2066    ///                 true
2067    ///             }
2068    ///         }
2069    ///     }
2070    ///
2071    ///     fn view(&self, ctx: &Context<Self>) -> Html {
2072    ///         html! {
2073    ///             <div>
2074    ///                 <h1>{"WebSocket Example"}</h1>
2075    ///                 <p>{format!("State: {:?}", self.state)}</p>
2076    ///             </div>
2077    ///         }
2078    ///     }
2079    /// }
2080    pub fn on_state_change(&self, callback: impl Callback<State>) -> (State, StateListener) {
2081        let Some(shared) = self.shared.upgrade() else {
2082            return (
2083                State::Closed,
2084                StateListener {
2085                    index: 0,
2086                    g: Weak::new(),
2087                },
2088            );
2089        };
2090
2091        let (state, index) = {
2092            let index = shared
2093                .g
2094                .state_listeners
2095                .borrow_mut()
2096                .insert(Rc::new(callback));
2097            (shared.state.get(), index)
2098        };
2099
2100        let listener = StateListener {
2101            index,
2102            g: Rc::downgrade(&shared.g),
2103        };
2104
2105        (state, listener)
2106    }
2107}
2108
2109impl<H> PartialEq for Handle<H>
2110where
2111    H: WebImpl,
2112{
2113    #[inline]
2114    fn eq(&self, _: &Self) -> bool {
2115        true
2116    }
2117}
2118
2119struct Pending<C>
2120where
2121    C: ?Sized,
2122{
2123    kind: MessageId,
2124    serial: u32,
2125    callback: C,
2126}
2127
2128impl<C> fmt::Debug for Pending<C> {
2129    #[inline]
2130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2131        f.debug_struct("Pending")
2132            .field("serial", &self.serial)
2133            .field("kind", &self.kind)
2134            .finish_non_exhaustive()
2135    }
2136}
2137struct ForcePrefix<'a>(&'a str, char);
2138
2139impl fmt::Display for ForcePrefix<'_> {
2140    #[inline]
2141    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2142        let Self(string, prefix) = *self;
2143        prefix.fmt(f)?;
2144        string.trim_start_matches(prefix).fmt(f)?;
2145        Ok(())
2146    }
2147}