Skip to main content

web_rpc/
lib.rs

1//! Bidirectional RPC for browsing contexts, web workers, and message channels.
2//!
3//! This crate allows you to define a service as a trait and annotate it with
4//! [`#[web_rpc::service]`](macro@service). The macro then produces a `*Client`, a `*Service`,
5//! and a forwarding trait that you can implement on the server side.
6//!
7//! Routing is inferred from each type: anything implementing
8//! [`AsRef<JsValue>`](https://docs.rs/wasm-bindgen/latest/wasm_bindgen/struct.JsValue.html) is
9//! posted through `postMessage` directly and everything that is serializable is first encoded
10//! via bincode. There is special support for `Option<T>` and `Result<T, E>` to allow Javascript
11//! types to be embedded within these types. This behaviour is recursive.
12//!
13//! # Quickstart
14//! ```rust
15//! #[web_rpc::service]
16//! pub trait Calculator {
17//!     fn add(&self, left: u32, right: u32) -> u32;
18//! }
19//! struct Calc;
20//! impl Calculator for Calc {
21//!     fn add(&self, left: u32, right: u32) -> u32 { left + right }
22//! }
23//! ```
24//! Wire up over a `MessageChannel`, [`Worker`](https://docs.rs/web-sys/latest/web_sys/struct.Worker.html),
25//! or any [`MessagePort`](https://docs.rs/web-sys/latest/web_sys/struct.MessagePort.html). Each
26//! Each call to [`Interface::new`] is async because temporary listeners need to detect when
27//! both ends are ready.
28//! ```rust,no_run
29//! # #[web_rpc::service]
30//! # pub trait Calculator { fn add(&self, l: u32, r: u32) -> u32; }
31//! # struct Calc;
32//! # impl Calculator for Calc { fn add(&self, l: u32, r: u32) -> u32 { l + r } }
33//! # async fn run() {
34//! let channel = web_sys::MessageChannel::new().unwrap();
35//! let (server_iface, client_iface) = futures_util::future::join(
36//!     web_rpc::Interface::new(channel.port1()),
37//!     web_rpc::Interface::new(channel.port2()),
38//! ).await;
39//!
40//! let server = web_rpc::Builder::new(server_iface)
41//!     .with_service::<CalculatorService<_>>(Calc)
42//!     .build();
43//! wasm_bindgen_futures::spawn_local(server);
44//!
45//! let client = web_rpc::Builder::new(client_iface)
46//!     .with_client::<CalculatorClient>()
47//!     .build();
48//! assert_eq!(client.add(41, 1).await, 42);
49//! # }
50//! ```
51//!
52//! # Routing
53//! ```rust
54//! #[web_rpc::service]
55//! pub trait Routing {
56//!     // Plain types that implement Serialize go through bincode.
57//!     fn add(&self, l: u32, r: u32) -> u32;
58//!     // Anything `AsRef<JsValue>` is posted through the JS array.
59//!     fn echo(&self, s: js_sys::JsString) -> js_sys::JsString;
60//!     // `Option`/`Result` recurse: Ok(Some(_)) is posted, Ok(None) is one byte,
61//!     // Err carries a bincoded `String`.
62//!     fn lookup(&self, k: u32) -> Result<Option<js_sys::JsString>, String>;
63//!     // `&str` / `&[u8]` deserialize zero-copy on the server.
64//!     fn count(&self, data: &[u8]) -> usize;
65//!     // References to JS types are accepted too and are decoded via JsCast::dyn_ref.
66//!     fn len(&self, s: &js_sys::JsString) -> u32;
67//! }
68//! ```
69//!
70//! # Async, notifications, streaming
71//! ```rust
72//! use futures_core::Stream;
73//!
74//! #[web_rpc::service]
75//! pub trait Misc {
76//!     // `async` here makes the server impl async; the client side is also async because we return a u32.
77//!     async fn slow(&self, ms: u32) -> u32;
78//!     // No return type means the method is a notification.
79//!     fn fire(&self, msg: String);
80//!     // `impl Stream<Item = T>` makes the method a streaming RPC.
81//!     fn items(&self, n: u32) -> impl Stream<Item = u32>;
82//! }
83//! ```
84//! On the client side, RPC methods that have a return type are async and yield a
85//! [`client::RequestFuture<T>`] which you await for the response. Methods without a return type
86//! are sync and act as fire-and-forget notifications. This is independent of whether the trait
87//! method itself is marked `async`, which only affects the server implementation. Dropping the
88//! `RequestFuture` cancels the request, so notifications cannot be cancelled.
89//!
90//! Streaming methods return a [`client::StreamReceiver<T>`] that yields each item the server
91//! produces. Dropping the receiver aborts the stream on the server, while
92//! [`close`](client::StreamReceiver::close) lets buffered items finish arriving instead.
93//! Streaming methods can also be `async` and the items they yield can be wrapper types like
94//! `Result<JsT, E>`.
95//!
96//! # Transfer
97//! Anything that should be transferred to the other side rather than copied with the structured
98//! clone algorithm can be specified inside a `#[transfer(...)]` attribute as a comma-separated
99//! list. The simplest case is to list the parameter that holds the transferable value, but if
100//! that value is wrapped or derived from a parameter, you can use a parameter-name expression
101//! (`name => expr`, evaluated with `name` in scope), a closure with a refutable pattern
102//! (`name => |pat| body`), or a match-block (`name => match { arm, ... }`). The same forms also
103//! work for the return value via `return`.
104//! ```rust
105//! # use wasm_bindgen::JsCast;
106//! #[web_rpc::service]
107//! pub trait Transfer {
108//!     // Bare param + derived expression + return closure.
109//!     #[transfer(
110//!         canvas,
111//!         data => data.buffer(),
112//!         return => |Ok(buf)| buf.buffer(),
113//!     )]
114//!     fn render(
115//!         &self,
116//!         canvas: web_sys::OffscreenCanvas,
117//!         data: js_sys::Uint8Array,
118//!     ) -> Result<js_sys::Uint8Array, String>;
119//!
120//!     // Match-block: useful when several variants need transferring.
121//!     #[transfer(return => match { Some(buf) => buf.buffer(), })]
122//!     fn maybe(&self) -> Option<js_sys::Uint8Array>;
123//! }
124//! ```
125//!
126//! # Bi-directional
127//! Both sides of a channel can be set up to act as both client and server at the same time. To
128//! do this, stack [`with_service`](Builder::with_service) and
129//! [`with_client`](Builder::with_client) on the same [`Builder`] before calling `build()`, which
130//! then returns a `(C, Server)` tuple instead of one or the other.
131//! ```rust,no_run
132//! # #[web_rpc::service]
133//! # pub trait Calculator { fn add(&self, l: u32, r: u32) -> u32; }
134//! # struct Calc;
135//! # impl Calculator for Calc { fn add(&self, l: u32, r: u32) -> u32 { l + r } }
136//! # async fn run() {
137//! # let channel = web_sys::MessageChannel::new().unwrap();
138//! # let (iface, _) = futures_util::future::join(
139//! #     web_rpc::Interface::new(channel.port1()),
140//! #     web_rpc::Interface::new(channel.port2()),
141//! # ).await;
142//! let (client, server) = web_rpc::Builder::new(iface)
143//!     .with_service::<CalculatorService<_>>(Calc)
144//!     .with_client::<CalculatorClient>()
145//!     .build();
146//! # }
147//! ```
148
149use std::{
150    cell::RefCell,
151    marker::PhantomData,
152    pin::Pin,
153    rc::Rc,
154    task::{Context, Poll},
155};
156
157use futures_channel::mpsc;
158use futures_core::{future::LocalBoxFuture, Future};
159use futures_util::{FutureExt, StreamExt};
160use gloo_events::EventListener;
161use js_sys::{ArrayBuffer, Uint8Array};
162use serde::{de::DeserializeOwned, Deserialize, Serialize};
163use wasm_bindgen::JsCast;
164
165#[doc(hidden)]
166pub use bincode;
167#[doc(hidden)]
168pub use futures_channel;
169#[doc(hidden)]
170pub use futures_core;
171#[doc(hidden)]
172pub use futures_util;
173#[doc(hidden)]
174pub use gloo_events;
175#[doc(hidden)]
176pub use js_sys;
177#[doc(hidden)]
178pub use pin_utils;
179#[doc(hidden)]
180pub use serde;
181#[doc(hidden)]
182pub use wasm_bindgen;
183
184pub use web_rpc_macro::service;
185
186pub mod client;
187#[doc(hidden)]
188pub mod codec;
189pub mod interface;
190pub mod port;
191#[doc(hidden)]
192pub mod service;
193
194pub use interface::Interface;
195
196#[doc(hidden)]
197#[derive(Serialize, Deserialize)]
198pub enum MessageHeader {
199    Request(usize),
200    Abort(usize),
201    Response(usize),
202    StreamItem(usize),
203    StreamEnd(usize),
204}
205
206/// This struct allows one to configure the RPC interface prior to creating it.
207/// To get an instance of this struct, call [`Builder<C, S>::new`] with
208/// an [`Interface`].
209pub struct Builder<C, S> {
210    client: PhantomData<C>,
211    service: S,
212    interface: Interface,
213}
214
215impl Builder<(), ()> {
216    /// Create a new builder from an [`Interface`]
217    pub fn new(interface: Interface) -> Self {
218        Self {
219            interface,
220            client: PhantomData::<()>,
221            service: (),
222        }
223    }
224}
225
226impl<C> Builder<C, ()> {
227    /// Configure the RPC interface with a service that implements methods
228    /// that can be called from the other side of the channel. To use this method,
229    /// you need to specify the type `S` which is the service type generated by the
230    /// attribute macro [`macro@service`]. The implementation parameter is then an
231    /// instance of something that implements the trait to which to applied the
232    /// [`macro@service`] macro. For example, if you have a trait `Calculator` to
233    /// which you have applied [`macro@service`], you would use this method as follows:
234    /// ```rust,no_run
235    /// # #[web_rpc::service]
236    /// # pub trait Calculator {
237    /// #     fn add(&self, left: u32, right: u32) -> u32;
238    /// # }
239    /// # struct CalculatorServiceImpl;
240    /// # impl Calculator for CalculatorServiceImpl {
241    /// #     fn add(&self, left: u32, right: u32) -> u32 { left + right }
242    /// # }
243    /// # fn example(some_interface: web_rpc::Interface) {
244    /// let server = web_rpc::Builder::new(some_interface)
245    ///     .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
246    ///     .build();
247    /// # }
248    /// ```
249    pub fn with_service<S: service::Service>(self, implementation: impl Into<S>) -> Builder<C, S> {
250        let service = implementation.into();
251        let Builder {
252            interface, client, ..
253        } = self;
254        Builder {
255            interface,
256            client,
257            service,
258        }
259    }
260}
261
262impl<S> Builder<(), S> {
263    /// Configure the RPC interface with a client that allows you to execute RPCs on the
264    /// server. The builder will automatically instansiate the client for you, you just
265    /// need to provide the type which is generated via the [`macro@service`] attribute
266    /// macro. For example, if you had a trait `Calculator` to which you applied the
267    /// [`macro@service`] attribute macro, the macro would have generated a `CalculatorClient`
268    /// struct which you can use as the `C` in this function.
269    pub fn with_client<C: client::Client>(self) -> Builder<C, S> {
270        let Builder {
271            interface, service, ..
272        } = self;
273        Builder {
274            interface,
275            client: PhantomData::<C>,
276            service,
277        }
278    }
279}
280
281/// `Server` is the server that is returned from the [`Builder::build`] method given
282/// you configured the RPC interface with a service. Note that `Server` implements future and needs
283/// to be polled in order to execute and respond to inbound RPC requests.
284#[must_use = "Server must be polled in order for RPC requests to be executed"]
285pub struct Server {
286    _listener: Rc<EventListener>,
287    task: LocalBoxFuture<'static, ()>,
288}
289
290impl Future for Server {
291    type Output = ();
292
293    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
294        self.task.poll_unpin(cx)
295    }
296}
297
298impl<C> Builder<C, ()>
299where
300    C: client::Client + From<client::Configuration<C::Response>> + 'static,
301    <C as client::Client>::Response: DeserializeOwned,
302{
303    /// Build function for client-only RPC interfaces.
304    pub fn build(self) -> C {
305        let Builder {
306            interface:
307                Interface {
308                    port,
309                    listener,
310                    mut messages_rx,
311                },
312            ..
313        } = self;
314        let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
315        let client_callback_map_cloned = client_callback_map.clone();
316        let stream_callback_map: Rc<RefCell<client::StreamCallbackMap<C::Response>>> =
317            Default::default();
318        let stream_callback_map_cloned = stream_callback_map.clone();
319        let dispatcher = async move {
320            while let Some(array) = messages_rx.next().await {
321                let header_bytes =
322                    Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
323                let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
324                match header {
325                    MessageHeader::Response(seq_id) => {
326                        let payload_bytes =
327                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
328                                .to_vec();
329                        let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
330                        if let Some(callback_tx) =
331                            client_callback_map_cloned.borrow_mut().remove(&seq_id)
332                        {
333                            let _ = callback_tx.send((response, array));
334                        }
335                    }
336                    MessageHeader::StreamItem(seq_id) => {
337                        let payload_bytes =
338                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
339                                .to_vec();
340                        let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
341                        if let Some(tx) = stream_callback_map_cloned.borrow().get(&seq_id) {
342                            let _ = tx.unbounded_send((response, array));
343                        }
344                    }
345                    MessageHeader::StreamEnd(seq_id) => {
346                        stream_callback_map_cloned.borrow_mut().remove(&seq_id);
347                    }
348                    _ => panic!("client received a server message"),
349                }
350            }
351        }
352        .boxed_local()
353        .shared();
354        let port_cloned = port.clone();
355        let abort_sender = move |seq_id: usize| {
356            let header = MessageHeader::Abort(seq_id);
357            let header_bytes = bincode::serialize(&header).unwrap();
358            let buffer = js_sys::Uint8Array::from(&header_bytes[..]).buffer();
359            let post_args = js_sys::Array::of1(&buffer);
360            let transfer_args = js_sys::Array::of1(&buffer);
361            port_cloned
362                .post_message(&post_args, &transfer_args)
363                .unwrap();
364        };
365        C::from((
366            client_callback_map,
367            stream_callback_map,
368            port,
369            Rc::new(listener),
370            dispatcher,
371            Rc::new(abort_sender),
372        ))
373    }
374}
375
376impl<S> Builder<(), S>
377where
378    S: service::Service + 'static,
379    <S as service::Service>::Response: Serialize,
380{
381    /// Build function for server-only RPC interfaces.
382    pub fn build(self) -> Server {
383        let Builder {
384            service,
385            interface:
386                Interface {
387                    port,
388                    listener,
389                    mut messages_rx,
390                },
391            ..
392        } = self;
393        let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
394        let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
395        let dispatcher = async move {
396            while let Some(array) = messages_rx.next().await {
397                let header_bytes =
398                    Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
399                let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
400                match header {
401                    MessageHeader::Request(seq_id) => {
402                        let payload =
403                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
404                                .to_vec();
405                        server_requests_tx
406                            .unbounded_send((seq_id, payload, array))
407                            .unwrap();
408                    }
409                    MessageHeader::Abort(seq_id) => {
410                        abort_requests_tx.unbounded_send(seq_id).unwrap();
411                    }
412                    _ => panic!("server received a client message"),
413                }
414            }
415        }
416        .boxed_local()
417        .shared();
418        Server {
419            _listener: Rc::new(listener),
420            task: service::task::<S>(
421                service,
422                port,
423                dispatcher,
424                server_requests_rx,
425                abort_requests_rx,
426            )
427            .boxed_local(),
428        }
429    }
430}
431
432impl<C, S> Builder<C, S>
433where
434    C: client::Client + From<client::Configuration<C::Response>> + 'static,
435    S: service::Service + 'static,
436    <S as service::Service>::Response: Serialize,
437    <C as client::Client>::Response: DeserializeOwned,
438{
439    /// Build function for client-server RPC interfaces.
440    pub fn build(self) -> (C, Server) {
441        let Builder {
442            service: server,
443            interface:
444                Interface {
445                    port,
446                    listener,
447                    mut messages_rx,
448                },
449            ..
450        } = self;
451        let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
452        let stream_callback_map: Rc<RefCell<client::StreamCallbackMap<C::Response>>> =
453            Default::default();
454        let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
455        let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
456        let client_callback_map_cloned = client_callback_map.clone();
457        let stream_callback_map_cloned = stream_callback_map.clone();
458        let dispatcher = async move {
459            while let Some(array) = messages_rx.next().await {
460                let header_bytes =
461                    Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
462                let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
463                match header {
464                    MessageHeader::Response(seq_id) => {
465                        let payload_bytes =
466                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
467                                .to_vec();
468                        let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
469                        if let Some(callback_tx) =
470                            client_callback_map_cloned.borrow_mut().remove(&seq_id)
471                        {
472                            let _ = callback_tx.send((response, array));
473                        }
474                    }
475                    MessageHeader::StreamItem(seq_id) => {
476                        let payload_bytes =
477                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
478                                .to_vec();
479                        let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
480                        if let Some(tx) = stream_callback_map_cloned.borrow().get(&seq_id) {
481                            let _ = tx.unbounded_send((response, array));
482                        }
483                    }
484                    MessageHeader::StreamEnd(seq_id) => {
485                        stream_callback_map_cloned.borrow_mut().remove(&seq_id);
486                    }
487                    MessageHeader::Request(seq_id) => {
488                        let payload =
489                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
490                                .to_vec();
491                        server_requests_tx
492                            .unbounded_send((seq_id, payload, array))
493                            .unwrap();
494                    }
495                    MessageHeader::Abort(seq_id) => {
496                        abort_requests_tx.unbounded_send(seq_id).unwrap();
497                    }
498                }
499            }
500        }
501        .boxed_local()
502        .shared();
503        let port_cloned = port.clone();
504        let abort_sender = move |seq_id: usize| {
505            let header = MessageHeader::Abort(seq_id);
506            let header_bytes = bincode::serialize(&header).unwrap();
507            let buffer = js_sys::Uint8Array::from(&header_bytes[..]).buffer();
508            let post_args = js_sys::Array::of1(&buffer);
509            let transfer_args = js_sys::Array::of1(&buffer);
510            port_cloned
511                .post_message(&post_args, &transfer_args)
512                .unwrap();
513        };
514        let listener = Rc::new(listener);
515        let client = C::from((
516            client_callback_map,
517            stream_callback_map,
518            port.clone(),
519            listener.clone(),
520            dispatcher.clone(),
521            Rc::new(abort_sender),
522        ));
523        let server = Server {
524            _listener: listener,
525            task: service::task::<S>(
526                server,
527                port,
528                dispatcher,
529                server_requests_rx,
530                abort_requests_rx,
531            )
532            .boxed_local(),
533        };
534        (client, server)
535    }
536}