musli_web/web.rs
1//! The generic web implementation.
2//!
3//! This is specialized over the `H` parameter through modules such as:
4//!
5//! * [`web03`] for `web-sys` `0.3.x`.
6//!
7//! [`web03`]: crate::web03
8
9use core::cell::{Cell, RefCell};
10use core::fmt;
11use core::marker::PhantomData;
12use core::mem;
13use core::ops::Deref;
14use core::ptr::NonNull;
15use std::collections::VecDeque;
16
17use alloc::boxed::Box;
18use alloc::format;
19use alloc::rc::Rc;
20use alloc::rc::Weak;
21use alloc::string::{String, ToString};
22use alloc::vec::Vec;
23
24use std::collections::hash_map::{Entry, HashMap};
25
26use musli::Decode;
27use musli::alloc::Global;
28use musli::mode::Binary;
29use musli::reader::SliceReader;
30use musli::storage;
31
32use slab::Slab;
33
34use crate::api::{self, Event, MessageId};
35
36const MAX_CAPACITY: usize = 1048576;
37
38/// An empty request body.
39#[non_exhaustive]
40pub struct EmptyBody;
41
42/// An empty callback.
43#[non_exhaustive]
44pub struct EmptyCallback;
45
46/// Slab of state listeners.
47type StateListeners = Slab<Rc<dyn Callback<State>>>;
48/// Slab of broadcast listeners.
49type Broadcasts = HashMap<MessageId, Slab<Rc<dyn Callback<Result<RawPacket>>>>>;
50/// Queue of recycled buffers.
51type Buffers = VecDeque<Box<BufData>>;
52/// Pending requests.
53type Requests = HashMap<u32, Box<Pending<dyn Callback<Result<RawPacket>>>>>;
54
55/// Location information for websocket implementation.
56#[doc(hidden)]
57pub struct Location {
58 pub(crate) protocol: String,
59 pub(crate) host: String,
60 pub(crate) port: String,
61}
62
63pub(crate) mod sealed_socket {
64 pub trait Sealed {}
65}
66
67pub(crate) trait SocketImpl
68where
69 Self: Sized + self::sealed_socket::Sealed,
70{
71 #[doc(hidden)]
72 type Handles;
73
74 #[doc(hidden)]
75 fn new(url: &str, handles: &Self::Handles) -> Result<Self, Error>;
76
77 #[doc(hidden)]
78 fn send(&self, data: &[u8]) -> Result<(), Error>;
79
80 #[doc(hidden)]
81 fn close(self) -> Result<(), Error>;
82}
83
84pub(crate) mod sealed_performance {
85 pub trait Sealed {}
86}
87
88pub trait PerformanceImpl
89where
90 Self: Sized + self::sealed_performance::Sealed,
91{
92 #[doc(hidden)]
93 fn now(&self) -> f64;
94}
95
96pub(crate) mod sealed_window {
97 pub trait Sealed {}
98}
99
100pub(crate) trait WindowImpl
101where
102 Self: Sized + self::sealed_window::Sealed,
103{
104 #[doc(hidden)]
105 type Performance: PerformanceImpl;
106
107 #[doc(hidden)]
108 type Timeout;
109
110 #[doc(hidden)]
111 fn new() -> Result<Self, Error>;
112
113 #[doc(hidden)]
114 fn performance(&self) -> Result<Self::Performance, Error>;
115
116 #[doc(hidden)]
117 fn location(&self) -> Result<Location, Error>;
118
119 #[doc(hidden)]
120 fn set_timeout(
121 &self,
122 millis: u32,
123 callback: impl FnOnce() + 'static,
124 ) -> Result<Self::Timeout, Error>;
125}
126
127pub(crate) mod sealed_web {
128 pub trait Sealed {}
129}
130
131/// Central trait for web integration.
132///
133/// Since web integration is currently unstable, this requires multiple
134/// different implementations, each time an ecosystem breaking change is
135/// released.
136///
137/// The crate in focus here is `web-sys`, and the corresponding modules provide
138/// integrations:
139///
140/// * [web03] for `web-sys` `0.3.x`.
141///
142/// [web03]: crate::web03
143pub trait WebImpl
144where
145 Self: 'static + Copy + Sized + self::sealed_web::Sealed,
146{
147 #[doc(hidden)]
148 #[allow(private_bounds)]
149 type Window: WindowImpl;
150
151 #[doc(hidden)]
152 type Handles;
153
154 #[doc(hidden)]
155 #[allow(private_bounds)]
156 type Socket: SocketImpl<Handles = Self::Handles>;
157
158 #[doc(hidden)]
159 #[allow(private_interfaces)]
160 fn handles(shared: &Weak<Shared<Self>>) -> Self::Handles;
161
162 #[doc(hidden)]
163 fn random(range: u32) -> u32;
164}
165
166/// Construct a new [`ServiceBuilder`] associated with the given [`Connect`]
167/// strategy.
168pub fn connect<H>(connect: Connect) -> ServiceBuilder<H, EmptyCallback>
169where
170 H: WebImpl,
171{
172 ServiceBuilder {
173 connect,
174 on_error: EmptyCallback,
175 _marker: PhantomData,
176 }
177}
178
179/// The state of the connection.
180///
181/// A listener for state changes can be set up through for example
182/// [`Handle::on_state_change`].
183#[derive(Debug, PartialEq, Eq, Clone, Copy)]
184#[non_exhaustive]
185pub enum State {
186 /// The connection is open.
187 Open,
188 /// The connection is closed.
189 Closed,
190}
191
192/// Error type for the WebSocket service.
193#[derive(Debug)]
194pub struct Error {
195 kind: ErrorKind,
196}
197
198impl Error {
199 /// Check if the error is caused by an empty packet.
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// use musli_web::web::{Error, RawPacket};
205 ///
206 /// let packet = RawPacket::empty();
207 /// let e = packet.decode::<u32>().unwrap_err();
208 ///
209 /// assert!(e.is_empty_packet());
210 /// ```
211 pub fn is_empty_packet(&self) -> bool {
212 matches!(self.kind, ErrorKind::EmptyPacket)
213 }
214}
215
216#[derive(Debug)]
217enum ErrorKind {
218 EmptyPacket,
219 Message(String),
220 Storage(storage::Error),
221 Overflow(usize, usize),
222}
223
224impl Error {
225 #[inline]
226 fn new(kind: ErrorKind) -> Self {
227 Self { kind }
228 }
229
230 #[inline]
231 pub(crate) fn msg(message: impl fmt::Display) -> Self {
232 Self::new(ErrorKind::Message(message.to_string()))
233 }
234}
235
236impl fmt::Display for Error {
237 #[inline]
238 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239 match &self.kind {
240 ErrorKind::EmptyPacket => write!(f, "Packet is empty"),
241 ErrorKind::Message(message) => write!(f, "{message}"),
242 ErrorKind::Storage(error) => write!(f, "Encoding error: {error}"),
243 ErrorKind::Overflow(at, len) => {
244 write!(f, "Internal packet overflow, {at} not in range 0-{len}")
245 }
246 }
247 }
248}
249
250impl core::error::Error for Error {
251 #[inline]
252 fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
253 match &self.kind {
254 ErrorKind::Storage(error) => Some(error),
255 _ => None,
256 }
257 }
258}
259
260impl From<storage::Error> for Error {
261 #[inline]
262 fn from(error: storage::Error) -> Self {
263 Self::new(ErrorKind::Storage(error))
264 }
265}
266
267#[cfg(feature = "wasm_bindgen02")]
268impl From<wasm_bindgen02::JsValue> for Error {
269 #[inline]
270 fn from(error: wasm_bindgen02::JsValue) -> Self {
271 Self::new(ErrorKind::Message(format!("{error:?}")))
272 }
273}
274
275type Result<T, E = Error> = core::result::Result<T, E>;
276
277const INITIAL_TIMEOUT: u32 = 250;
278const MAX_TIMEOUT: u32 = 4000;
279
280/// How to connect to the websocket.
281enum ConnectKind {
282 Location { path: String },
283 Url { url: String },
284}
285
286/// A specification for how to connect a websocket.
287pub struct Connect {
288 kind: ConnectKind,
289}
290
291impl Connect {
292 /// Connect to the same location with a custom path.
293 ///
294 /// Note that any number of `/` prefixes are ignored, the canonical
295 /// representation always ignores them and the path is relative to the
296 /// current location.
297 #[inline]
298 pub fn location(path: impl AsRef<str>) -> Self {
299 Self {
300 kind: ConnectKind::Location {
301 path: String::from(path.as_ref()),
302 },
303 }
304 }
305
306 /// Connect to the specified URL.
307 #[inline]
308 pub fn url(url: String) -> Self {
309 Self {
310 kind: ConnectKind::Url { url },
311 }
312 }
313}
314
315/// Generic but shared fields which do not depend on specialization over `H`.
316struct Generic {
317 state_listeners: RefCell<StateListeners>,
318 requests: RefCell<Requests>,
319 broadcasts: RefCell<Broadcasts>,
320 buffers: RefCell<Buffers>,
321}
322
323/// Shared implementation details for websocket implementations.
324pub(crate) struct Shared<H>
325where
326 H: WebImpl,
327{
328 connect: Connect,
329 pub(crate) on_error: Box<dyn Callback<Error>>,
330 window: H::Window,
331 performance: <H::Window as WindowImpl>::Performance,
332 handles: H::Handles,
333 state: Cell<State>,
334 opened: Cell<Option<f64>>,
335 serial: Cell<u32>,
336 defer_broadcasts: RefCell<VecDeque<Weak<dyn Callback<Result<RawPacket>>>>>,
337 defer_state_listeners: RefCell<VecDeque<Weak<dyn Callback<State>>>>,
338 pub(crate) socket: RefCell<Option<H::Socket>>,
339 output: RefCell<Vec<u8>>,
340 current_timeout: Cell<u32>,
341 reconnect_timeout: RefCell<Option<<H::Window as WindowImpl>::Timeout>>,
342 g: Rc<Generic>,
343}
344
345impl<H> Drop for Shared<H>
346where
347 H: WebImpl,
348{
349 fn drop(&mut self) {
350 if let Some(s) = self.socket.take()
351 && let Err(e) = s.close()
352 {
353 self.on_error.call(e);
354 }
355
356 // We don't need to worry about mutable borrows here, since we only have
357 // weak references to Shared and by virtue of this being dropped they
358 // are all invalid.
359 let state_listeners = mem::take(&mut *self.g.state_listeners.borrow_mut());
360 let mut requests = self.g.requests.borrow_mut();
361
362 for (_, listener) in state_listeners {
363 listener.call(State::Closed);
364 }
365
366 for (_, p) in requests.drain() {
367 p.callback.call(Err(Error::msg("Websocket service closed")));
368 }
369 }
370}
371
372/// Builder of a service.
373pub struct ServiceBuilder<H, E>
374where
375 H: WebImpl,
376{
377 connect: Connect,
378 on_error: E,
379 _marker: PhantomData<H>,
380}
381
382impl<H, E> ServiceBuilder<H, E>
383where
384 H: WebImpl,
385 E: Callback<Error>,
386{
387 /// Set the error handler to use for the service.
388 pub fn on_error<U>(self, on_error: U) -> ServiceBuilder<H, U>
389 where
390 U: Callback<Error>,
391 {
392 ServiceBuilder {
393 connect: self.connect,
394 on_error,
395 _marker: self._marker,
396 }
397 }
398
399 /// Build a new service and handle.
400 pub fn build(self) -> Service<H> {
401 let window = match H::Window::new() {
402 Ok(window) => window,
403 Err(error) => {
404 panic!("{error}")
405 }
406 };
407
408 let performance = match WindowImpl::performance(&window) {
409 Ok(performance) => performance,
410 Err(error) => {
411 panic!("{error}")
412 }
413 };
414
415 let shared = Rc::<Shared<H>>::new_cyclic(move |shared| Shared {
416 connect: self.connect,
417 on_error: Box::new(self.on_error),
418 window,
419 performance,
420 handles: H::handles(shared),
421 state: Cell::new(State::Closed),
422 opened: Cell::new(None),
423 serial: Cell::new(0),
424 defer_broadcasts: RefCell::new(VecDeque::new()),
425 defer_state_listeners: RefCell::new(VecDeque::new()),
426 socket: RefCell::new(None),
427 output: RefCell::new(Vec::new()),
428 current_timeout: Cell::new(INITIAL_TIMEOUT),
429 reconnect_timeout: RefCell::new(None),
430 g: Rc::new(Generic {
431 state_listeners: RefCell::new(Slab::new()),
432 broadcasts: RefCell::new(HashMap::new()),
433 requests: RefCell::new(Requests::new()),
434 buffers: RefCell::new(VecDeque::new()),
435 }),
436 });
437
438 let handle = Handle {
439 shared: Rc::downgrade(&shared),
440 };
441
442 Service { shared, handle }
443 }
444}
445
446/// The service handle.
447///
448/// Once dropped this will cause the service to be disconnected and all requests
449/// to be cancelled.
450pub struct Service<H>
451where
452 H: WebImpl,
453{
454 shared: Rc<Shared<H>>,
455 handle: Handle<H>,
456}
457
458impl<H> Service<H>
459where
460 H: WebImpl,
461{
462 /// Attempt to establish a websocket connection.
463 pub fn connect(&self) {
464 self.shared.connect()
465 }
466
467 /// Build a handle to the service.
468 pub fn handle(&self) -> &Handle<H> {
469 &self.handle
470 }
471}
472
473impl<H> Shared<H>
474where
475 H: WebImpl,
476{
477 /// Send a client message.
478 fn send_client_request<T>(&self, serial: u32, body: &T) -> Result<()>
479 where
480 T: api::Request,
481 {
482 let Some(ref socket) = *self.socket.borrow() else {
483 return Err(Error::msg("Socket is not connected"));
484 };
485
486 let header = api::RequestHeader {
487 serial,
488 id: <T::Endpoint as api::Endpoint>::ID.get(),
489 };
490
491 let out = &mut *self.output.borrow_mut();
492
493 storage::to_writer(&mut *out, &header)?;
494 storage::to_writer(&mut *out, &body)?;
495
496 tracing::debug!(
497 header.serial,
498 ?header.id,
499 len = out.len(),
500 "Sending request"
501 );
502
503 socket.send(out.as_slice())?;
504
505 out.clear();
506 out.shrink_to(MAX_CAPACITY);
507 Ok(())
508 }
509
510 pub(crate) fn next_buffer(self: &Rc<Self>, needed: usize) -> Box<BufData> {
511 match self.g.buffers.borrow_mut().pop_front() {
512 Some(mut buf) => {
513 if buf.data.capacity() < needed {
514 buf.data.reserve(needed - buf.data.len());
515 }
516
517 buf
518 }
519 None => Box::new(BufData::with_capacity(Rc::downgrade(&self.g), needed)),
520 }
521 }
522
523 pub(crate) fn message(self: &Rc<Self>, buf: Box<BufData>) -> Result<()> {
524 // Wrap the buffer in a simple shared reference-counted container.
525 let buf = BufRc::new(buf);
526 let mut reader = SliceReader::new(&buf);
527
528 let header: api::ResponseHeader = storage::decode(&mut reader)?;
529
530 if let Some(broadcast) = MessageId::new(header.broadcast) {
531 tracing::debug!(?header.broadcast, "Got broadcast");
532
533 if !self.prepare_broadcast(broadcast) {
534 return Ok(());
535 };
536
537 if let Some(id) = MessageId::new(header.error) {
538 let error = if id == MessageId::ERROR_MESSAGE {
539 storage::decode(&mut reader)?
540 } else {
541 api::ErrorMessage {
542 message: "Unknown error",
543 }
544 };
545
546 while let Some(callback) = self.defer_broadcasts.borrow_mut().pop_front() {
547 if let Some(callback) = callback.upgrade() {
548 callback.call(Err(Error::msg(error.message)));
549 }
550 }
551
552 return Ok(());
553 }
554
555 let at = buf.len().saturating_sub(reader.remaining());
556
557 let packet = RawPacket {
558 buf: Some(buf.clone()),
559 at: Cell::new(at),
560 id: broadcast,
561 };
562
563 while let Some(callback) = self.defer_broadcasts.borrow_mut().pop_front() {
564 if let Some(callback) = callback.upgrade() {
565 callback.call(Ok(packet.clone()));
566 }
567 }
568 } else {
569 tracing::debug!(header.serial, "Got response");
570
571 let p = {
572 let mut requests = self.g.requests.borrow_mut();
573
574 let Some(p) = requests.remove(&header.serial) else {
575 tracing::debug!(header.serial, "Missing request");
576 return Ok(());
577 };
578
579 p
580 };
581
582 if let Some(id) = MessageId::new(header.error) {
583 let error = if id == MessageId::ERROR_MESSAGE {
584 storage::decode(&mut reader)?
585 } else {
586 api::ErrorMessage {
587 message: "Unknown error",
588 }
589 };
590
591 p.callback.call(Err(Error::msg(error.message)));
592 return Ok(());
593 }
594
595 let at = buf.len().saturating_sub(reader.remaining());
596
597 let packet = RawPacket {
598 id: p.kind,
599 buf: Some(buf),
600 at: Cell::new(at),
601 };
602
603 p.callback.call(Ok(packet));
604 }
605
606 Ok(())
607 }
608
609 fn prepare_broadcast(self: &Rc<Self>, kind: MessageId) -> bool {
610 // Note: We need to defer this, since the outcome of calling
611 // the broadcast callback might be that the broadcast
612 // listener is modified, which could require mutable access
613 // to broadcasts.
614 let mut defer = self.defer_broadcasts.borrow_mut();
615
616 let broadcasts = self.g.broadcasts.borrow();
617
618 let Some(broadcasts) = broadcasts.get(&kind) else {
619 return false;
620 };
621
622 for (_, callback) in broadcasts.iter() {
623 defer.push_back(Rc::downgrade(callback));
624 }
625
626 !defer.is_empty()
627 }
628
629 pub(crate) fn set_open(&self) {
630 tracing::debug!("Set open");
631 self.opened
632 .set(Some(PerformanceImpl::now(&self.performance)));
633 self.emit_state_change(State::Open);
634 }
635
636 fn is_open_for_a_while(&self) -> bool {
637 let Some(at) = self.opened.get() else {
638 return false;
639 };
640
641 let now = PerformanceImpl::now(&self.performance);
642 (now - at) >= 250.0
643 }
644
645 pub(crate) fn close(self: &Rc<Self>) -> Result<(), Error> {
646 tracing::debug!("Close connection");
647
648 // We need a weak reference back to shared state to handle the timeout.
649 let shared = Rc::downgrade(self);
650
651 tracing::debug!(
652 "Set closed timeout={}, opened={:?}",
653 self.current_timeout.get(),
654 self.opened.get(),
655 );
656
657 if !self.is_open_for_a_while() {
658 let current_timeout = self.current_timeout.get();
659
660 if current_timeout < MAX_TIMEOUT {
661 let fuzz = H::random(50);
662
663 self.current_timeout.set(
664 current_timeout
665 .saturating_mul(2)
666 .saturating_add(fuzz)
667 .min(MAX_TIMEOUT),
668 );
669 }
670 } else {
671 self.current_timeout.set(INITIAL_TIMEOUT);
672 }
673
674 self.opened.set(None);
675 self.emit_state_change(State::Closed);
676
677 if let Some(s) = self.socket.take() {
678 s.close()?;
679 }
680
681 self.close_pending();
682
683 let timeout = self
684 .window
685 .set_timeout(self.current_timeout.get(), move || {
686 if let Some(shared) = shared.upgrade() {
687 Self::connect(&shared);
688 }
689 })?;
690
691 drop(self.reconnect_timeout.borrow_mut().replace(timeout));
692 Ok(())
693 }
694
695 /// Close an pending requests with an error, since there is no chance they
696 /// will be responded to any more.
697 fn close_pending(self: &Rc<Self>) {
698 loop {
699 let Some(serial) = self.g.requests.borrow().keys().next().copied() else {
700 break;
701 };
702
703 let p = {
704 let mut requests = self.g.requests.borrow_mut();
705
706 let Some(p) = requests.remove(&serial) else {
707 break;
708 };
709
710 p
711 };
712
713 p.callback.call(Err(Error::msg("Connection closed")));
714 }
715 }
716
717 fn emit_state_change(&self, state: State) {
718 if self.state.get() == state {
719 return;
720 }
721
722 self.state.set(state);
723
724 {
725 // We need to collect callbacks to avoid the callback recursively
726 // borrowing state listeners, which it would if it modifies any
727 // existing state listeners.
728 let mut defer = self.defer_state_listeners.borrow_mut();
729
730 for (_, callback) in self.g.state_listeners.borrow().iter() {
731 defer.push_back(Rc::downgrade(callback));
732 }
733
734 if defer.is_empty() {
735 return;
736 }
737 }
738
739 while let Some(callback) = self.defer_state_listeners.borrow_mut().pop_front() {
740 if let Some(callback) = callback.upgrade() {
741 callback.call(state);
742 }
743 }
744 }
745
746 fn is_closed(&self) -> bool {
747 self.opened.get().is_none()
748 }
749
750 fn connect(self: &Rc<Self>) {
751 tracing::debug!("Connect");
752
753 if let Err(e) = self.build() {
754 self.on_error.call(e);
755 } else {
756 return;
757 }
758
759 if let Err(e) = self.close() {
760 self.on_error.call(e);
761 }
762 }
763
764 /// Build a websocket connection.
765 fn build(self: &Rc<Self>) -> Result<()> {
766 let url = match &self.connect.kind {
767 ConnectKind::Location { path } => {
768 let Location {
769 protocol,
770 host,
771 port,
772 } = WindowImpl::location(&self.window)?;
773
774 let protocol = match protocol.as_str() {
775 "https:" => "wss:",
776 "http:" => "ws:",
777 other => {
778 return Err(Error::msg(format_args!(
779 "Same host connection is not supported for protocol `{other}`"
780 )));
781 }
782 };
783
784 let path = ForcePrefix(path, '/');
785 format!("{protocol}//{host}:{port}{path}")
786 }
787 ConnectKind::Url { url } => url.clone(),
788 };
789
790 let ws = SocketImpl::new(&url, &self.handles)?;
791
792 let old = self.socket.borrow_mut().replace(ws);
793
794 if let Some(old) = old {
795 old.close()?;
796 }
797
798 Ok(())
799 }
800}
801
802/// Trait governing how callbacks are called.
803pub trait Callback<I>
804where
805 Self: 'static,
806{
807 /// Call the callback.
808 fn call(&self, input: I);
809}
810
811impl<I> Callback<I> for EmptyCallback {
812 #[inline]
813 fn call(&self, _: I) {}
814}
815
816impl<F, I> Callback<I> for F
817where
818 F: 'static + Fn(I),
819{
820 #[inline]
821 fn call(&self, input: I) {
822 self(input)
823 }
824}
825
826/// A request builder .
827///
828/// Associate the callback to be used by using either
829/// [`RequestBuilder::on_packet`] or [`RequestBuilder::on_raw_packet`] depending
830/// on your needs.
831///
832/// Send the request with [`RequestBuilder::send`].
833pub struct RequestBuilder<'a, H, B, C>
834where
835 H: WebImpl,
836{
837 shared: &'a Weak<Shared<H>>,
838 body: B,
839 callback: C,
840}
841
842impl<'a, H, B, C> RequestBuilder<'a, H, B, C>
843where
844 H: WebImpl,
845{
846 /// Set the body of the request.
847 #[inline]
848 pub fn body<U>(self, body: U) -> RequestBuilder<'a, H, U, C>
849 where
850 U: api::Request,
851 {
852 RequestBuilder {
853 shared: self.shared,
854 body,
855 callback: self.callback,
856 }
857 }
858
859 /// Handle the response using the specified callback.
860 ///
861 /// # Examples
862 ///
863 /// ```
864 /// # extern crate yew021 as yew;
865 /// use yew::prelude::*;
866 /// use musli_web::web03::prelude::*;
867 ///
868 /// mod api {
869 /// use musli::{Decode, Encode};
870 /// use musli_web::api;
871 ///
872 /// #[derive(Encode, Decode)]
873 /// pub struct HelloRequest<'de> {
874 /// pub message: &'de str,
875 /// }
876 ///
877 /// #[derive(Encode, Decode)]
878 /// pub struct HelloResponse<'de> {
879 /// pub message: &'de str,
880 /// }
881 ///
882 /// api::define! {
883 /// pub type Hello;
884 ///
885 /// impl Endpoint for Hello {
886 /// impl<'de> Request for HelloRequest<'de>;
887 /// type Response<'de> = HelloResponse<'de>;
888 /// }
889 /// }
890 /// }
891 ///
892 /// enum Msg {
893 /// OnHello(Result<ws::Packet<api::Hello>, ws::Error>),
894 /// }
895 ///
896 /// #[derive(Properties, PartialEq)]
897 /// struct Props {
898 /// ws: ws::Handle,
899 /// }
900 ///
901 /// struct App {
902 /// message: String,
903 /// _hello: ws::Request,
904 /// }
905 ///
906 /// impl Component for App {
907 /// type Message = Msg;
908 /// type Properties = Props;
909 ///
910 /// fn create(ctx: &Context<Self>) -> Self {
911 /// let hello = ctx.props().ws
912 /// .request()
913 /// .body(api::HelloRequest { message: "Hello!"})
914 /// .on_packet(ctx.link().callback(Msg::OnHello))
915 /// .send();
916 ///
917 /// Self {
918 /// message: String::from("No Message :("),
919 /// _hello: hello,
920 /// }
921 /// }
922 ///
923 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
924 /// match msg {
925 /// Msg::OnHello(Err(error)) => {
926 /// tracing::error!("Request error: {:?}", error);
927 /// false
928 /// }
929 /// Msg::OnHello(Ok(packet)) => {
930 /// if let Ok(response) = packet.decode() {
931 /// self.message = response.message.to_owned();
932 /// }
933 ///
934 /// true
935 /// }
936 /// }
937 /// }
938 ///
939 /// fn view(&self, ctx: &Context<Self>) -> Html {
940 /// html! {
941 /// <div>
942 /// <h1>{"WebSocket Example"}</h1>
943 /// <p>{format!("Message: {}", self.message)}</p>
944 /// </div>
945 /// }
946 /// }
947 /// }
948 /// ```
949 pub fn on_packet<E>(
950 self,
951 callback: impl Callback<Result<Packet<E>>>,
952 ) -> RequestBuilder<'a, H, B, impl Callback<Result<RawPacket>>>
953 where
954 E: api::Endpoint,
955 {
956 self.on_raw_packet(move |result: Result<RawPacket>| match result {
957 Ok(ok) => callback.call(Ok(Packet::new(ok))),
958 Err(err) => callback.call(Err(err)),
959 })
960 }
961
962 /// Handle the response using the specified callback.
963 ///
964 /// # Examples
965 ///
966 /// ```
967 /// # extern crate yew021 as yew;
968 /// use yew::prelude::*;
969 /// use musli_web::web03::prelude::*;
970 ///
971 /// mod api {
972 /// use musli::{Decode, Encode};
973 /// use musli_web::api;
974 ///
975 /// #[derive(Encode, Decode)]
976 /// pub struct HelloRequest<'de> {
977 /// pub message: &'de str,
978 /// }
979 ///
980 /// #[derive(Encode, Decode)]
981 /// pub struct HelloResponse<'de> {
982 /// pub message: &'de str,
983 /// }
984 ///
985 /// api::define! {
986 /// pub type Hello;
987 ///
988 /// impl Endpoint for Hello {
989 /// impl<'de> Request for HelloRequest<'de>;
990 /// type Response<'de> = HelloResponse<'de>;
991 /// }
992 /// }
993 /// }
994 ///
995 /// enum Msg {
996 /// OnHello(Result<ws::RawPacket, ws::Error>),
997 /// }
998 ///
999 /// #[derive(Properties, PartialEq)]
1000 /// struct Props {
1001 /// ws: ws::Handle,
1002 /// }
1003 ///
1004 /// struct App {
1005 /// message: String,
1006 /// _hello: ws::Request,
1007 /// }
1008 ///
1009 /// impl Component for App {
1010 /// type Message = Msg;
1011 /// type Properties = Props;
1012 ///
1013 /// fn create(ctx: &Context<Self>) -> Self {
1014 /// let link = ctx.link().clone();
1015 ///
1016 /// let hello = ctx.props().ws
1017 /// .request()
1018 /// .body(api::HelloRequest { message: "Hello!"})
1019 /// .on_raw_packet(move |packet| link.send_message(Msg::OnHello(packet)))
1020 /// .send();
1021 ///
1022 /// Self {
1023 /// message: String::from("No Message :("),
1024 /// _hello: hello,
1025 /// }
1026 /// }
1027 ///
1028 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1029 /// match msg {
1030 /// Msg::OnHello(Err(error)) => {
1031 /// tracing::error!("Request error: {:?}", error);
1032 /// false
1033 /// }
1034 /// Msg::OnHello(Ok(packet)) => {
1035 /// if let Ok(response) = packet.decode::<api::HelloResponse>() {
1036 /// self.message = response.message.to_owned();
1037 /// }
1038 ///
1039 /// true
1040 /// }
1041 /// }
1042 /// }
1043 ///
1044 /// fn view(&self, ctx: &Context<Self>) -> Html {
1045 /// html! {
1046 /// <div>
1047 /// <h1>{"WebSocket Example"}</h1>
1048 /// <p>{format!("Message: {}", self.message)}</p>
1049 /// </div>
1050 /// }
1051 /// }
1052 /// }
1053 /// ```
1054 pub fn on_raw_packet<U>(self, callback: U) -> RequestBuilder<'a, H, B, U>
1055 where
1056 U: Callback<Result<RawPacket, Error>>,
1057 {
1058 RequestBuilder {
1059 shared: self.shared,
1060 body: self.body,
1061 callback,
1062 }
1063 }
1064}
1065
1066impl<'a, H, B, C> RequestBuilder<'a, H, B, C>
1067where
1068 B: api::Request,
1069 C: Callback<Result<RawPacket>>,
1070 H: WebImpl,
1071{
1072 /// Send the request.
1073 ///
1074 /// This requires that a body has been set using [`RequestBuilder::body`].
1075 pub fn send(self) -> Request {
1076 let Some(shared) = self.shared.upgrade() else {
1077 self.callback
1078 .call(Err(Error::msg("WebSocket service is down")));
1079 return Request::new();
1080 };
1081
1082 if shared.is_closed() {
1083 self.callback
1084 .call(Err(Error::msg("WebSocket is not connected")));
1085 return Request::new();
1086 }
1087
1088 let serial = shared.serial.get();
1089
1090 if let Err(error) = shared.send_client_request(serial, &self.body) {
1091 shared.on_error.call(error);
1092 return Request::new();
1093 }
1094
1095 shared.serial.set(serial.wrapping_add(1));
1096
1097 let pending = Pending {
1098 kind: <B::Endpoint as api::Endpoint>::ID,
1099 serial,
1100 callback: self.callback,
1101 };
1102
1103 let existing = shared
1104 .g
1105 .requests
1106 .borrow_mut()
1107 .insert(serial, Box::new(pending));
1108
1109 if let Some(p) = existing {
1110 p.callback.call(Err(Error::msg("Request cancelled")));
1111 }
1112
1113 Request {
1114 serial,
1115 g: Rc::downgrade(&shared.g),
1116 }
1117 }
1118}
1119
1120/// The handle for a pending request.
1121///
1122/// Dropping or [`clear()`] this handle will cancel the request.
1123///
1124/// [`clear()`]: Self::clear
1125pub struct Request {
1126 serial: u32,
1127 g: Weak<Generic>,
1128}
1129
1130impl Request {
1131 /// An empty request handler.
1132 #[inline]
1133 pub const fn new() -> Self {
1134 Self {
1135 serial: 0,
1136 g: Weak::new(),
1137 }
1138 }
1139
1140 /// Clear the request handle without dropping it, cancelling any pending
1141 /// requests.
1142 pub fn clear(&mut self) {
1143 let removed = {
1144 let serial = mem::take(&mut self.serial);
1145
1146 let Some(g) = self.g.upgrade() else {
1147 return;
1148 };
1149
1150 self.g = Weak::new();
1151
1152 let Some(p) = g.requests.borrow_mut().remove(&serial) else {
1153 return;
1154 };
1155
1156 p
1157 };
1158
1159 drop(removed);
1160 }
1161}
1162
1163impl Default for Request {
1164 #[inline]
1165 fn default() -> Self {
1166 Self::new()
1167 }
1168}
1169
1170impl Drop for Request {
1171 #[inline]
1172 fn drop(&mut self) {
1173 self.clear();
1174 }
1175}
1176
1177/// The handle for a pending request.
1178///
1179/// Dropping or calling [`clear()`] on this handle remove the listener.
1180///
1181/// [`clear()`]: Self::clear
1182pub struct Listener {
1183 kind: Option<MessageId>,
1184 index: usize,
1185 g: Weak<Generic>,
1186}
1187
1188impl Listener {
1189 /// Construct an empty listener.
1190 #[inline]
1191 pub const fn new() -> Self {
1192 Self {
1193 kind: None,
1194 index: 0,
1195 g: Weak::new(),
1196 }
1197 }
1198
1199 /// Build up an empty listener with the specified kind.
1200 #[inline]
1201 pub(crate) const fn empty_with_kind(kind: MessageId) -> Self {
1202 Self {
1203 kind: Some(kind),
1204 index: 0,
1205 g: Weak::new(),
1206 }
1207 }
1208
1209 /// Clear the listener without dropping it.
1210 ///
1211 /// This will remove the associated broadcast listener from being notified.
1212 pub fn clear(&mut self) {
1213 // Gather values here to drop them outside of the upgrade block.
1214 let removed;
1215 let removed_value;
1216
1217 {
1218 let Some(g) = self.g.upgrade() else {
1219 return;
1220 };
1221
1222 self.g = Weak::new();
1223 let index = mem::take(&mut self.index);
1224
1225 let Some(kind) = self.kind.take() else {
1226 return;
1227 };
1228
1229 let mut broadcasts = g.broadcasts.borrow_mut();
1230
1231 let Entry::Occupied(mut e) = broadcasts.entry(kind) else {
1232 return;
1233 };
1234
1235 removed = e.get_mut().try_remove(index);
1236
1237 if e.get().is_empty() {
1238 removed_value = Some(e.remove());
1239 } else {
1240 removed_value = None;
1241 }
1242 }
1243
1244 // Drop here, to avoid invoking any destructors which might borrow
1245 // shared mutably earlier.
1246 drop(removed);
1247 drop(removed_value);
1248 }
1249}
1250
1251impl Default for Listener {
1252 #[inline]
1253 fn default() -> Self {
1254 Self::new()
1255 }
1256}
1257
1258impl Drop for Listener {
1259 #[inline]
1260 fn drop(&mut self) {
1261 self.clear();
1262 }
1263}
1264
1265/// The handle for state change listening.
1266///
1267/// Dropping or calling [`clear()`] on this handle will remove the associated
1268/// callback from being notified.
1269///
1270/// [`clear()`]: Self::clear
1271pub struct StateListener {
1272 index: usize,
1273 g: Weak<Generic>,
1274}
1275
1276impl StateListener {
1277 /// Construct an empty state listener.
1278 #[inline]
1279 pub const fn new() -> Self {
1280 Self {
1281 index: 0,
1282 g: Weak::new(),
1283 }
1284 }
1285
1286 /// Clear the state listener without dropping it.
1287 ///
1288 /// This will remove the associated callback from being notified.
1289 pub fn clear(&mut self) {
1290 let removed = {
1291 let Some(g) = self.g.upgrade() else {
1292 return;
1293 };
1294
1295 self.g = Weak::new();
1296
1297 g.state_listeners.borrow_mut().try_remove(self.index)
1298 };
1299
1300 drop(removed);
1301 }
1302}
1303
1304impl Default for StateListener {
1305 #[inline]
1306 fn default() -> Self {
1307 Self::new()
1308 }
1309}
1310
1311impl Drop for StateListener {
1312 #[inline]
1313 fn drop(&mut self) {
1314 self.clear();
1315 }
1316}
1317
1318pub(crate) struct BufData {
1319 /// Buffer being used.
1320 pub(crate) data: Vec<u8>,
1321 /// Number of strong references to this buffer.
1322 strong: Cell<usize>,
1323 /// Reference to shared state where the buffer will be recycled to.
1324 g: Weak<Generic>,
1325}
1326
1327impl BufData {
1328 fn with_capacity(g: Weak<Generic>, capacity: usize) -> Self {
1329 Self {
1330 data: Vec::with_capacity(capacity),
1331 strong: Cell::new(0),
1332 g,
1333 }
1334 }
1335
1336 unsafe fn dec(ptr: NonNull<BufData>) {
1337 unsafe {
1338 let count = ptr.as_ref().strong.get().wrapping_sub(1);
1339 ptr.as_ref().strong.set(count);
1340
1341 if count > 0 {
1342 return;
1343 }
1344
1345 let mut buf = Box::from_raw(ptr.as_ptr());
1346
1347 // Try to recycle the buffer if shared is available, else let it be
1348 // dropped and free here.
1349 let Some(g) = buf.as_ref().g.upgrade() else {
1350 return;
1351 };
1352
1353 let mut buffers = g.buffers.borrow_mut();
1354
1355 // Set the length of the recycled buffer.
1356 buf.data.set_len(buf.data.len().min(MAX_CAPACITY));
1357
1358 // We size our buffers to some max capacity to avod overuse in case
1359 // we infrequently need to handle some massive message. If we don't
1360 // shrink the allocation, then memory use can run away over time.
1361 buf.data.shrink_to(MAX_CAPACITY);
1362
1363 buffers.push_back(buf);
1364 }
1365 }
1366
1367 unsafe fn inc(ptr: NonNull<BufData>) {
1368 unsafe {
1369 let count = ptr.as_ref().strong.get().wrapping_add(1);
1370
1371 if count == 0 {
1372 std::process::abort();
1373 }
1374
1375 ptr.as_ref().strong.set(count);
1376 }
1377 }
1378}
1379
1380/// A shared buffer of data that is recycled when dropped.
1381struct BufRc {
1382 data: NonNull<BufData>,
1383}
1384
1385impl BufRc {
1386 fn new(data: Box<BufData>) -> Self {
1387 let data = NonNull::from(Box::leak(data));
1388
1389 unsafe {
1390 BufData::inc(data);
1391 }
1392
1393 Self { data }
1394 }
1395}
1396
1397impl Deref for BufRc {
1398 type Target = [u8];
1399
1400 fn deref(&self) -> &Self::Target {
1401 unsafe { &(*self.data.as_ptr()).data }
1402 }
1403}
1404
1405impl Clone for BufRc {
1406 fn clone(&self) -> Self {
1407 unsafe {
1408 BufData::inc(self.data);
1409 }
1410
1411 Self { data: self.data }
1412 }
1413}
1414
1415impl Drop for BufRc {
1416 fn drop(&mut self) {
1417 unsafe {
1418 BufData::dec(self.data);
1419 }
1420 }
1421}
1422
1423/// A raw packet of data.
1424#[derive(Clone)]
1425pub struct RawPacket {
1426 id: MessageId,
1427 buf: Option<BufRc>,
1428 at: Cell<usize>,
1429}
1430
1431impl RawPacket {
1432 /// Construct an empty raw packet.
1433 ///
1434 /// # Examples
1435 ///
1436 /// ```
1437 /// use musli_web::api::MessageId;
1438 /// use musli_web::web::RawPacket;
1439 ///
1440 /// let packet = RawPacket::empty();
1441 ///
1442 /// assert!(packet.is_empty());
1443 /// assert_eq!(packet.id(), MessageId::EMPTY);
1444 /// ```
1445 pub const fn empty() -> Self {
1446 Self {
1447 id: MessageId::EMPTY,
1448 buf: None,
1449 at: Cell::new(0),
1450 }
1451 }
1452
1453 /// Decode the contents of a raw packet.
1454 ///
1455 /// This can be called multiple times if there are multiple payloads in
1456 /// sequence of the response.
1457 ///
1458 /// You can check if the packet is empty using [`RawPacket::is_empty`].
1459 pub fn decode<'this, T>(&'this self) -> Result<T>
1460 where
1461 T: Decode<'this, Binary, Global>,
1462 {
1463 let at = self.at.get();
1464
1465 if self.id == MessageId::EMPTY {
1466 return Err(Error::new(ErrorKind::EmptyPacket));
1467 }
1468
1469 let Some(bytes) = self.as_slice().get(at..) else {
1470 return Err(Error::new(ErrorKind::Overflow(at, self.as_slice().len())));
1471 };
1472
1473 let mut reader = SliceReader::new(bytes);
1474
1475 match storage::decode(&mut reader) {
1476 Ok(value) => {
1477 self.at.set(at + bytes.len() - reader.remaining());
1478 Ok(value)
1479 }
1480 Err(error) => {
1481 self.at.set(self.len());
1482 Err(Error::from(error))
1483 }
1484 }
1485 }
1486
1487 /// Get the underlying byte slice of the packet.
1488 ///
1489 /// # Examples
1490 ///
1491 /// ```
1492 /// use musli_web::web::RawPacket;
1493 ///
1494 /// let packet = RawPacket::empty();
1495 /// assert_eq!(packet.as_slice(), &[]);
1496 /// ```
1497 pub fn as_slice(&self) -> &[u8] {
1498 match &self.buf {
1499 Some(buf) => buf.as_ref(),
1500 None => &[],
1501 }
1502 }
1503
1504 /// Get the length of the packet.
1505 ///
1506 /// # Examples
1507 ////
1508 /// ```
1509 /// use musli_web::web::RawPacket;
1510 ///
1511 /// let packet = RawPacket::empty();
1512 /// assert_eq!(packet.len(), 0);
1513 /// ```
1514 pub fn len(&self) -> usize {
1515 match &self.buf {
1516 Some(buf) => buf.len(),
1517 None => 0,
1518 }
1519 }
1520
1521 /// Check if the packet is empty.
1522 ///
1523 /// # Examples
1524 ///
1525 /// ```
1526 /// use musli_web::web::RawPacket;
1527 ///
1528 /// let packet = RawPacket::empty();
1529 /// assert!(packet.is_empty());
1530 /// ```
1531 pub fn is_empty(&self) -> bool {
1532 self.at.get() >= self.len()
1533 }
1534
1535 /// The id of the packet this is a response to as specified by
1536 /// [`Endpoint::ID`] or [`Broadcast::ID`].
1537 ///
1538 /// [`Endpoint::ID`]: crate::api::Endpoint::ID
1539 /// [`Broadcast::ID`]: crate::api::Broadcast::ID
1540 pub fn id(&self) -> MessageId {
1541 self.id
1542 }
1543}
1544
1545/// A typed packet of data.
1546#[derive(Clone)]
1547pub struct Packet<T> {
1548 raw: RawPacket,
1549 _marker: PhantomData<T>,
1550}
1551
1552impl<T> Packet<T> {
1553 /// Construct an empty package.
1554 ///
1555 /// # Examples
1556 ///
1557 /// ```
1558 /// use musli_web::api::MessageId;
1559 /// use musli_web::web::Packet;
1560 ///
1561 /// let packet = Packet::<()>::empty();
1562 ///
1563 /// assert!(packet.is_empty());
1564 /// assert_eq!(packet.id(), MessageId::EMPTY);
1565 /// ```
1566 pub const fn empty() -> Self {
1567 Self {
1568 raw: RawPacket::empty(),
1569 _marker: PhantomData,
1570 }
1571 }
1572
1573 /// Construct a new typed package from a raw one.
1574 ///
1575 /// Note that this does not guarantee that the typed package is correct, but
1576 /// the `T` parameter becomes associated with it allowing it to be used
1577 /// automatically with methods such as [`Packet::decode`].
1578 #[inline]
1579 pub fn new(raw: RawPacket) -> Self {
1580 Self {
1581 raw,
1582 _marker: PhantomData,
1583 }
1584 }
1585
1586 /// Convert a packet into a raw packet.
1587 ///
1588 /// To determine which endpoint or broadcast it belongs to the
1589 /// [`RawPacket::id`] method can be used.
1590 pub fn into_raw(self) -> RawPacket {
1591 self.raw
1592 }
1593
1594 /// Check if the packet is empty.
1595 ///
1596 /// # Examples
1597 ///
1598 /// ```
1599 /// use musli_web::web::Packet;
1600 ///
1601 /// let packet = Packet::<()>::empty();
1602 /// assert!(packet.is_empty());
1603 /// ```
1604 pub fn is_empty(&self) -> bool {
1605 self.raw.is_empty()
1606 }
1607
1608 /// The id of the packet this is a response to as specified by
1609 /// [`Endpoint::ID`] or [`Broadcast::ID`].
1610 ///
1611 /// [`Endpoint::ID`]: crate::api::Endpoint::ID
1612 /// [`Broadcast::ID`]: crate::api::Broadcast::ID
1613 pub fn id(&self) -> MessageId {
1614 self.raw.id()
1615 }
1616}
1617
1618impl<T> Packet<T>
1619where
1620 T: api::Decodable,
1621{
1622 /// Decode the contents of a packet.
1623 ///
1624 /// This can be called multiple times if there are multiple payloads in
1625 /// sequence of the response.
1626 ///
1627 /// You can check if the packet is empty using [`Packet::is_empty`].
1628 pub fn decode(&self) -> Result<T::Type<'_>> {
1629 self.decode_any()
1630 }
1631
1632 /// Decode any contents of a packet.
1633 ///
1634 /// This can be called multiple times if there are multiple payloads in
1635 /// sequence of the response.
1636 ///
1637 /// You can check if the packet is empty using [`Packet::is_empty`].
1638 pub fn decode_any<'de, R>(&'de self) -> Result<R>
1639 where
1640 R: Decode<'de, Binary, Global>,
1641 {
1642 self.raw.decode()
1643 }
1644}
1645
1646impl<T> Packet<T>
1647where
1648 T: api::Endpoint,
1649{
1650 /// Decode the contents of a packet.
1651 ///
1652 /// This can be called multiple times if there are multiple payloads in
1653 /// sequence of the response.
1654 ///
1655 /// You can check if the packet is empty using [`Packet::is_empty`].
1656 pub fn decode_response(&self) -> Result<T::Response<'_>> {
1657 self.decode_any_response()
1658 }
1659
1660 /// Decode any contents of a packet.
1661 ///
1662 /// This can be called multiple times if there are multiple payloads in
1663 /// sequence of the response.
1664 ///
1665 /// You can check if the packet is empty using [`Packet::is_empty`].
1666 pub fn decode_any_response<'de, R>(&'de self) -> Result<R>
1667 where
1668 R: Decode<'de, Binary, Global>,
1669 {
1670 self.raw.decode()
1671 }
1672}
1673
1674impl<T> Packet<T>
1675where
1676 T: api::Broadcast,
1677{
1678 /// Decode the primary event related to a broadcast.
1679 pub fn decode_event<'de>(&'de self) -> Result<T::Event<'de>>
1680 where
1681 T: api::BroadcastWithEvent,
1682 {
1683 self.decode_event_any()
1684 }
1685
1686 /// Decode any event related to a broadcast.
1687 pub fn decode_event_any<'de, E>(&'de self) -> Result<E>
1688 where
1689 E: Event<Broadcast = T> + Decode<'de, Binary, Global>,
1690 {
1691 self.raw.decode()
1692 }
1693}
1694
1695/// A handle to the WebSocket service.
1696#[derive(Clone)]
1697#[repr(transparent)]
1698pub struct Handle<H>
1699where
1700 H: WebImpl,
1701{
1702 shared: Weak<Shared<H>>,
1703}
1704
1705impl<H> Handle<H>
1706where
1707 H: WebImpl,
1708{
1709 /// Send a request of type `T`.
1710 ///
1711 /// Returns a handle for the request.
1712 ///
1713 /// If the handle is dropped, the request is cancelled.
1714 ///
1715 /// # Examples
1716 ///
1717 /// ```
1718 /// # extern crate yew021 as yew;
1719 /// use yew::prelude::*;
1720 /// use musli_web::web03::prelude::*;
1721 ///
1722 /// mod api {
1723 /// use musli::{Decode, Encode};
1724 /// use musli_web::api;
1725 ///
1726 /// #[derive(Encode, Decode)]
1727 /// pub struct HelloRequest<'de> {
1728 /// pub message: &'de str,
1729 /// }
1730 ///
1731 /// #[derive(Encode, Decode)]
1732 /// pub struct HelloResponse<'de> {
1733 /// pub message: &'de str,
1734 /// }
1735 ///
1736 /// api::define! {
1737 /// pub type Hello;
1738 ///
1739 /// impl Endpoint for Hello {
1740 /// impl<'de> Request for HelloRequest<'de>;
1741 /// type Response<'de> = HelloResponse<'de>;
1742 /// }
1743 /// }
1744 /// }
1745 ///
1746 /// enum Msg {
1747 /// OnHello(Result<ws::Packet<api::Hello>, ws::Error>),
1748 /// }
1749 ///
1750 /// #[derive(Properties, PartialEq)]
1751 /// struct Props {
1752 /// ws: ws::Handle,
1753 /// }
1754 ///
1755 /// struct App {
1756 /// message: String,
1757 /// _hello: ws::Request,
1758 /// }
1759 ///
1760 /// impl Component for App {
1761 /// type Message = Msg;
1762 /// type Properties = Props;
1763 ///
1764 /// fn create(ctx: &Context<Self>) -> Self {
1765 /// let hello = ctx.props().ws
1766 /// .request()
1767 /// .body(api::HelloRequest { message: "Hello!"})
1768 /// .on_packet(ctx.link().callback(Msg::OnHello))
1769 /// .send();
1770 ///
1771 /// Self {
1772 /// message: String::from("No Message :("),
1773 /// _hello: hello,
1774 /// }
1775 /// }
1776 ///
1777 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1778 /// match msg {
1779 /// Msg::OnHello(Err(error)) => {
1780 /// tracing::error!("Request error: {:?}", error);
1781 /// false
1782 /// }
1783 /// Msg::OnHello(Ok(packet)) => {
1784 /// if let Ok(response) = packet.decode() {
1785 /// self.message = response.message.to_owned();
1786 /// }
1787 ///
1788 /// true
1789 /// }
1790 /// }
1791 /// }
1792 ///
1793 /// fn view(&self, ctx: &Context<Self>) -> Html {
1794 /// html! {
1795 /// <div>
1796 /// <h1>{"WebSocket Example"}</h1>
1797 /// <p>{format!("Message: {}", self.message)}</p>
1798 /// </div>
1799 /// }
1800 /// }
1801 /// }
1802 /// ```
1803 pub fn request(&self) -> RequestBuilder<'_, H, EmptyBody, EmptyCallback> {
1804 RequestBuilder {
1805 shared: &self.shared,
1806 body: EmptyBody,
1807 callback: EmptyCallback,
1808 }
1809 }
1810
1811 /// Listen for broadcasts of type `T`.
1812 ///
1813 /// Returns a handle for the listener that will cancel the listener if
1814 /// dropped.
1815 ///
1816 /// # Examples
1817 ///
1818 /// ```
1819 /// # extern crate yew021 as yew;
1820 /// use yew::prelude::*;
1821 /// use musli_web::web03::prelude::*;
1822 ///
1823 /// mod api {
1824 /// use musli::{Decode, Encode};
1825 /// use musli_web::api;
1826 ///
1827 /// #[derive(Encode, Decode)]
1828 /// pub struct TickEvent<'de> {
1829 /// pub message: &'de str,
1830 /// pub tick: u32,
1831 /// }
1832 ///
1833 /// api::define! {
1834 /// pub type Tick;
1835 ///
1836 /// impl Broadcast for Tick {
1837 /// impl<'de> Event for TickEvent<'de>;
1838 /// }
1839 /// }
1840 /// }
1841 ///
1842 /// enum Msg {
1843 /// Tick(Result<ws::Packet<api::Tick>, ws::Error>),
1844 /// }
1845 ///
1846 /// #[derive(Properties, PartialEq)]
1847 /// struct Props {
1848 /// ws: ws::Handle,
1849 /// }
1850 ///
1851 /// struct App {
1852 /// tick: u32,
1853 /// _listen: ws::Listener,
1854 /// }
1855 ///
1856 /// impl Component for App {
1857 /// type Message = Msg;
1858 /// type Properties = Props;
1859 ///
1860 /// fn create(ctx: &Context<Self>) -> Self {
1861 /// let listen = ctx.props().ws.on_broadcast(ctx.link().callback(Msg::Tick));
1862 ///
1863 /// Self {
1864 /// tick: 0,
1865 /// _listen: listen,
1866 /// }
1867 /// }
1868 ///
1869 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1870 /// match msg {
1871 /// Msg::Tick(Err(error)) => {
1872 /// tracing::error!("Tick error: {error}");
1873 /// false
1874 /// }
1875 /// Msg::Tick(Ok(packet)) => {
1876 /// if let Ok(tick) = packet.decode_event() {
1877 /// self.tick = tick.tick;
1878 /// }
1879 ///
1880 /// true
1881 /// }
1882 /// }
1883 /// }
1884 ///
1885 /// fn view(&self, ctx: &Context<Self>) -> Html {
1886 /// html! {
1887 /// <div>
1888 /// <h1>{"WebSocket Example"}</h1>
1889 /// <p>{format!("Tick: {}", self.tick)}</p>
1890 /// </div>
1891 /// }
1892 /// }
1893 /// }
1894 /// ```
1895 pub fn on_broadcast<T>(&self, callback: impl Callback<Result<Packet<T>>>) -> Listener
1896 where
1897 T: api::Broadcast,
1898 {
1899 self.on_raw_broadcast::<T>(move |result| match result {
1900 Ok(packet) => callback.call(Ok(Packet::new(packet))),
1901 Err(error) => callback.call(Err(error)),
1902 })
1903 }
1904
1905 /// Listen for broadcasts of type `T`.
1906 ///
1907 /// Returns a handle for the listener that will cancel the listener if
1908 /// dropped.
1909 ///
1910 /// # Examples
1911 ///
1912 /// ```
1913 /// # extern crate yew021 as yew;
1914 /// use yew::prelude::*;
1915 /// use musli_web::web03::prelude::*;
1916 ///
1917 /// mod api {
1918 /// use musli::{Decode, Encode};
1919 /// use musli_web::api;
1920 ///
1921 /// #[derive(Encode, Decode)]
1922 /// pub struct TickEvent<'de> {
1923 /// pub message: &'de str,
1924 /// pub tick: u32,
1925 /// }
1926 ///
1927 /// api::define! {
1928 /// pub type Tick;
1929 ///
1930 /// impl Broadcast for Tick {
1931 /// impl<'de> Event for TickEvent<'de>;
1932 /// }
1933 /// }
1934 /// }
1935 ///
1936 /// enum Msg {
1937 /// Tick(Result<ws::RawPacket, ws::Error>),
1938 /// }
1939 ///
1940 /// #[derive(Properties, PartialEq)]
1941 /// struct Props {
1942 /// ws: ws::Handle,
1943 /// }
1944 ///
1945 /// struct App {
1946 /// tick: u32,
1947 /// _listen: ws::Listener,
1948 /// }
1949 ///
1950 /// impl Component for App {
1951 /// type Message = Msg;
1952 /// type Properties = Props;
1953 ///
1954 /// fn create(ctx: &Context<Self>) -> Self {
1955 /// let link = ctx.link().clone();
1956 /// let listen = ctx.props().ws.on_raw_broadcast::<api::Tick>(ctx.link().callback(Msg::Tick));
1957 ///
1958 /// Self {
1959 /// tick: 0,
1960 /// _listen: listen,
1961 /// }
1962 /// }
1963 ///
1964 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1965 /// match msg {
1966 /// Msg::Tick(Err(error)) => {
1967 /// tracing::error!("Tick error: {error}");
1968 /// false
1969 /// }
1970 /// Msg::Tick(Ok(packet)) => {
1971 /// if let Ok(tick) = packet.decode::<api::TickEvent>() {
1972 /// self.tick = tick.tick;
1973 /// }
1974 ///
1975 /// true
1976 /// }
1977 /// }
1978 /// }
1979 ///
1980 /// fn view(&self, ctx: &Context<Self>) -> Html {
1981 /// html! {
1982 /// <div>
1983 /// <h1>{"WebSocket Example"}</h1>
1984 /// <p>{format!("Tick: {}", self.tick)}</p>
1985 /// </div>
1986 /// }
1987 /// }
1988 /// }
1989 /// ```
1990 pub fn on_raw_broadcast<T>(&self, callback: impl Callback<Result<RawPacket>>) -> Listener
1991 where
1992 T: api::Broadcast,
1993 {
1994 let Some(shared) = self.shared.upgrade() else {
1995 return Listener::empty_with_kind(T::ID);
1996 };
1997
1998 let index = {
1999 let mut broadcasts = shared.g.broadcasts.borrow_mut();
2000 let slots = broadcasts.entry(T::ID).or_default();
2001 slots.insert(Rc::new(callback))
2002 };
2003
2004 Listener {
2005 kind: Some(T::ID),
2006 index,
2007 g: Rc::downgrade(&shared.g),
2008 }
2009 }
2010
2011 /// Listen for state changes to the underlying connection.
2012 ///
2013 /// This indicates when the connection is open and ready to receive requests
2014 /// through [`State::Open`], or if it's closed and requests will be queued
2015 /// through [`State::Closed`].
2016 ///
2017 /// Note that if you are connecting through a proxy the reported updates
2018 /// might be volatile. It is always best to send a message over the
2019 /// connection on the server side that once received allows the client to
2020 /// know that it is connected.
2021 ///
2022 /// Dropping the returned handle will cancel the listener.
2023 ///
2024 /// # Examples
2025 ///
2026 /// ```
2027 /// # extern crate yew021 as yew;
2028 /// use yew::prelude::*;
2029 /// use musli_web::web03::prelude::*;
2030 ///
2031 /// enum Msg {
2032 /// StateChange(ws::State),
2033 /// }
2034 ///
2035 /// #[derive(Properties, PartialEq)]
2036 /// struct Props {
2037 /// ws: ws::Handle,
2038 /// }
2039 ///
2040 /// struct App {
2041 /// state: ws::State,
2042 /// _listen: ws::StateListener,
2043 /// }
2044 ///
2045 /// impl Component for App {
2046 /// type Message = Msg;
2047 /// type Properties = Props;
2048 ///
2049 /// fn create(ctx: &Context<Self>) -> Self {
2050 /// let link = ctx.link().clone();
2051 ///
2052 /// let (state, listen) = ctx.props().ws.on_state_change(move |state| {
2053 /// link.send_message(Msg::StateChange(state));
2054 /// });
2055 ///
2056 /// Self {
2057 /// state,
2058 /// _listen: listen,
2059 /// }
2060 /// }
2061 ///
2062 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
2063 /// match msg {
2064 /// Msg::StateChange(state) => {
2065 /// self.state = state;
2066 /// true
2067 /// }
2068 /// }
2069 /// }
2070 ///
2071 /// fn view(&self, ctx: &Context<Self>) -> Html {
2072 /// html! {
2073 /// <div>
2074 /// <h1>{"WebSocket Example"}</h1>
2075 /// <p>{format!("State: {:?}", self.state)}</p>
2076 /// </div>
2077 /// }
2078 /// }
2079 /// }
2080 pub fn on_state_change(&self, callback: impl Callback<State>) -> (State, StateListener) {
2081 let Some(shared) = self.shared.upgrade() else {
2082 return (
2083 State::Closed,
2084 StateListener {
2085 index: 0,
2086 g: Weak::new(),
2087 },
2088 );
2089 };
2090
2091 let (state, index) = {
2092 let index = shared
2093 .g
2094 .state_listeners
2095 .borrow_mut()
2096 .insert(Rc::new(callback));
2097 (shared.state.get(), index)
2098 };
2099
2100 let listener = StateListener {
2101 index,
2102 g: Rc::downgrade(&shared.g),
2103 };
2104
2105 (state, listener)
2106 }
2107}
2108
2109impl<H> PartialEq for Handle<H>
2110where
2111 H: WebImpl,
2112{
2113 #[inline]
2114 fn eq(&self, _: &Self) -> bool {
2115 true
2116 }
2117}
2118
2119struct Pending<C>
2120where
2121 C: ?Sized,
2122{
2123 kind: MessageId,
2124 serial: u32,
2125 callback: C,
2126}
2127
2128impl<C> fmt::Debug for Pending<C> {
2129 #[inline]
2130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2131 f.debug_struct("Pending")
2132 .field("serial", &self.serial)
2133 .field("kind", &self.kind)
2134 .finish_non_exhaustive()
2135 }
2136}
2137struct ForcePrefix<'a>(&'a str, char);
2138
2139impl fmt::Display for ForcePrefix<'_> {
2140 #[inline]
2141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2142 let Self(string, prefix) = *self;
2143 prefix.fmt(f)?;
2144 string.trim_start_matches(prefix).fmt(f)?;
2145 Ok(())
2146 }
2147}