musli_web/web.rs
1//! The generic web implementation.
2//!
3//! This is specialized over the `H` parameter through modules such as:
4//!
5//! * [`web03`] for `web-sys` `0.3.x`.
6//!
7//! [`web03`]: crate::web03
8
9use core::cell::{Cell, RefCell};
10use core::fmt;
11use core::marker::PhantomData;
12use core::mem;
13use core::ops::Deref;
14use core::ptr::NonNull;
15use std::collections::VecDeque;
16
17use alloc::boxed::Box;
18use alloc::format;
19use alloc::rc::Rc;
20use alloc::rc::Weak;
21use alloc::string::{String, ToString};
22use alloc::vec::Vec;
23
24use std::collections::hash_map::{Entry, HashMap};
25
26use musli::Decode;
27use musli::alloc::Global;
28use musli::mode::Binary;
29use musli::reader::SliceReader;
30use musli::storage;
31
32use slab::Slab;
33
34use crate::api::{self, Event, MessageId};
35
36const MAX_CAPACITY: usize = 1048576;
37
38/// An empty request body.
39#[non_exhaustive]
40pub struct EmptyBody;
41
42/// An empty callback.
43#[non_exhaustive]
44pub struct EmptyCallback;
45
46/// Slab of state listeners.
47type StateListeners = Slab<Rc<dyn Callback<State>>>;
48/// Slab of broadcast listeners.
49type Broadcasts = HashMap<MessageId, Slab<Rc<dyn Callback<Result<RawPacket>>>>>;
50/// Queue of recycled buffers.
51type Buffers = VecDeque<Box<BufData>>;
52/// Pending requests.
53type Requests = HashMap<u32, Box<Pending<dyn Callback<Result<RawPacket>>>>>;
54
55/// Location information for websocket implementation.
56#[doc(hidden)]
57pub struct Location {
58 pub(crate) protocol: String,
59 pub(crate) host: String,
60 pub(crate) port: String,
61}
62
63pub(crate) mod sealed_socket {
64 pub trait Sealed {}
65}
66
67pub(crate) trait SocketImpl
68where
69 Self: Sized + self::sealed_socket::Sealed,
70{
71 #[doc(hidden)]
72 type Handles;
73
74 #[doc(hidden)]
75 fn new(url: &str, handles: &Self::Handles) -> Result<Self, Error>;
76
77 #[doc(hidden)]
78 fn send(&self, data: &[u8]) -> Result<(), Error>;
79
80 #[doc(hidden)]
81 fn close(self) -> Result<(), Error>;
82}
83
84pub(crate) mod sealed_performance {
85 pub trait Sealed {}
86}
87
88pub trait PerformanceImpl
89where
90 Self: Sized + self::sealed_performance::Sealed,
91{
92 #[doc(hidden)]
93 fn now(&self) -> f64;
94}
95
96pub(crate) mod sealed_window {
97 pub trait Sealed {}
98}
99
100pub(crate) trait WindowImpl
101where
102 Self: Sized + self::sealed_window::Sealed,
103{
104 #[doc(hidden)]
105 type Performance: PerformanceImpl;
106
107 #[doc(hidden)]
108 type Timeout;
109
110 #[doc(hidden)]
111 fn new() -> Result<Self, Error>;
112
113 #[doc(hidden)]
114 fn performance(&self) -> Result<Self::Performance, Error>;
115
116 #[doc(hidden)]
117 fn location(&self) -> Result<Location, Error>;
118
119 #[doc(hidden)]
120 fn set_timeout(
121 &self,
122 millis: u32,
123 callback: impl FnOnce() + 'static,
124 ) -> Result<Self::Timeout, Error>;
125}
126
127pub(crate) mod sealed_web {
128 pub trait Sealed {}
129}
130
131/// Central trait for web integration.
132///
133/// Since web integration is currently unstable, this requires multiple
134/// different implementations, each time an ecosystem breaking change is
135/// released.
136///
137/// The crate in focus here is `web-sys`, and the corresponding modules provide
138/// integrations:
139///
140/// * [web03] for `web-sys` `0.3.x`.
141///
142/// [web03]: crate::web03
143pub trait WebImpl
144where
145 Self: 'static + Copy + Sized + self::sealed_web::Sealed,
146{
147 #[doc(hidden)]
148 #[allow(private_bounds)]
149 type Window: WindowImpl;
150
151 #[doc(hidden)]
152 type Handles;
153
154 #[doc(hidden)]
155 #[allow(private_bounds)]
156 type Socket: SocketImpl<Handles = Self::Handles>;
157
158 #[doc(hidden)]
159 #[allow(private_interfaces)]
160 fn handles(shared: &Weak<Shared<Self>>) -> Self::Handles;
161
162 #[doc(hidden)]
163 fn random(range: u32) -> u32;
164}
165
166/// Construct a new [`ServiceBuilder`] associated with the given [`Connect`]
167/// strategy.
168pub fn connect<H>(connect: Connect) -> ServiceBuilder<H, EmptyCallback>
169where
170 H: WebImpl,
171{
172 ServiceBuilder {
173 connect,
174 on_error: EmptyCallback,
175 _marker: PhantomData,
176 }
177}
178
179/// The state of the connection.
180///
181/// A listener for state changes can be set up through for example
182/// [`Handle::on_state_change`].
183#[derive(Debug, PartialEq, Eq, Clone, Copy)]
184#[non_exhaustive]
185pub enum State {
186 /// The connection is open.
187 Open,
188 /// The connection is closed.
189 Closed,
190}
191
192/// Error type for the WebSocket service.
193#[derive(Debug)]
194pub struct Error {
195 kind: ErrorKind,
196}
197
198#[derive(Debug)]
199enum ErrorKind {
200 Message(String),
201 Storage(storage::Error),
202 Overflow(usize, usize),
203}
204
205impl Error {
206 #[inline]
207 fn new(kind: ErrorKind) -> Self {
208 Self { kind }
209 }
210
211 #[inline]
212 pub(crate) fn msg(message: impl fmt::Display) -> Self {
213 Self::new(ErrorKind::Message(message.to_string()))
214 }
215}
216
217impl fmt::Display for Error {
218 #[inline]
219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220 match &self.kind {
221 ErrorKind::Message(message) => write!(f, "{message}"),
222 ErrorKind::Storage(error) => write!(f, "Encoding error: {error}"),
223 ErrorKind::Overflow(at, len) => {
224 write!(f, "Internal packet overflow, {at} not in range 0-{len}")
225 }
226 }
227 }
228}
229
230impl core::error::Error for Error {
231 #[inline]
232 fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
233 match &self.kind {
234 ErrorKind::Storage(error) => Some(error),
235 _ => None,
236 }
237 }
238}
239
240impl From<storage::Error> for Error {
241 #[inline]
242 fn from(error: storage::Error) -> Self {
243 Self::new(ErrorKind::Storage(error))
244 }
245}
246
247#[cfg(feature = "wasm_bindgen02")]
248impl From<wasm_bindgen02::JsValue> for Error {
249 #[inline]
250 fn from(error: wasm_bindgen02::JsValue) -> Self {
251 Self::new(ErrorKind::Message(format!("{error:?}")))
252 }
253}
254
255type Result<T, E = Error> = core::result::Result<T, E>;
256
257const INITIAL_TIMEOUT: u32 = 250;
258const MAX_TIMEOUT: u32 = 4000;
259
260/// How to connect to the websocket.
261enum ConnectKind {
262 Location { path: String },
263 Url { url: String },
264}
265
266/// A specification for how to connect a websocket.
267pub struct Connect {
268 kind: ConnectKind,
269}
270
271impl Connect {
272 /// Connect to the same location with a custom path.
273 ///
274 /// Note that any number of `/` prefixes are ignored, the canonical
275 /// representation always ignores them and the path is relative to the
276 /// current location.
277 #[inline]
278 pub fn location(path: impl AsRef<str>) -> Self {
279 Self {
280 kind: ConnectKind::Location {
281 path: String::from(path.as_ref()),
282 },
283 }
284 }
285
286 /// Connect to the specified URL.
287 #[inline]
288 pub fn url(url: String) -> Self {
289 Self {
290 kind: ConnectKind::Url { url },
291 }
292 }
293}
294
295/// Generic but shared fields which do not depend on specialization over `H`.
296struct Generic {
297 state_listeners: RefCell<StateListeners>,
298 requests: RefCell<Requests>,
299 broadcasts: RefCell<Broadcasts>,
300 buffers: RefCell<Buffers>,
301}
302
303/// Shared implementation details for websocket implementations.
304pub(crate) struct Shared<H>
305where
306 H: WebImpl,
307{
308 connect: Connect,
309 pub(crate) on_error: Box<dyn Callback<Error>>,
310 window: H::Window,
311 performance: <H::Window as WindowImpl>::Performance,
312 handles: H::Handles,
313 state: Cell<State>,
314 opened: Cell<Option<f64>>,
315 serial: Cell<u32>,
316 defer_broadcasts: RefCell<VecDeque<Weak<dyn Callback<Result<RawPacket>>>>>,
317 defer_state_listeners: RefCell<VecDeque<Weak<dyn Callback<State>>>>,
318 pub(crate) socket: RefCell<Option<H::Socket>>,
319 output: RefCell<Vec<u8>>,
320 current_timeout: Cell<u32>,
321 reconnect_timeout: RefCell<Option<<H::Window as WindowImpl>::Timeout>>,
322 g: Rc<Generic>,
323}
324
325impl<H> Drop for Shared<H>
326where
327 H: WebImpl,
328{
329 fn drop(&mut self) {
330 if let Some(s) = self.socket.take()
331 && let Err(e) = s.close()
332 {
333 self.on_error.call(e);
334 }
335
336 // We don't need to worry about mutable borrows here, since we only have
337 // weak references to Shared and by virtue of this being dropped they
338 // are all invalid.
339 let state_listeners = mem::take(&mut *self.g.state_listeners.borrow_mut());
340 let mut requests = self.g.requests.borrow_mut();
341
342 for (_, listener) in state_listeners {
343 listener.call(State::Closed);
344 }
345
346 for (_, p) in requests.drain() {
347 p.callback.call(Err(Error::msg("Websocket service closed")));
348 }
349 }
350}
351
352/// Builder of a service.
353pub struct ServiceBuilder<H, E>
354where
355 H: WebImpl,
356{
357 connect: Connect,
358 on_error: E,
359 _marker: PhantomData<H>,
360}
361
362impl<H, E> ServiceBuilder<H, E>
363where
364 H: WebImpl,
365 E: Callback<Error>,
366{
367 /// Set the error handler to use for the service.
368 pub fn on_error<U>(self, on_error: U) -> ServiceBuilder<H, U>
369 where
370 U: Callback<Error>,
371 {
372 ServiceBuilder {
373 connect: self.connect,
374 on_error,
375 _marker: self._marker,
376 }
377 }
378
379 /// Build a new service and handle.
380 pub fn build(self) -> Service<H> {
381 let window = match H::Window::new() {
382 Ok(window) => window,
383 Err(error) => {
384 panic!("{error}")
385 }
386 };
387
388 let performance = match WindowImpl::performance(&window) {
389 Ok(performance) => performance,
390 Err(error) => {
391 panic!("{error}")
392 }
393 };
394
395 let shared = Rc::<Shared<H>>::new_cyclic(move |shared| Shared {
396 connect: self.connect,
397 on_error: Box::new(self.on_error),
398 window,
399 performance,
400 handles: H::handles(shared),
401 state: Cell::new(State::Closed),
402 opened: Cell::new(None),
403 serial: Cell::new(0),
404 defer_broadcasts: RefCell::new(VecDeque::new()),
405 defer_state_listeners: RefCell::new(VecDeque::new()),
406 socket: RefCell::new(None),
407 output: RefCell::new(Vec::new()),
408 current_timeout: Cell::new(INITIAL_TIMEOUT),
409 reconnect_timeout: RefCell::new(None),
410 g: Rc::new(Generic {
411 state_listeners: RefCell::new(Slab::new()),
412 broadcasts: RefCell::new(HashMap::new()),
413 requests: RefCell::new(Requests::new()),
414 buffers: RefCell::new(VecDeque::new()),
415 }),
416 });
417
418 let handle = Handle {
419 shared: Rc::downgrade(&shared),
420 };
421
422 Service { shared, handle }
423 }
424}
425
426/// The service handle.
427///
428/// Once dropped this will cause the service to be disconnected and all requests
429/// to be cancelled.
430pub struct Service<H>
431where
432 H: WebImpl,
433{
434 shared: Rc<Shared<H>>,
435 handle: Handle<H>,
436}
437
438impl<H> Service<H>
439where
440 H: WebImpl,
441{
442 /// Attempt to establish a websocket connection.
443 pub fn connect(&self) {
444 self.shared.connect()
445 }
446
447 /// Build a handle to the service.
448 pub fn handle(&self) -> &Handle<H> {
449 &self.handle
450 }
451}
452
453impl<H> Shared<H>
454where
455 H: WebImpl,
456{
457 /// Send a client message.
458 fn send_client_request<T>(&self, serial: u32, body: &T) -> Result<()>
459 where
460 T: api::Request,
461 {
462 let Some(ref socket) = *self.socket.borrow() else {
463 return Err(Error::msg("Socket is not connected"));
464 };
465
466 let header = api::RequestHeader {
467 serial,
468 id: <T::Endpoint as api::Endpoint>::ID.get(),
469 };
470
471 let out = &mut *self.output.borrow_mut();
472
473 storage::to_writer(&mut *out, &header)?;
474 storage::to_writer(&mut *out, &body)?;
475
476 tracing::debug!(
477 header.serial,
478 ?header.id,
479 len = out.len(),
480 "Sending request"
481 );
482
483 socket.send(out.as_slice())?;
484
485 out.clear();
486 out.shrink_to(MAX_CAPACITY);
487 Ok(())
488 }
489
490 pub(crate) fn next_buffer(self: &Rc<Self>, needed: usize) -> Box<BufData> {
491 match self.g.buffers.borrow_mut().pop_front() {
492 Some(mut buf) => {
493 if buf.data.capacity() < needed {
494 buf.data.reserve(needed - buf.data.len());
495 }
496
497 buf
498 }
499 None => Box::new(BufData::with_capacity(Rc::downgrade(&self.g), needed)),
500 }
501 }
502
503 pub(crate) fn message(self: &Rc<Self>, buf: Box<BufData>) -> Result<()> {
504 // Wrap the buffer in a simple shared reference-counted container.
505 let buf = BufRc::new(buf);
506 let mut reader = SliceReader::new(&buf);
507
508 let header: api::ResponseHeader = storage::decode(&mut reader)?;
509
510 if let Some(broadcast) = MessageId::new(header.broadcast) {
511 tracing::debug!(?header.broadcast, "Got broadcast");
512
513 if !self.prepare_broadcast(broadcast) {
514 return Ok(());
515 };
516
517 if let Some(id) = MessageId::new(header.error) {
518 let error = if id == MessageId::ERROR_MESSAGE {
519 storage::decode(&mut reader)?
520 } else {
521 api::ErrorMessage {
522 message: "Unknown error",
523 }
524 };
525
526 while let Some(callback) = self.defer_broadcasts.borrow_mut().pop_front() {
527 if let Some(callback) = callback.upgrade() {
528 callback.call(Err(Error::msg(error.message)));
529 }
530 }
531
532 return Ok(());
533 }
534
535 let at = buf.len().saturating_sub(reader.remaining());
536
537 let packet = RawPacket {
538 buf: buf.clone(),
539 at: Cell::new(at),
540 id: broadcast,
541 };
542
543 while let Some(callback) = self.defer_broadcasts.borrow_mut().pop_front() {
544 if let Some(callback) = callback.upgrade() {
545 callback.call(Ok(packet.clone()));
546 }
547 }
548 } else {
549 tracing::debug!(header.serial, "Got response");
550
551 let p = {
552 let mut requests = self.g.requests.borrow_mut();
553
554 let Some(p) = requests.remove(&header.serial) else {
555 tracing::debug!(header.serial, "Missing request");
556 return Ok(());
557 };
558
559 p
560 };
561
562 if let Some(id) = MessageId::new(header.error) {
563 let error = if id == MessageId::ERROR_MESSAGE {
564 storage::decode(&mut reader)?
565 } else {
566 api::ErrorMessage {
567 message: "Unknown error",
568 }
569 };
570
571 p.callback.call(Err(Error::msg(error.message)));
572 return Ok(());
573 }
574
575 let at = buf.len().saturating_sub(reader.remaining());
576
577 let packet = RawPacket {
578 id: p.kind,
579 buf,
580 at: Cell::new(at),
581 };
582
583 p.callback.call(Ok(packet));
584 }
585
586 Ok(())
587 }
588
589 fn prepare_broadcast(self: &Rc<Self>, kind: MessageId) -> bool {
590 // Note: We need to defer this, since the outcome of calling
591 // the broadcast callback might be that the broadcast
592 // listener is modified, which could require mutable access
593 // to broadcasts.
594 let mut defer = self.defer_broadcasts.borrow_mut();
595
596 let broadcasts = self.g.broadcasts.borrow();
597
598 let Some(broadcasts) = broadcasts.get(&kind) else {
599 return false;
600 };
601
602 for (_, callback) in broadcasts.iter() {
603 defer.push_back(Rc::downgrade(callback));
604 }
605
606 !defer.is_empty()
607 }
608
609 pub(crate) fn set_open(&self) {
610 tracing::debug!("Set open");
611 self.opened
612 .set(Some(PerformanceImpl::now(&self.performance)));
613 self.emit_state_change(State::Open);
614 }
615
616 fn is_open_for_a_while(&self) -> bool {
617 let Some(at) = self.opened.get() else {
618 return false;
619 };
620
621 let now = PerformanceImpl::now(&self.performance);
622 (now - at) >= 250.0
623 }
624
625 pub(crate) fn close(self: &Rc<Self>) -> Result<(), Error> {
626 tracing::debug!("Close connection");
627
628 // We need a weak reference back to shared state to handle the timeout.
629 let shared = Rc::downgrade(self);
630
631 tracing::debug!(
632 "Set closed timeout={}, opened={:?}",
633 self.current_timeout.get(),
634 self.opened.get(),
635 );
636
637 if !self.is_open_for_a_while() {
638 let current_timeout = self.current_timeout.get();
639
640 if current_timeout < MAX_TIMEOUT {
641 let fuzz = H::random(50);
642
643 self.current_timeout.set(
644 current_timeout
645 .saturating_mul(2)
646 .saturating_add(fuzz)
647 .min(MAX_TIMEOUT),
648 );
649 }
650 } else {
651 self.current_timeout.set(INITIAL_TIMEOUT);
652 }
653
654 self.opened.set(None);
655 self.emit_state_change(State::Closed);
656
657 if let Some(s) = self.socket.take() {
658 s.close()?;
659 }
660
661 self.close_pending();
662
663 let timeout = self
664 .window
665 .set_timeout(self.current_timeout.get(), move || {
666 if let Some(shared) = shared.upgrade() {
667 Self::connect(&shared);
668 }
669 })?;
670
671 drop(self.reconnect_timeout.borrow_mut().replace(timeout));
672 Ok(())
673 }
674
675 /// Close an pending requests with an error, since there is no chance they
676 /// will be responded to any more.
677 fn close_pending(self: &Rc<Self>) {
678 loop {
679 let Some(serial) = self.g.requests.borrow().keys().next().copied() else {
680 break;
681 };
682
683 let p = {
684 let mut requests = self.g.requests.borrow_mut();
685
686 let Some(p) = requests.remove(&serial) else {
687 break;
688 };
689
690 p
691 };
692
693 p.callback.call(Err(Error::msg("Connection closed")));
694 }
695 }
696
697 fn emit_state_change(&self, state: State) {
698 if self.state.get() == state {
699 return;
700 }
701
702 self.state.set(state);
703
704 {
705 // We need to collect callbacks to avoid the callback recursively
706 // borrowing state listeners, which it would if it modifies any
707 // existing state listeners.
708 let mut defer = self.defer_state_listeners.borrow_mut();
709
710 for (_, callback) in self.g.state_listeners.borrow().iter() {
711 defer.push_back(Rc::downgrade(callback));
712 }
713
714 if defer.is_empty() {
715 return;
716 }
717 }
718
719 while let Some(callback) = self.defer_state_listeners.borrow_mut().pop_front() {
720 if let Some(callback) = callback.upgrade() {
721 callback.call(state);
722 }
723 }
724 }
725
726 fn is_closed(&self) -> bool {
727 self.opened.get().is_none()
728 }
729
730 fn connect(self: &Rc<Self>) {
731 tracing::debug!("Connect");
732
733 if let Err(e) = self.build() {
734 self.on_error.call(e);
735 } else {
736 return;
737 }
738
739 if let Err(e) = self.close() {
740 self.on_error.call(e);
741 }
742 }
743
744 /// Build a websocket connection.
745 fn build(self: &Rc<Self>) -> Result<()> {
746 let url = match &self.connect.kind {
747 ConnectKind::Location { path } => {
748 let Location {
749 protocol,
750 host,
751 port,
752 } = WindowImpl::location(&self.window)?;
753
754 let protocol = match protocol.as_str() {
755 "https:" => "wss:",
756 "http:" => "ws:",
757 other => {
758 return Err(Error::msg(format_args!(
759 "Same host connection is not supported for protocol `{other}`"
760 )));
761 }
762 };
763
764 let path = ForcePrefix(path, '/');
765 format!("{protocol}//{host}:{port}{path}")
766 }
767 ConnectKind::Url { url } => url.clone(),
768 };
769
770 let ws = SocketImpl::new(&url, &self.handles)?;
771
772 let old = self.socket.borrow_mut().replace(ws);
773
774 if let Some(old) = old {
775 old.close()?;
776 }
777
778 Ok(())
779 }
780}
781
782/// Trait governing how callbacks are called.
783pub trait Callback<I>
784where
785 Self: 'static,
786{
787 /// Call the callback.
788 fn call(&self, input: I);
789}
790
791impl<I> Callback<I> for EmptyCallback {
792 #[inline]
793 fn call(&self, _: I) {}
794}
795
796impl<F, I> Callback<I> for F
797where
798 F: 'static + Fn(I),
799{
800 #[inline]
801 fn call(&self, input: I) {
802 self(input)
803 }
804}
805
806/// A request builder .
807///
808/// Associate the callback to be used by using either
809/// [`RequestBuilder::on_packet`] or [`RequestBuilder::on_raw_packet`] depending
810/// on your needs.
811///
812/// Send the request with [`RequestBuilder::send`].
813pub struct RequestBuilder<'a, H, B, C>
814where
815 H: WebImpl,
816{
817 shared: &'a Weak<Shared<H>>,
818 body: B,
819 callback: C,
820}
821
822impl<'a, H, B, C> RequestBuilder<'a, H, B, C>
823where
824 H: WebImpl,
825{
826 /// Set the body of the request.
827 #[inline]
828 pub fn body<U>(self, body: U) -> RequestBuilder<'a, H, U, C>
829 where
830 U: api::Request,
831 {
832 RequestBuilder {
833 shared: self.shared,
834 body,
835 callback: self.callback,
836 }
837 }
838
839 /// Handle the response using the specified callback.
840 ///
841 /// # Examples
842 ///
843 /// ```
844 /// # extern crate yew021 as yew;
845 /// use yew::prelude::*;
846 /// use musli_web::web03::prelude::*;
847 ///
848 /// mod api {
849 /// use musli::{Decode, Encode};
850 /// use musli_web::api;
851 ///
852 /// #[derive(Encode, Decode)]
853 /// pub struct HelloRequest<'de> {
854 /// pub message: &'de str,
855 /// }
856 ///
857 /// #[derive(Encode, Decode)]
858 /// pub struct HelloResponse<'de> {
859 /// pub message: &'de str,
860 /// }
861 ///
862 /// api::define! {
863 /// pub type Hello;
864 ///
865 /// impl Endpoint for Hello {
866 /// impl<'de> Request for HelloRequest<'de>;
867 /// type Response<'de> = HelloResponse<'de>;
868 /// }
869 /// }
870 /// }
871 ///
872 /// enum Msg {
873 /// OnHello(Result<ws::Packet<api::Hello>, ws::Error>),
874 /// }
875 ///
876 /// #[derive(Properties, PartialEq)]
877 /// struct Props {
878 /// ws: ws::Handle,
879 /// }
880 ///
881 /// struct App {
882 /// message: String,
883 /// _hello: ws::Request,
884 /// }
885 ///
886 /// impl Component for App {
887 /// type Message = Msg;
888 /// type Properties = Props;
889 ///
890 /// fn create(ctx: &Context<Self>) -> Self {
891 /// let hello = ctx.props().ws
892 /// .request()
893 /// .body(api::HelloRequest { message: "Hello!"})
894 /// .on_packet(ctx.link().callback(Msg::OnHello))
895 /// .send();
896 ///
897 /// Self {
898 /// message: String::from("No Message :("),
899 /// _hello: hello,
900 /// }
901 /// }
902 ///
903 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
904 /// match msg {
905 /// Msg::OnHello(Err(error)) => {
906 /// tracing::error!("Request error: {:?}", error);
907 /// false
908 /// }
909 /// Msg::OnHello(Ok(packet)) => {
910 /// if let Ok(response) = packet.decode() {
911 /// self.message = response.message.to_owned();
912 /// }
913 ///
914 /// true
915 /// }
916 /// }
917 /// }
918 ///
919 /// fn view(&self, ctx: &Context<Self>) -> Html {
920 /// html! {
921 /// <div>
922 /// <h1>{"WebSocket Example"}</h1>
923 /// <p>{format!("Message: {}", self.message)}</p>
924 /// </div>
925 /// }
926 /// }
927 /// }
928 /// ```
929 pub fn on_packet<E>(
930 self,
931 callback: impl Callback<Result<Packet<E>>>,
932 ) -> RequestBuilder<'a, H, B, impl Callback<Result<RawPacket>>>
933 where
934 E: api::Endpoint,
935 {
936 self.on_raw_packet(move |result: Result<RawPacket>| match result {
937 Ok(ok) => callback.call(Ok(Packet::new(ok))),
938 Err(err) => callback.call(Err(err)),
939 })
940 }
941
942 /// Handle the response using the specified callback.
943 ///
944 /// # Examples
945 ///
946 /// ```
947 /// # extern crate yew021 as yew;
948 /// use yew::prelude::*;
949 /// use musli_web::web03::prelude::*;
950 ///
951 /// mod api {
952 /// use musli::{Decode, Encode};
953 /// use musli_web::api;
954 ///
955 /// #[derive(Encode, Decode)]
956 /// pub struct HelloRequest<'de> {
957 /// pub message: &'de str,
958 /// }
959 ///
960 /// #[derive(Encode, Decode)]
961 /// pub struct HelloResponse<'de> {
962 /// pub message: &'de str,
963 /// }
964 ///
965 /// api::define! {
966 /// pub type Hello;
967 ///
968 /// impl Endpoint for Hello {
969 /// impl<'de> Request for HelloRequest<'de>;
970 /// type Response<'de> = HelloResponse<'de>;
971 /// }
972 /// }
973 /// }
974 ///
975 /// enum Msg {
976 /// OnHello(Result<ws::RawPacket, ws::Error>),
977 /// }
978 ///
979 /// #[derive(Properties, PartialEq)]
980 /// struct Props {
981 /// ws: ws::Handle,
982 /// }
983 ///
984 /// struct App {
985 /// message: String,
986 /// _hello: ws::Request,
987 /// }
988 ///
989 /// impl Component for App {
990 /// type Message = Msg;
991 /// type Properties = Props;
992 ///
993 /// fn create(ctx: &Context<Self>) -> Self {
994 /// let link = ctx.link().clone();
995 ///
996 /// let hello = ctx.props().ws
997 /// .request()
998 /// .body(api::HelloRequest { message: "Hello!"})
999 /// .on_raw_packet(move |packet| link.send_message(Msg::OnHello(packet)))
1000 /// .send();
1001 ///
1002 /// Self {
1003 /// message: String::from("No Message :("),
1004 /// _hello: hello,
1005 /// }
1006 /// }
1007 ///
1008 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1009 /// match msg {
1010 /// Msg::OnHello(Err(error)) => {
1011 /// tracing::error!("Request error: {:?}", error);
1012 /// false
1013 /// }
1014 /// Msg::OnHello(Ok(packet)) => {
1015 /// if let Ok(response) = packet.decode::<api::HelloResponse>() {
1016 /// self.message = response.message.to_owned();
1017 /// }
1018 ///
1019 /// true
1020 /// }
1021 /// }
1022 /// }
1023 ///
1024 /// fn view(&self, ctx: &Context<Self>) -> Html {
1025 /// html! {
1026 /// <div>
1027 /// <h1>{"WebSocket Example"}</h1>
1028 /// <p>{format!("Message: {}", self.message)}</p>
1029 /// </div>
1030 /// }
1031 /// }
1032 /// }
1033 /// ```
1034 pub fn on_raw_packet<U>(self, callback: U) -> RequestBuilder<'a, H, B, U>
1035 where
1036 U: Callback<Result<RawPacket, Error>>,
1037 {
1038 RequestBuilder {
1039 shared: self.shared,
1040 body: self.body,
1041 callback,
1042 }
1043 }
1044}
1045
1046impl<'a, H, B, C> RequestBuilder<'a, H, B, C>
1047where
1048 B: api::Request,
1049 C: Callback<Result<RawPacket>>,
1050 H: WebImpl,
1051{
1052 /// Send the request.
1053 ///
1054 /// This requires that a body has been set using [`RequestBuilder::body`].
1055 pub fn send(self) -> Request {
1056 let Some(shared) = self.shared.upgrade() else {
1057 self.callback
1058 .call(Err(Error::msg("WebSocket service is down")));
1059 return Request::new();
1060 };
1061
1062 if shared.is_closed() {
1063 self.callback
1064 .call(Err(Error::msg("WebSocket is not connected")));
1065 return Request::new();
1066 }
1067
1068 let serial = shared.serial.get();
1069
1070 if let Err(error) = shared.send_client_request(serial, &self.body) {
1071 shared.on_error.call(error);
1072 return Request::new();
1073 }
1074
1075 shared.serial.set(serial.wrapping_add(1));
1076
1077 let pending = Pending {
1078 kind: <B::Endpoint as api::Endpoint>::ID,
1079 serial,
1080 callback: self.callback,
1081 };
1082
1083 let existing = shared
1084 .g
1085 .requests
1086 .borrow_mut()
1087 .insert(serial, Box::new(pending));
1088
1089 if let Some(p) = existing {
1090 p.callback.call(Err(Error::msg("Request cancelled")));
1091 }
1092
1093 Request {
1094 serial,
1095 g: Rc::downgrade(&shared.g),
1096 }
1097 }
1098}
1099
1100/// The handle for a pending request.
1101///
1102/// Dropping or [`clear()`] this handle will cancel the request.
1103///
1104/// [`clear()`]: Self::clear
1105pub struct Request {
1106 serial: u32,
1107 g: Weak<Generic>,
1108}
1109
1110impl Request {
1111 /// An empty request handler.
1112 #[inline]
1113 pub const fn new() -> Self {
1114 Self {
1115 serial: 0,
1116 g: Weak::new(),
1117 }
1118 }
1119
1120 /// Clear the request handle without dropping it, cancelling any pending
1121 /// requests.
1122 pub fn clear(&mut self) {
1123 let removed = {
1124 let serial = mem::take(&mut self.serial);
1125
1126 let Some(g) = self.g.upgrade() else {
1127 return;
1128 };
1129
1130 self.g = Weak::new();
1131
1132 let Some(p) = g.requests.borrow_mut().remove(&serial) else {
1133 return;
1134 };
1135
1136 p
1137 };
1138
1139 drop(removed);
1140 }
1141}
1142
1143impl Default for Request {
1144 #[inline]
1145 fn default() -> Self {
1146 Self::new()
1147 }
1148}
1149
1150impl Drop for Request {
1151 #[inline]
1152 fn drop(&mut self) {
1153 self.clear();
1154 }
1155}
1156
1157/// The handle for a pending request.
1158///
1159/// Dropping or calling [`clear()`] on this handle remove the listener.
1160///
1161/// [`clear()`]: Self::clear
1162pub struct Listener {
1163 kind: Option<MessageId>,
1164 index: usize,
1165 g: Weak<Generic>,
1166}
1167
1168impl Listener {
1169 /// Construct an empty listener.
1170 #[inline]
1171 pub const fn new() -> Self {
1172 Self {
1173 kind: None,
1174 index: 0,
1175 g: Weak::new(),
1176 }
1177 }
1178
1179 /// Build up an empty listener with the specified kind.
1180 #[inline]
1181 pub(crate) const fn empty_with_kind(kind: MessageId) -> Self {
1182 Self {
1183 kind: Some(kind),
1184 index: 0,
1185 g: Weak::new(),
1186 }
1187 }
1188
1189 /// Clear the listener without dropping it.
1190 ///
1191 /// This will remove the associated broadcast listener from being notified.
1192 pub fn clear(&mut self) {
1193 // Gather values here to drop them outside of the upgrade block.
1194 let removed;
1195 let removed_value;
1196
1197 {
1198 let Some(g) = self.g.upgrade() else {
1199 return;
1200 };
1201
1202 self.g = Weak::new();
1203 let index = mem::take(&mut self.index);
1204
1205 let Some(kind) = self.kind.take() else {
1206 return;
1207 };
1208
1209 let mut broadcasts = g.broadcasts.borrow_mut();
1210
1211 let Entry::Occupied(mut e) = broadcasts.entry(kind) else {
1212 return;
1213 };
1214
1215 removed = e.get_mut().try_remove(index);
1216
1217 if e.get().is_empty() {
1218 removed_value = Some(e.remove());
1219 } else {
1220 removed_value = None;
1221 }
1222 }
1223
1224 // Drop here, to avoid invoking any destructors which might borrow
1225 // shared mutably earlier.
1226 drop(removed);
1227 drop(removed_value);
1228 }
1229}
1230
1231impl Default for Listener {
1232 #[inline]
1233 fn default() -> Self {
1234 Self::new()
1235 }
1236}
1237
1238impl Drop for Listener {
1239 #[inline]
1240 fn drop(&mut self) {
1241 self.clear();
1242 }
1243}
1244
1245/// The handle for state change listening.
1246///
1247/// Dropping or calling [`clear()`] on this handle will remove the associated
1248/// callback from being notified.
1249///
1250/// [`clear()`]: Self::clear
1251pub struct StateListener {
1252 index: usize,
1253 g: Weak<Generic>,
1254}
1255
1256impl StateListener {
1257 /// Construct an empty state listener.
1258 #[inline]
1259 pub const fn new() -> Self {
1260 Self {
1261 index: 0,
1262 g: Weak::new(),
1263 }
1264 }
1265
1266 /// Clear the state listener without dropping it.
1267 ///
1268 /// This will remove the associated callback from being notified.
1269 pub fn clear(&mut self) {
1270 let removed = {
1271 let Some(g) = self.g.upgrade() else {
1272 return;
1273 };
1274
1275 self.g = Weak::new();
1276
1277 g.state_listeners.borrow_mut().try_remove(self.index)
1278 };
1279
1280 drop(removed);
1281 }
1282}
1283
1284impl Default for StateListener {
1285 #[inline]
1286 fn default() -> Self {
1287 Self::new()
1288 }
1289}
1290
1291impl Drop for StateListener {
1292 #[inline]
1293 fn drop(&mut self) {
1294 self.clear();
1295 }
1296}
1297
1298pub(crate) struct BufData {
1299 /// Buffer being used.
1300 pub(crate) data: Vec<u8>,
1301 /// Number of strong references to this buffer.
1302 strong: Cell<usize>,
1303 /// Reference to shared state where the buffer will be recycled to.
1304 g: Weak<Generic>,
1305}
1306
1307impl BufData {
1308 fn with_capacity(g: Weak<Generic>, capacity: usize) -> Self {
1309 Self {
1310 data: Vec::with_capacity(capacity),
1311 strong: Cell::new(0),
1312 g,
1313 }
1314 }
1315
1316 unsafe fn dec(ptr: NonNull<BufData>) {
1317 unsafe {
1318 let count = ptr.as_ref().strong.get().wrapping_sub(1);
1319 ptr.as_ref().strong.set(count);
1320
1321 if count > 0 {
1322 return;
1323 }
1324
1325 let mut buf = Box::from_raw(ptr.as_ptr());
1326
1327 // Try to recycle the buffer if shared is available, else let it be
1328 // dropped and free here.
1329 let Some(g) = buf.as_ref().g.upgrade() else {
1330 return;
1331 };
1332
1333 let mut buffers = g.buffers.borrow_mut();
1334
1335 // Set the length of the recycled buffer.
1336 buf.data.set_len(buf.data.len().min(MAX_CAPACITY));
1337
1338 // We size our buffers to some max capacity to avod overuse in case
1339 // we infrequently need to handle some massive message. If we don't
1340 // shrink the allocation, then memory use can run away over time.
1341 buf.data.shrink_to(MAX_CAPACITY);
1342
1343 buffers.push_back(buf);
1344 }
1345 }
1346
1347 unsafe fn inc(ptr: NonNull<BufData>) {
1348 unsafe {
1349 let count = ptr.as_ref().strong.get().wrapping_add(1);
1350
1351 if count == 0 {
1352 std::process::abort();
1353 }
1354
1355 ptr.as_ref().strong.set(count);
1356 }
1357 }
1358}
1359
1360/// A shared buffer of data that is recycled when dropped.
1361struct BufRc {
1362 data: NonNull<BufData>,
1363}
1364
1365impl BufRc {
1366 fn new(data: Box<BufData>) -> Self {
1367 let data = NonNull::from(Box::leak(data));
1368
1369 unsafe {
1370 BufData::inc(data);
1371 }
1372
1373 Self { data }
1374 }
1375}
1376
1377impl Deref for BufRc {
1378 type Target = [u8];
1379
1380 fn deref(&self) -> &Self::Target {
1381 unsafe { &(*self.data.as_ptr()).data }
1382 }
1383}
1384
1385impl Clone for BufRc {
1386 fn clone(&self) -> Self {
1387 unsafe {
1388 BufData::inc(self.data);
1389 }
1390
1391 Self { data: self.data }
1392 }
1393}
1394
1395impl Drop for BufRc {
1396 fn drop(&mut self) {
1397 unsafe {
1398 BufData::dec(self.data);
1399 }
1400 }
1401}
1402
1403/// A raw packet of data.
1404#[derive(Clone)]
1405pub struct RawPacket {
1406 id: MessageId,
1407 buf: BufRc,
1408 at: Cell<usize>,
1409}
1410
1411impl RawPacket {
1412 /// Decode the contents of a raw packet.
1413 ///
1414 /// This can be called multiple times if there are multiple payloads in
1415 /// sequence of the response.
1416 ///
1417 /// You can check if the packet is empty using [`RawPacket::is_empty`].
1418 pub fn decode<'this, T>(&'this self) -> Result<T>
1419 where
1420 T: Decode<'this, Binary, Global>,
1421 {
1422 let at = self.at.get();
1423
1424 let Some(bytes) = self.buf.get(at..) else {
1425 return Err(Error::new(ErrorKind::Overflow(at, self.buf.len())));
1426 };
1427
1428 let mut reader = SliceReader::new(bytes);
1429
1430 match storage::decode(&mut reader) {
1431 Ok(value) => {
1432 self.at.set(at + bytes.len() - reader.remaining());
1433 Ok(value)
1434 }
1435 Err(error) => {
1436 self.at.set(self.buf.len());
1437 Err(Error::from(error))
1438 }
1439 }
1440 }
1441
1442 /// Check if the packet is empty.
1443 pub fn is_empty(&self) -> bool {
1444 self.at.get() >= self.buf.len()
1445 }
1446
1447 /// The id of the packet this is a response to as specified by
1448 /// [`Endpoint::ID`] or [`Broadcast::ID`].
1449 ///
1450 /// [`Endpoint::ID`]: crate::api::Endpoint::ID
1451 /// [`Broadcast::ID`]: crate::api::Broadcast::ID
1452 pub fn id(&self) -> MessageId {
1453 self.id
1454 }
1455}
1456
1457/// A typed packet of data.
1458#[derive(Clone)]
1459pub struct Packet<T> {
1460 raw: RawPacket,
1461 _marker: PhantomData<T>,
1462}
1463
1464impl<T> Packet<T> {
1465 /// Construct a new typed package from a raw one.
1466 ///
1467 /// Note that this does not guarantee that the typed package is correct, but
1468 /// the `T` parameter becomes associated with it allowing it to be used
1469 /// automatically with methods such as [`Packet::decode`].
1470 #[inline]
1471 pub fn new(raw: RawPacket) -> Self {
1472 Self {
1473 raw,
1474 _marker: PhantomData,
1475 }
1476 }
1477
1478 /// Convert a packet into a raw packet.
1479 ///
1480 /// To determine which endpoint or broadcast it belongs to the
1481 /// [`RawPacket::id`] method can be used.
1482 pub fn into_raw(self) -> RawPacket {
1483 self.raw
1484 }
1485
1486 /// Check if the packet is empty.
1487 pub fn is_empty(&self) -> bool {
1488 self.raw.is_empty()
1489 }
1490
1491 /// The id of the packet this is a response to as specified by
1492 /// [`Endpoint::ID`] or [`Broadcast::ID`].
1493 ///
1494 /// [`Endpoint::ID`]: crate::api::Endpoint::ID
1495 /// [`Broadcast::ID`]: crate::api::Broadcast::ID
1496 pub fn id(&self) -> MessageId {
1497 self.raw.id()
1498 }
1499}
1500
1501impl<T> Packet<T>
1502where
1503 T: api::Endpoint,
1504{
1505 /// Decode the contents of a packet.
1506 ///
1507 /// This can be called multiple times if there are multiple payloads in
1508 /// sequence of the response.
1509 ///
1510 /// You can check if the packet is empty using [`Packet::is_empty`].
1511 pub fn decode(&self) -> Result<T::Response<'_>> {
1512 self.decode_any()
1513 }
1514
1515 /// Decode any contents of a packet.
1516 ///
1517 /// This can be called multiple times if there are multiple payloads in
1518 /// sequence of the response.
1519 ///
1520 /// You can check if the packet is empty using [`Packet::is_empty`].
1521 pub fn decode_any<'de, R>(&'de self) -> Result<R>
1522 where
1523 R: Decode<'de, Binary, Global>,
1524 {
1525 self.raw.decode()
1526 }
1527}
1528
1529impl<T> Packet<T>
1530where
1531 T: api::Broadcast,
1532{
1533 /// Decode the primary event related to a broadcast.
1534 pub fn decode_event<'de>(&'de self) -> Result<T::Event<'de>> {
1535 self.decode_event_any()
1536 }
1537
1538 /// Decode any event related to a broadcast.
1539 pub fn decode_event_any<'de, E>(&'de self) -> Result<E>
1540 where
1541 E: Event<Broadcast = T> + Decode<'de, Binary, Global>,
1542 {
1543 self.raw.decode()
1544 }
1545}
1546
1547/// A handle to the WebSocket service.
1548#[derive(Clone)]
1549#[repr(transparent)]
1550pub struct Handle<H>
1551where
1552 H: WebImpl,
1553{
1554 shared: Weak<Shared<H>>,
1555}
1556
1557impl<H> Handle<H>
1558where
1559 H: WebImpl,
1560{
1561 /// Send a request of type `T`.
1562 ///
1563 /// Returns a handle for the request.
1564 ///
1565 /// If the handle is dropped, the request is cancelled.
1566 ///
1567 /// # Examples
1568 ///
1569 /// ```
1570 /// # extern crate yew021 as yew;
1571 /// use yew::prelude::*;
1572 /// use musli_web::web03::prelude::*;
1573 ///
1574 /// mod api {
1575 /// use musli::{Decode, Encode};
1576 /// use musli_web::api;
1577 ///
1578 /// #[derive(Encode, Decode)]
1579 /// pub struct HelloRequest<'de> {
1580 /// pub message: &'de str,
1581 /// }
1582 ///
1583 /// #[derive(Encode, Decode)]
1584 /// pub struct HelloResponse<'de> {
1585 /// pub message: &'de str,
1586 /// }
1587 ///
1588 /// api::define! {
1589 /// pub type Hello;
1590 ///
1591 /// impl Endpoint for Hello {
1592 /// impl<'de> Request for HelloRequest<'de>;
1593 /// type Response<'de> = HelloResponse<'de>;
1594 /// }
1595 /// }
1596 /// }
1597 ///
1598 /// enum Msg {
1599 /// OnHello(Result<ws::Packet<api::Hello>, ws::Error>),
1600 /// }
1601 ///
1602 /// #[derive(Properties, PartialEq)]
1603 /// struct Props {
1604 /// ws: ws::Handle,
1605 /// }
1606 ///
1607 /// struct App {
1608 /// message: String,
1609 /// _hello: ws::Request,
1610 /// }
1611 ///
1612 /// impl Component for App {
1613 /// type Message = Msg;
1614 /// type Properties = Props;
1615 ///
1616 /// fn create(ctx: &Context<Self>) -> Self {
1617 /// let hello = ctx.props().ws
1618 /// .request()
1619 /// .body(api::HelloRequest { message: "Hello!"})
1620 /// .on_packet(ctx.link().callback(Msg::OnHello))
1621 /// .send();
1622 ///
1623 /// Self {
1624 /// message: String::from("No Message :("),
1625 /// _hello: hello,
1626 /// }
1627 /// }
1628 ///
1629 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1630 /// match msg {
1631 /// Msg::OnHello(Err(error)) => {
1632 /// tracing::error!("Request error: {:?}", error);
1633 /// false
1634 /// }
1635 /// Msg::OnHello(Ok(packet)) => {
1636 /// if let Ok(response) = packet.decode() {
1637 /// self.message = response.message.to_owned();
1638 /// }
1639 ///
1640 /// true
1641 /// }
1642 /// }
1643 /// }
1644 ///
1645 /// fn view(&self, ctx: &Context<Self>) -> Html {
1646 /// html! {
1647 /// <div>
1648 /// <h1>{"WebSocket Example"}</h1>
1649 /// <p>{format!("Message: {}", self.message)}</p>
1650 /// </div>
1651 /// }
1652 /// }
1653 /// }
1654 /// ```
1655 pub fn request(&self) -> RequestBuilder<'_, H, EmptyBody, EmptyCallback> {
1656 RequestBuilder {
1657 shared: &self.shared,
1658 body: EmptyBody,
1659 callback: EmptyCallback,
1660 }
1661 }
1662
1663 /// Listen for broadcasts of type `T`.
1664 ///
1665 /// Returns a handle for the listener that will cancel the listener if
1666 /// dropped.
1667 ///
1668 /// # Examples
1669 ///
1670 /// ```
1671 /// # extern crate yew021 as yew;
1672 /// use yew::prelude::*;
1673 /// use musli_web::web03::prelude::*;
1674 ///
1675 /// mod api {
1676 /// use musli::{Decode, Encode};
1677 /// use musli_web::api;
1678 ///
1679 /// #[derive(Encode, Decode)]
1680 /// pub struct TickEvent<'de> {
1681 /// pub message: &'de str,
1682 /// pub tick: u32,
1683 /// }
1684 ///
1685 /// api::define! {
1686 /// pub type Tick;
1687 ///
1688 /// impl Broadcast for Tick {
1689 /// impl<'de> Event for TickEvent<'de>;
1690 /// }
1691 /// }
1692 /// }
1693 ///
1694 /// enum Msg {
1695 /// Tick(Result<ws::Packet<api::Tick>, ws::Error>),
1696 /// }
1697 ///
1698 /// #[derive(Properties, PartialEq)]
1699 /// struct Props {
1700 /// ws: ws::Handle,
1701 /// }
1702 ///
1703 /// struct App {
1704 /// tick: u32,
1705 /// _listen: ws::Listener,
1706 /// }
1707 ///
1708 /// impl Component for App {
1709 /// type Message = Msg;
1710 /// type Properties = Props;
1711 ///
1712 /// fn create(ctx: &Context<Self>) -> Self {
1713 /// let listen = ctx.props().ws.on_broadcast(ctx.link().callback(Msg::Tick));
1714 ///
1715 /// Self {
1716 /// tick: 0,
1717 /// _listen: listen,
1718 /// }
1719 /// }
1720 ///
1721 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1722 /// match msg {
1723 /// Msg::Tick(Err(error)) => {
1724 /// tracing::error!("Tick error: {error}");
1725 /// false
1726 /// }
1727 /// Msg::Tick(Ok(packet)) => {
1728 /// if let Ok(tick) = packet.decode_event() {
1729 /// self.tick = tick.tick;
1730 /// }
1731 ///
1732 /// true
1733 /// }
1734 /// }
1735 /// }
1736 ///
1737 /// fn view(&self, ctx: &Context<Self>) -> Html {
1738 /// html! {
1739 /// <div>
1740 /// <h1>{"WebSocket Example"}</h1>
1741 /// <p>{format!("Tick: {}", self.tick)}</p>
1742 /// </div>
1743 /// }
1744 /// }
1745 /// }
1746 /// ```
1747 pub fn on_broadcast<T>(&self, callback: impl Callback<Result<Packet<T>>>) -> Listener
1748 where
1749 T: api::Broadcast,
1750 {
1751 self.on_raw_broadcast::<T>(move |result| match result {
1752 Ok(packet) => callback.call(Ok(Packet::new(packet))),
1753 Err(error) => callback.call(Err(error)),
1754 })
1755 }
1756
1757 /// Listen for broadcasts of type `T`.
1758 ///
1759 /// Returns a handle for the listener that will cancel the listener if
1760 /// dropped.
1761 ///
1762 /// # Examples
1763 ///
1764 /// ```
1765 /// # extern crate yew021 as yew;
1766 /// use yew::prelude::*;
1767 /// use musli_web::web03::prelude::*;
1768 ///
1769 /// mod api {
1770 /// use musli::{Decode, Encode};
1771 /// use musli_web::api;
1772 ///
1773 /// #[derive(Encode, Decode)]
1774 /// pub struct TickEvent<'de> {
1775 /// pub message: &'de str,
1776 /// pub tick: u32,
1777 /// }
1778 ///
1779 /// api::define! {
1780 /// pub type Tick;
1781 ///
1782 /// impl Broadcast for Tick {
1783 /// impl<'de> Event for TickEvent<'de>;
1784 /// }
1785 /// }
1786 /// }
1787 ///
1788 /// enum Msg {
1789 /// Tick(Result<ws::RawPacket, ws::Error>),
1790 /// }
1791 ///
1792 /// #[derive(Properties, PartialEq)]
1793 /// struct Props {
1794 /// ws: ws::Handle,
1795 /// }
1796 ///
1797 /// struct App {
1798 /// tick: u32,
1799 /// _listen: ws::Listener,
1800 /// }
1801 ///
1802 /// impl Component for App {
1803 /// type Message = Msg;
1804 /// type Properties = Props;
1805 ///
1806 /// fn create(ctx: &Context<Self>) -> Self {
1807 /// let link = ctx.link().clone();
1808 /// let listen = ctx.props().ws.on_raw_broadcast::<api::Tick>(ctx.link().callback(Msg::Tick));
1809 ///
1810 /// Self {
1811 /// tick: 0,
1812 /// _listen: listen,
1813 /// }
1814 /// }
1815 ///
1816 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1817 /// match msg {
1818 /// Msg::Tick(Err(error)) => {
1819 /// tracing::error!("Tick error: {error}");
1820 /// false
1821 /// }
1822 /// Msg::Tick(Ok(packet)) => {
1823 /// if let Ok(tick) = packet.decode::<api::TickEvent>() {
1824 /// self.tick = tick.tick;
1825 /// }
1826 ///
1827 /// true
1828 /// }
1829 /// }
1830 /// }
1831 ///
1832 /// fn view(&self, ctx: &Context<Self>) -> Html {
1833 /// html! {
1834 /// <div>
1835 /// <h1>{"WebSocket Example"}</h1>
1836 /// <p>{format!("Tick: {}", self.tick)}</p>
1837 /// </div>
1838 /// }
1839 /// }
1840 /// }
1841 /// ```
1842 pub fn on_raw_broadcast<T>(&self, callback: impl Callback<Result<RawPacket>>) -> Listener
1843 where
1844 T: api::Broadcast,
1845 {
1846 let Some(shared) = self.shared.upgrade() else {
1847 return Listener::empty_with_kind(T::ID);
1848 };
1849
1850 let index = {
1851 let mut broadcasts = shared.g.broadcasts.borrow_mut();
1852 let slots = broadcasts.entry(T::ID).or_default();
1853 slots.insert(Rc::new(callback))
1854 };
1855
1856 Listener {
1857 kind: Some(T::ID),
1858 index,
1859 g: Rc::downgrade(&shared.g),
1860 }
1861 }
1862
1863 /// Listen for state changes to the underlying connection.
1864 ///
1865 /// This indicates when the connection is open and ready to receive requests
1866 /// through [`State::Open`], or if it's closed and requests will be queued
1867 /// through [`State::Closed`].
1868 ///
1869 /// Note that if you are connecting through a proxy the reported updates
1870 /// might be volatile. It is always best to send a message over the
1871 /// connection on the server side that once received allows the client to
1872 /// know that it is connected.
1873 ///
1874 /// Dropping the returned handle will cancel the listener.
1875 ///
1876 /// # Examples
1877 ///
1878 /// ```
1879 /// # extern crate yew021 as yew;
1880 /// use yew::prelude::*;
1881 /// use musli_web::web03::prelude::*;
1882 ///
1883 /// enum Msg {
1884 /// StateChange(ws::State),
1885 /// }
1886 ///
1887 /// #[derive(Properties, PartialEq)]
1888 /// struct Props {
1889 /// ws: ws::Handle,
1890 /// }
1891 ///
1892 /// struct App {
1893 /// state: ws::State,
1894 /// _listen: ws::StateListener,
1895 /// }
1896 ///
1897 /// impl Component for App {
1898 /// type Message = Msg;
1899 /// type Properties = Props;
1900 ///
1901 /// fn create(ctx: &Context<Self>) -> Self {
1902 /// let link = ctx.link().clone();
1903 ///
1904 /// let (state, listen) = ctx.props().ws.on_state_change(move |state| {
1905 /// link.send_message(Msg::StateChange(state));
1906 /// });
1907 ///
1908 /// Self {
1909 /// state,
1910 /// _listen: listen,
1911 /// }
1912 /// }
1913 ///
1914 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1915 /// match msg {
1916 /// Msg::StateChange(state) => {
1917 /// self.state = state;
1918 /// true
1919 /// }
1920 /// }
1921 /// }
1922 ///
1923 /// fn view(&self, ctx: &Context<Self>) -> Html {
1924 /// html! {
1925 /// <div>
1926 /// <h1>{"WebSocket Example"}</h1>
1927 /// <p>{format!("State: {:?}", self.state)}</p>
1928 /// </div>
1929 /// }
1930 /// }
1931 /// }
1932 pub fn on_state_change(&self, callback: impl Callback<State>) -> (State, StateListener) {
1933 let Some(shared) = self.shared.upgrade() else {
1934 return (
1935 State::Closed,
1936 StateListener {
1937 index: 0,
1938 g: Weak::new(),
1939 },
1940 );
1941 };
1942
1943 let (state, index) = {
1944 let index = shared
1945 .g
1946 .state_listeners
1947 .borrow_mut()
1948 .insert(Rc::new(callback));
1949 (shared.state.get(), index)
1950 };
1951
1952 let listener = StateListener {
1953 index,
1954 g: Rc::downgrade(&shared.g),
1955 };
1956
1957 (state, listener)
1958 }
1959}
1960
1961impl<H> PartialEq for Handle<H>
1962where
1963 H: WebImpl,
1964{
1965 #[inline]
1966 fn eq(&self, _: &Self) -> bool {
1967 true
1968 }
1969}
1970
1971struct Pending<C>
1972where
1973 C: ?Sized,
1974{
1975 kind: MessageId,
1976 serial: u32,
1977 callback: C,
1978}
1979
1980impl<C> fmt::Debug for Pending<C> {
1981 #[inline]
1982 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1983 f.debug_struct("Pending")
1984 .field("serial", &self.serial)
1985 .field("kind", &self.kind)
1986 .finish_non_exhaustive()
1987 }
1988}
1989struct ForcePrefix<'a>(&'a str, char);
1990
1991impl fmt::Display for ForcePrefix<'_> {
1992 #[inline]
1993 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1994 let Self(string, prefix) = *self;
1995 prefix.fmt(f)?;
1996 string.trim_start_matches(prefix).fmt(f)?;
1997 Ok(())
1998 }
1999}