musli_web/web.rs
1//! The generic web implementation.
2//!
3//! This is specialized over the `H` parameter through modules such as:
4//!
5//! * [`web03`] for `web-sys` `0.3.x`.
6//!
7//! [`web03`]: crate::web03
8
9use core::cell::{Cell, RefCell};
10use core::fmt;
11use core::marker::PhantomData;
12use core::mem;
13use core::ops::Deref;
14use core::ptr::NonNull;
15use std::collections::VecDeque;
16
17use alloc::boxed::Box;
18use alloc::format;
19use alloc::rc::Rc;
20use alloc::rc::Weak;
21use alloc::string::{String, ToString};
22use alloc::vec::Vec;
23
24use std::collections::hash_map::{Entry, HashMap};
25
26use musli::Decode;
27use musli::alloc::Global;
28use musli::mode::Binary;
29use musli::reader::SliceReader;
30use musli::storage;
31
32use slab::Slab;
33
34use crate::api::{self, Event, MessageId};
35
36const MAX_CAPACITY: usize = 1048576;
37
38/// An empty request body.
39#[non_exhaustive]
40pub struct EmptyBody;
41
42/// An empty callback.
43#[non_exhaustive]
44pub struct EmptyCallback;
45
46/// Slab of state listeners.
47type StateListeners = Slab<Rc<dyn Callback<State>>>;
48/// Slab of broadcast listeners.
49type Broadcasts = HashMap<MessageId, Slab<Rc<dyn Callback<Result<RawPacket>>>>>;
50/// Queue of recycled buffers.
51type Buffers = VecDeque<Box<BufData>>;
52/// Pending requests.
53type Requests = HashMap<u32, Box<Pending<dyn Callback<Result<RawPacket>>>>>;
54
55/// Location information for websocket implementation.
56#[doc(hidden)]
57pub struct Location {
58 pub(crate) protocol: String,
59 pub(crate) host: String,
60 pub(crate) port: String,
61}
62
63pub(crate) mod sealed_socket {
64 pub trait Sealed {}
65}
66
67pub(crate) trait SocketImpl
68where
69 Self: Sized + self::sealed_socket::Sealed,
70{
71 #[doc(hidden)]
72 type Handles;
73
74 #[doc(hidden)]
75 fn new(url: &str, handles: &Self::Handles) -> Result<Self, Error>;
76
77 #[doc(hidden)]
78 fn send(&self, data: &[u8]) -> Result<(), Error>;
79
80 #[doc(hidden)]
81 fn close(self) -> Result<(), Error>;
82}
83
84pub(crate) mod sealed_performance {
85 pub trait Sealed {}
86}
87
88pub trait PerformanceImpl
89where
90 Self: Sized + self::sealed_performance::Sealed,
91{
92 #[doc(hidden)]
93 fn now(&self) -> f64;
94}
95
96pub(crate) mod sealed_window {
97 pub trait Sealed {}
98}
99
100pub(crate) trait WindowImpl
101where
102 Self: Sized + self::sealed_window::Sealed,
103{
104 #[doc(hidden)]
105 type Performance: PerformanceImpl;
106
107 #[doc(hidden)]
108 type Timeout;
109
110 #[doc(hidden)]
111 fn new() -> Result<Self, Error>;
112
113 #[doc(hidden)]
114 fn performance(&self) -> Result<Self::Performance, Error>;
115
116 #[doc(hidden)]
117 fn location(&self) -> Result<Location, Error>;
118
119 #[doc(hidden)]
120 fn set_timeout(
121 &self,
122 millis: u32,
123 callback: impl FnOnce() + 'static,
124 ) -> Result<Self::Timeout, Error>;
125}
126
127pub(crate) mod sealed_web {
128 pub trait Sealed {}
129}
130
131/// Central trait for web integration.
132///
133/// Since web integration is currently unstable, this requires multiple
134/// different implementations, each time an ecosystem breaking change is
135/// released.
136///
137/// The crate in focus here is `web-sys`, and the corresponding modules provide
138/// integrations:
139///
140/// * [web03] for `web-sys` `0.3.x`.
141///
142/// [web03]: crate::web03
143pub trait WebImpl
144where
145 Self: 'static + Copy + Sized + self::sealed_web::Sealed,
146{
147 #[doc(hidden)]
148 #[allow(private_bounds)]
149 type Window: WindowImpl;
150
151 #[doc(hidden)]
152 type Handles;
153
154 #[doc(hidden)]
155 #[allow(private_bounds)]
156 type Socket: SocketImpl<Handles = Self::Handles>;
157
158 #[doc(hidden)]
159 #[allow(private_interfaces)]
160 fn handles(shared: &Weak<Shared<Self>>) -> Self::Handles;
161
162 #[doc(hidden)]
163 fn random(range: u32) -> u32;
164}
165
166/// Construct a new [`ServiceBuilder`] associated with the given [`Connect`]
167/// strategy.
168pub fn connect<H>(connect: Connect) -> ServiceBuilder<H, EmptyCallback>
169where
170 H: WebImpl,
171{
172 ServiceBuilder {
173 connect,
174 on_error: EmptyCallback,
175 _marker: PhantomData,
176 }
177}
178
179/// The state of the connection.
180///
181/// A listener for state changes can be set up through for example
182/// [`Handle::on_state_change`].
183#[derive(Debug, PartialEq, Eq, Clone, Copy)]
184#[non_exhaustive]
185pub enum State {
186 /// The connection is open.
187 Open,
188 /// The connection is closed.
189 Closed,
190}
191
192/// Error type for the WebSocket service.
193#[derive(Debug)]
194pub struct Error {
195 kind: ErrorKind,
196}
197
198impl Error {
199 /// Check if the error is caused by an empty packet.
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// use musli_web::web::{Error, RawPacket};
205 ///
206 /// let packet = RawPacket::empty();
207 /// let e = packet.decode::<u32>().unwrap_err();
208 ///
209 /// assert!(e.is_empty_packet());
210 /// ```
211 pub fn is_empty_packet(&self) -> bool {
212 matches!(self.kind, ErrorKind::EmptyPacket)
213 }
214}
215
216#[derive(Debug)]
217enum ErrorKind {
218 EmptyPacket,
219 Message(String),
220 Storage(storage::Error),
221 Overflow(usize, usize),
222}
223
224impl Error {
225 #[inline]
226 fn new(kind: ErrorKind) -> Self {
227 Self { kind }
228 }
229
230 #[inline]
231 pub(crate) fn msg(message: impl fmt::Display) -> Self {
232 Self::new(ErrorKind::Message(message.to_string()))
233 }
234}
235
236impl fmt::Display for Error {
237 #[inline]
238 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239 match &self.kind {
240 ErrorKind::EmptyPacket => write!(f, "Packet is empty"),
241 ErrorKind::Message(message) => write!(f, "{message}"),
242 ErrorKind::Storage(error) => write!(f, "Encoding error: {error}"),
243 ErrorKind::Overflow(at, len) => {
244 write!(f, "Internal packet overflow, {at} not in range 0-{len}")
245 }
246 }
247 }
248}
249
250impl core::error::Error for Error {
251 #[inline]
252 fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
253 match &self.kind {
254 ErrorKind::Storage(error) => Some(error),
255 _ => None,
256 }
257 }
258}
259
260impl From<storage::Error> for Error {
261 #[inline]
262 fn from(error: storage::Error) -> Self {
263 Self::new(ErrorKind::Storage(error))
264 }
265}
266
267#[cfg(feature = "wasm_bindgen02")]
268impl From<wasm_bindgen02::JsValue> for Error {
269 #[inline]
270 fn from(error: wasm_bindgen02::JsValue) -> Self {
271 Self::new(ErrorKind::Message(format!("{error:?}")))
272 }
273}
274
275type Result<T, E = Error> = core::result::Result<T, E>;
276
277const INITIAL_TIMEOUT: u32 = 250;
278const MAX_TIMEOUT: u32 = 4000;
279
280/// How to connect to the websocket.
281enum ConnectKind {
282 Location { path: String },
283 Url { url: String },
284}
285
286/// A specification for how to connect a websocket.
287pub struct Connect {
288 kind: ConnectKind,
289}
290
291impl Connect {
292 /// Connect to the same location with a custom path.
293 ///
294 /// Note that any number of `/` prefixes are ignored, the canonical
295 /// representation always ignores them and the path is relative to the
296 /// current location.
297 #[inline]
298 pub fn location(path: impl AsRef<str>) -> Self {
299 Self {
300 kind: ConnectKind::Location {
301 path: String::from(path.as_ref()),
302 },
303 }
304 }
305
306 /// Connect to the specified URL.
307 #[inline]
308 pub fn url(url: String) -> Self {
309 Self {
310 kind: ConnectKind::Url { url },
311 }
312 }
313}
314
315/// Generic but shared fields which do not depend on specialization over `H`.
316struct Generic {
317 state_listeners: RefCell<StateListeners>,
318 requests: RefCell<Requests>,
319 broadcasts: RefCell<Broadcasts>,
320 buffers: RefCell<Buffers>,
321}
322
323/// Shared implementation details for websocket implementations.
324pub(crate) struct Shared<H>
325where
326 H: WebImpl,
327{
328 connect: Connect,
329 pub(crate) on_error: Box<dyn Callback<Error>>,
330 window: H::Window,
331 performance: <H::Window as WindowImpl>::Performance,
332 handles: H::Handles,
333 state: Cell<State>,
334 opened: Cell<Option<f64>>,
335 serial: Cell<u32>,
336 defer_broadcasts: RefCell<VecDeque<Weak<dyn Callback<Result<RawPacket>>>>>,
337 defer_state_listeners: RefCell<VecDeque<Weak<dyn Callback<State>>>>,
338 pub(crate) socket: RefCell<Option<H::Socket>>,
339 output: RefCell<Vec<u8>>,
340 current_timeout: Cell<u32>,
341 reconnect_timeout: RefCell<Option<<H::Window as WindowImpl>::Timeout>>,
342 g: Rc<Generic>,
343}
344
345impl<H> Drop for Shared<H>
346where
347 H: WebImpl,
348{
349 fn drop(&mut self) {
350 if let Some(s) = self.socket.take()
351 && let Err(e) = s.close()
352 {
353 self.on_error.call(e);
354 }
355
356 // We don't need to worry about mutable borrows here, since we only have
357 // weak references to Shared and by virtue of this being dropped they
358 // are all invalid.
359 let state_listeners = mem::take(&mut *self.g.state_listeners.borrow_mut());
360 let mut requests = self.g.requests.borrow_mut();
361
362 for (_, listener) in state_listeners {
363 listener.call(State::Closed);
364 }
365
366 for (_, p) in requests.drain() {
367 p.callback.call(Err(Error::msg("Websocket service closed")));
368 }
369 }
370}
371
372/// Builder of a service.
373pub struct ServiceBuilder<H, E>
374where
375 H: WebImpl,
376{
377 connect: Connect,
378 on_error: E,
379 _marker: PhantomData<H>,
380}
381
382impl<H, E> ServiceBuilder<H, E>
383where
384 H: WebImpl,
385 E: Callback<Error>,
386{
387 /// Set the error handler to use for the service.
388 pub fn on_error<U>(self, on_error: U) -> ServiceBuilder<H, U>
389 where
390 U: Callback<Error>,
391 {
392 ServiceBuilder {
393 connect: self.connect,
394 on_error,
395 _marker: self._marker,
396 }
397 }
398
399 /// Build a new service and handle.
400 pub fn build(self) -> Service<H> {
401 let window = match H::Window::new() {
402 Ok(window) => window,
403 Err(error) => {
404 panic!("{error}")
405 }
406 };
407
408 let performance = match WindowImpl::performance(&window) {
409 Ok(performance) => performance,
410 Err(error) => {
411 panic!("{error}")
412 }
413 };
414
415 let shared = Rc::<Shared<H>>::new_cyclic(move |shared| Shared {
416 connect: self.connect,
417 on_error: Box::new(self.on_error),
418 window,
419 performance,
420 handles: H::handles(shared),
421 state: Cell::new(State::Closed),
422 opened: Cell::new(None),
423 serial: Cell::new(0),
424 defer_broadcasts: RefCell::new(VecDeque::new()),
425 defer_state_listeners: RefCell::new(VecDeque::new()),
426 socket: RefCell::new(None),
427 output: RefCell::new(Vec::new()),
428 current_timeout: Cell::new(INITIAL_TIMEOUT),
429 reconnect_timeout: RefCell::new(None),
430 g: Rc::new(Generic {
431 state_listeners: RefCell::new(Slab::new()),
432 broadcasts: RefCell::new(HashMap::new()),
433 requests: RefCell::new(Requests::new()),
434 buffers: RefCell::new(VecDeque::new()),
435 }),
436 });
437
438 let handle = Handle {
439 shared: Rc::downgrade(&shared),
440 };
441
442 Service { shared, handle }
443 }
444}
445
446/// The service handle.
447///
448/// Once dropped this will cause the service to be disconnected and all requests
449/// to be cancelled.
450pub struct Service<H>
451where
452 H: WebImpl,
453{
454 shared: Rc<Shared<H>>,
455 handle: Handle<H>,
456}
457
458impl<H> Service<H>
459where
460 H: WebImpl,
461{
462 /// Attempt to establish a websocket connection.
463 pub fn connect(&self) {
464 self.shared.connect()
465 }
466
467 /// Build a handle to the service.
468 pub fn handle(&self) -> &Handle<H> {
469 &self.handle
470 }
471}
472
473impl<H> Shared<H>
474where
475 H: WebImpl,
476{
477 /// Send a client message.
478 fn send_client_request<T>(&self, serial: u32, body: &T) -> Result<()>
479 where
480 T: api::Request,
481 {
482 let Some(ref socket) = *self.socket.borrow() else {
483 return Err(Error::msg("Socket is not connected"));
484 };
485
486 let header = api::RequestHeader {
487 serial,
488 id: <T::Endpoint as api::Endpoint>::ID.get(),
489 };
490
491 let out = &mut *self.output.borrow_mut();
492
493 storage::to_writer(&mut *out, &header)?;
494 storage::to_writer(&mut *out, &body)?;
495
496 tracing::debug!(
497 header.serial,
498 ?header.id,
499 len = out.len(),
500 "Sending request"
501 );
502
503 socket.send(out.as_slice())?;
504
505 out.clear();
506 out.shrink_to(MAX_CAPACITY);
507 Ok(())
508 }
509
510 pub(crate) fn next_buffer(self: &Rc<Self>, needed: usize) -> Box<BufData> {
511 match self.g.buffers.borrow_mut().pop_front() {
512 Some(mut buf) => {
513 if buf.data.capacity() < needed {
514 buf.data.reserve(needed - buf.data.len());
515 }
516
517 buf
518 }
519 None => Box::new(BufData::with_capacity(Rc::downgrade(&self.g), needed)),
520 }
521 }
522
523 pub(crate) fn message(self: &Rc<Self>, buf: Box<BufData>) -> Result<()> {
524 // Wrap the buffer in a simple shared reference-counted container.
525 let buf = BufRc::new(buf);
526 let mut reader = SliceReader::new(&buf);
527
528 let header: api::ResponseHeader = storage::decode(&mut reader)?;
529
530 if let Some(broadcast) = MessageId::new(header.broadcast) {
531 tracing::debug!(?header, "Got broadcast");
532
533 if !self.prepare_broadcast(broadcast) {
534 return Ok(());
535 };
536
537 if let Some(id) = MessageId::new(header.error) {
538 let error = if id == MessageId::ERROR_MESSAGE {
539 storage::decode(&mut reader)?
540 } else {
541 api::ErrorMessage {
542 message: "Unknown error",
543 }
544 };
545
546 while let Some(callback) = self.defer_broadcasts.borrow_mut().pop_front() {
547 if let Some(callback) = callback.upgrade() {
548 callback.call(Err(Error::msg(error.message)));
549 }
550 }
551
552 return Ok(());
553 }
554
555 let at = buf.len().saturating_sub(reader.remaining());
556
557 let packet = RawPacket {
558 buf: Some(buf.clone()),
559 at: Cell::new(at),
560 id: broadcast,
561 };
562
563 while let Some(callback) = self.defer_broadcasts.borrow_mut().pop_front() {
564 if let Some(callback) = callback.upgrade() {
565 callback.call(Ok(packet.clone()));
566 }
567 }
568 } else {
569 tracing::debug!(?header, "Got response");
570
571 let p = {
572 let mut requests = self.g.requests.borrow_mut();
573
574 let Some(p) = requests.remove(&header.serial) else {
575 tracing::debug!(?header, "Missing request");
576 return Ok(());
577 };
578
579 p
580 };
581
582 if let Some(id) = MessageId::new(header.error) {
583 let error = if id == MessageId::ERROR_MESSAGE {
584 storage::decode(&mut reader)?
585 } else {
586 api::ErrorMessage {
587 message: "Unknown error",
588 }
589 };
590
591 p.callback.call(Err(Error::msg(error.message)));
592 return Ok(());
593 }
594
595 let at = buf.len().saturating_sub(reader.remaining());
596
597 let packet = RawPacket {
598 id: p.kind,
599 buf: Some(buf),
600 at: Cell::new(at),
601 };
602
603 p.callback.call(Ok(packet));
604 }
605
606 Ok(())
607 }
608
609 fn prepare_broadcast(self: &Rc<Self>, kind: MessageId) -> bool {
610 // Note: We need to defer this, since the outcome of calling
611 // the broadcast callback might be that the broadcast
612 // listener is modified, which could require mutable access
613 // to broadcasts.
614 let mut defer = self.defer_broadcasts.borrow_mut();
615
616 let broadcasts = self.g.broadcasts.borrow();
617
618 let Some(broadcasts) = broadcasts.get(&kind) else {
619 return false;
620 };
621
622 for (_, callback) in broadcasts.iter() {
623 defer.push_back(Rc::downgrade(callback));
624 }
625
626 !defer.is_empty()
627 }
628
629 pub(crate) fn set_open(&self) {
630 tracing::debug!("Set open");
631 self.opened
632 .set(Some(PerformanceImpl::now(&self.performance)));
633 self.emit_state_change(State::Open);
634 }
635
636 fn is_open_for_a_while(&self) -> bool {
637 let Some(at) = self.opened.get() else {
638 return false;
639 };
640
641 let now = PerformanceImpl::now(&self.performance);
642 (now - at) >= 250.0
643 }
644
645 pub(crate) fn close(self: &Rc<Self>) -> Result<(), Error> {
646 tracing::debug!("Close connection");
647
648 // We need a weak reference back to shared state to handle the timeout.
649 let shared = Rc::downgrade(self);
650
651 tracing::debug!(
652 "Set closed timeout={}, opened={:?}",
653 self.current_timeout.get(),
654 self.opened.get(),
655 );
656
657 if !self.is_open_for_a_while() {
658 let current_timeout = self.current_timeout.get();
659
660 if current_timeout < MAX_TIMEOUT {
661 let fuzz = H::random(50);
662
663 self.current_timeout.set(
664 current_timeout
665 .saturating_mul(2)
666 .saturating_add(fuzz)
667 .min(MAX_TIMEOUT),
668 );
669 }
670 } else {
671 self.current_timeout.set(INITIAL_TIMEOUT);
672 }
673
674 self.opened.set(None);
675 self.emit_state_change(State::Closed);
676
677 if let Some(s) = self.socket.take() {
678 s.close()?;
679 }
680
681 self.close_pending();
682
683 let timeout = self
684 .window
685 .set_timeout(self.current_timeout.get(), move || {
686 if let Some(shared) = shared.upgrade() {
687 Self::connect(&shared);
688 }
689 })?;
690
691 drop(self.reconnect_timeout.borrow_mut().replace(timeout));
692 Ok(())
693 }
694
695 /// Close an pending requests with an error, since there is no chance they
696 /// will be responded to any more.
697 fn close_pending(self: &Rc<Self>) {
698 loop {
699 let Some(serial) = self.g.requests.borrow().keys().next().copied() else {
700 break;
701 };
702
703 let p = {
704 let mut requests = self.g.requests.borrow_mut();
705
706 let Some(p) = requests.remove(&serial) else {
707 break;
708 };
709
710 p
711 };
712
713 p.callback.call(Err(Error::msg("Connection closed")));
714 }
715 }
716
717 fn emit_state_change(&self, state: State) {
718 if self.state.get() == state {
719 return;
720 }
721
722 self.state.set(state);
723
724 {
725 // We need to collect callbacks to avoid the callback recursively
726 // borrowing state listeners, which it would if it modifies any
727 // existing state listeners.
728 let mut defer = self.defer_state_listeners.borrow_mut();
729
730 for (_, callback) in self.g.state_listeners.borrow().iter() {
731 defer.push_back(Rc::downgrade(callback));
732 }
733
734 if defer.is_empty() {
735 return;
736 }
737 }
738
739 while let Some(callback) = self.defer_state_listeners.borrow_mut().pop_front() {
740 if let Some(callback) = callback.upgrade() {
741 callback.call(state);
742 }
743 }
744 }
745
746 fn is_closed(&self) -> bool {
747 self.opened.get().is_none()
748 }
749
750 fn connect(self: &Rc<Self>) {
751 tracing::debug!("Connect");
752
753 if let Err(e) = self.build() {
754 self.on_error.call(e);
755 } else {
756 return;
757 }
758
759 if let Err(e) = self.close() {
760 self.on_error.call(e);
761 }
762 }
763
764 /// Build a websocket connection.
765 fn build(self: &Rc<Self>) -> Result<()> {
766 let url = match &self.connect.kind {
767 ConnectKind::Location { path } => {
768 let Location {
769 protocol,
770 host,
771 port,
772 } = WindowImpl::location(&self.window)?;
773
774 let protocol = match protocol.as_str() {
775 "https:" => "wss:",
776 "http:" => "ws:",
777 other => {
778 return Err(Error::msg(format_args!(
779 "Same host connection is not supported for protocol `{other}`"
780 )));
781 }
782 };
783
784 let path = ForcePrefix(path, '/');
785 format!("{protocol}//{host}:{port}{path}")
786 }
787 ConnectKind::Url { url } => url.clone(),
788 };
789
790 let ws = SocketImpl::new(&url, &self.handles)?;
791
792 let old = self.socket.borrow_mut().replace(ws);
793
794 if let Some(old) = old {
795 old.close()?;
796 }
797
798 Ok(())
799 }
800}
801
802/// Trait governing how callbacks are called.
803pub trait Callback<I>
804where
805 Self: 'static,
806{
807 /// Call the callback.
808 fn call(&self, input: I);
809}
810
811impl<I> Callback<I> for EmptyCallback {
812 #[inline]
813 fn call(&self, _: I) {}
814}
815
816impl<F, I> Callback<I> for F
817where
818 F: 'static + Fn(I),
819{
820 #[inline]
821 fn call(&self, input: I) {
822 self(input)
823 }
824}
825
826/// A request builder .
827///
828/// Associate the callback to be used by using either
829/// [`RequestBuilder::on_packet`] or [`RequestBuilder::on_raw_packet`] depending
830/// on your needs.
831///
832/// Send the request with [`RequestBuilder::send`].
833pub struct RequestBuilder<'a, H, B, C>
834where
835 H: WebImpl,
836{
837 shared: &'a Weak<Shared<H>>,
838 body: B,
839 callback: C,
840}
841
842impl<'a, H, B, C> RequestBuilder<'a, H, B, C>
843where
844 H: WebImpl,
845{
846 /// Set the body of the request.
847 #[inline]
848 pub fn body<U>(self, body: U) -> RequestBuilder<'a, H, U, C>
849 where
850 U: api::Request,
851 {
852 RequestBuilder {
853 shared: self.shared,
854 body,
855 callback: self.callback,
856 }
857 }
858
859 /// Handle the response using the specified callback.
860 ///
861 /// # Examples
862 ///
863 /// ```
864 /// # extern crate yew021 as yew;
865 /// use yew::prelude::*;
866 /// use musli_web::web03::prelude::*;
867 ///
868 /// mod api {
869 /// use musli::{Decode, Encode};
870 /// use musli_web::api;
871 ///
872 /// #[derive(Encode, Decode)]
873 /// pub struct HelloRequest<'de> {
874 /// pub message: &'de str,
875 /// }
876 ///
877 /// #[derive(Encode, Decode)]
878 /// pub struct HelloResponse<'de> {
879 /// pub message: &'de str,
880 /// }
881 ///
882 /// api::define! {
883 /// pub type Hello;
884 ///
885 /// impl Endpoint for Hello {
886 /// impl<'de> Request for HelloRequest<'de>;
887 /// type Response<'de> = HelloResponse<'de>;
888 /// }
889 /// }
890 /// }
891 ///
892 /// enum Msg {
893 /// OnHello(Result<ws::Packet<api::Hello>, ws::Error>),
894 /// }
895 ///
896 /// #[derive(Properties, PartialEq)]
897 /// struct Props {
898 /// ws: ws::Handle,
899 /// }
900 ///
901 /// struct App {
902 /// message: String,
903 /// _hello: ws::Request,
904 /// }
905 ///
906 /// impl Component for App {
907 /// type Message = Msg;
908 /// type Properties = Props;
909 ///
910 /// fn create(ctx: &Context<Self>) -> Self {
911 /// let hello = ctx.props().ws
912 /// .request()
913 /// .body(api::HelloRequest { message: "Hello!"})
914 /// .on_packet(ctx.link().callback(Msg::OnHello))
915 /// .send();
916 ///
917 /// Self {
918 /// message: String::from("No Message :("),
919 /// _hello: hello,
920 /// }
921 /// }
922 ///
923 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
924 /// match msg {
925 /// Msg::OnHello(Err(error)) => {
926 /// tracing::error!("Request error: {:?}", error);
927 /// false
928 /// }
929 /// Msg::OnHello(Ok(packet)) => {
930 /// if let Ok(response) = packet.decode() {
931 /// self.message = response.message.to_owned();
932 /// }
933 ///
934 /// true
935 /// }
936 /// }
937 /// }
938 ///
939 /// fn view(&self, ctx: &Context<Self>) -> Html {
940 /// html! {
941 /// <div>
942 /// <h1>{"WebSocket Example"}</h1>
943 /// <p>{format!("Message: {}", self.message)}</p>
944 /// </div>
945 /// }
946 /// }
947 /// }
948 /// ```
949 pub fn on_packet<E>(
950 self,
951 callback: impl Callback<Result<Packet<E>>>,
952 ) -> RequestBuilder<'a, H, B, impl Callback<Result<RawPacket>>>
953 where
954 E: api::Endpoint,
955 {
956 self.on_raw_packet(move |result: Result<RawPacket>| match result {
957 Ok(ok) => callback.call(Ok(Packet::new(ok))),
958 Err(err) => callback.call(Err(err)),
959 })
960 }
961
962 /// Handle the response using the specified callback.
963 ///
964 /// # Examples
965 ///
966 /// ```
967 /// # extern crate yew021 as yew;
968 /// use yew::prelude::*;
969 /// use musli_web::web03::prelude::*;
970 ///
971 /// mod api {
972 /// use musli::{Decode, Encode};
973 /// use musli_web::api;
974 ///
975 /// #[derive(Encode, Decode)]
976 /// pub struct HelloRequest<'de> {
977 /// pub message: &'de str,
978 /// }
979 ///
980 /// #[derive(Encode, Decode)]
981 /// pub struct HelloResponse<'de> {
982 /// pub message: &'de str,
983 /// }
984 ///
985 /// api::define! {
986 /// pub type Hello;
987 ///
988 /// impl Endpoint for Hello {
989 /// impl<'de> Request for HelloRequest<'de>;
990 /// type Response<'de> = HelloResponse<'de>;
991 /// }
992 /// }
993 /// }
994 ///
995 /// enum Msg {
996 /// OnHello(Result<ws::RawPacket, ws::Error>),
997 /// }
998 ///
999 /// #[derive(Properties, PartialEq)]
1000 /// struct Props {
1001 /// ws: ws::Handle,
1002 /// }
1003 ///
1004 /// struct App {
1005 /// message: String,
1006 /// _hello: ws::Request,
1007 /// }
1008 ///
1009 /// impl Component for App {
1010 /// type Message = Msg;
1011 /// type Properties = Props;
1012 ///
1013 /// fn create(ctx: &Context<Self>) -> Self {
1014 /// let link = ctx.link().clone();
1015 ///
1016 /// let hello = ctx.props().ws
1017 /// .request()
1018 /// .body(api::HelloRequest { message: "Hello!"})
1019 /// .on_raw_packet(move |packet| link.send_message(Msg::OnHello(packet)))
1020 /// .send();
1021 ///
1022 /// Self {
1023 /// message: String::from("No Message :("),
1024 /// _hello: hello,
1025 /// }
1026 /// }
1027 ///
1028 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1029 /// match msg {
1030 /// Msg::OnHello(Err(error)) => {
1031 /// tracing::error!("Request error: {:?}", error);
1032 /// false
1033 /// }
1034 /// Msg::OnHello(Ok(packet)) => {
1035 /// if let Ok(response) = packet.decode::<api::HelloResponse>() {
1036 /// self.message = response.message.to_owned();
1037 /// }
1038 ///
1039 /// true
1040 /// }
1041 /// }
1042 /// }
1043 ///
1044 /// fn view(&self, ctx: &Context<Self>) -> Html {
1045 /// html! {
1046 /// <div>
1047 /// <h1>{"WebSocket Example"}</h1>
1048 /// <p>{format!("Message: {}", self.message)}</p>
1049 /// </div>
1050 /// }
1051 /// }
1052 /// }
1053 /// ```
1054 pub fn on_raw_packet<U>(self, callback: U) -> RequestBuilder<'a, H, B, U>
1055 where
1056 U: Callback<Result<RawPacket, Error>>,
1057 {
1058 RequestBuilder {
1059 shared: self.shared,
1060 body: self.body,
1061 callback,
1062 }
1063 }
1064}
1065
1066impl<'a, H, B, C> RequestBuilder<'a, H, B, C>
1067where
1068 B: api::Request,
1069 C: Callback<Result<RawPacket>>,
1070 H: WebImpl,
1071{
1072 /// Send the request.
1073 ///
1074 /// This requires that a body has been set using [`RequestBuilder::body`].
1075 pub fn send(self) -> Request {
1076 let Some(shared) = self.shared.upgrade() else {
1077 self.callback
1078 .call(Err(Error::msg("WebSocket service is down")));
1079 return Request::new();
1080 };
1081
1082 if shared.is_closed() {
1083 self.callback
1084 .call(Err(Error::msg("WebSocket is not connected")));
1085 return Request::new();
1086 }
1087
1088 let serial = shared.serial.get();
1089
1090 if let Err(error) = shared.send_client_request(serial, &self.body) {
1091 shared.on_error.call(error);
1092 return Request::new();
1093 }
1094
1095 shared.serial.set(serial.wrapping_add(1));
1096
1097 let pending = Pending {
1098 kind: <B::Endpoint as api::Endpoint>::ID,
1099 serial,
1100 callback: self.callback,
1101 };
1102
1103 let existing = shared
1104 .g
1105 .requests
1106 .borrow_mut()
1107 .insert(serial, Box::new(pending));
1108
1109 if let Some(p) = existing {
1110 p.callback.call(Err(Error::msg("Request cancelled")));
1111 }
1112
1113 Request {
1114 serial,
1115 g: Rc::downgrade(&shared.g),
1116 }
1117 }
1118}
1119
1120/// The handle for a pending request.
1121///
1122/// Dropping or [`clear()`] this handle will cancel the request.
1123///
1124/// [`clear()`]: Self::clear
1125pub struct Request {
1126 serial: u32,
1127 g: Weak<Generic>,
1128}
1129
1130impl Request {
1131 /// An empty request handler.
1132 #[inline]
1133 pub const fn new() -> Self {
1134 Self {
1135 serial: 0,
1136 g: Weak::new(),
1137 }
1138 }
1139
1140 /// Clear the request handle without dropping it, cancelling any pending
1141 /// requests.
1142 pub fn clear(&mut self) {
1143 let removed = {
1144 let serial = mem::take(&mut self.serial);
1145
1146 let Some(g) = self.g.upgrade() else {
1147 return;
1148 };
1149
1150 self.g = Weak::new();
1151
1152 let Some(p) = g.requests.borrow_mut().remove(&serial) else {
1153 return;
1154 };
1155
1156 p
1157 };
1158
1159 drop(removed);
1160 }
1161
1162 /// Indicate if the update request is pending.
1163 pub fn is_pending(&self) -> bool {
1164 let Some(g) = self.g.upgrade() else {
1165 return false;
1166 };
1167
1168 g.requests.borrow().contains_key(&self.serial)
1169 }
1170}
1171
1172impl Default for Request {
1173 #[inline]
1174 fn default() -> Self {
1175 Self::new()
1176 }
1177}
1178
1179impl Drop for Request {
1180 #[inline]
1181 fn drop(&mut self) {
1182 self.clear();
1183 }
1184}
1185
1186/// The handle for a pending request.
1187///
1188/// Dropping or calling [`clear()`] on this handle remove the listener.
1189///
1190/// [`clear()`]: Self::clear
1191pub struct Listener {
1192 kind: Option<MessageId>,
1193 index: usize,
1194 g: Weak<Generic>,
1195}
1196
1197impl Listener {
1198 /// Construct an empty listener.
1199 #[inline]
1200 pub const fn new() -> Self {
1201 Self {
1202 kind: None,
1203 index: 0,
1204 g: Weak::new(),
1205 }
1206 }
1207
1208 /// Build up an empty listener with the specified kind.
1209 #[inline]
1210 pub(crate) const fn empty_with_kind(kind: MessageId) -> Self {
1211 Self {
1212 kind: Some(kind),
1213 index: 0,
1214 g: Weak::new(),
1215 }
1216 }
1217
1218 /// Clear the listener without dropping it.
1219 ///
1220 /// This will remove the associated broadcast listener from being notified.
1221 pub fn clear(&mut self) {
1222 // Gather values here to drop them outside of the upgrade block.
1223 let removed;
1224 let removed_value;
1225
1226 {
1227 let Some(g) = self.g.upgrade() else {
1228 return;
1229 };
1230
1231 self.g = Weak::new();
1232 let index = mem::take(&mut self.index);
1233
1234 let Some(kind) = self.kind.take() else {
1235 return;
1236 };
1237
1238 let mut broadcasts = g.broadcasts.borrow_mut();
1239
1240 let Entry::Occupied(mut e) = broadcasts.entry(kind) else {
1241 return;
1242 };
1243
1244 removed = e.get_mut().try_remove(index);
1245
1246 if e.get().is_empty() {
1247 removed_value = Some(e.remove());
1248 } else {
1249 removed_value = None;
1250 }
1251 }
1252
1253 // Drop here, to avoid invoking any destructors which might borrow
1254 // shared mutably earlier.
1255 drop(removed);
1256 drop(removed_value);
1257 }
1258}
1259
1260impl Default for Listener {
1261 #[inline]
1262 fn default() -> Self {
1263 Self::new()
1264 }
1265}
1266
1267impl Drop for Listener {
1268 #[inline]
1269 fn drop(&mut self) {
1270 self.clear();
1271 }
1272}
1273
1274/// The handle for state change listening.
1275///
1276/// Dropping or calling [`clear()`] on this handle will remove the associated
1277/// callback from being notified.
1278///
1279/// [`clear()`]: Self::clear
1280pub struct StateListener {
1281 index: usize,
1282 g: Weak<Generic>,
1283}
1284
1285impl StateListener {
1286 /// Construct an empty state listener.
1287 #[inline]
1288 pub const fn new() -> Self {
1289 Self {
1290 index: 0,
1291 g: Weak::new(),
1292 }
1293 }
1294
1295 /// Clear the state listener without dropping it.
1296 ///
1297 /// This will remove the associated callback from being notified.
1298 pub fn clear(&mut self) {
1299 let removed = {
1300 let Some(g) = self.g.upgrade() else {
1301 return;
1302 };
1303
1304 self.g = Weak::new();
1305
1306 g.state_listeners.borrow_mut().try_remove(self.index)
1307 };
1308
1309 drop(removed);
1310 }
1311}
1312
1313impl Default for StateListener {
1314 #[inline]
1315 fn default() -> Self {
1316 Self::new()
1317 }
1318}
1319
1320impl Drop for StateListener {
1321 #[inline]
1322 fn drop(&mut self) {
1323 self.clear();
1324 }
1325}
1326
1327pub(crate) struct BufData {
1328 /// Buffer being used.
1329 pub(crate) data: Vec<u8>,
1330 /// Number of strong references to this buffer.
1331 strong: Cell<usize>,
1332 /// Reference to shared state where the buffer will be recycled to.
1333 g: Weak<Generic>,
1334}
1335
1336impl BufData {
1337 fn with_capacity(g: Weak<Generic>, capacity: usize) -> Self {
1338 Self {
1339 data: Vec::with_capacity(capacity),
1340 strong: Cell::new(0),
1341 g,
1342 }
1343 }
1344
1345 unsafe fn dec(ptr: NonNull<BufData>) {
1346 unsafe {
1347 let count = ptr.as_ref().strong.get().wrapping_sub(1);
1348 ptr.as_ref().strong.set(count);
1349
1350 if count > 0 {
1351 return;
1352 }
1353
1354 let mut buf = Box::from_raw(ptr.as_ptr());
1355
1356 // Try to recycle the buffer if shared is available, else let it be
1357 // dropped and free here.
1358 let Some(g) = buf.as_ref().g.upgrade() else {
1359 return;
1360 };
1361
1362 let mut buffers = g.buffers.borrow_mut();
1363
1364 // Set the length of the recycled buffer.
1365 buf.data.set_len(buf.data.len().min(MAX_CAPACITY));
1366
1367 // We size our buffers to some max capacity to avod overuse in case
1368 // we infrequently need to handle some massive message. If we don't
1369 // shrink the allocation, then memory use can run away over time.
1370 buf.data.shrink_to(MAX_CAPACITY);
1371
1372 buffers.push_back(buf);
1373 }
1374 }
1375
1376 unsafe fn inc(ptr: NonNull<BufData>) {
1377 unsafe {
1378 let count = ptr.as_ref().strong.get().wrapping_add(1);
1379
1380 if count == 0 {
1381 std::process::abort();
1382 }
1383
1384 ptr.as_ref().strong.set(count);
1385 }
1386 }
1387}
1388
1389/// A shared buffer of data that is recycled when dropped.
1390struct BufRc {
1391 data: NonNull<BufData>,
1392}
1393
1394impl BufRc {
1395 fn new(data: Box<BufData>) -> Self {
1396 let data = NonNull::from(Box::leak(data));
1397
1398 unsafe {
1399 BufData::inc(data);
1400 }
1401
1402 Self { data }
1403 }
1404}
1405
1406impl Deref for BufRc {
1407 type Target = [u8];
1408
1409 fn deref(&self) -> &Self::Target {
1410 unsafe { &(*self.data.as_ptr()).data }
1411 }
1412}
1413
1414impl Clone for BufRc {
1415 fn clone(&self) -> Self {
1416 unsafe {
1417 BufData::inc(self.data);
1418 }
1419
1420 Self { data: self.data }
1421 }
1422}
1423
1424impl Drop for BufRc {
1425 fn drop(&mut self) {
1426 unsafe {
1427 BufData::dec(self.data);
1428 }
1429 }
1430}
1431
1432/// A raw packet of data.
1433#[derive(Clone)]
1434pub struct RawPacket {
1435 id: MessageId,
1436 buf: Option<BufRc>,
1437 at: Cell<usize>,
1438}
1439
1440impl RawPacket {
1441 /// Construct an empty raw packet.
1442 ///
1443 /// # Examples
1444 ///
1445 /// ```
1446 /// use musli_web::api::MessageId;
1447 /// use musli_web::web::RawPacket;
1448 ///
1449 /// let packet = RawPacket::empty();
1450 ///
1451 /// assert!(packet.is_empty());
1452 /// assert_eq!(packet.id(), MessageId::EMPTY);
1453 /// ```
1454 pub const fn empty() -> Self {
1455 Self {
1456 id: MessageId::EMPTY,
1457 buf: None,
1458 at: Cell::new(0),
1459 }
1460 }
1461
1462 /// Decode the contents of a raw packet.
1463 ///
1464 /// This can be called multiple times if there are multiple payloads in
1465 /// sequence of the response.
1466 ///
1467 /// You can check if the packet is empty using [`RawPacket::is_empty`].
1468 pub fn decode<'this, T>(&'this self) -> Result<T>
1469 where
1470 T: Decode<'this, Binary, Global>,
1471 {
1472 let at = self.at.get();
1473
1474 if self.id == MessageId::EMPTY {
1475 return Err(Error::new(ErrorKind::EmptyPacket));
1476 }
1477
1478 let Some(bytes) = self.as_slice().get(at..) else {
1479 return Err(Error::new(ErrorKind::Overflow(at, self.as_slice().len())));
1480 };
1481
1482 let mut reader = SliceReader::new(bytes);
1483
1484 match storage::decode(&mut reader) {
1485 Ok(value) => {
1486 self.at.set(at + bytes.len() - reader.remaining());
1487 Ok(value)
1488 }
1489 Err(error) => {
1490 self.at.set(self.len());
1491 Err(Error::from(error))
1492 }
1493 }
1494 }
1495
1496 /// Get the underlying byte slice of the packet.
1497 ///
1498 /// # Examples
1499 ///
1500 /// ```
1501 /// use musli_web::web::RawPacket;
1502 ///
1503 /// let packet = RawPacket::empty();
1504 /// assert_eq!(packet.as_slice(), &[]);
1505 /// ```
1506 pub fn as_slice(&self) -> &[u8] {
1507 match &self.buf {
1508 Some(buf) => buf.as_ref(),
1509 None => &[],
1510 }
1511 }
1512
1513 /// Get the length of the packet.
1514 ///
1515 /// # Examples
1516 ////
1517 /// ```
1518 /// use musli_web::web::RawPacket;
1519 ///
1520 /// let packet = RawPacket::empty();
1521 /// assert_eq!(packet.len(), 0);
1522 /// ```
1523 pub fn len(&self) -> usize {
1524 match &self.buf {
1525 Some(buf) => buf.len(),
1526 None => 0,
1527 }
1528 }
1529
1530 /// Check if the packet is empty.
1531 ///
1532 /// # Examples
1533 ///
1534 /// ```
1535 /// use musli_web::web::RawPacket;
1536 ///
1537 /// let packet = RawPacket::empty();
1538 /// assert!(packet.is_empty());
1539 /// ```
1540 pub fn is_empty(&self) -> bool {
1541 self.at.get() >= self.len()
1542 }
1543
1544 /// The id of the packet this is a response to as specified by
1545 /// [`Endpoint::ID`] or [`Broadcast::ID`].
1546 ///
1547 /// [`Endpoint::ID`]: crate::api::Endpoint::ID
1548 /// [`Broadcast::ID`]: crate::api::Broadcast::ID
1549 pub fn id(&self) -> MessageId {
1550 self.id
1551 }
1552}
1553
1554/// A typed packet of data.
1555#[derive(Clone)]
1556pub struct Packet<T> {
1557 raw: RawPacket,
1558 _marker: PhantomData<T>,
1559}
1560
1561impl<T> Packet<T> {
1562 /// Construct an empty package.
1563 ///
1564 /// # Examples
1565 ///
1566 /// ```
1567 /// use musli_web::api::MessageId;
1568 /// use musli_web::web::Packet;
1569 ///
1570 /// let packet = Packet::<()>::empty();
1571 ///
1572 /// assert!(packet.is_empty());
1573 /// assert_eq!(packet.id(), MessageId::EMPTY);
1574 /// ```
1575 pub const fn empty() -> Self {
1576 Self {
1577 raw: RawPacket::empty(),
1578 _marker: PhantomData,
1579 }
1580 }
1581
1582 /// Construct a new typed package from a raw one.
1583 ///
1584 /// Note that this does not guarantee that the typed package is correct, but
1585 /// the `T` parameter becomes associated with it allowing it to be used
1586 /// automatically with methods such as [`Packet::decode`].
1587 #[inline]
1588 pub fn new(raw: RawPacket) -> Self {
1589 Self {
1590 raw,
1591 _marker: PhantomData,
1592 }
1593 }
1594
1595 /// Convert a packet into a raw packet.
1596 ///
1597 /// To determine which endpoint or broadcast it belongs to the
1598 /// [`RawPacket::id`] method can be used.
1599 pub fn into_raw(self) -> RawPacket {
1600 self.raw
1601 }
1602
1603 /// Check if the packet is empty.
1604 ///
1605 /// # Examples
1606 ///
1607 /// ```
1608 /// use musli_web::web::Packet;
1609 ///
1610 /// let packet = Packet::<()>::empty();
1611 /// assert!(packet.is_empty());
1612 /// ```
1613 pub fn is_empty(&self) -> bool {
1614 self.raw.is_empty()
1615 }
1616
1617 /// The id of the packet this is a response to as specified by
1618 /// [`Endpoint::ID`] or [`Broadcast::ID`].
1619 ///
1620 /// [`Endpoint::ID`]: crate::api::Endpoint::ID
1621 /// [`Broadcast::ID`]: crate::api::Broadcast::ID
1622 pub fn id(&self) -> MessageId {
1623 self.raw.id()
1624 }
1625}
1626
1627impl<T> Packet<T>
1628where
1629 T: api::Decodable,
1630{
1631 /// Decode the contents of a packet.
1632 ///
1633 /// This can be called multiple times if there are multiple payloads in
1634 /// sequence of the response.
1635 ///
1636 /// You can check if the packet is empty using [`Packet::is_empty`].
1637 pub fn decode(&self) -> Result<T::Type<'_>> {
1638 self.decode_any()
1639 }
1640
1641 /// Decode any contents of a packet.
1642 ///
1643 /// This can be called multiple times if there are multiple payloads in
1644 /// sequence of the response.
1645 ///
1646 /// You can check if the packet is empty using [`Packet::is_empty`].
1647 pub fn decode_any<'de, R>(&'de self) -> Result<R>
1648 where
1649 R: Decode<'de, Binary, Global>,
1650 {
1651 self.raw.decode()
1652 }
1653}
1654
1655impl<T> Packet<T>
1656where
1657 T: api::Endpoint,
1658{
1659 /// Decode the contents of a packet.
1660 ///
1661 /// This can be called multiple times if there are multiple payloads in
1662 /// sequence of the response.
1663 ///
1664 /// You can check if the packet is empty using [`Packet::is_empty`].
1665 pub fn decode_response(&self) -> Result<T::Response<'_>> {
1666 self.decode_any_response()
1667 }
1668
1669 /// Decode any contents of a packet.
1670 ///
1671 /// This can be called multiple times if there are multiple payloads in
1672 /// sequence of the response.
1673 ///
1674 /// You can check if the packet is empty using [`Packet::is_empty`].
1675 pub fn decode_any_response<'de, R>(&'de self) -> Result<R>
1676 where
1677 R: Decode<'de, Binary, Global>,
1678 {
1679 self.raw.decode()
1680 }
1681}
1682
1683impl<T> Packet<T>
1684where
1685 T: api::Broadcast,
1686{
1687 /// Decode the primary event related to a broadcast.
1688 pub fn decode_event<'de>(&'de self) -> Result<T::Event<'de>>
1689 where
1690 T: api::BroadcastWithEvent,
1691 {
1692 self.decode_event_any()
1693 }
1694
1695 /// Decode any event related to a broadcast.
1696 pub fn decode_event_any<'de, E>(&'de self) -> Result<E>
1697 where
1698 E: Event<Broadcast = T> + Decode<'de, Binary, Global>,
1699 {
1700 self.raw.decode()
1701 }
1702}
1703
1704/// A handle to the WebSocket service.
1705#[derive(Clone)]
1706#[repr(transparent)]
1707pub struct Handle<H>
1708where
1709 H: WebImpl,
1710{
1711 shared: Weak<Shared<H>>,
1712}
1713
1714impl<H> Handle<H>
1715where
1716 H: WebImpl,
1717{
1718 /// Send a request of type `T`.
1719 ///
1720 /// Returns a handle for the request.
1721 ///
1722 /// If the handle is dropped, the request is cancelled.
1723 ///
1724 /// # Examples
1725 ///
1726 /// ```
1727 /// # extern crate yew021 as yew;
1728 /// use yew::prelude::*;
1729 /// use musli_web::web03::prelude::*;
1730 ///
1731 /// mod api {
1732 /// use musli::{Decode, Encode};
1733 /// use musli_web::api;
1734 ///
1735 /// #[derive(Encode, Decode)]
1736 /// pub struct HelloRequest<'de> {
1737 /// pub message: &'de str,
1738 /// }
1739 ///
1740 /// #[derive(Encode, Decode)]
1741 /// pub struct HelloResponse<'de> {
1742 /// pub message: &'de str,
1743 /// }
1744 ///
1745 /// api::define! {
1746 /// pub type Hello;
1747 ///
1748 /// impl Endpoint for Hello {
1749 /// impl<'de> Request for HelloRequest<'de>;
1750 /// type Response<'de> = HelloResponse<'de>;
1751 /// }
1752 /// }
1753 /// }
1754 ///
1755 /// enum Msg {
1756 /// OnHello(Result<ws::Packet<api::Hello>, ws::Error>),
1757 /// }
1758 ///
1759 /// #[derive(Properties, PartialEq)]
1760 /// struct Props {
1761 /// ws: ws::Handle,
1762 /// }
1763 ///
1764 /// struct App {
1765 /// message: String,
1766 /// _hello: ws::Request,
1767 /// }
1768 ///
1769 /// impl Component for App {
1770 /// type Message = Msg;
1771 /// type Properties = Props;
1772 ///
1773 /// fn create(ctx: &Context<Self>) -> Self {
1774 /// let hello = ctx.props().ws
1775 /// .request()
1776 /// .body(api::HelloRequest { message: "Hello!"})
1777 /// .on_packet(ctx.link().callback(Msg::OnHello))
1778 /// .send();
1779 ///
1780 /// Self {
1781 /// message: String::from("No Message :("),
1782 /// _hello: hello,
1783 /// }
1784 /// }
1785 ///
1786 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1787 /// match msg {
1788 /// Msg::OnHello(Err(error)) => {
1789 /// tracing::error!("Request error: {:?}", error);
1790 /// false
1791 /// }
1792 /// Msg::OnHello(Ok(packet)) => {
1793 /// if let Ok(response) = packet.decode() {
1794 /// self.message = response.message.to_owned();
1795 /// }
1796 ///
1797 /// true
1798 /// }
1799 /// }
1800 /// }
1801 ///
1802 /// fn view(&self, ctx: &Context<Self>) -> Html {
1803 /// html! {
1804 /// <div>
1805 /// <h1>{"WebSocket Example"}</h1>
1806 /// <p>{format!("Message: {}", self.message)}</p>
1807 /// </div>
1808 /// }
1809 /// }
1810 /// }
1811 /// ```
1812 pub fn request(&self) -> RequestBuilder<'_, H, EmptyBody, EmptyCallback> {
1813 RequestBuilder {
1814 shared: &self.shared,
1815 body: EmptyBody,
1816 callback: EmptyCallback,
1817 }
1818 }
1819
1820 /// Listen for broadcasts of type `T`.
1821 ///
1822 /// Returns a handle for the listener that will cancel the listener if
1823 /// dropped.
1824 ///
1825 /// # Examples
1826 ///
1827 /// ```
1828 /// # extern crate yew021 as yew;
1829 /// use yew::prelude::*;
1830 /// use musli_web::web03::prelude::*;
1831 ///
1832 /// mod api {
1833 /// use musli::{Decode, Encode};
1834 /// use musli_web::api;
1835 ///
1836 /// #[derive(Encode, Decode)]
1837 /// pub struct TickEvent<'de> {
1838 /// pub message: &'de str,
1839 /// pub tick: u32,
1840 /// }
1841 ///
1842 /// api::define! {
1843 /// pub type Tick;
1844 ///
1845 /// impl Broadcast for Tick {
1846 /// impl<'de> Event for TickEvent<'de>;
1847 /// }
1848 /// }
1849 /// }
1850 ///
1851 /// enum Msg {
1852 /// Tick(Result<ws::Packet<api::Tick>, ws::Error>),
1853 /// }
1854 ///
1855 /// #[derive(Properties, PartialEq)]
1856 /// struct Props {
1857 /// ws: ws::Handle,
1858 /// }
1859 ///
1860 /// struct App {
1861 /// tick: u32,
1862 /// _listen: ws::Listener,
1863 /// }
1864 ///
1865 /// impl Component for App {
1866 /// type Message = Msg;
1867 /// type Properties = Props;
1868 ///
1869 /// fn create(ctx: &Context<Self>) -> Self {
1870 /// let listen = ctx.props().ws.on_broadcast(ctx.link().callback(Msg::Tick));
1871 ///
1872 /// Self {
1873 /// tick: 0,
1874 /// _listen: listen,
1875 /// }
1876 /// }
1877 ///
1878 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1879 /// match msg {
1880 /// Msg::Tick(Err(error)) => {
1881 /// tracing::error!("Tick error: {error}");
1882 /// false
1883 /// }
1884 /// Msg::Tick(Ok(packet)) => {
1885 /// if let Ok(tick) = packet.decode_event() {
1886 /// self.tick = tick.tick;
1887 /// }
1888 ///
1889 /// true
1890 /// }
1891 /// }
1892 /// }
1893 ///
1894 /// fn view(&self, ctx: &Context<Self>) -> Html {
1895 /// html! {
1896 /// <div>
1897 /// <h1>{"WebSocket Example"}</h1>
1898 /// <p>{format!("Tick: {}", self.tick)}</p>
1899 /// </div>
1900 /// }
1901 /// }
1902 /// }
1903 /// ```
1904 pub fn on_broadcast<T>(&self, callback: impl Callback<Result<Packet<T>>>) -> Listener
1905 where
1906 T: api::Broadcast,
1907 {
1908 self.on_raw_broadcast::<T>(move |result| match result {
1909 Ok(packet) => callback.call(Ok(Packet::new(packet))),
1910 Err(error) => callback.call(Err(error)),
1911 })
1912 }
1913
1914 /// Listen for broadcasts of type `T`.
1915 ///
1916 /// Returns a handle for the listener that will cancel the listener if
1917 /// dropped.
1918 ///
1919 /// # Examples
1920 ///
1921 /// ```
1922 /// # extern crate yew021 as yew;
1923 /// use yew::prelude::*;
1924 /// use musli_web::web03::prelude::*;
1925 ///
1926 /// mod api {
1927 /// use musli::{Decode, Encode};
1928 /// use musli_web::api;
1929 ///
1930 /// #[derive(Encode, Decode)]
1931 /// pub struct TickEvent<'de> {
1932 /// pub message: &'de str,
1933 /// pub tick: u32,
1934 /// }
1935 ///
1936 /// api::define! {
1937 /// pub type Tick;
1938 ///
1939 /// impl Broadcast for Tick {
1940 /// impl<'de> Event for TickEvent<'de>;
1941 /// }
1942 /// }
1943 /// }
1944 ///
1945 /// enum Msg {
1946 /// Tick(Result<ws::RawPacket, ws::Error>),
1947 /// }
1948 ///
1949 /// #[derive(Properties, PartialEq)]
1950 /// struct Props {
1951 /// ws: ws::Handle,
1952 /// }
1953 ///
1954 /// struct App {
1955 /// tick: u32,
1956 /// _listen: ws::Listener,
1957 /// }
1958 ///
1959 /// impl Component for App {
1960 /// type Message = Msg;
1961 /// type Properties = Props;
1962 ///
1963 /// fn create(ctx: &Context<Self>) -> Self {
1964 /// let link = ctx.link().clone();
1965 /// let listen = ctx.props().ws.on_raw_broadcast::<api::Tick>(ctx.link().callback(Msg::Tick));
1966 ///
1967 /// Self {
1968 /// tick: 0,
1969 /// _listen: listen,
1970 /// }
1971 /// }
1972 ///
1973 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
1974 /// match msg {
1975 /// Msg::Tick(Err(error)) => {
1976 /// tracing::error!("Tick error: {error}");
1977 /// false
1978 /// }
1979 /// Msg::Tick(Ok(packet)) => {
1980 /// if let Ok(tick) = packet.decode::<api::TickEvent>() {
1981 /// self.tick = tick.tick;
1982 /// }
1983 ///
1984 /// true
1985 /// }
1986 /// }
1987 /// }
1988 ///
1989 /// fn view(&self, ctx: &Context<Self>) -> Html {
1990 /// html! {
1991 /// <div>
1992 /// <h1>{"WebSocket Example"}</h1>
1993 /// <p>{format!("Tick: {}", self.tick)}</p>
1994 /// </div>
1995 /// }
1996 /// }
1997 /// }
1998 /// ```
1999 pub fn on_raw_broadcast<T>(&self, callback: impl Callback<Result<RawPacket>>) -> Listener
2000 where
2001 T: api::Broadcast,
2002 {
2003 let Some(shared) = self.shared.upgrade() else {
2004 return Listener::empty_with_kind(T::ID);
2005 };
2006
2007 let index = {
2008 let mut broadcasts = shared.g.broadcasts.borrow_mut();
2009 let slots = broadcasts.entry(T::ID).or_default();
2010 slots.insert(Rc::new(callback))
2011 };
2012
2013 Listener {
2014 kind: Some(T::ID),
2015 index,
2016 g: Rc::downgrade(&shared.g),
2017 }
2018 }
2019
2020 /// Listen for state changes to the underlying connection.
2021 ///
2022 /// This indicates when the connection is open and ready to receive requests
2023 /// through [`State::Open`], or if it's closed and requests will be queued
2024 /// through [`State::Closed`].
2025 ///
2026 /// Note that if you are connecting through a proxy the reported updates
2027 /// might be volatile. It is always best to send a message over the
2028 /// connection on the server side that once received allows the client to
2029 /// know that it is connected.
2030 ///
2031 /// Dropping the returned handle will cancel the listener.
2032 ///
2033 /// # Examples
2034 ///
2035 /// ```
2036 /// # extern crate yew021 as yew;
2037 /// use yew::prelude::*;
2038 /// use musli_web::web03::prelude::*;
2039 ///
2040 /// enum Msg {
2041 /// StateChange(ws::State),
2042 /// }
2043 ///
2044 /// #[derive(Properties, PartialEq)]
2045 /// struct Props {
2046 /// ws: ws::Handle,
2047 /// }
2048 ///
2049 /// struct App {
2050 /// state: ws::State,
2051 /// _listen: ws::StateListener,
2052 /// }
2053 ///
2054 /// impl Component for App {
2055 /// type Message = Msg;
2056 /// type Properties = Props;
2057 ///
2058 /// fn create(ctx: &Context<Self>) -> Self {
2059 /// let link = ctx.link().clone();
2060 ///
2061 /// let (state, listen) = ctx.props().ws.on_state_change(move |state| {
2062 /// link.send_message(Msg::StateChange(state));
2063 /// });
2064 ///
2065 /// Self {
2066 /// state,
2067 /// _listen: listen,
2068 /// }
2069 /// }
2070 ///
2071 /// fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
2072 /// match msg {
2073 /// Msg::StateChange(state) => {
2074 /// self.state = state;
2075 /// true
2076 /// }
2077 /// }
2078 /// }
2079 ///
2080 /// fn view(&self, ctx: &Context<Self>) -> Html {
2081 /// html! {
2082 /// <div>
2083 /// <h1>{"WebSocket Example"}</h1>
2084 /// <p>{format!("State: {:?}", self.state)}</p>
2085 /// </div>
2086 /// }
2087 /// }
2088 /// }
2089 pub fn on_state_change(&self, callback: impl Callback<State>) -> (State, StateListener) {
2090 let Some(shared) = self.shared.upgrade() else {
2091 return (
2092 State::Closed,
2093 StateListener {
2094 index: 0,
2095 g: Weak::new(),
2096 },
2097 );
2098 };
2099
2100 let (state, index) = {
2101 let index = shared
2102 .g
2103 .state_listeners
2104 .borrow_mut()
2105 .insert(Rc::new(callback));
2106 (shared.state.get(), index)
2107 };
2108
2109 let listener = StateListener {
2110 index,
2111 g: Rc::downgrade(&shared.g),
2112 };
2113
2114 (state, listener)
2115 }
2116}
2117
2118impl<H> PartialEq for Handle<H>
2119where
2120 H: WebImpl,
2121{
2122 #[inline]
2123 fn eq(&self, _: &Self) -> bool {
2124 true
2125 }
2126}
2127
2128struct Pending<C>
2129where
2130 C: ?Sized,
2131{
2132 kind: MessageId,
2133 serial: u32,
2134 callback: C,
2135}
2136
2137impl<C> fmt::Debug for Pending<C> {
2138 #[inline]
2139 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2140 f.debug_struct("Pending")
2141 .field("serial", &self.serial)
2142 .field("kind", &self.kind)
2143 .finish_non_exhaustive()
2144 }
2145}
2146struct ForcePrefix<'a>(&'a str, char);
2147
2148impl fmt::Display for ForcePrefix<'_> {
2149 #[inline]
2150 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2151 let Self(string, prefix) = *self;
2152 prefix.fmt(f)?;
2153 string.trim_start_matches(prefix).fmt(f)?;
2154 Ok(())
2155 }
2156}